ac349bb606acd4d457f4b51b7fdef66f9779abba
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use DBIx::Class::Exception;
11 use Data::Page;
12 use Storable;
13 use DBIx::Class::ResultSetColumn;
14 use DBIx::Class::ResultSourceHandle;
15 use List::Util ();
16 use Scalar::Util ();
17 use base qw/DBIx::Class/;
18
19 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
20
21 =head1 NAME
22
23 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
24
25 =head1 SYNOPSIS
26
27   my $users_rs   = $schema->resultset('User');
28   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
29   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
30
31 =head1 DESCRIPTION
32
33 A ResultSet is an object which stores a set of conditions representing
34 a query. It is the backbone of DBIx::Class (i.e. the really
35 important/useful bit).
36
37 No SQL is executed on the database when a ResultSet is created, it
38 just stores all the conditions needed to create the query.
39
40 A basic ResultSet representing the data of an entire table is returned
41 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
42 L<Source|DBIx::Class::Manual::Glossary/Source> name.
43
44   my $users_rs = $schema->resultset('User');
45
46 A new ResultSet is returned from calling L</search> on an existing
47 ResultSet. The new one will contain all the conditions of the
48 original, plus any new conditions added in the C<search> call.
49
50 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
51 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
52 represents.
53
54 The query that the ResultSet represents is B<only> executed against
55 the database when these methods are called:
56 L</find> L</next> L</all> L</first> L</single> L</count>
57
58 =head1 EXAMPLES
59
60 =head2 Chaining resultsets
61
62 Let's say you've got a query that needs to be run to return some data
63 to the user. But, you have an authorization system in place that
64 prevents certain users from seeing certain information. So, you want
65 to construct the basic query in one method, but add constraints to it in
66 another.
67
68   sub get_data {
69     my $self = shift;
70     my $request = $self->get_request; # Get a request object somehow.
71     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
72
73     my $cd_rs = $schema->resultset('CD')->search({
74       title => $request->param('title'),
75       year => $request->param('year'),
76     });
77
78     $self->apply_security_policy( $cd_rs );
79
80     return $cd_rs->all();
81   }
82
83   sub apply_security_policy {
84     my $self = shift;
85     my ($rs) = @_;
86
87     return $rs->search({
88       subversive => 0,
89     });
90   }
91
92 =head3 Resolving conditions and attributes
93
94 When a resultset is chained from another resultset, conditions and
95 attributes with the same keys need resolving.
96
97 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
98 into the existing ones from the original resultset.
99
100 The L</where>, L</having> attribute, and any search conditions are
101 merged with an SQL C<AND> to the existing condition from the original
102 resultset.
103
104 All other attributes are overridden by any new ones supplied in the
105 search attributes.
106
107 =head2 Multiple queries
108
109 Since a resultset just defines a query, you can do all sorts of
110 things with it with the same object.
111
112   # Don't hit the DB yet.
113   my $cd_rs = $schema->resultset('CD')->search({
114     title => 'something',
115     year => 2009,
116   });
117
118   # Each of these hits the DB individually.
119   my $count = $cd_rs->count;
120   my $most_recent = $cd_rs->get_column('date_released')->max();
121   my @records = $cd_rs->all;
122
123 And it's not just limited to SELECT statements.
124
125   $cd_rs->delete();
126
127 This is even cooler:
128
129   $cd_rs->create({ artist => 'Fred' });
130
131 Which is the same as:
132
133   $schema->resultset('CD')->create({
134     title => 'something',
135     year => 2009,
136     artist => 'Fred'
137   });
138
139 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
140
141 =head1 OVERLOADING
142
143 If a resultset is used in a numeric context it returns the L</count>.
144 However, if it is used in a booleand context it is always true.  So if
145 you want to check if a resultset has any results use C<if $rs != 0>.
146 C<if $rs> will always be true.
147
148 =head1 METHODS
149
150 =head2 new
151
152 =over 4
153
154 =item Arguments: $source, \%$attrs
155
156 =item Return Value: $rs
157
158 =back
159
160 The resultset constructor. Takes a source object (usually a
161 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
162 L</ATTRIBUTES> below).  Does not perform any queries -- these are
163 executed as needed by the other methods.
164
165 Generally you won't need to construct a resultset manually.  You'll
166 automatically get one from e.g. a L</search> called in scalar context:
167
168   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
169
170 IMPORTANT: If called on an object, proxies to new_result instead so
171
172   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
173
174 will return a CD object, not a ResultSet.
175
176 =cut
177
178 sub new {
179   my $class = shift;
180   return $class->new_result(@_) if ref $class;
181
182   my ($source, $attrs) = @_;
183   $source = $source->handle
184     unless $source->isa('DBIx::Class::ResultSourceHandle');
185   $attrs = { %{$attrs||{}} };
186
187   if ($attrs->{page}) {
188     $attrs->{rows} ||= 10;
189   }
190
191   $attrs->{alias} ||= 'me';
192
193   # Creation of {} and bless separated to mitigate RH perl bug
194   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
195   my $self = {
196     _source_handle => $source,
197     cond => $attrs->{where},
198     count => undef,
199     pager => undef,
200     attrs => $attrs
201   };
202
203   bless $self, $class;
204
205   $self->result_class(
206     $attrs->{result_class} || $source->resolve->result_class
207   );
208
209   return $self;
210 }
211
212 =head2 search
213
214 =over 4
215
216 =item Arguments: $cond, \%attrs?
217
218 =item Return Value: $resultset (scalar context), @row_objs (list context)
219
220 =back
221
222   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
223   my $new_rs = $cd_rs->search({ year => 2005 });
224
225   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
226                  # year = 2005 OR year = 2004
227
228 If you need to pass in additional attributes but no additional condition,
229 call it as C<search(undef, \%attrs)>.
230
231   # "SELECT name, artistid FROM $artist_table"
232   my @all_artists = $schema->resultset('Artist')->search(undef, {
233     columns => [qw/name artistid/],
234   });
235
236 For a list of attributes that can be passed to C<search>, see
237 L</ATTRIBUTES>. For more examples of using this function, see
238 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
239 documentation for the first argument, see L<SQL::Abstract>.
240
241 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
242
243 =cut
244
245 sub search {
246   my $self = shift;
247   my $rs = $self->search_rs( @_ );
248   return (wantarray ? $rs->all : $rs);
249 }
250
251 =head2 search_rs
252
253 =over 4
254
255 =item Arguments: $cond, \%attrs?
256
257 =item Return Value: $resultset
258
259 =back
260
261 This method does the same exact thing as search() except it will
262 always return a resultset, even in list context.
263
264 =cut
265
266 sub search_rs {
267   my $self = shift;
268
269   # Special-case handling for (undef, undef).
270   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
271     pop(@_); pop(@_);
272   }
273
274   my $attrs = {};
275   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
276   my $our_attrs = { %{$self->{attrs}} };
277   my $having = delete $our_attrs->{having};
278   my $where = delete $our_attrs->{where};
279
280   my $rows;
281
282   my %safe = (alias => 1, cache => 1);
283
284   unless (
285     (@_ && defined($_[0])) # @_ == () or (undef)
286     ||
287     (keys %$attrs # empty attrs or only 'safe' attrs
288     && List::Util::first { !$safe{$_} } keys %$attrs)
289   ) {
290     # no search, effectively just a clone
291     $rows = $self->get_cache;
292   }
293
294   my $new_attrs = { %{$our_attrs}, %{$attrs} };
295
296   # merge new attrs into inherited
297   foreach my $key (qw/join prefetch +select +as bind/) {
298     next unless exists $attrs->{$key};
299     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
300   }
301
302   my $cond = (@_
303     ? (
304         (@_ == 1 || ref $_[0] eq "HASH")
305           ? (
306               (ref $_[0] eq 'HASH')
307                 ? (
308                     (keys %{ $_[0] }  > 0)
309                       ? shift
310                       : undef
311                    )
312                 :  shift
313              )
314           : (
315               (@_ % 2)
316                 ? $self->throw_exception("Odd number of arguments to search")
317                 : {@_}
318              )
319       )
320     : undef
321   );
322
323   if (defined $where) {
324     $new_attrs->{where} = (
325       defined $new_attrs->{where}
326         ? { '-and' => [
327               map {
328                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
329               } $where, $new_attrs->{where}
330             ]
331           }
332         : $where);
333   }
334
335   if (defined $cond) {
336     $new_attrs->{where} = (
337       defined $new_attrs->{where}
338         ? { '-and' => [
339               map {
340                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
341               } $cond, $new_attrs->{where}
342             ]
343           }
344         : $cond);
345   }
346
347   if (defined $having) {
348     $new_attrs->{having} = (
349       defined $new_attrs->{having}
350         ? { '-and' => [
351               map {
352                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
353               } $having, $new_attrs->{having}
354             ]
355           }
356         : $having);
357   }
358
359   my $rs = (ref $self)->new($self->result_source, $new_attrs);
360   if ($rows) {
361     $rs->set_cache($rows);
362   }
363   return $rs;
364 }
365
366 =head2 search_literal
367
368 =over 4
369
370 =item Arguments: $sql_fragment, @bind_values
371
372 =item Return Value: $resultset (scalar context), @row_objs (list context)
373
374 =back
375
376   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
377   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
378
379 Pass a literal chunk of SQL to be added to the conditional part of the
380 resultset query.
381
382 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
383 only be used in that context. C<search_literal> is a convenience method.
384 It is equivalent to calling $schema->search(\[]), but if you want to ensure
385 columns are bound correctly, use C<search>.
386
387 Example of how to use C<search> instead of C<search_literal>
388
389   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
390   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
391
392
393 See L<DBIx::Class::Manual::Cookbook/Searching> and
394 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
395 require C<search_literal>.
396
397 =cut
398
399 sub search_literal {
400   my ($self, $sql, @bind) = @_;
401   my $attr;
402   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
403     $attr = pop @bind;
404   }
405   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
406 }
407
408 =head2 find
409
410 =over 4
411
412 =item Arguments: @values | \%cols, \%attrs?
413
414 =item Return Value: $row_object | undef
415
416 =back
417
418 Finds a row based on its primary key or unique constraint. For example, to find
419 a row by its primary key:
420
421   my $cd = $schema->resultset('CD')->find(5);
422
423 You can also find a row by a specific unique constraint using the C<key>
424 attribute. For example:
425
426   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
427     key => 'cd_artist_title'
428   });
429
430 Additionally, you can specify the columns explicitly by name:
431
432   my $cd = $schema->resultset('CD')->find(
433     {
434       artist => 'Massive Attack',
435       title  => 'Mezzanine',
436     },
437     { key => 'cd_artist_title' }
438   );
439
440 If the C<key> is specified as C<primary>, it searches only on the primary key.
441
442 If no C<key> is specified, it searches on all unique constraints defined on the
443 source for which column data is provided, including the primary key.
444
445 If your table does not have a primary key, you B<must> provide a value for the
446 C<key> attribute matching one of the unique constraints on the source.
447
448 In addition to C<key>, L</find> recognizes and applies standard
449 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
450
451 Note: If your query does not return only one row, a warning is generated:
452
453   Query returned more than one row
454
455 See also L</find_or_create> and L</update_or_create>. For information on how to
456 declare unique constraints, see
457 L<DBIx::Class::ResultSource/add_unique_constraint>.
458
459 =cut
460
461 sub find {
462   my $self = shift;
463   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
464
465   # Default to the primary key, but allow a specific key
466   my @cols = exists $attrs->{key}
467     ? $self->result_source->unique_constraint_columns($attrs->{key})
468     : $self->result_source->primary_columns;
469   $self->throw_exception(
470     "Can't find unless a primary key is defined or unique constraint is specified"
471   ) unless @cols;
472
473   # Parse out a hashref from input
474   my $input_query;
475   if (ref $_[0] eq 'HASH') {
476     $input_query = { %{$_[0]} };
477   }
478   elsif (@_ == @cols) {
479     $input_query = {};
480     @{$input_query}{@cols} = @_;
481   }
482   else {
483     # Compatibility: Allow e.g. find(id => $value)
484     carp "Find by key => value deprecated; please use a hashref instead";
485     $input_query = {@_};
486   }
487
488   my (%related, $info);
489
490   KEY: foreach my $key (keys %$input_query) {
491     if (ref($input_query->{$key})
492         && ($info = $self->result_source->relationship_info($key))) {
493       my $val = delete $input_query->{$key};
494       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
495       my $rel_q = $self->result_source->_resolve_condition(
496                     $info->{cond}, $val, $key
497                   );
498       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
499       @related{keys %$rel_q} = values %$rel_q;
500     }
501   }
502   if (my @keys = keys %related) {
503     @{$input_query}{@keys} = values %related;
504   }
505
506
507   # Build the final query: Default to the disjunction of the unique queries,
508   # but allow the input query in case the ResultSet defines the query or the
509   # user is abusing find
510   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
511   my $query;
512   if (exists $attrs->{key}) {
513     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
514     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
515     $query = $self->_add_alias($unique_query, $alias);
516   }
517   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
518     # This means that we got here after a merger of relationship conditions
519     # in ::Relationship::Base::search_related (the row method), and furthermore
520     # the relationship is of the 'single' type. This means that the condition
521     # provided by the relationship (already attached to $self) is sufficient,
522     # as there can be only one row in the databse that would satisfy the 
523     # relationship
524   }
525   else {
526     my @unique_queries = $self->_unique_queries($input_query, $attrs);
527     $query = @unique_queries
528       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
529       : $self->_add_alias($input_query, $alias);
530   }
531
532   # Run the query
533   my $rs = $self->search ($query, $attrs);
534   if (keys %{$rs->_resolved_attrs->{collapse}}) {
535     my $row = $rs->next;
536     carp "Query returned more than one row" if $rs->next;
537     return $row;
538   }
539   else {
540     return $rs->single;
541   }
542 }
543
544 # _add_alias
545 #
546 # Add the specified alias to the specified query hash. A copy is made so the
547 # original query is not modified.
548
549 sub _add_alias {
550   my ($self, $query, $alias) = @_;
551
552   my %aliased = %$query;
553   foreach my $col (grep { ! m/\./ } keys %aliased) {
554     $aliased{"$alias.$col"} = delete $aliased{$col};
555   }
556
557   return \%aliased;
558 }
559
560 # _unique_queries
561 #
562 # Build a list of queries which satisfy unique constraints.
563
564 sub _unique_queries {
565   my ($self, $query, $attrs) = @_;
566
567   my @constraint_names = exists $attrs->{key}
568     ? ($attrs->{key})
569     : $self->result_source->unique_constraint_names;
570
571   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
572   my $num_where = scalar keys %$where;
573
574   my (@unique_queries, %seen_column_combinations);
575   foreach my $name (@constraint_names) {
576     my @constraint_cols = $self->result_source->unique_constraint_columns($name);
577
578     my $constraint_sig = join "\x00", sort @constraint_cols;
579     next if $seen_column_combinations{$constraint_sig}++;
580
581     my $unique_query = $self->_build_unique_query($query, \@constraint_cols);
582
583     my $num_cols = scalar @constraint_cols;
584     my $num_query = scalar keys %$unique_query;
585
586     my $total = $num_query + $num_where;
587     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
588       # The query is either unique on its own or is unique in combination with
589       # the existing where clause
590       push @unique_queries, $unique_query;
591     }
592   }
593
594   return @unique_queries;
595 }
596
597 # _build_unique_query
598 #
599 # Constrain the specified query hash based on the specified column names.
600
601 sub _build_unique_query {
602   my ($self, $query, $unique_cols) = @_;
603
604   return {
605     map  { $_ => $query->{$_} }
606     grep { exists $query->{$_} }
607       @$unique_cols
608   };
609 }
610
611 =head2 search_related
612
613 =over 4
614
615 =item Arguments: $rel, $cond, \%attrs?
616
617 =item Return Value: $new_resultset
618
619 =back
620
621   $new_rs = $cd_rs->search_related('artist', {
622     name => 'Emo-R-Us',
623   });
624
625 Searches the specified relationship, optionally specifying a condition and
626 attributes for matching records. See L</ATTRIBUTES> for more information.
627
628 =cut
629
630 sub search_related {
631   return shift->related_resultset(shift)->search(@_);
632 }
633
634 =head2 search_related_rs
635
636 This method works exactly the same as search_related, except that
637 it guarantees a restultset, even in list context.
638
639 =cut
640
641 sub search_related_rs {
642   return shift->related_resultset(shift)->search_rs(@_);
643 }
644
645 =head2 cursor
646
647 =over 4
648
649 =item Arguments: none
650
651 =item Return Value: $cursor
652
653 =back
654
655 Returns a storage-driven cursor to the given resultset. See
656 L<DBIx::Class::Cursor> for more information.
657
658 =cut
659
660 sub cursor {
661   my ($self) = @_;
662
663   my $attrs = $self->_resolved_attrs_copy;
664
665   return $self->{cursor}
666     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
667           $attrs->{where},$attrs);
668 }
669
670 =head2 single
671
672 =over 4
673
674 =item Arguments: $cond?
675
676 =item Return Value: $row_object?
677
678 =back
679
680   my $cd = $schema->resultset('CD')->single({ year => 2001 });
681
682 Inflates the first result without creating a cursor if the resultset has
683 any records in it; if not returns nothing. Used by L</find> as a lean version of
684 L</search>.
685
686 While this method can take an optional search condition (just like L</search>)
687 being a fast-code-path it does not recognize search attributes. If you need to
688 add extra joins or similar, call L</search> and then chain-call L</single> on the
689 L<DBIx::Class::ResultSet> returned.
690
691 =over
692
693 =item B<Note>
694
695 As of 0.08100, this method enforces the assumption that the preceeding
696 query returns only one row. If more than one row is returned, you will receive
697 a warning:
698
699   Query returned more than one row
700
701 In this case, you should be using L</next> or L</find> instead, or if you really
702 know what you are doing, use the L</rows> attribute to explicitly limit the size
703 of the resultset.
704
705 This method will also throw an exception if it is called on a resultset prefetching
706 has_many, as such a prefetch implies fetching multiple rows from the database in
707 order to assemble the resulting object.
708
709 =back
710
711 =cut
712
713 sub single {
714   my ($self, $where) = @_;
715   if(@_ > 2) {
716       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
717   }
718
719   my $attrs = $self->_resolved_attrs_copy;
720
721   if (keys %{$attrs->{collapse}}) {
722     $self->throw_exception(
723       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
724     );
725   }
726
727   if ($where) {
728     if (defined $attrs->{where}) {
729       $attrs->{where} = {
730         '-and' =>
731             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
732                $where, delete $attrs->{where} ]
733       };
734     } else {
735       $attrs->{where} = $where;
736     }
737   }
738
739 #  XXX: Disabled since it doesn't infer uniqueness in all cases
740 #  unless ($self->_is_unique_query($attrs->{where})) {
741 #    carp "Query not guaranteed to return a single row"
742 #      . "; please declare your unique constraints or use search instead";
743 #  }
744
745   my @data = $self->result_source->storage->select_single(
746     $attrs->{from}, $attrs->{select},
747     $attrs->{where}, $attrs
748   );
749
750   return (@data ? ($self->_construct_object(@data))[0] : undef);
751 }
752
753
754 # _is_unique_query
755 #
756 # Try to determine if the specified query is guaranteed to be unique, based on
757 # the declared unique constraints.
758
759 sub _is_unique_query {
760   my ($self, $query) = @_;
761
762   my $collapsed = $self->_collapse_query($query);
763   my $alias = $self->{attrs}{alias};
764
765   foreach my $name ($self->result_source->unique_constraint_names) {
766     my @unique_cols = map {
767       "$alias.$_"
768     } $self->result_source->unique_constraint_columns($name);
769
770     # Count the values for each unique column
771     my %seen = map { $_ => 0 } @unique_cols;
772
773     foreach my $key (keys %$collapsed) {
774       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
775       next unless exists $seen{$aliased};  # Additional constraints are okay
776       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
777     }
778
779     # If we get 0 or more than 1 value for a column, it's not necessarily unique
780     return 1 unless grep { $_ != 1 } values %seen;
781   }
782
783   return 0;
784 }
785
786 # _collapse_query
787 #
788 # Recursively collapse the query, accumulating values for each column.
789
790 sub _collapse_query {
791   my ($self, $query, $collapsed) = @_;
792
793   $collapsed ||= {};
794
795   if (ref $query eq 'ARRAY') {
796     foreach my $subquery (@$query) {
797       next unless ref $subquery;  # -or
798       $collapsed = $self->_collapse_query($subquery, $collapsed);
799     }
800   }
801   elsif (ref $query eq 'HASH') {
802     if (keys %$query and (keys %$query)[0] eq '-and') {
803       foreach my $subquery (@{$query->{-and}}) {
804         $collapsed = $self->_collapse_query($subquery, $collapsed);
805       }
806     }
807     else {
808       foreach my $col (keys %$query) {
809         my $value = $query->{$col};
810         $collapsed->{$col}{$value}++;
811       }
812     }
813   }
814
815   return $collapsed;
816 }
817
818 =head2 get_column
819
820 =over 4
821
822 =item Arguments: $cond?
823
824 =item Return Value: $resultsetcolumn
825
826 =back
827
828   my $max_length = $rs->get_column('length')->max;
829
830 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
831
832 =cut
833
834 sub get_column {
835   my ($self, $column) = @_;
836   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
837   return $new;
838 }
839
840 =head2 search_like
841
842 =over 4
843
844 =item Arguments: $cond, \%attrs?
845
846 =item Return Value: $resultset (scalar context), @row_objs (list context)
847
848 =back
849
850   # WHERE title LIKE '%blue%'
851   $cd_rs = $rs->search_like({ title => '%blue%'});
852
853 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
854 that this is simply a convenience method retained for ex Class::DBI users.
855 You most likely want to use L</search> with specific operators.
856
857 For more information, see L<DBIx::Class::Manual::Cookbook>.
858
859 This method is deprecated and will be removed in 0.09. Use L</search()>
860 instead. An example conversion is:
861
862   ->search_like({ foo => 'bar' });
863
864   # Becomes
865
866   ->search({ foo => { like => 'bar' } });
867
868 =cut
869
870 sub search_like {
871   my $class = shift;
872   carp (
873     'search_like() is deprecated and will be removed in DBIC version 0.09.'
874    .' Instead use ->search({ x => { -like => "y%" } })'
875    .' (note the outer pair of {}s - they are important!)'
876   );
877   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
878   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
879   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
880   return $class->search($query, { %$attrs });
881 }
882
883 =head2 slice
884
885 =over 4
886
887 =item Arguments: $first, $last
888
889 =item Return Value: $resultset (scalar context), @row_objs (list context)
890
891 =back
892
893 Returns a resultset or object list representing a subset of elements from the
894 resultset slice is called on. Indexes are from 0, i.e., to get the first
895 three records, call:
896
897   my ($one, $two, $three) = $rs->slice(0, 2);
898
899 =cut
900
901 sub slice {
902   my ($self, $min, $max) = @_;
903   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
904   $attrs->{offset} = $self->{attrs}{offset} || 0;
905   $attrs->{offset} += $min;
906   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
907   return $self->search(undef(), $attrs);
908   #my $slice = (ref $self)->new($self->result_source, $attrs);
909   #return (wantarray ? $slice->all : $slice);
910 }
911
912 =head2 next
913
914 =over 4
915
916 =item Arguments: none
917
918 =item Return Value: $result?
919
920 =back
921
922 Returns the next element in the resultset (C<undef> is there is none).
923
924 Can be used to efficiently iterate over records in the resultset:
925
926   my $rs = $schema->resultset('CD')->search;
927   while (my $cd = $rs->next) {
928     print $cd->title;
929   }
930
931 Note that you need to store the resultset object, and call C<next> on it.
932 Calling C<< resultset('Table')->next >> repeatedly will always return the
933 first record from the resultset.
934
935 =cut
936
937 sub next {
938   my ($self) = @_;
939   if (my $cache = $self->get_cache) {
940     $self->{all_cache_position} ||= 0;
941     return $cache->[$self->{all_cache_position}++];
942   }
943   if ($self->{attrs}{cache}) {
944     $self->{all_cache_position} = 1;
945     return ($self->all)[0];
946   }
947   if ($self->{stashed_objects}) {
948     my $obj = shift(@{$self->{stashed_objects}});
949     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
950     return $obj;
951   }
952   my @row = (
953     exists $self->{stashed_row}
954       ? @{delete $self->{stashed_row}}
955       : $self->cursor->next
956   );
957   return undef unless (@row);
958   my ($row, @more) = $self->_construct_object(@row);
959   $self->{stashed_objects} = \@more if @more;
960   return $row;
961 }
962
963 sub _construct_object {
964   my ($self, @row) = @_;
965
966   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
967     or return ();
968   my @new = $self->result_class->inflate_result($self->result_source, @$info);
969   @new = $self->{_attrs}{record_filter}->(@new)
970     if exists $self->{_attrs}{record_filter};
971   return @new;
972 }
973
974 # _unflatten_result takes a row hashref which looks like this:
975 # $VAR1 = {
976 #   'cd.artist.artistid' => '1',
977 #   'cd.artist' => '1',
978 #   'cd_id' => '1',
979 #   'cd.genreid' => undef,
980 #   'cd.year' => '1999',
981 #   'cd.title' => 'Spoonful of bees',
982 #   'cd.single_track' => undef,
983 #   'cd.artist.name' => 'Caterwauler McCrae',
984 #   'cd.artist.rank' => '13',
985 #   'cd.artist.charfield' => undef,
986 #   'cd.cdid' => '1'
987 # };
988
989 # and generates the following structure:
990
991 # $VAR1 = [
992 #   {
993 #     'cd_id' => '1'
994 #   },
995 #   {
996 #     'cd' => [
997 #       {
998 #         'single_track' => undef,
999 #         'cdid' => '1',
1000 #         'artist' => '1',
1001 #         'title' => 'Spoonful of bees',
1002 #         'year' => '1999',
1003 #         'genreid' => undef
1004 #       },
1005 #       {
1006 #         'artist' => [
1007 #           {
1008 #             'artistid' => '1',
1009 #             'charfield' => undef,
1010 #             'name' => 'Caterwauler McCrae',
1011 #             'rank' => '13'
1012 #           }
1013 #         ]
1014 #       }
1015 #     ]
1016 #   }
1017 # ];  
1018
1019 # It returns one row object which consists of an arrayref with two
1020 # elements. The first contains the plain column data, the second 
1021 # contains the data of relationships. Those are row arrayrefs, themselves.
1022
1023 # it's a recursive function. It needs to request the relationship_info
1024 # to decide whether to put the data of a relationship in a hashref
1025 # (i.e. belongs_to) or an arrayref (i.e. has_many).
1026
1027 sub _unflatten_result {
1028     my ( $self, $row ) = @_;
1029
1030     my $columns = {};
1031     my $rels    = {};
1032
1033     foreach my $column ( sort keys %$row ) {
1034         if ( $column =~ /^(.*?)\.(.*)$/ ) {
1035             $rels->{$1} ||= {};
1036             $rels->{$1}->{$2} = $row->{$column};
1037         }
1038         else {
1039             $columns->{$column} = $row->{$column};
1040         }
1041     }
1042
1043     foreach my $rel ( sort keys %$rels ) {
1044         my $rel_info = $self->result_source->relationship_info($rel);
1045         $rels->{$rel} =
1046           $self->related_resultset($rel)->_unflatten_result( $rels->{$rel} );
1047         $rels->{$rel} = [ $rels->{$rel} ]
1048           if ( $rel_info->{attrs}->{accessor} eq 'multi' );
1049     }
1050
1051     return keys %$rels ? [ $columns, $rels ] : [$columns];
1052 }
1053
1054 # two arguments: $as_proto is an arrayref of column names,
1055 # $row_ref is an arrayref of the data. If none of the row data
1056 # is defined we return undef (that's copied from the old
1057 # _collapse_result). Next we decide whether we need to collapse
1058 # the resultset (i.e. we prefetch something) or not. $collapse
1059 # indicates that. The do-while loop will run once if we do not need
1060 # to collapse the result and will run as long as _merge_result returns
1061 # a true value. It will return undef if the current added row does not
1062 # match the previous row. A bit of stashing and cursor magic is
1063 # required so that the cursor is not mixed up.
1064
1065 # "$rows" is a bit misleading. In the end, there should only be one
1066 # element in this arrayref. 
1067
1068 sub _collapse_result {
1069     my ( $self, $as_proto, $row_ref ) = @_;
1070     my $has_def;
1071     for (@$row_ref) {
1072         if ( defined $_ ) {
1073             $has_def++;
1074             last;
1075         }
1076     }
1077     return undef unless $has_def;
1078
1079     my $collapse = keys %{ $self->{_attrs}{collapse} || {} };
1080     my $rows     = [];
1081     my @row      = @$row_ref;
1082     do {
1083         my $i = 0;
1084         my $row = { map { $_ => $row[ $i++ ] } @$as_proto };
1085         $row = $self->_unflatten_result($row);
1086         unless ( scalar @$rows ) {
1087             push( @$rows, $row );
1088         }
1089         $collapse = undef unless ( $self->_merge_result( $rows, $row ) );
1090       } while (
1091         $collapse
1092         && do { @row = $self->cursor->next; $self->{stashed_row} = \@row if @row; }
1093       );
1094
1095     return $rows->[0];
1096
1097 }
1098
1099 # _merge_result accepts an arrayref of rows objects (again, an arrayref of two elements)
1100 # and a row object which should be merged into the first object.
1101 # First we try to find out whether $row is already in $rows. If this is the case
1102 # we try to merge them by iteration through their relationship data. We call
1103 # _merge_result again on them, so they get merged.
1104
1105 # If we don't find the $row in $rows, we append it to $rows and return undef.
1106 # _merge_result returns 1 otherwise (i.e. $row has been found in $rows).
1107
1108 sub _merge_result {
1109     my ( $self, $rows, $row ) = @_;
1110     my ( $columns, $rels ) = @$row;
1111     my $found = undef;
1112     foreach my $seen (@$rows) {
1113         my $match = 1;
1114         foreach my $column ( keys %$columns ) {
1115             if (   defined $seen->[0]->{$column} ^ defined $columns->{$column}
1116                 or defined $columns->{$column}
1117                 && $seen->[0]->{$column} ne $columns->{$column} )
1118             {
1119
1120                 $match = 0;
1121                 last;
1122             }
1123         }
1124         if ($match) {
1125             $found = $seen;
1126             last;
1127         }
1128     }
1129     if ($found) {
1130         foreach my $rel ( keys %$rels ) {
1131             my $old_rows = $found->[1]->{$rel};
1132             $self->_merge_result(
1133                 ref $found->[1]->{$rel}->[0] eq 'HASH' ? [ $found->[1]->{$rel} ]
1134                 : $found->[1]->{$rel},
1135                 ref $rels->{$rel}->[0] eq 'HASH' ? [ $rels->{$rel}->[0], $rels->{$rel}->[1] ]
1136                 : $rels->{$rel}->[0]
1137             );
1138
1139         }
1140
1141     }
1142     else {
1143         push( @$rows, $row );
1144         return undef;
1145     }
1146
1147     return 1;
1148 }
1149
1150
1151 =head2 result_source
1152
1153 =over 4
1154
1155 =item Arguments: $result_source?
1156
1157 =item Return Value: $result_source
1158
1159 =back
1160
1161 An accessor for the primary ResultSource object from which this ResultSet
1162 is derived.
1163
1164 =head2 result_class
1165
1166 =over 4
1167
1168 =item Arguments: $result_class?
1169
1170 =item Return Value: $result_class
1171
1172 =back
1173
1174 An accessor for the class to use when creating row objects. Defaults to
1175 C<< result_source->result_class >> - which in most cases is the name of the
1176 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1177
1178 Note that changing the result_class will also remove any components
1179 that were originally loaded in the source class via
1180 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1181 in the original source class will not run.
1182
1183 =cut
1184
1185 sub result_class {
1186   my ($self, $result_class) = @_;
1187   if ($result_class) {
1188     $self->ensure_class_loaded($result_class);
1189     $self->_result_class($result_class);
1190   }
1191   $self->_result_class;
1192 }
1193
1194 =head2 count
1195
1196 =over 4
1197
1198 =item Arguments: $cond, \%attrs??
1199
1200 =item Return Value: $count
1201
1202 =back
1203
1204 Performs an SQL C<COUNT> with the same query as the resultset was built
1205 with to find the number of elements. Passing arguments is equivalent to
1206 C<< $rs->search ($cond, \%attrs)->count >>
1207
1208 =cut
1209
1210 sub count {
1211   my $self = shift;
1212   return $self->search(@_)->count if @_ and defined $_[0];
1213   return scalar @{ $self->get_cache } if $self->get_cache;
1214
1215   my $attrs = $self->_resolved_attrs_copy;
1216
1217   # this is a little optimization - it is faster to do the limit
1218   # adjustments in software, instead of a subquery
1219   my $rows = delete $attrs->{rows};
1220   my $offset = delete $attrs->{offset};
1221
1222   my $crs;
1223   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1224     $crs = $self->_count_subq_rs ($attrs);
1225   }
1226   else {
1227     $crs = $self->_count_rs ($attrs);
1228   }
1229   my $count = $crs->next;
1230
1231   $count -= $offset if $offset;
1232   $count = $rows if $rows and $rows < $count;
1233   $count = 0 if ($count < 0);
1234
1235   return $count;
1236 }
1237
1238 =head2 count_rs
1239
1240 =over 4
1241
1242 =item Arguments: $cond, \%attrs??
1243
1244 =item Return Value: $count_rs
1245
1246 =back
1247
1248 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1249 This can be very handy for subqueries:
1250
1251   ->search( { amount => $some_rs->count_rs->as_query } )
1252
1253 As with regular resultsets the SQL query will be executed only after
1254 the resultset is accessed via L</next> or L</all>. That would return
1255 the same single value obtainable via L</count>.
1256
1257 =cut
1258
1259 sub count_rs {
1260   my $self = shift;
1261   return $self->search(@_)->count_rs if @_;
1262
1263   # this may look like a lack of abstraction (count() does about the same)
1264   # but in fact an _rs *must* use a subquery for the limits, as the
1265   # software based limiting can not be ported if this $rs is to be used
1266   # in a subquery itself (i.e. ->as_query)
1267   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1268     return $self->_count_subq_rs;
1269   }
1270   else {
1271     return $self->_count_rs;
1272   }
1273 }
1274
1275 #
1276 # returns a ResultSetColumn object tied to the count query
1277 #
1278 sub _count_rs {
1279   my ($self, $attrs) = @_;
1280
1281   my $rsrc = $self->result_source;
1282   $attrs ||= $self->_resolved_attrs;
1283
1284   my $tmp_attrs = { %$attrs };
1285
1286   # take off any limits, record_filter is cdbi, and no point of ordering a count 
1287   delete $tmp_attrs->{$_} for (qw/select as rows offset order_by record_filter/);
1288
1289   # overwrite the selector (supplied by the storage)
1290   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
1291   $tmp_attrs->{as} = 'count';
1292
1293   # read the comment on top of the actual function to see what this does
1294   $tmp_attrs->{from} = $self->_switch_to_inner_join_if_needed (
1295     $tmp_attrs->{from}, $tmp_attrs->{alias}
1296   );
1297
1298   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1299
1300   return $tmp_rs;
1301 }
1302
1303 #
1304 # same as above but uses a subquery
1305 #
1306 sub _count_subq_rs {
1307   my ($self, $attrs) = @_;
1308
1309   my $rsrc = $self->result_source;
1310   $attrs ||= $self->_resolved_attrs_copy;
1311
1312   my $sub_attrs = { %$attrs };
1313
1314   # extra selectors do not go in the subquery and there is no point of ordering it
1315   delete $sub_attrs->{$_} for qw/collapse select _prefetch_select as order_by/;
1316
1317   # if we prefetch, we group_by primary keys only as this is what we would get out
1318   # of the rs via ->next/->all. We DO WANT to clobber old group_by regardless
1319   if ( keys %{$attrs->{collapse}} ) {
1320     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->primary_columns) ]
1321   }
1322
1323   $sub_attrs->{select} = $rsrc->storage->_subq_count_select ($rsrc, $sub_attrs);
1324
1325   # read the comment on top of the actual function to see what this does
1326   $sub_attrs->{from} = $self->_switch_to_inner_join_if_needed (
1327     $sub_attrs->{from}, $sub_attrs->{alias}
1328   );
1329
1330   # this is so that ordering can be thrown away in things like Top limit
1331   $sub_attrs->{-for_count_only} = 1;
1332
1333   my $sub_rs = $rsrc->resultset_class->new ($rsrc, $sub_attrs);
1334
1335   $attrs->{from} = [{
1336     -alias => 'count_subq',
1337     -source_handle => $rsrc->handle,
1338     count_subq => $sub_rs->as_query,
1339   }];
1340
1341   # the subquery replaces this
1342   delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
1343
1344   return $self->_count_rs ($attrs);
1345 }
1346
1347
1348 # The DBIC relationship chaining implementation is pretty simple - every
1349 # new related_relationship is pushed onto the {from} stack, and the {select}
1350 # window simply slides further in. This means that when we count somewhere
1351 # in the middle, we got to make sure that everything in the join chain is an
1352 # actual inner join, otherwise the count will come back with unpredictable
1353 # results (a resultset may be generated with _some_ rows regardless of if
1354 # the relation which the $rs currently selects has rows or not). E.g.
1355 # $artist_rs->cds->count - normally generates:
1356 # SELECT COUNT( * ) FROM artist me LEFT JOIN cd cds ON cds.artist = me.artistid
1357 # which actually returns the number of artists * (number of cds || 1)
1358 #
1359 # So what we do here is crawl {from}, determine if the current alias is at
1360 # the top of the stack, and if not - make sure the chain is inner-joined down
1361 # to the root.
1362 #
1363 sub _switch_to_inner_join_if_needed {
1364   my ($self, $from, $alias) = @_;
1365
1366   # subqueries and other oddness is naturally not supported
1367   return $from if (
1368     ref $from ne 'ARRAY'
1369       ||
1370     @$from <= 1
1371       ||
1372     ref $from->[0] ne 'HASH'
1373       ||
1374     ! $from->[0]{-alias}
1375       ||
1376     $from->[0]{-alias} eq $alias
1377   );
1378
1379   my $switch_branch;
1380   JOINSCAN:
1381   for my $j (@{$from}[1 .. $#$from]) {
1382     if ($j->[0]{-alias} eq $alias) {
1383       $switch_branch = $j->[0]{-join_path};
1384       last JOINSCAN;
1385     }
1386   }
1387
1388   # something else went wrong
1389   return $from unless $switch_branch;
1390
1391   # So it looks like we will have to switch some stuff around.
1392   # local() is useless here as we will be leaving the scope
1393   # anyway, and deep cloning is just too fucking expensive
1394   # So replace the inner hashref manually
1395   my @new_from = ($from->[0]);
1396   my $sw_idx = { map { $_ => 1 } @$switch_branch };
1397
1398   for my $j (@{$from}[1 .. $#$from]) {
1399     my $jalias = $j->[0]{-alias};
1400
1401     if ($sw_idx->{$jalias}) {
1402       my %attrs = %{$j->[0]};
1403       delete $attrs{-join_type};
1404       push @new_from, [
1405         \%attrs,
1406         @{$j}[ 1 .. $#$j ],
1407       ];
1408     }
1409     else {
1410       push @new_from, $j;
1411     }
1412   }
1413
1414   return \@new_from;
1415 }
1416
1417
1418 sub _bool {
1419   return 1;
1420 }
1421
1422 =head2 count_literal
1423
1424 =over 4
1425
1426 =item Arguments: $sql_fragment, @bind_values
1427
1428 =item Return Value: $count
1429
1430 =back
1431
1432 Counts the results in a literal query. Equivalent to calling L</search_literal>
1433 with the passed arguments, then L</count>.
1434
1435 =cut
1436
1437 sub count_literal { shift->search_literal(@_)->count; }
1438
1439 =head2 all
1440
1441 =over 4
1442
1443 =item Arguments: none
1444
1445 =item Return Value: @objects
1446
1447 =back
1448
1449 Returns all elements in the resultset. Called implicitly if the resultset
1450 is returned in list context.
1451
1452 =cut
1453
1454 sub all {
1455   my $self = shift;
1456   if(@_) {
1457       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1458   }
1459
1460   return @{ $self->get_cache } if $self->get_cache;
1461
1462   my @obj;
1463
1464   if (keys %{$self->_resolved_attrs->{collapse}}) {
1465     # Using $self->cursor->all is really just an optimisation.
1466     # If we're collapsing has_many prefetches it probably makes
1467     # very little difference, and this is cleaner than hacking
1468     # _construct_object to survive the approach
1469     $self->cursor->reset;
1470     my @row = $self->cursor->next;
1471     while (@row) {
1472       push(@obj, $self->_construct_object(@row));
1473       @row = (exists $self->{stashed_row}
1474                ? @{delete $self->{stashed_row}}
1475                : $self->cursor->next);
1476     }
1477   } else {
1478     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1479   }
1480
1481   $self->set_cache(\@obj) if $self->{attrs}{cache};
1482
1483   return @obj;
1484 }
1485
1486 =head2 reset
1487
1488 =over 4
1489
1490 =item Arguments: none
1491
1492 =item Return Value: $self
1493
1494 =back
1495
1496 Resets the resultset's cursor, so you can iterate through the elements again.
1497 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1498 another query.
1499
1500 =cut
1501
1502 sub reset {
1503   my ($self) = @_;
1504   delete $self->{_attrs} if exists $self->{_attrs};
1505   $self->{all_cache_position} = 0;
1506   $self->cursor->reset;
1507   return $self;
1508 }
1509
1510 =head2 first
1511
1512 =over 4
1513
1514 =item Arguments: none
1515
1516 =item Return Value: $object?
1517
1518 =back
1519
1520 Resets the resultset and returns an object for the first result (if the
1521 resultset returns anything).
1522
1523 =cut
1524
1525 sub first {
1526   return $_[0]->reset->next;
1527 }
1528
1529
1530 # _rs_update_delete
1531 #
1532 # Determines whether and what type of subquery is required for the $rs operation.
1533 # If grouping is necessary either supplies its own, or verifies the current one
1534 # After all is done delegates to the proper storage method.
1535
1536 sub _rs_update_delete {
1537   my ($self, $op, $values) = @_;
1538
1539   my $rsrc = $self->result_source;
1540
1541   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1542   my $needs_subq = $self->_has_resolved_attr (qw/row offset/);
1543
1544   if ($needs_group_by_subq or $needs_subq) {
1545
1546     # make a new $rs selecting only the PKs (that's all we really need)
1547     my $attrs = $self->_resolved_attrs_copy;
1548
1549     delete $attrs->{$_} for qw/collapse select as/;
1550     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
1551
1552     if ($needs_group_by_subq) {
1553       # make sure no group_by was supplied, or if there is one - make sure it matches
1554       # the columns compiled above perfectly. Anything else can not be sanely executed
1555       # on most databases so croak right then and there
1556
1557       if (my $g = $attrs->{group_by}) {
1558         my @current_group_by = map
1559           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1560           @$g
1561         ;
1562
1563         if (
1564           join ("\x00", sort @current_group_by)
1565             ne
1566           join ("\x00", sort @{$attrs->{columns}} )
1567         ) {
1568           $self->throw_exception (
1569             "You have just attempted a $op operation on a resultset which does group_by"
1570             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1571             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1572             . ' kind of queries. Please retry the operation with a modified group_by or'
1573             . ' without using one at all.'
1574           );
1575         }
1576       }
1577       else {
1578         $attrs->{group_by} = $attrs->{columns};
1579       }
1580     }
1581
1582     my $subrs = (ref $self)->new($rsrc, $attrs);
1583
1584     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1585   }
1586   else {
1587     return $rsrc->storage->$op(
1588       $rsrc,
1589       $op eq 'update' ? $values : (),
1590       $self->_cond_for_update_delete,
1591     );
1592   }
1593 }
1594
1595
1596 # _cond_for_update_delete
1597 #
1598 # update/delete require the condition to be modified to handle
1599 # the differing SQL syntax available.  This transforms the $self->{cond}
1600 # appropriately, returning the new condition.
1601
1602 sub _cond_for_update_delete {
1603   my ($self, $full_cond) = @_;
1604   my $cond = {};
1605
1606   $full_cond ||= $self->{cond};
1607   # No-op. No condition, we're updating/deleting everything
1608   return $cond unless ref $full_cond;
1609
1610   if (ref $full_cond eq 'ARRAY') {
1611     $cond = [
1612       map {
1613         my %hash;
1614         foreach my $key (keys %{$_}) {
1615           $key =~ /([^.]+)$/;
1616           $hash{$1} = $_->{$key};
1617         }
1618         \%hash;
1619       } @{$full_cond}
1620     ];
1621   }
1622   elsif (ref $full_cond eq 'HASH') {
1623     if ((keys %{$full_cond})[0] eq '-and') {
1624       $cond->{-and} = [];
1625       my @cond = @{$full_cond->{-and}};
1626        for (my $i = 0; $i < @cond; $i++) {
1627         my $entry = $cond[$i];
1628         my $hash;
1629         if (ref $entry eq 'HASH') {
1630           $hash = $self->_cond_for_update_delete($entry);
1631         }
1632         else {
1633           $entry =~ /([^.]+)$/;
1634           $hash->{$1} = $cond[++$i];
1635         }
1636         push @{$cond->{-and}}, $hash;
1637       }
1638     }
1639     else {
1640       foreach my $key (keys %{$full_cond}) {
1641         $key =~ /([^.]+)$/;
1642         $cond->{$1} = $full_cond->{$key};
1643       }
1644     }
1645   }
1646   else {
1647     $self->throw_exception("Can't update/delete on resultset with condition unless hash or array");
1648   }
1649
1650   return $cond;
1651 }
1652
1653
1654 =head2 update
1655
1656 =over 4
1657
1658 =item Arguments: \%values
1659
1660 =item Return Value: $storage_rv
1661
1662 =back
1663
1664 Sets the specified columns in the resultset to the supplied values in a
1665 single query. Return value will be true if the update succeeded or false
1666 if no records were updated; exact type of success value is storage-dependent.
1667
1668 =cut
1669
1670 sub update {
1671   my ($self, $values) = @_;
1672   $self->throw_exception('Values for update must be a hash')
1673     unless ref $values eq 'HASH';
1674
1675   return $self->_rs_update_delete ('update', $values);
1676 }
1677
1678 =head2 update_all
1679
1680 =over 4
1681
1682 =item Arguments: \%values
1683
1684 =item Return Value: 1
1685
1686 =back
1687
1688 Fetches all objects and updates them one at a time. Note that C<update_all>
1689 will run DBIC cascade triggers, while L</update> will not.
1690
1691 =cut
1692
1693 sub update_all {
1694   my ($self, $values) = @_;
1695   $self->throw_exception('Values for update_all must be a hash')
1696     unless ref $values eq 'HASH';
1697   foreach my $obj ($self->all) {
1698     $obj->set_columns($values)->update;
1699   }
1700   return 1;
1701 }
1702
1703 =head2 delete
1704
1705 =over 4
1706
1707 =item Arguments: none
1708
1709 =item Return Value: $storage_rv
1710
1711 =back
1712
1713 Deletes the contents of the resultset from its result source. Note that this
1714 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1715 to run. See also L<DBIx::Class::Row/delete>.
1716
1717 Return value will be the amount of rows deleted; exact type of return value
1718 is storage-dependent.
1719
1720 =cut
1721
1722 sub delete {
1723   my $self = shift;
1724   $self->throw_exception('delete does not accept any arguments')
1725     if @_;
1726
1727   return $self->_rs_update_delete ('delete');
1728 }
1729
1730 =head2 delete_all
1731
1732 =over 4
1733
1734 =item Arguments: none
1735
1736 =item Return Value: 1
1737
1738 =back
1739
1740 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1741 will run DBIC cascade triggers, while L</delete> will not.
1742
1743 =cut
1744
1745 sub delete_all {
1746   my $self = shift;
1747   $self->throw_exception('delete_all does not accept any arguments')
1748     if @_;
1749
1750   $_->delete for $self->all;
1751   return 1;
1752 }
1753
1754 =head2 populate
1755
1756 =over 4
1757
1758 =item Arguments: \@data;
1759
1760 =back
1761
1762 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1763 For the arrayref of hashrefs style each hashref should be a structure suitable
1764 forsubmitting to a $resultset->create(...) method.
1765
1766 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1767 to insert the data, as this is a faster method.
1768
1769 Otherwise, each set of data is inserted into the database using
1770 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1771 accumulated into an array. The array itself, or an array reference
1772 is returned depending on scalar or list context.
1773
1774 Example:  Assuming an Artist Class that has many CDs Classes relating:
1775
1776   my $Artist_rs = $schema->resultset("Artist");
1777
1778   ## Void Context Example
1779   $Artist_rs->populate([
1780      { artistid => 4, name => 'Manufactured Crap', cds => [
1781         { title => 'My First CD', year => 2006 },
1782         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1783       ],
1784      },
1785      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1786         { title => 'My parents sold me to a record company' ,year => 2005 },
1787         { title => 'Why Am I So Ugly?', year => 2006 },
1788         { title => 'I Got Surgery and am now Popular', year => 2007 }
1789       ],
1790      },
1791   ]);
1792
1793   ## Array Context Example
1794   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1795     { name => "Artist One"},
1796     { name => "Artist Two"},
1797     { name => "Artist Three", cds=> [
1798     { title => "First CD", year => 2007},
1799     { title => "Second CD", year => 2008},
1800   ]}
1801   ]);
1802
1803   print $ArtistOne->name; ## response is 'Artist One'
1804   print $ArtistThree->cds->count ## reponse is '2'
1805
1806 For the arrayref of arrayrefs style,  the first element should be a list of the
1807 fieldsnames to which the remaining elements are rows being inserted.  For
1808 example:
1809
1810   $Arstist_rs->populate([
1811     [qw/artistid name/],
1812     [100, 'A Formally Unknown Singer'],
1813     [101, 'A singer that jumped the shark two albums ago'],
1814     [102, 'An actually cool singer.'],
1815   ]);
1816
1817 Please note an important effect on your data when choosing between void and
1818 wantarray context. Since void context goes straight to C<insert_bulk> in
1819 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1820 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1821 create primary keys for you, you will find that your PKs are empty.  In this
1822 case you will have to use the wantarray context in order to create those
1823 values.
1824
1825 =cut
1826
1827 sub populate {
1828   my $self = shift @_;
1829   my $data = ref $_[0][0] eq 'HASH'
1830     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1831     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1832
1833   if(defined wantarray) {
1834     my @created;
1835     foreach my $item (@$data) {
1836       push(@created, $self->create($item));
1837     }
1838     return wantarray ? @created : \@created;
1839   } else {
1840     my ($first, @rest) = @$data;
1841
1842     my @names = grep {!ref $first->{$_}} keys %$first;
1843     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1844     my @pks = $self->result_source->primary_columns;
1845
1846     ## do the belongs_to relationships
1847     foreach my $index (0..$#$data) {
1848
1849       # delegate to create() for any dataset without primary keys with specified relationships
1850       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1851         for my $r (@rels) {
1852           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1853             my @ret = $self->populate($data);
1854             return;
1855           }
1856         }
1857       }
1858
1859       foreach my $rel (@rels) {
1860         next unless ref $data->[$index]->{$rel} eq "HASH";
1861         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1862         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1863         my $related = $result->result_source->_resolve_condition(
1864           $result->result_source->relationship_info($reverse)->{cond},
1865           $self,
1866           $result,
1867         );
1868
1869         delete $data->[$index]->{$rel};
1870         $data->[$index] = {%{$data->[$index]}, %$related};
1871
1872         push @names, keys %$related if $index == 0;
1873       }
1874     }
1875
1876     ## do bulk insert on current row
1877     my @values = map { [ @$_{@names} ] } @$data;
1878
1879     $self->result_source->storage->insert_bulk(
1880       $self->result_source,
1881       \@names,
1882       \@values,
1883     );
1884
1885     ## do the has_many relationships
1886     foreach my $item (@$data) {
1887
1888       foreach my $rel (@rels) {
1889         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1890
1891         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks)
1892      || $self->throw_exception('Cannot find the relating object.');
1893
1894         my $child = $parent->$rel;
1895
1896         my $related = $child->result_source->_resolve_condition(
1897           $parent->result_source->relationship_info($rel)->{cond},
1898           $child,
1899           $parent,
1900         );
1901
1902         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1903         my @populate = map { {%$_, %$related} } @rows_to_add;
1904
1905         $child->populate( \@populate );
1906       }
1907     }
1908   }
1909 }
1910
1911 =head2 _normalize_populate_args ($args)
1912
1913 Private method used by L</populate> to normalize its incoming arguments.  Factored
1914 out in case you want to subclass and accept new argument structures to the
1915 L</populate> method.
1916
1917 =cut
1918
1919 sub _normalize_populate_args {
1920   my ($self, $data) = @_;
1921   my @names = @{shift(@$data)};
1922   my @results_to_create;
1923   foreach my $datum (@$data) {
1924     my %result_to_create;
1925     foreach my $index (0..$#names) {
1926       $result_to_create{$names[$index]} = $$datum[$index];
1927     }
1928     push @results_to_create, \%result_to_create;
1929   }
1930   return \@results_to_create;
1931 }
1932
1933 =head2 pager
1934
1935 =over 4
1936
1937 =item Arguments: none
1938
1939 =item Return Value: $pager
1940
1941 =back
1942
1943 Return Value a L<Data::Page> object for the current resultset. Only makes
1944 sense for queries with a C<page> attribute.
1945
1946 To get the full count of entries for a paged resultset, call
1947 C<total_entries> on the L<Data::Page> object.
1948
1949 =cut
1950
1951 sub pager {
1952   my ($self) = @_;
1953
1954   return $self->{pager} if $self->{pager};
1955
1956   my $attrs = $self->{attrs};
1957   $self->throw_exception("Can't create pager for non-paged rs")
1958     unless $self->{attrs}{page};
1959   $attrs->{rows} ||= 10;
1960
1961   # throw away the paging flags and re-run the count (possibly
1962   # with a subselect) to get the real total count
1963   my $count_attrs = { %$attrs };
1964   delete $count_attrs->{$_} for qw/rows offset page pager/;
1965   my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
1966
1967   return $self->{pager} = Data::Page->new(
1968     $total_count,
1969     $attrs->{rows},
1970     $self->{attrs}{page}
1971   );
1972 }
1973
1974 =head2 page
1975
1976 =over 4
1977
1978 =item Arguments: $page_number
1979
1980 =item Return Value: $rs
1981
1982 =back
1983
1984 Returns a resultset for the $page_number page of the resultset on which page
1985 is called, where each page contains a number of rows equal to the 'rows'
1986 attribute set on the resultset (10 by default).
1987
1988 =cut
1989
1990 sub page {
1991   my ($self, $page) = @_;
1992   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1993 }
1994
1995 =head2 new_result
1996
1997 =over 4
1998
1999 =item Arguments: \%vals
2000
2001 =item Return Value: $rowobject
2002
2003 =back
2004
2005 Creates a new row object in the resultset's result class and returns
2006 it. The row is not inserted into the database at this point, call
2007 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2008 will tell you whether the row object has been inserted or not.
2009
2010 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2011
2012 =cut
2013
2014 sub new_result {
2015   my ($self, $values) = @_;
2016   $self->throw_exception( "new_result needs a hash" )
2017     unless (ref $values eq 'HASH');
2018
2019   my %new;
2020   my $alias = $self->{attrs}{alias};
2021
2022   if (
2023     defined $self->{cond}
2024     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
2025   ) {
2026     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2027     $new{-from_resultset} = [ keys %new ] if keys %new;
2028   } else {
2029     $self->throw_exception(
2030       "Can't abstract implicit construct, condition not a hash"
2031     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
2032
2033     my $collapsed_cond = (
2034       $self->{cond}
2035         ? $self->_collapse_cond($self->{cond})
2036         : {}
2037     );
2038
2039     # precendence must be given to passed values over values inherited from
2040     # the cond, so the order here is important.
2041     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
2042     while( my($col,$value) = each %implied ){
2043       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
2044         $new{$col} = $value->{'='};
2045         next;
2046       }
2047       $new{$col} = $value if $self->_is_deterministic_value($value);
2048     }
2049   }
2050
2051   %new = (
2052     %new,
2053     %{ $self->_remove_alias($values, $alias) },
2054     -source_handle => $self->_source_handle,
2055     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2056   );
2057
2058   return $self->result_class->new(\%new);
2059 }
2060
2061 # _is_deterministic_value
2062 #
2063 # Make an effor to strip non-deterministic values from the condition,
2064 # to make sure new_result chokes less
2065
2066 sub _is_deterministic_value {
2067   my $self = shift;
2068   my $value = shift;
2069   my $ref_type = ref $value;
2070   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
2071   return 1 if Scalar::Util::blessed($value);
2072   return 0;
2073 }
2074
2075 # _has_resolved_attr
2076 #
2077 # determines if the resultset defines at least one
2078 # of the attributes supplied
2079 #
2080 # used to determine if a subquery is neccessary
2081 #
2082 # supports some virtual attributes:
2083 #   -join
2084 #     This will scan for any joins being present on the resultset.
2085 #     It is not a mere key-search but a deep inspection of {from}
2086 #
2087
2088 sub _has_resolved_attr {
2089   my ($self, @attr_names) = @_;
2090
2091   my $attrs = $self->_resolved_attrs;
2092
2093   my %extra_checks;
2094
2095   for my $n (@attr_names) {
2096     if (grep { $n eq $_ } (qw/-join/) ) {
2097       $extra_checks{$n}++;
2098       next;
2099     }
2100
2101     my $attr =  $attrs->{$n};
2102
2103     next if not defined $attr;
2104
2105     if (ref $attr eq 'HASH') {
2106       return 1 if keys %$attr;
2107     }
2108     elsif (ref $attr eq 'ARRAY') {
2109       return 1 if @$attr;
2110     }
2111     else {
2112       return 1 if $attr;
2113     }
2114   }
2115
2116   # a resolved join is expressed as a multi-level from
2117   return 1 if (
2118     $extra_checks{-join}
2119       and
2120     ref $attrs->{from} eq 'ARRAY'
2121       and
2122     @{$attrs->{from}} > 1
2123   );
2124
2125   return 0;
2126 }
2127
2128 # _collapse_cond
2129 #
2130 # Recursively collapse the condition.
2131
2132 sub _collapse_cond {
2133   my ($self, $cond, $collapsed) = @_;
2134
2135   $collapsed ||= {};
2136
2137   if (ref $cond eq 'ARRAY') {
2138     foreach my $subcond (@$cond) {
2139       next unless ref $subcond;  # -or
2140       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2141     }
2142   }
2143   elsif (ref $cond eq 'HASH') {
2144     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2145       foreach my $subcond (@{$cond->{-and}}) {
2146         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2147       }
2148     }
2149     else {
2150       foreach my $col (keys %$cond) {
2151         my $value = $cond->{$col};
2152         $collapsed->{$col} = $value;
2153       }
2154     }
2155   }
2156
2157   return $collapsed;
2158 }
2159
2160 # _remove_alias
2161 #
2162 # Remove the specified alias from the specified query hash. A copy is made so
2163 # the original query is not modified.
2164
2165 sub _remove_alias {
2166   my ($self, $query, $alias) = @_;
2167
2168   my %orig = %{ $query || {} };
2169   my %unaliased;
2170
2171   foreach my $key (keys %orig) {
2172     if ($key !~ /\./) {
2173       $unaliased{$key} = $orig{$key};
2174       next;
2175     }
2176     $unaliased{$1} = $orig{$key}
2177       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2178   }
2179
2180   return \%unaliased;
2181 }
2182
2183 =head2 as_query (EXPERIMENTAL)
2184
2185 =over 4
2186
2187 =item Arguments: none
2188
2189 =item Return Value: \[ $sql, @bind ]
2190
2191 =back
2192
2193 Returns the SQL query and bind vars associated with the invocant.
2194
2195 This is generally used as the RHS for a subquery.
2196
2197 B<NOTE>: This feature is still experimental.
2198
2199 =cut
2200
2201 sub as_query {
2202   my $self = shift;
2203
2204   my $attrs = $self->_resolved_attrs_copy;
2205
2206   # For future use:
2207   #
2208   # in list ctx:
2209   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2210   # $sql also has no wrapping parenthesis in list ctx
2211   #
2212   my $sqlbind = $self->result_source->storage
2213     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2214
2215   return $sqlbind;
2216 }
2217
2218 =head2 find_or_new
2219
2220 =over 4
2221
2222 =item Arguments: \%vals, \%attrs?
2223
2224 =item Return Value: $rowobject
2225
2226 =back
2227
2228   my $artist = $schema->resultset('Artist')->find_or_new(
2229     { artist => 'fred' }, { key => 'artists' });
2230
2231   $cd->cd_to_producer->find_or_new({ producer => $producer },
2232                                    { key => 'primary });
2233
2234 Find an existing record from this resultset, based on its primary
2235 key, or a unique constraint. If none exists, instantiate a new result
2236 object and return it. The object will not be saved into your storage
2237 until you call L<DBIx::Class::Row/insert> on it.
2238
2239 You most likely want this method when looking for existing rows using
2240 a unique constraint that is not the primary key, or looking for
2241 related rows.
2242
2243 If you want objects to be saved immediately, use L</find_or_create>
2244 instead.
2245
2246 B<Note>: Take care when using C<find_or_new> with a table having
2247 columns with default values that you intend to be automatically
2248 supplied by the database (e.g. an auto_increment primary key column).
2249 In normal usage, the value of such columns should NOT be included at
2250 all in the call to C<find_or_new>, even when set to C<undef>.
2251
2252 =cut
2253
2254 sub find_or_new {
2255   my $self     = shift;
2256   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2257   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2258   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2259     return $row;
2260   }
2261   return $self->new_result($hash);
2262 }
2263
2264 =head2 create
2265
2266 =over 4
2267
2268 =item Arguments: \%vals
2269
2270 =item Return Value: a L<DBIx::Class::Row> $object
2271
2272 =back
2273
2274 Attempt to create a single new row or a row with multiple related rows
2275 in the table represented by the resultset (and related tables). This
2276 will not check for duplicate rows before inserting, use
2277 L</find_or_create> to do that.
2278
2279 To create one row for this resultset, pass a hashref of key/value
2280 pairs representing the columns of the table and the values you wish to
2281 store. If the appropriate relationships are set up, foreign key fields
2282 can also be passed an object representing the foreign row, and the
2283 value will be set to its primary key.
2284
2285 To create related objects, pass a hashref of related-object column values
2286 B<keyed on the relationship name>. If the relationship is of type C<multi>
2287 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2288 The process will correctly identify columns holding foreign keys, and will
2289 transparrently populate them from the keys of the corresponding relation.
2290 This can be applied recursively, and will work correctly for a structure
2291 with an arbitrary depth and width, as long as the relationships actually
2292 exists and the correct column data has been supplied.
2293
2294
2295 Instead of hashrefs of plain related data (key/value pairs), you may
2296 also pass new or inserted objects. New objects (not inserted yet, see
2297 L</new>), will be inserted into their appropriate tables.
2298
2299 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2300
2301 Example of creating a new row.
2302
2303   $person_rs->create({
2304     name=>"Some Person",
2305     email=>"somebody@someplace.com"
2306   });
2307
2308 Example of creating a new row and also creating rows in a related C<has_many>
2309 or C<has_one> resultset.  Note Arrayref.
2310
2311   $artist_rs->create(
2312      { artistid => 4, name => 'Manufactured Crap', cds => [
2313         { title => 'My First CD', year => 2006 },
2314         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2315       ],
2316      },
2317   );
2318
2319 Example of creating a new row and also creating a row in a related
2320 C<belongs_to>resultset. Note Hashref.
2321
2322   $cd_rs->create({
2323     title=>"Music for Silly Walks",
2324     year=>2000,
2325     artist => {
2326       name=>"Silly Musician",
2327     }
2328   });
2329
2330 =over
2331
2332 =item WARNING
2333
2334 When subclassing ResultSet never attempt to override this method. Since
2335 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2336 lot of the internals simply never call it, so your override will be
2337 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2338 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2339 L</create> process you need to intervene.
2340
2341 =back
2342
2343 =cut
2344
2345 sub create {
2346   my ($self, $attrs) = @_;
2347   $self->throw_exception( "create needs a hashref" )
2348     unless ref $attrs eq 'HASH';
2349   return $self->new_result($attrs)->insert;
2350 }
2351
2352 =head2 find_or_create
2353
2354 =over 4
2355
2356 =item Arguments: \%vals, \%attrs?
2357
2358 =item Return Value: $rowobject
2359
2360 =back
2361
2362   $cd->cd_to_producer->find_or_create({ producer => $producer },
2363                                       { key => 'primary' });
2364
2365 Tries to find a record based on its primary key or unique constraints; if none
2366 is found, creates one and returns that instead.
2367
2368   my $cd = $schema->resultset('CD')->find_or_create({
2369     cdid   => 5,
2370     artist => 'Massive Attack',
2371     title  => 'Mezzanine',
2372     year   => 2005,
2373   });
2374
2375 Also takes an optional C<key> attribute, to search by a specific key or unique
2376 constraint. For example:
2377
2378   my $cd = $schema->resultset('CD')->find_or_create(
2379     {
2380       artist => 'Massive Attack',
2381       title  => 'Mezzanine',
2382     },
2383     { key => 'cd_artist_title' }
2384   );
2385
2386 B<Note>: Because find_or_create() reads from the database and then
2387 possibly inserts based on the result, this method is subject to a race
2388 condition. Another process could create a record in the table after
2389 the find has completed and before the create has started. To avoid
2390 this problem, use find_or_create() inside a transaction.
2391
2392 B<Note>: Take care when using C<find_or_create> with a table having
2393 columns with default values that you intend to be automatically
2394 supplied by the database (e.g. an auto_increment primary key column).
2395 In normal usage, the value of such columns should NOT be included at
2396 all in the call to C<find_or_create>, even when set to C<undef>.
2397
2398 See also L</find> and L</update_or_create>. For information on how to declare
2399 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2400
2401 =cut
2402
2403 sub find_or_create {
2404   my $self     = shift;
2405   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2406   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2407   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2408     return $row;
2409   }
2410   return $self->create($hash);
2411 }
2412
2413 =head2 update_or_create
2414
2415 =over 4
2416
2417 =item Arguments: \%col_values, { key => $unique_constraint }?
2418
2419 =item Return Value: $rowobject
2420
2421 =back
2422
2423   $resultset->update_or_create({ col => $val, ... });
2424
2425 First, searches for an existing row matching one of the unique constraints
2426 (including the primary key) on the source of this resultset. If a row is
2427 found, updates it with the other given column values. Otherwise, creates a new
2428 row.
2429
2430 Takes an optional C<key> attribute to search on a specific unique constraint.
2431 For example:
2432
2433   # In your application
2434   my $cd = $schema->resultset('CD')->update_or_create(
2435     {
2436       artist => 'Massive Attack',
2437       title  => 'Mezzanine',
2438       year   => 1998,
2439     },
2440     { key => 'cd_artist_title' }
2441   );
2442
2443   $cd->cd_to_producer->update_or_create({
2444     producer => $producer,
2445     name => 'harry',
2446   }, {
2447     key => 'primary,
2448   });
2449
2450
2451 If no C<key> is specified, it searches on all unique constraints defined on the
2452 source, including the primary key.
2453
2454 If the C<key> is specified as C<primary>, it searches only on the primary key.
2455
2456 See also L</find> and L</find_or_create>. For information on how to declare
2457 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2458
2459 B<Note>: Take care when using C<update_or_create> with a table having
2460 columns with default values that you intend to be automatically
2461 supplied by the database (e.g. an auto_increment primary key column).
2462 In normal usage, the value of such columns should NOT be included at
2463 all in the call to C<update_or_create>, even when set to C<undef>.
2464
2465 =cut
2466
2467 sub update_or_create {
2468   my $self = shift;
2469   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2470   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2471
2472   my $row = $self->find($cond, $attrs);
2473   if (defined $row) {
2474     $row->update($cond);
2475     return $row;
2476   }
2477
2478   return $self->create($cond);
2479 }
2480
2481 =head2 update_or_new
2482
2483 =over 4
2484
2485 =item Arguments: \%col_values, { key => $unique_constraint }?
2486
2487 =item Return Value: $rowobject
2488
2489 =back
2490
2491   $resultset->update_or_new({ col => $val, ... });
2492
2493 First, searches for an existing row matching one of the unique constraints
2494 (including the primary key) on the source of this resultset. If a row is
2495 found, updates it with the other given column values. Otherwise, instantiate
2496 a new result object and return it. The object will not be saved into your storage
2497 until you call L<DBIx::Class::Row/insert> on it.
2498
2499 Takes an optional C<key> attribute to search on a specific unique constraint.
2500 For example:
2501
2502   # In your application
2503   my $cd = $schema->resultset('CD')->update_or_new(
2504     {
2505       artist => 'Massive Attack',
2506       title  => 'Mezzanine',
2507       year   => 1998,
2508     },
2509     { key => 'cd_artist_title' }
2510   );
2511
2512   if ($cd->in_storage) {
2513       # the cd was updated
2514   }
2515   else {
2516       # the cd is not yet in the database, let's insert it
2517       $cd->insert;
2518   }
2519
2520 B<Note>: Take care when using C<update_or_new> with a table having
2521 columns with default values that you intend to be automatically
2522 supplied by the database (e.g. an auto_increment primary key column).
2523 In normal usage, the value of such columns should NOT be included at
2524 all in the call to C<update_or_new>, even when set to C<undef>.
2525
2526 See also L</find>, L</find_or_create> and L</find_or_new>.
2527
2528 =cut
2529
2530 sub update_or_new {
2531     my $self  = shift;
2532     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2533     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2534
2535     my $row = $self->find( $cond, $attrs );
2536     if ( defined $row ) {
2537         $row->update($cond);
2538         return $row;
2539     }
2540
2541     return $self->new_result($cond);
2542 }
2543
2544 =head2 get_cache
2545
2546 =over 4
2547
2548 =item Arguments: none
2549
2550 =item Return Value: \@cache_objects?
2551
2552 =back
2553
2554 Gets the contents of the cache for the resultset, if the cache is set.
2555
2556 The cache is populated either by using the L</prefetch> attribute to
2557 L</search> or by calling L</set_cache>.
2558
2559 =cut
2560
2561 sub get_cache {
2562   shift->{all_cache};
2563 }
2564
2565 =head2 set_cache
2566
2567 =over 4
2568
2569 =item Arguments: \@cache_objects
2570
2571 =item Return Value: \@cache_objects
2572
2573 =back
2574
2575 Sets the contents of the cache for the resultset. Expects an arrayref
2576 of objects of the same class as those produced by the resultset. Note that
2577 if the cache is set the resultset will return the cached objects rather
2578 than re-querying the database even if the cache attr is not set.
2579
2580 The contents of the cache can also be populated by using the
2581 L</prefetch> attribute to L</search>.
2582
2583 =cut
2584
2585 sub set_cache {
2586   my ( $self, $data ) = @_;
2587   $self->throw_exception("set_cache requires an arrayref")
2588       if defined($data) && (ref $data ne 'ARRAY');
2589   $self->{all_cache} = $data;
2590 }
2591
2592 =head2 clear_cache
2593
2594 =over 4
2595
2596 =item Arguments: none
2597
2598 =item Return Value: []
2599
2600 =back
2601
2602 Clears the cache for the resultset.
2603
2604 =cut
2605
2606 sub clear_cache {
2607   shift->set_cache(undef);
2608 }
2609
2610 =head2 related_resultset
2611
2612 =over 4
2613
2614 =item Arguments: $relationship_name
2615
2616 =item Return Value: $resultset
2617
2618 =back
2619
2620 Returns a related resultset for the supplied relationship name.
2621
2622   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2623
2624 =cut
2625
2626 sub related_resultset {
2627   my ($self, $rel) = @_;
2628
2629   $self->{related_resultsets} ||= {};
2630   return $self->{related_resultsets}{$rel} ||= do {
2631     my $rel_info = $self->result_source->relationship_info($rel);
2632
2633     $self->throw_exception(
2634       "search_related: result source '" . $self->result_source->source_name .
2635         "' has no such relationship $rel")
2636       unless $rel_info;
2637
2638     my ($from,$seen) = $self->_chain_relationship($rel);
2639
2640     my $join_count = $seen->{$rel};
2641     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2642
2643     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2644     my %attrs = %{$self->{attrs}||{}};
2645     delete @attrs{qw(result_class alias)};
2646
2647     my $new_cache;
2648
2649     if (my $cache = $self->get_cache) {
2650       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2651         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2652                         @$cache ];
2653       }
2654     }
2655
2656     my $rel_source = $self->result_source->related_source($rel);
2657
2658     my $new = do {
2659
2660       # The reason we do this now instead of passing the alias to the
2661       # search_rs below is that if you wrap/overload resultset on the
2662       # source you need to know what alias it's -going- to have for things
2663       # to work sanely (e.g. RestrictWithObject wants to be able to add
2664       # extra query restrictions, and these may need to be $alias.)
2665
2666       my $attrs = $rel_source->resultset_attributes;
2667       local $attrs->{alias} = $alias;
2668
2669       $rel_source->resultset
2670                  ->search_rs(
2671                      undef, {
2672                        %attrs,
2673                        join => undef,
2674                        prefetch => undef,
2675                        select => undef,
2676                        as => undef,
2677                        where => $self->{cond},
2678                        seen_join => $seen,
2679                        from => $from,
2680                    });
2681     };
2682     $new->set_cache($new_cache) if $new_cache;
2683     $new;
2684   };
2685 }
2686
2687 =head2 current_source_alias
2688
2689 =over 4
2690
2691 =item Arguments: none
2692
2693 =item Return Value: $source_alias
2694
2695 =back
2696
2697 Returns the current table alias for the result source this resultset is built
2698 on, that will be used in the SQL query. Usually it is C<me>.
2699
2700 Currently the source alias that refers to the result set returned by a
2701 L</search>/L</find> family method depends on how you got to the resultset: it's
2702 C<me> by default, but eg. L</search_related> aliases it to the related result
2703 source name (and keeps C<me> referring to the original result set). The long
2704 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2705 (and make this method unnecessary).
2706
2707 Thus it's currently necessary to use this method in predefined queries (see
2708 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2709 source alias of the current result set:
2710
2711   # in a result set class
2712   sub modified_by {
2713     my ($self, $user) = @_;
2714
2715     my $me = $self->current_source_alias;
2716
2717     return $self->search(
2718       "$me.modified" => $user->id,
2719     );
2720   }
2721
2722 =cut
2723
2724 sub current_source_alias {
2725   my ($self) = @_;
2726
2727   return ($self->{attrs} || {})->{alias} || 'me';
2728 }
2729
2730 # This code is called by search_related, and makes sure there
2731 # is clear separation between the joins before, during, and
2732 # after the relationship. This information is needed later
2733 # in order to properly resolve prefetch aliases (any alias
2734 # with a relation_chain_depth less than the depth of the
2735 # current prefetch is not considered)
2736 #
2737 # The increments happen in 1/2s to make it easier to correlate the
2738 # join depth with the join path. An integer means a relationship
2739 # specified via a search_related, whereas a fraction means an added
2740 # join/prefetch via attributes
2741 sub _chain_relationship {
2742   my ($self, $rel) = @_;
2743   my $source = $self->result_source;
2744   my $attrs = $self->{attrs};
2745
2746   my $from = [ @{
2747       $attrs->{from}
2748         ||
2749       [{
2750         -source_handle => $source->handle,
2751         -alias => $attrs->{alias},
2752         $attrs->{alias} => $source->from,
2753       }]
2754   }];
2755
2756   my $seen = { %{$attrs->{seen_join} || {} } };
2757   my $jpath = ($attrs->{seen_join} && keys %{$attrs->{seen_join}}) 
2758     ? $from->[-1][0]{-join_path} 
2759     : [];
2760
2761
2762   # we need to take the prefetch the attrs into account before we
2763   # ->_resolve_join as otherwise they get lost - captainL
2764   my $merged = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
2765
2766   my @requested_joins = $source->_resolve_join(
2767     $merged,
2768     $attrs->{alias},
2769     $seen,
2770     $jpath,
2771   );
2772
2773   push @$from, @requested_joins;
2774
2775   $seen->{-relation_chain_depth} += 0.5;
2776
2777   # if $self already had a join/prefetch specified on it, the requested
2778   # $rel might very well be already included. What we do in this case
2779   # is effectively a no-op (except that we bump up the chain_depth on
2780   # the join in question so we could tell it *is* the search_related)
2781   my $already_joined;
2782
2783
2784   # we consider the last one thus reverse
2785   for my $j (reverse @requested_joins) {
2786     if ($rel eq $j->[0]{-join_path}[-1]) {
2787       $j->[0]{-relation_chain_depth} += 0.5;
2788       $already_joined++;
2789       last;
2790     }
2791   }
2792
2793 # alternative way to scan the entire chain - not backwards compatible
2794 #  for my $j (reverse @$from) {
2795 #    next unless ref $j eq 'ARRAY';
2796 #    if ($j->[0]{-join_path} && $j->[0]{-join_path}[-1] eq $rel) {
2797 #      $j->[0]{-relation_chain_depth} += 0.5;
2798 #      $already_joined++;
2799 #      last;
2800 #    }
2801 #  }
2802
2803   unless ($already_joined) {
2804     push @$from, $source->_resolve_join(
2805       $rel,
2806       $attrs->{alias},
2807       $seen,
2808       $jpath,
2809     );
2810   }
2811
2812   $seen->{-relation_chain_depth} += 0.5;
2813
2814   return ($from,$seen);
2815 }
2816
2817 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
2818 sub _resolved_attrs_copy {
2819   my $self = shift;
2820   return { %{$self->_resolved_attrs (@_)} };
2821 }
2822
2823 sub _resolved_attrs {
2824   my $self = shift;
2825   return $self->{_attrs} if $self->{_attrs};
2826
2827   my $attrs  = { %{ $self->{attrs} || {} } };
2828   my $source = $self->result_source;
2829   my $alias  = $attrs->{alias};
2830
2831   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2832   my @colbits;
2833
2834   # build columns (as long as select isn't set) into a set of as/select hashes
2835   unless ( $attrs->{select} ) {
2836
2837     my @cols = ( ref($attrs->{columns}) eq 'ARRAY' )
2838       ? @{ delete $attrs->{columns}}
2839       : (
2840           ( delete $attrs->{columns} )
2841             ||
2842           $source->columns
2843         )
2844     ;
2845
2846     @colbits = map {
2847       ( ref($_) eq 'HASH' )
2848       ? $_
2849       : {
2850           (
2851             /^\Q${alias}.\E(.+)$/
2852               ? "$1"
2853               : "$_"
2854           )
2855             =>
2856           (
2857             /\./
2858               ? "$_"
2859               : "${alias}.$_"
2860           )
2861         }
2862     } @cols;
2863   }
2864
2865   # add the additional columns on
2866   foreach ( 'include_columns', '+columns' ) {
2867       push @colbits, map {
2868           ( ref($_) eq 'HASH' )
2869             ? $_
2870             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2871       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2872   }
2873
2874   # start with initial select items
2875   if ( $attrs->{select} ) {
2876     $attrs->{select} =
2877         ( ref $attrs->{select} eq 'ARRAY' )
2878       ? [ @{ $attrs->{select} } ]
2879       : [ $attrs->{select} ];
2880     $attrs->{as} = (
2881       $attrs->{as}
2882       ? (
2883         ref $attrs->{as} eq 'ARRAY'
2884         ? [ @{ $attrs->{as} } ]
2885         : [ $attrs->{as} ]
2886         )
2887       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2888     );
2889   }
2890   else {
2891
2892     # otherwise we intialise select & as to empty
2893     $attrs->{select} = [];
2894     $attrs->{as}     = [];
2895   }
2896
2897   # now add colbits to select/as
2898   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2899   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2900
2901   my $adds;
2902   if ( $adds = delete $attrs->{'+select'} ) {
2903     $adds = [$adds] unless ref $adds eq 'ARRAY';
2904     push(
2905       @{ $attrs->{select} },
2906       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2907     );
2908   }
2909   if ( $adds = delete $attrs->{'+as'} ) {
2910     $adds = [$adds] unless ref $adds eq 'ARRAY';
2911     push( @{ $attrs->{as} }, @$adds );
2912   }
2913
2914   $attrs->{from} ||= [ {
2915     -source_handle => $source->handle,
2916     -alias => $self->{attrs}{alias},
2917     $self->{attrs}{alias} => $source->from,
2918   } ];
2919
2920   if ( $attrs->{join} || $attrs->{prefetch} ) {
2921
2922     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
2923       if ref $attrs->{from} ne 'ARRAY';
2924
2925     my $join = delete $attrs->{join} || {};
2926
2927     if ( defined $attrs->{prefetch} ) {
2928       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2929     }
2930
2931     $attrs->{from} =    # have to copy here to avoid corrupting the original
2932       [
2933         @{ $attrs->{from} },
2934         $source->_resolve_join(
2935           $join,
2936           $alias,
2937           { %{ $attrs->{seen_join} || {} } },
2938           ($attrs->{seen_join} && keys %{$attrs->{seen_join}})
2939             ? $attrs->{from}[-1][0]{-join_path}
2940             : []
2941           ,
2942         )
2943       ];
2944   }
2945
2946   if ( defined $attrs->{order_by} ) {
2947     $attrs->{order_by} = (
2948       ref( $attrs->{order_by} ) eq 'ARRAY'
2949       ? [ @{ $attrs->{order_by} } ]
2950       : [ $attrs->{order_by} || () ]
2951     );
2952   }
2953
2954   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
2955     $attrs->{group_by} = [ $attrs->{group_by} ];
2956   }
2957
2958   # generate the distinct induced group_by early, as prefetch will be carried via a
2959   # subquery (since a group_by is present)
2960   if (delete $attrs->{distinct}) {
2961     if ($attrs->{group_by}) {
2962       carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
2963     }
2964     else {
2965       $attrs->{group_by} = [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
2966     }
2967   }
2968
2969   $attrs->{collapse} ||= {};
2970   if ( my $prefetch = delete $attrs->{prefetch} ) {
2971     $prefetch = $self->_merge_attr( {}, $prefetch );
2972
2973     my $prefetch_ordering = [];
2974
2975     my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
2976
2977     my @prefetch =
2978       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
2979
2980     # we need to somehow mark which columns came from prefetch
2981     $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
2982
2983     push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
2984     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
2985
2986     push( @{$attrs->{order_by}}, @$prefetch_ordering );
2987     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
2988   }
2989
2990   # if both page and offset are specified, produce a combined offset
2991   # even though it doesn't make much sense, this is what pre 081xx has
2992   # been doing
2993   if (my $page = delete $attrs->{page}) {
2994     $attrs->{offset} = 
2995       ($attrs->{rows} * ($page - 1))
2996             +
2997       ($attrs->{offset} || 0)
2998     ;
2999   }
3000
3001   return $self->{_attrs} = $attrs;
3002 }
3003
3004 sub _joinpath_aliases {
3005   my ($self, $fromspec, $seen) = @_;
3006
3007   my $paths = {};
3008   return $paths unless ref $fromspec eq 'ARRAY';
3009
3010   my $cur_depth = $seen->{-relation_chain_depth} || 0;
3011
3012   if (int ($cur_depth) != $cur_depth) {
3013     $self->throw_exception ("-relation_chain_depth is not an integer, something went horribly wrong ($cur_depth)");
3014   }
3015
3016   for my $j (@$fromspec) {
3017
3018     next if ref $j ne 'ARRAY';
3019     next if ($j->[0]{-relation_chain_depth} || 0) < $cur_depth;
3020
3021     my $jpath = $j->[0]{-join_path};
3022
3023     my $p = $paths;
3024     $p = $p->{$_} ||= {} for @{$jpath}[$cur_depth .. $#$jpath];
3025     push @{$p->{-join_aliases} }, $j->[0]{-alias};
3026   }
3027
3028   return $paths;
3029 }
3030
3031 sub _rollout_attr {
3032   my ($self, $attr) = @_;
3033
3034   if (ref $attr eq 'HASH') {
3035     return $self->_rollout_hash($attr);
3036   } elsif (ref $attr eq 'ARRAY') {
3037     return $self->_rollout_array($attr);
3038   } else {
3039     return [$attr];
3040   }
3041 }
3042
3043 sub _rollout_array {
3044   my ($self, $attr) = @_;
3045
3046   my @rolled_array;
3047   foreach my $element (@{$attr}) {
3048     if (ref $element eq 'HASH') {
3049       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3050     } elsif (ref $element eq 'ARRAY') {
3051       #  XXX - should probably recurse here
3052       push( @rolled_array, @{$self->_rollout_array($element)} );
3053     } else {
3054       push( @rolled_array, $element );
3055     }
3056   }
3057   return \@rolled_array;
3058 }
3059
3060 sub _rollout_hash {
3061   my ($self, $attr) = @_;
3062
3063   my @rolled_array;
3064   foreach my $key (keys %{$attr}) {
3065     push( @rolled_array, { $key => $attr->{$key} } );
3066   }
3067   return \@rolled_array;
3068 }
3069
3070 sub _calculate_score {
3071   my ($self, $a, $b) = @_;
3072
3073   if (defined $a xor defined $b) {
3074     return 0;
3075   }
3076   elsif (not defined $a) {
3077     return 1;
3078   }
3079
3080   if (ref $b eq 'HASH') {
3081     my ($b_key) = keys %{$b};
3082     if (ref $a eq 'HASH') {
3083       my ($a_key) = keys %{$a};
3084       if ($a_key eq $b_key) {
3085         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3086       } else {
3087         return 0;
3088       }
3089     } else {
3090       return ($a eq $b_key) ? 1 : 0;
3091     }
3092   } else {
3093     if (ref $a eq 'HASH') {
3094       my ($a_key) = keys %{$a};
3095       return ($b eq $a_key) ? 1 : 0;
3096     } else {
3097       return ($b eq $a) ? 1 : 0;
3098     }
3099   }
3100 }
3101
3102 sub _merge_attr {
3103   my ($self, $orig, $import) = @_;
3104
3105   return $import unless defined($orig);
3106   return $orig unless defined($import);
3107
3108   $orig = $self->_rollout_attr($orig);
3109   $import = $self->_rollout_attr($import);
3110
3111   my $seen_keys;
3112   foreach my $import_element ( @{$import} ) {
3113     # find best candidate from $orig to merge $b_element into
3114     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3115     foreach my $orig_element ( @{$orig} ) {
3116       my $score = $self->_calculate_score( $orig_element, $import_element );
3117       if ($score > $best_candidate->{score}) {
3118         $best_candidate->{position} = $position;
3119         $best_candidate->{score} = $score;
3120       }
3121       $position++;
3122     }
3123     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3124
3125     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3126       push( @{$orig}, $import_element );
3127     } else {
3128       my $orig_best = $orig->[$best_candidate->{position}];
3129       # merge orig_best and b_element together and replace original with merged
3130       if (ref $orig_best ne 'HASH') {
3131         $orig->[$best_candidate->{position}] = $import_element;
3132       } elsif (ref $import_element eq 'HASH') {
3133         my ($key) = keys %{$orig_best};
3134         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
3135       }
3136     }
3137     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3138   }
3139
3140   return $orig;
3141 }
3142
3143 sub result_source {
3144     my $self = shift;
3145
3146     if (@_) {
3147         $self->_source_handle($_[0]->handle);
3148     } else {
3149         $self->_source_handle->resolve;
3150     }
3151 }
3152
3153 =head2 throw_exception
3154
3155 See L<DBIx::Class::Schema/throw_exception> for details.
3156
3157 =cut
3158
3159 sub throw_exception {
3160   my $self=shift;
3161
3162   if (ref $self && $self->_source_handle->schema) {
3163     $self->_source_handle->schema->throw_exception(@_)
3164   }
3165   else {
3166     DBIx::Class::Exception->throw(@_);
3167   }
3168 }
3169
3170 # XXX: FIXME: Attributes docs need clearing up
3171
3172 =head1 ATTRIBUTES
3173
3174 Attributes are used to refine a ResultSet in various ways when
3175 searching for data. They can be passed to any method which takes an
3176 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3177 L</count>.
3178
3179 These are in no particular order:
3180
3181 =head2 order_by
3182
3183 =over 4
3184
3185 =item Value: ( $order_by | \@order_by | \%order_by )
3186
3187 =back
3188
3189 Which column(s) to order the results by. 
3190
3191 [The full list of suitable values is documented in
3192 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3193 common options.]
3194
3195 If a single column name, or an arrayref of names is supplied, the
3196 argument is passed through directly to SQL. The hashref syntax allows
3197 for connection-agnostic specification of ordering direction:
3198
3199  For descending order:
3200
3201   order_by => { -desc => [qw/col1 col2 col3/] }
3202
3203  For explicit ascending order:
3204
3205   order_by => { -asc => 'col' }
3206
3207 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3208 supported, although you are strongly encouraged to use the hashref
3209 syntax as outlined above.
3210
3211 =head2 columns
3212
3213 =over 4
3214
3215 =item Value: \@columns
3216
3217 =back
3218
3219 Shortcut to request a particular set of columns to be retrieved. Each
3220 column spec may be a string (a table column name), or a hash (in which
3221 case the key is the C<as> value, and the value is used as the C<select>
3222 expression). Adds C<me.> onto the start of any column without a C<.> in
3223 it and sets C<select> from that, then auto-populates C<as> from
3224 C<select> as normal. (You may also use the C<cols> attribute, as in
3225 earlier versions of DBIC.)
3226
3227 =head2 +columns
3228
3229 =over 4
3230
3231 =item Value: \@columns
3232
3233 =back
3234
3235 Indicates additional columns to be selected from storage. Works the same
3236 as L</columns> but adds columns to the selection. (You may also use the
3237 C<include_columns> attribute, as in earlier versions of DBIC). For
3238 example:-
3239
3240   $schema->resultset('CD')->search(undef, {
3241     '+columns' => ['artist.name'],
3242     join => ['artist']
3243   });
3244
3245 would return all CDs and include a 'name' column to the information
3246 passed to object inflation. Note that the 'artist' is the name of the
3247 column (or relationship) accessor, and 'name' is the name of the column
3248 accessor in the related table.
3249
3250 =head2 include_columns
3251
3252 =over 4
3253
3254 =item Value: \@columns
3255
3256 =back
3257
3258 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3259
3260 =head2 select
3261
3262 =over 4
3263
3264 =item Value: \@select_columns
3265
3266 =back
3267
3268 Indicates which columns should be selected from the storage. You can use
3269 column names, or in the case of RDBMS back ends, function or stored procedure
3270 names:
3271
3272   $rs = $schema->resultset('Employee')->search(undef, {
3273     select => [
3274       'name',
3275       { count => 'employeeid' },
3276       { sum => 'salary' }
3277     ]
3278   });
3279
3280 When you use function/stored procedure names and do not supply an C<as>
3281 attribute, the column names returned are storage-dependent. E.g. MySQL would
3282 return a column named C<count(employeeid)> in the above example.
3283
3284 =head2 +select
3285
3286 =over 4
3287
3288 Indicates additional columns to be selected from storage.  Works the same as
3289 L</select> but adds columns to the selection.
3290
3291 =back
3292
3293 =head2 +as
3294
3295 =over 4
3296
3297 Indicates additional column names for those added via L</+select>. See L</as>.
3298
3299 =back
3300
3301 =head2 as
3302
3303 =over 4
3304
3305 =item Value: \@inflation_names
3306
3307 =back
3308
3309 Indicates column names for object inflation. That is, C<as>
3310 indicates the name that the column can be accessed as via the
3311 C<get_column> method (or via the object accessor, B<if one already
3312 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
3313
3314 The C<as> attribute is used in conjunction with C<select>,
3315 usually when C<select> contains one or more function or stored
3316 procedure names:
3317
3318   $rs = $schema->resultset('Employee')->search(undef, {
3319     select => [
3320       'name',
3321       { count => 'employeeid' }
3322     ],
3323     as => ['name', 'employee_count'],
3324   });
3325
3326   my $employee = $rs->first(); # get the first Employee
3327
3328 If the object against which the search is performed already has an accessor
3329 matching a column name specified in C<as>, the value can be retrieved using
3330 the accessor as normal:
3331
3332   my $name = $employee->name();
3333
3334 If on the other hand an accessor does not exist in the object, you need to
3335 use C<get_column> instead:
3336
3337   my $employee_count = $employee->get_column('employee_count');
3338
3339 You can create your own accessors if required - see
3340 L<DBIx::Class::Manual::Cookbook> for details.
3341
3342 Please note: This will NOT insert an C<AS employee_count> into the SQL
3343 statement produced, it is used for internal access only. Thus
3344 attempting to use the accessor in an C<order_by> clause or similar
3345 will fail miserably.
3346
3347 To get around this limitation, you can supply literal SQL to your
3348 C<select> attibute that contains the C<AS alias> text, eg:
3349
3350   select => [\'myfield AS alias']
3351
3352 =head2 join
3353
3354 =over 4
3355
3356 =item Value: ($rel_name | \@rel_names | \%rel_names)
3357
3358 =back
3359
3360 Contains a list of relationships that should be joined for this query.  For
3361 example:
3362
3363   # Get CDs by Nine Inch Nails
3364   my $rs = $schema->resultset('CD')->search(
3365     { 'artist.name' => 'Nine Inch Nails' },
3366     { join => 'artist' }
3367   );
3368
3369 Can also contain a hash reference to refer to the other relation's relations.
3370 For example:
3371
3372   package MyApp::Schema::Track;
3373   use base qw/DBIx::Class/;
3374   __PACKAGE__->table('track');
3375   __PACKAGE__->add_columns(qw/trackid cd position title/);
3376   __PACKAGE__->set_primary_key('trackid');
3377   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3378   1;
3379
3380   # In your application
3381   my $rs = $schema->resultset('Artist')->search(
3382     { 'track.title' => 'Teardrop' },
3383     {
3384       join     => { cd => 'track' },
3385       order_by => 'artist.name',
3386     }
3387   );
3388
3389 You need to use the relationship (not the table) name in  conditions,
3390 because they are aliased as such. The current table is aliased as "me", so
3391 you need to use me.column_name in order to avoid ambiguity. For example:
3392
3393   # Get CDs from 1984 with a 'Foo' track
3394   my $rs = $schema->resultset('CD')->search(
3395     {
3396       'me.year' => 1984,
3397       'tracks.name' => 'Foo'
3398     },
3399     { join => 'tracks' }
3400   );
3401
3402 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3403 similarly for a third time). For e.g.
3404
3405   my $rs = $schema->resultset('Artist')->search({
3406     'cds.title'   => 'Down to Earth',
3407     'cds_2.title' => 'Popular',
3408   }, {
3409     join => [ qw/cds cds/ ],
3410   });
3411
3412 will return a set of all artists that have both a cd with title 'Down
3413 to Earth' and a cd with title 'Popular'.
3414
3415 If you want to fetch related objects from other tables as well, see C<prefetch>
3416 below.
3417
3418 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3419
3420 =head2 prefetch
3421
3422 =over 4
3423
3424 =item Value: ($rel_name | \@rel_names | \%rel_names)
3425
3426 =back
3427
3428 Contains one or more relationships that should be fetched along with
3429 the main query (when they are accessed afterwards the data will
3430 already be available, without extra queries to the database).  This is
3431 useful for when you know you will need the related objects, because it
3432 saves at least one query:
3433
3434   my $rs = $schema->resultset('Tag')->search(
3435     undef,
3436     {
3437       prefetch => {
3438         cd => 'artist'
3439       }
3440     }
3441   );
3442
3443 The initial search results in SQL like the following:
3444
3445   SELECT tag.*, cd.*, artist.* FROM tag
3446   JOIN cd ON tag.cd = cd.cdid
3447   JOIN artist ON cd.artist = artist.artistid
3448
3449 L<DBIx::Class> has no need to go back to the database when we access the
3450 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3451 case.
3452
3453 Simple prefetches will be joined automatically, so there is no need
3454 for a C<join> attribute in the above search.
3455
3456 C<prefetch> can be used with the following relationship types: C<belongs_to>,
3457 C<has_one> (or if you're using C<add_relationship>, any relationship declared
3458 with an accessor type of 'single' or 'filter'). A more complex example that
3459 prefetches an artists cds, the tracks on those cds, and the tags associted
3460 with that artist is given below (assuming many-to-many from artists to tags):
3461
3462  my $rs = $schema->resultset('Artist')->search(
3463    undef,
3464    {
3465      prefetch => [
3466        { cds => 'tracks' },
3467        { artist_tags => 'tags' }
3468      ]
3469    }
3470  );
3471
3472
3473 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
3474 attributes will be ignored.
3475
3476 B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
3477 exactly as you might expect.
3478
3479 =over 4
3480
3481 =item * 
3482
3483 Prefetch uses the L</cache> to populate the prefetched relationships. This
3484 may or may not be what you want.
3485
3486 =item * 
3487
3488 If you specify a condition on a prefetched relationship, ONLY those
3489 rows that match the prefetched condition will be fetched into that relationship.
3490 This means that adding prefetch to a search() B<may alter> what is returned by
3491 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
3492
3493   my $artist_rs = $schema->resultset('Artist')->search({
3494       'cds.year' => 2008,
3495   }, {
3496       join => 'cds',
3497   });
3498
3499   my $count = $artist_rs->first->cds->count;
3500
3501   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
3502
3503   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
3504
3505   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
3506
3507 that cmp_ok() may or may not pass depending on the datasets involved. This
3508 behavior may or may not survive the 0.09 transition.
3509
3510 =back
3511
3512 =head2 page
3513
3514 =over 4
3515
3516 =item Value: $page
3517
3518 =back
3519
3520 Makes the resultset paged and specifies the page to retrieve. Effectively
3521 identical to creating a non-pages resultset and then calling ->page($page)
3522 on it.
3523
3524 If L<rows> attribute is not specified it defaults to 10 rows per page.
3525
3526 When you have a paged resultset, L</count> will only return the number
3527 of rows in the page. To get the total, use the L</pager> and call
3528 C<total_entries> on it.
3529
3530 =head2 rows
3531
3532 =over 4
3533
3534 =item Value: $rows
3535
3536 =back
3537
3538 Specifes the maximum number of rows for direct retrieval or the number of
3539 rows per page if the page attribute or method is used.
3540
3541 =head2 offset
3542
3543 =over 4
3544
3545 =item Value: $offset
3546
3547 =back
3548
3549 Specifies the (zero-based) row number for the  first row to be returned, or the
3550 of the first row of the first page if paging is used.
3551
3552 =head2 group_by
3553
3554 =over 4
3555
3556 =item Value: \@columns
3557
3558 =back
3559
3560 A arrayref of columns to group by. Can include columns of joined tables.
3561
3562   group_by => [qw/ column1 column2 ... /]
3563
3564 =head2 having
3565
3566 =over 4
3567
3568 =item Value: $condition
3569
3570 =back
3571
3572 HAVING is a select statement attribute that is applied between GROUP BY and
3573 ORDER BY. It is applied to the after the grouping calculations have been
3574 done.
3575
3576   having => { 'count(employee)' => { '>=', 100 } }
3577
3578 =head2 distinct
3579
3580 =over 4
3581
3582 =item Value: (0 | 1)
3583
3584 =back
3585
3586 Set to 1 to group by all columns. If the resultset already has a group_by
3587 attribute, this setting is ignored and an appropriate warning is issued.
3588
3589 =head2 where
3590
3591 =over 4
3592
3593 Adds to the WHERE clause.
3594
3595   # only return rows WHERE deleted IS NULL for all searches
3596   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
3597
3598 Can be overridden by passing C<{ where => undef }> as an attribute
3599 to a resulset.
3600
3601 =back
3602
3603 =head2 cache
3604
3605 Set to 1 to cache search results. This prevents extra SQL queries if you
3606 revisit rows in your ResultSet:
3607
3608   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
3609
3610   while( my $artist = $resultset->next ) {
3611     ... do stuff ...
3612   }
3613
3614   $rs->first; # without cache, this would issue a query
3615
3616 By default, searches are not cached.
3617
3618 For more examples of using these attributes, see
3619 L<DBIx::Class::Manual::Cookbook>.
3620
3621 =head2 for
3622
3623 =over 4
3624
3625 =item Value: ( 'update' | 'shared' )
3626
3627 =back
3628
3629 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3630 ... FOR SHARED.
3631
3632 =cut
3633
3634 1;