Back out constructor/prefetch rewrite introduced mainly by 43245ada4a
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Data::Compare (); # no imports!!! guard against insane architecture
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 CUSTOM ResultSet CLASSES THAT USE Moose
78
79 If you want to make your custom ResultSet classes with L<Moose>, use a template
80 similar to:
81
82     package MyApp::Schema::ResultSet::User;
83
84     use Moose;
85     use namespace::autoclean;
86     use MooseX::NonMoose;
87     extends 'DBIx::Class::ResultSet';
88
89     sub BUILDARGS { $_[2] }
90
91     ...your code...
92
93     __PACKAGE__->meta->make_immutable;
94
95     1;
96
97 The L<MooseX::NonMoose> is necessary so that the L<Moose> constructor does not
98 clash with the regular ResultSet constructor. Alternatively, you can use:
99
100     __PACKAGE__->meta->make_immutable(inline_constructor => 0);
101
102 The L<BUILDARGS|Moose::Manual::Construction/BUILDARGS> is necessary because the
103 signature of the ResultSet C<new> is C<< ->new($source, \%args) >>.
104
105 =head1 EXAMPLES
106
107 =head2 Chaining resultsets
108
109 Let's say you've got a query that needs to be run to return some data
110 to the user. But, you have an authorization system in place that
111 prevents certain users from seeing certain information. So, you want
112 to construct the basic query in one method, but add constraints to it in
113 another.
114
115   sub get_data {
116     my $self = shift;
117     my $request = $self->get_request; # Get a request object somehow.
118     my $schema = $self->result_source->schema;
119
120     my $cd_rs = $schema->resultset('CD')->search({
121       title => $request->param('title'),
122       year => $request->param('year'),
123     });
124
125     $cd_rs = $self->apply_security_policy( $cd_rs );
126
127     return $cd_rs->all();
128   }
129
130   sub apply_security_policy {
131     my $self = shift;
132     my ($rs) = @_;
133
134     return $rs->search({
135       subversive => 0,
136     });
137   }
138
139 =head3 Resolving conditions and attributes
140
141 When a resultset is chained from another resultset, conditions and
142 attributes with the same keys need resolving.
143
144 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
145 into the existing ones from the original resultset.
146
147 The L</where> and L</having> attributes, and any search conditions, are
148 merged with an SQL C<AND> to the existing condition from the original
149 resultset.
150
151 All other attributes are overridden by any new ones supplied in the
152 search attributes.
153
154 =head2 Multiple queries
155
156 Since a resultset just defines a query, you can do all sorts of
157 things with it with the same object.
158
159   # Don't hit the DB yet.
160   my $cd_rs = $schema->resultset('CD')->search({
161     title => 'something',
162     year => 2009,
163   });
164
165   # Each of these hits the DB individually.
166   my $count = $cd_rs->count;
167   my $most_recent = $cd_rs->get_column('date_released')->max();
168   my @records = $cd_rs->all;
169
170 And it's not just limited to SELECT statements.
171
172   $cd_rs->delete();
173
174 This is even cooler:
175
176   $cd_rs->create({ artist => 'Fred' });
177
178 Which is the same as:
179
180   $schema->resultset('CD')->create({
181     title => 'something',
182     year => 2009,
183     artist => 'Fred'
184   });
185
186 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
187
188 =head1 METHODS
189
190 =head2 new
191
192 =over 4
193
194 =item Arguments: $source, \%$attrs
195
196 =item Return Value: $rs
197
198 =back
199
200 The resultset constructor. Takes a source object (usually a
201 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
202 L</ATTRIBUTES> below).  Does not perform any queries -- these are
203 executed as needed by the other methods.
204
205 Generally you won't need to construct a resultset manually.  You'll
206 automatically get one from e.g. a L</search> called in scalar context:
207
208   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
209
210 IMPORTANT: If called on an object, proxies to new_result instead so
211
212   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
213
214 will return a CD object, not a ResultSet.
215
216 =cut
217
218 sub new {
219   my $class = shift;
220   return $class->new_result(@_) if ref $class;
221
222   my ($source, $attrs) = @_;
223   $source = $source->resolve
224     if $source->isa('DBIx::Class::ResultSourceHandle');
225   $attrs = { %{$attrs||{}} };
226
227   if ($attrs->{page}) {
228     $attrs->{rows} ||= 10;
229   }
230
231   $attrs->{alias} ||= 'me';
232
233   my $self = bless {
234     result_source => $source,
235     cond => $attrs->{where},
236     pager => undef,
237     attrs => $attrs,
238   }, $class;
239
240   # if there is a dark selector, this means we are already in a
241   # chain and the cleanup/sanification was taken care of by
242   # _search_rs already
243   $self->_normalize_selection($attrs)
244     unless $attrs->{_dark_selector};
245
246   $self->result_class(
247     $attrs->{result_class} || $source->result_class
248   );
249
250   $self;
251 }
252
253 =head2 search
254
255 =over 4
256
257 =item Arguments: $cond, \%attrs?
258
259 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
260
261 =back
262
263   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
264   my $new_rs = $cd_rs->search({ year => 2005 });
265
266   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
267                  # year = 2005 OR year = 2004
268
269 In list context, C<< ->all() >> is called implicitly on the resultset, thus
270 returning a list of row objects instead. To avoid that, use L</search_rs>.
271
272 If you need to pass in additional attributes but no additional condition,
273 call it as C<search(undef, \%attrs)>.
274
275   # "SELECT name, artistid FROM $artist_table"
276   my @all_artists = $schema->resultset('Artist')->search(undef, {
277     columns => [qw/name artistid/],
278   });
279
280 For a list of attributes that can be passed to C<search>, see
281 L</ATTRIBUTES>. For more examples of using this function, see
282 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
283 documentation for the first argument, see L<SQL::Abstract>
284 and its extension L<DBIx::Class::SQLMaker>.
285
286 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
287
288 =head3 CAVEAT
289
290 Note that L</search> does not process/deflate any of the values passed in the
291 L<SQL::Abstract>-compatible search condition structure. This is unlike other
292 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
293 manually that any value passed to this method will stringify to something the
294 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
295 objects, for more info see:
296 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
297
298 =cut
299
300 sub search {
301   my $self = shift;
302   my $rs = $self->search_rs( @_ );
303
304   if (wantarray) {
305     return $rs->all;
306   }
307   elsif (defined wantarray) {
308     return $rs;
309   }
310   else {
311     # we can be called by a relationship helper, which in
312     # turn may be called in void context due to some braindead
313     # overload or whatever else the user decided to be clever
314     # at this particular day. Thus limit the exception to
315     # external code calls only
316     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
317       if (caller)[0] !~ /^\QDBIx::Class::/;
318
319     return ();
320   }
321 }
322
323 =head2 search_rs
324
325 =over 4
326
327 =item Arguments: $cond, \%attrs?
328
329 =item Return Value: $resultset
330
331 =back
332
333 This method does the same exact thing as search() except it will
334 always return a resultset, even in list context.
335
336 =cut
337
338 sub search_rs {
339   my $self = shift;
340
341   # Special-case handling for (undef, undef).
342   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
343     @_ = ();
344   }
345
346   my $call_attrs = {};
347   if (@_ > 1) {
348     if (ref $_[-1] eq 'HASH') {
349       # copy for _normalize_selection
350       $call_attrs = { %{ pop @_ } };
351     }
352     elsif (! defined $_[-1] ) {
353       pop @_;   # search({}, undef)
354     }
355   }
356
357   # see if we can keep the cache (no $rs changes)
358   my $cache;
359   my %safe = (alias => 1, cache => 1);
360   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
361     ! defined $_[0]
362       or
363     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
364       or
365     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
366   )) {
367     $cache = $self->get_cache;
368   }
369
370   my $rsrc = $self->result_source;
371
372   my $old_attrs = { %{$self->{attrs}} };
373   my $old_having = delete $old_attrs->{having};
374   my $old_where = delete $old_attrs->{where};
375
376   my $new_attrs = { %$old_attrs };
377
378   # take care of call attrs (only if anything is changing)
379   if (keys %$call_attrs) {
380
381     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
382
383     # reset the current selector list if new selectors are supplied
384     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
385       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
386     }
387
388     # Normalize the new selector list (operates on the passed-in attr structure)
389     # Need to do it on every chain instead of only once on _resolved_attrs, in
390     # order to allow detection of empty vs partial 'as'
391     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
392       if $old_attrs->{_dark_selector};
393     $self->_normalize_selection ($call_attrs);
394
395     # start with blind overwriting merge, exclude selector attrs
396     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
397     delete @{$new_attrs}{@selector_attrs};
398
399     for (@selector_attrs) {
400       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
401         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
402     }
403
404     # older deprecated name, use only if {columns} is not there
405     if (my $c = delete $new_attrs->{cols}) {
406       if ($new_attrs->{columns}) {
407         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
408       }
409       else {
410         $new_attrs->{columns} = $c;
411       }
412     }
413
414
415     # join/prefetch use their own crazy merging heuristics
416     foreach my $key (qw/join prefetch/) {
417       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
418         if exists $call_attrs->{$key};
419     }
420
421     # stack binds together
422     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
423   }
424
425
426   # rip apart the rest of @_, parse a condition
427   my $call_cond = do {
428
429     if (ref $_[0] eq 'HASH') {
430       (keys %{$_[0]}) ? $_[0] : undef
431     }
432     elsif (@_ == 1) {
433       $_[0]
434     }
435     elsif (@_ % 2) {
436       $self->throw_exception('Odd number of arguments to search')
437     }
438     else {
439       +{ @_ }
440     }
441
442   } if @_;
443
444   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
445     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
446   }
447
448   for ($old_where, $call_cond) {
449     if (defined $_) {
450       $new_attrs->{where} = $self->_stack_cond (
451         $_, $new_attrs->{where}
452       );
453     }
454   }
455
456   if (defined $old_having) {
457     $new_attrs->{having} = $self->_stack_cond (
458       $old_having, $new_attrs->{having}
459     )
460   }
461
462   my $rs = (ref $self)->new($rsrc, $new_attrs);
463
464   $rs->set_cache($cache) if ($cache);
465
466   return $rs;
467 }
468
469 my $dark_sel_dumper;
470 sub _normalize_selection {
471   my ($self, $attrs) = @_;
472
473   # legacy syntax
474   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
475     if exists $attrs->{include_columns};
476
477   # columns are always placed first, however
478
479   # Keep the X vs +X separation until _resolved_attrs time - this allows to
480   # delay the decision on whether to use a default select list ($rsrc->columns)
481   # allowing stuff like the remove_columns helper to work
482   #
483   # select/as +select/+as pairs need special handling - the amount of select/as
484   # elements in each pair does *not* have to be equal (think multicolumn
485   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
486   # supplied at all) - try to infer the alias, either from the -as parameter
487   # of the selector spec, or use the parameter whole if it looks like a column
488   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
489   # is ok as well), but make sure no more additions to the 'as' chain take place
490   for my $pref ('', '+') {
491
492     my ($sel, $as) = map {
493       my $key = "${pref}${_}";
494
495       my $val = [ ref $attrs->{$key} eq 'ARRAY'
496         ? @{$attrs->{$key}}
497         : $attrs->{$key} || ()
498       ];
499       delete $attrs->{$key};
500       $val;
501     } qw/select as/;
502
503     if (! @$as and ! @$sel ) {
504       next;
505     }
506     elsif (@$as and ! @$sel) {
507       $self->throw_exception(
508         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
509       );
510     }
511     elsif( ! @$as ) {
512       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
513       # if any @$as has been supplied we assume the user knows what (s)he is doing
514       # and blindly keep stacking up pieces
515       unless ($attrs->{_dark_selector}) {
516         SELECTOR:
517         for (@$sel) {
518           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
519             push @$as, $_->{-as};
520           }
521           # assume any plain no-space, no-parenthesis string to be a column spec
522           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
523           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
524             push @$as, $_;
525           }
526           # if all else fails - raise a flag that no more aliasing will be allowed
527           else {
528             $attrs->{_dark_selector} = {
529               plus_stage => $pref,
530               string => ($dark_sel_dumper ||= do {
531                   require Data::Dumper::Concise;
532                   Data::Dumper::Concise::DumperObject()->Indent(0);
533                 })->Values([$_])->Dump
534               ,
535             };
536             last SELECTOR;
537           }
538         }
539       }
540     }
541     elsif (@$as < @$sel) {
542       $self->throw_exception(
543         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
544       );
545     }
546     elsif ($pref and $attrs->{_dark_selector}) {
547       $self->throw_exception(
548         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
549       );
550     }
551
552
553     # merge result
554     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
555     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
556   }
557 }
558
559 sub _stack_cond {
560   my ($self, $left, $right) = @_;
561
562   # collapse single element top-level conditions
563   # (single pass only, unlikely to need recursion)
564   for ($left, $right) {
565     if (ref $_ eq 'ARRAY') {
566       if (@$_ == 0) {
567         $_ = undef;
568       }
569       elsif (@$_ == 1) {
570         $_ = $_->[0];
571       }
572     }
573     elsif (ref $_ eq 'HASH') {
574       my ($first, $more) = keys %$_;
575
576       # empty hash
577       if (! defined $first) {
578         $_ = undef;
579       }
580       # one element hash
581       elsif (! defined $more) {
582         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
583           $_ = $_->{'-and'};
584         }
585         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
586           $_ = $_->{'-or'};
587         }
588       }
589     }
590   }
591
592   # merge hashes with weeding out of duplicates (simple cases only)
593   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
594
595     # shallow copy to destroy
596     $right = { %$right };
597     for (grep { exists $right->{$_} } keys %$left) {
598       # the use of eq_deeply here is justified - the rhs of an
599       # expression can contain a lot of twisted weird stuff
600       delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
601     }
602
603     $right = undef unless keys %$right;
604   }
605
606
607   if (defined $left xor defined $right) {
608     return defined $left ? $left : $right;
609   }
610   elsif (! defined $left) {
611     return undef;
612   }
613   else {
614     return { -and => [ $left, $right ] };
615   }
616 }
617
618 =head2 search_literal
619
620 =over 4
621
622 =item Arguments: $sql_fragment, @bind_values
623
624 =item Return Value: $resultset (scalar context) || @row_objs (list context)
625
626 =back
627
628   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
629   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
630
631 Pass a literal chunk of SQL to be added to the conditional part of the
632 resultset query.
633
634 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
635 only be used in that context. C<search_literal> is a convenience method.
636 It is equivalent to calling $schema->search(\[]), but if you want to ensure
637 columns are bound correctly, use C<search>.
638
639 Example of how to use C<search> instead of C<search_literal>
640
641   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
642   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
643
644
645 See L<DBIx::Class::Manual::Cookbook/Searching> and
646 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
647 require C<search_literal>.
648
649 =cut
650
651 sub search_literal {
652   my ($self, $sql, @bind) = @_;
653   my $attr;
654   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
655     $attr = pop @bind;
656   }
657   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
658 }
659
660 =head2 find
661
662 =over 4
663
664 =item Arguments: \%columns_values | @pk_values, \%attrs?
665
666 =item Return Value: $row_object | undef
667
668 =back
669
670 Finds and returns a single row based on supplied criteria. Takes either a
671 hashref with the same format as L</create> (including inference of foreign
672 keys from related objects), or a list of primary key values in the same
673 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
674 declaration on the L</result_source>.
675
676 In either case an attempt is made to combine conditions already existing on
677 the resultset with the condition passed to this method.
678
679 To aid with preparing the correct query for the storage you may supply the
680 C<key> attribute, which is the name of a
681 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
682 unique constraint corresponding to the
683 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
684 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
685 to construct a query that satisfies the named unique constraint fully (
686 non-NULL values for each column member of the constraint) an exception is
687 thrown.
688
689 If no C<key> is specified, the search is carried over all unique constraints
690 which are fully defined by the available condition.
691
692 If no such constraint is found, C<find> currently defaults to a simple
693 C<< search->(\%column_values) >> which may or may not do what you expect.
694 Note that this fallback behavior may be deprecated in further versions. If
695 you need to search with arbitrary conditions - use L</search>. If the query
696 resulting from this fallback produces more than one row, a warning to the
697 effect is issued, though only the first row is constructed and returned as
698 C<$row_object>.
699
700 In addition to C<key>, L</find> recognizes and applies standard
701 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
702
703 Note that if you have extra concerns about the correctness of the resulting
704 query you need to specify the C<key> attribute and supply the entire condition
705 as an argument to find (since it is not always possible to perform the
706 combination of the resultset condition with the supplied one, especially if
707 the resultset condition contains literal sql).
708
709 For example, to find a row by its primary key:
710
711   my $cd = $schema->resultset('CD')->find(5);
712
713 You can also find a row by a specific unique constraint:
714
715   my $cd = $schema->resultset('CD')->find(
716     {
717       artist => 'Massive Attack',
718       title  => 'Mezzanine',
719     },
720     { key => 'cd_artist_title' }
721   );
722
723 See also L</find_or_create> and L</update_or_create>.
724
725 =cut
726
727 sub find {
728   my $self = shift;
729   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
730
731   my $rsrc = $self->result_source;
732
733   my $constraint_name;
734   if (exists $attrs->{key}) {
735     $constraint_name = defined $attrs->{key}
736       ? $attrs->{key}
737       : $self->throw_exception("An undefined 'key' resultset attribute makes no sense")
738     ;
739   }
740
741   # Parse out the condition from input
742   my $call_cond;
743
744   if (ref $_[0] eq 'HASH') {
745     $call_cond = { %{$_[0]} };
746   }
747   else {
748     # if only values are supplied we need to default to 'primary'
749     $constraint_name = 'primary' unless defined $constraint_name;
750
751     my @c_cols = $rsrc->unique_constraint_columns($constraint_name);
752
753     $self->throw_exception(
754       "No constraint columns, maybe a malformed '$constraint_name' constraint?"
755     ) unless @c_cols;
756
757     $self->throw_exception (
758       'find() expects either a column/value hashref, or a list of values '
759     . "corresponding to the columns of the specified unique constraint '$constraint_name'"
760     ) unless @c_cols == @_;
761
762     $call_cond = {};
763     @{$call_cond}{@c_cols} = @_;
764   }
765
766   my %related;
767   for my $key (keys %$call_cond) {
768     if (
769       my $keyref = ref($call_cond->{$key})
770         and
771       my $relinfo = $rsrc->relationship_info($key)
772     ) {
773       my $val = delete $call_cond->{$key};
774
775       next if $keyref eq 'ARRAY'; # has_many for multi_create
776
777       my $rel_q = $rsrc->_resolve_condition(
778         $relinfo->{cond}, $val, $key, $key
779       );
780       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
781       @related{keys %$rel_q} = values %$rel_q;
782     }
783   }
784
785   # relationship conditions take precedence (?)
786   @{$call_cond}{keys %related} = values %related;
787
788   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
789   my $final_cond;
790   if (defined $constraint_name) {
791     $final_cond = $self->_qualify_cond_columns (
792
793       $self->_build_unique_cond (
794         $constraint_name,
795         $call_cond,
796       ),
797
798       $alias,
799     );
800   }
801   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
802     # This means that we got here after a merger of relationship conditions
803     # in ::Relationship::Base::search_related (the row method), and furthermore
804     # the relationship is of the 'single' type. This means that the condition
805     # provided by the relationship (already attached to $self) is sufficient,
806     # as there can be only one row in the database that would satisfy the
807     # relationship
808   }
809   else {
810     # no key was specified - fall down to heuristics mode:
811     # run through all unique queries registered on the resultset, and
812     # 'OR' all qualifying queries together
813     my (@unique_queries, %seen_column_combinations);
814     for my $c_name ($rsrc->unique_constraint_names) {
815       next if $seen_column_combinations{
816         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
817       }++;
818
819       push @unique_queries, try {
820         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
821       } || ();
822     }
823
824     $final_cond = @unique_queries
825       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
826       : $self->_non_unique_find_fallback ($call_cond, $attrs)
827     ;
828   }
829
830   # Run the query, passing the result_class since it should propagate for find
831   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
832   if (keys %{$rs->_resolved_attrs->{collapse}}) {
833     my $row = $rs->next;
834     carp "Query returned more than one row" if $rs->next;
835     return $row;
836   }
837   else {
838     return $rs->single;
839   }
840 }
841
842 # This is a stop-gap method as agreed during the discussion on find() cleanup:
843 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
844 #
845 # It is invoked when find() is called in legacy-mode with insufficiently-unique
846 # condition. It is provided for overrides until a saner way forward is devised
847 #
848 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
849 # the road. Please adjust your tests accordingly to catch this situation early
850 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
851 #
852 # The method will not be removed without an adequately complete replacement
853 # for strict-mode enforcement
854 sub _non_unique_find_fallback {
855   my ($self, $cond, $attrs) = @_;
856
857   return $self->_qualify_cond_columns(
858     $cond,
859     exists $attrs->{alias}
860       ? $attrs->{alias}
861       : $self->{attrs}{alias}
862   );
863 }
864
865
866 sub _qualify_cond_columns {
867   my ($self, $cond, $alias) = @_;
868
869   my %aliased = %$cond;
870   for (keys %aliased) {
871     $aliased{"$alias.$_"} = delete $aliased{$_}
872       if $_ !~ /\./;
873   }
874
875   return \%aliased;
876 }
877
878 sub _build_unique_cond {
879   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
880
881   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
882
883   # combination may fail if $self->{cond} is non-trivial
884   my ($final_cond) = try {
885     $self->_merge_with_rscond ($extra_cond)
886   } catch {
887     +{ %$extra_cond }
888   };
889
890   # trim out everything not in $columns
891   $final_cond = { map {
892     exists $final_cond->{$_}
893       ? ( $_ => $final_cond->{$_} )
894       : ()
895   } @c_cols };
896
897   if (my @missing = grep
898     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
899     (@c_cols)
900   ) {
901     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
902       $constraint_name,
903       join (', ', map { "'$_'" } @missing),
904     ) );
905   }
906
907   if (
908     !$croak_on_null
909       and
910     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
911       and
912     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
913   ) {
914     carp_unique ( sprintf (
915       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
916     . 'values in column(s): %s). This is almost certainly not what you wanted, '
917     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
918       $constraint_name,
919       join (', ', map { "'$_'" } @undefs),
920     ));
921   }
922
923   return $final_cond;
924 }
925
926 =head2 search_related
927
928 =over 4
929
930 =item Arguments: $rel, $cond?, \%attrs?
931
932 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
933
934 =back
935
936   $new_rs = $cd_rs->search_related('artist', {
937     name => 'Emo-R-Us',
938   });
939
940 Searches the specified relationship, optionally specifying a condition and
941 attributes for matching records. See L</ATTRIBUTES> for more information.
942
943 In list context, C<< ->all() >> is called implicitly on the resultset, thus
944 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
945
946 See also L</search_related_rs>.
947
948 =cut
949
950 sub search_related {
951   return shift->related_resultset(shift)->search(@_);
952 }
953
954 =head2 search_related_rs
955
956 This method works exactly the same as search_related, except that
957 it guarantees a resultset, even in list context.
958
959 =cut
960
961 sub search_related_rs {
962   return shift->related_resultset(shift)->search_rs(@_);
963 }
964
965 =head2 cursor
966
967 =over 4
968
969 =item Arguments: none
970
971 =item Return Value: $cursor
972
973 =back
974
975 Returns a storage-driven cursor to the given resultset. See
976 L<DBIx::Class::Cursor> for more information.
977
978 =cut
979
980 sub cursor {
981   my ($self) = @_;
982
983   my $attrs = $self->_resolved_attrs_copy;
984
985   return $self->{cursor}
986     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
987           $attrs->{where},$attrs);
988 }
989
990 =head2 single
991
992 =over 4
993
994 =item Arguments: $cond?
995
996 =item Return Value: $row_object | undef
997
998 =back
999
1000   my $cd = $schema->resultset('CD')->single({ year => 2001 });
1001
1002 Inflates the first result without creating a cursor if the resultset has
1003 any records in it; if not returns C<undef>. Used by L</find> as a lean version
1004 of L</search>.
1005
1006 While this method can take an optional search condition (just like L</search>)
1007 being a fast-code-path it does not recognize search attributes. If you need to
1008 add extra joins or similar, call L</search> and then chain-call L</single> on the
1009 L<DBIx::Class::ResultSet> returned.
1010
1011 =over
1012
1013 =item B<Note>
1014
1015 As of 0.08100, this method enforces the assumption that the preceding
1016 query returns only one row. If more than one row is returned, you will receive
1017 a warning:
1018
1019   Query returned more than one row
1020
1021 In this case, you should be using L</next> or L</find> instead, or if you really
1022 know what you are doing, use the L</rows> attribute to explicitly limit the size
1023 of the resultset.
1024
1025 This method will also throw an exception if it is called on a resultset prefetching
1026 has_many, as such a prefetch implies fetching multiple rows from the database in
1027 order to assemble the resulting object.
1028
1029 =back
1030
1031 =cut
1032
1033 sub single {
1034   my ($self, $where) = @_;
1035   if(@_ > 2) {
1036       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
1037   }
1038
1039   my $attrs = $self->_resolved_attrs_copy;
1040
1041   if (keys %{$attrs->{collapse}}) {
1042     $self->throw_exception(
1043       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1044     );
1045   }
1046
1047   if ($where) {
1048     if (defined $attrs->{where}) {
1049       $attrs->{where} = {
1050         '-and' =>
1051             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1052                $where, delete $attrs->{where} ]
1053       };
1054     } else {
1055       $attrs->{where} = $where;
1056     }
1057   }
1058
1059   my @data = $self->result_source->storage->select_single(
1060     $attrs->{from}, $attrs->{select},
1061     $attrs->{where}, $attrs
1062   );
1063
1064   return (@data ? ($self->_construct_object(@data))[0] : undef);
1065 }
1066
1067
1068 # _collapse_query
1069 #
1070 # Recursively collapse the query, accumulating values for each column.
1071
1072 sub _collapse_query {
1073   my ($self, $query, $collapsed) = @_;
1074
1075   $collapsed ||= {};
1076
1077   if (ref $query eq 'ARRAY') {
1078     foreach my $subquery (@$query) {
1079       next unless ref $subquery;  # -or
1080       $collapsed = $self->_collapse_query($subquery, $collapsed);
1081     }
1082   }
1083   elsif (ref $query eq 'HASH') {
1084     if (keys %$query and (keys %$query)[0] eq '-and') {
1085       foreach my $subquery (@{$query->{-and}}) {
1086         $collapsed = $self->_collapse_query($subquery, $collapsed);
1087       }
1088     }
1089     else {
1090       foreach my $col (keys %$query) {
1091         my $value = $query->{$col};
1092         $collapsed->{$col}{$value}++;
1093       }
1094     }
1095   }
1096
1097   return $collapsed;
1098 }
1099
1100 =head2 get_column
1101
1102 =over 4
1103
1104 =item Arguments: $cond?
1105
1106 =item Return Value: $resultsetcolumn
1107
1108 =back
1109
1110   my $max_length = $rs->get_column('length')->max;
1111
1112 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1113
1114 =cut
1115
1116 sub get_column {
1117   my ($self, $column) = @_;
1118   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1119   return $new;
1120 }
1121
1122 =head2 search_like
1123
1124 =over 4
1125
1126 =item Arguments: $cond, \%attrs?
1127
1128 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1129
1130 =back
1131
1132   # WHERE title LIKE '%blue%'
1133   $cd_rs = $rs->search_like({ title => '%blue%'});
1134
1135 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1136 that this is simply a convenience method retained for ex Class::DBI users.
1137 You most likely want to use L</search> with specific operators.
1138
1139 For more information, see L<DBIx::Class::Manual::Cookbook>.
1140
1141 This method is deprecated and will be removed in 0.09. Use L</search()>
1142 instead. An example conversion is:
1143
1144   ->search_like({ foo => 'bar' });
1145
1146   # Becomes
1147
1148   ->search({ foo => { like => 'bar' } });
1149
1150 =cut
1151
1152 sub search_like {
1153   my $class = shift;
1154   carp_unique (
1155     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1156    .' Instead use ->search({ x => { -like => "y%" } })'
1157    .' (note the outer pair of {}s - they are important!)'
1158   );
1159   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1160   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1161   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1162   return $class->search($query, { %$attrs });
1163 }
1164
1165 =head2 slice
1166
1167 =over 4
1168
1169 =item Arguments: $first, $last
1170
1171 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1172
1173 =back
1174
1175 Returns a resultset or object list representing a subset of elements from the
1176 resultset slice is called on. Indexes are from 0, i.e., to get the first
1177 three records, call:
1178
1179   my ($one, $two, $three) = $rs->slice(0, 2);
1180
1181 =cut
1182
1183 sub slice {
1184   my ($self, $min, $max) = @_;
1185   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1186   $attrs->{offset} = $self->{attrs}{offset} || 0;
1187   $attrs->{offset} += $min;
1188   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1189   return $self->search(undef, $attrs);
1190   #my $slice = (ref $self)->new($self->result_source, $attrs);
1191   #return (wantarray ? $slice->all : $slice);
1192 }
1193
1194 =head2 next
1195
1196 =over 4
1197
1198 =item Arguments: none
1199
1200 =item Return Value: $result | undef
1201
1202 =back
1203
1204 Returns the next element in the resultset (C<undef> is there is none).
1205
1206 Can be used to efficiently iterate over records in the resultset:
1207
1208   my $rs = $schema->resultset('CD')->search;
1209   while (my $cd = $rs->next) {
1210     print $cd->title;
1211   }
1212
1213 Note that you need to store the resultset object, and call C<next> on it.
1214 Calling C<< resultset('Table')->next >> repeatedly will always return the
1215 first record from the resultset.
1216
1217 =cut
1218
1219 sub next {
1220   my ($self) = @_;
1221   if (my $cache = $self->get_cache) {
1222     $self->{all_cache_position} ||= 0;
1223     return $cache->[$self->{all_cache_position}++];
1224   }
1225   if ($self->{attrs}{cache}) {
1226     delete $self->{pager};
1227     $self->{all_cache_position} = 1;
1228     return ($self->all)[0];
1229   }
1230   if ($self->{stashed_objects}) {
1231     my $obj = shift(@{$self->{stashed_objects}});
1232     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
1233     return $obj;
1234   }
1235   my @row = (
1236     exists $self->{stashed_row}
1237       ? @{delete $self->{stashed_row}}
1238       : $self->cursor->next
1239   );
1240   return undef unless (@row);
1241   my ($row, @more) = $self->_construct_object(@row);
1242   $self->{stashed_objects} = \@more if @more;
1243   return $row;
1244 }
1245
1246 sub _construct_object {
1247   my ($self, @row) = @_;
1248
1249   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
1250     or return ();
1251   my @new = $self->result_class->inflate_result($self->result_source, @$info);
1252   @new = $self->{_attrs}{record_filter}->(@new)
1253     if exists $self->{_attrs}{record_filter};
1254   return @new;
1255 }
1256
1257 sub _collapse_result {
1258   my ($self, $as_proto, $row) = @_;
1259
1260   my @copy = @$row;
1261
1262   # 'foo'         => [ undef, 'foo' ]
1263   # 'foo.bar'     => [ 'foo', 'bar' ]
1264   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
1265
1266   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
1267
1268   my %collapse = %{$self->{_attrs}{collapse}||{}};
1269
1270   my @pri_index;
1271
1272   # if we're doing collapsing (has_many prefetch) we need to grab records
1273   # until the PK changes, so fill @pri_index. if not, we leave it empty so
1274   # we know we don't have to bother.
1275
1276   # the reason for not using the collapse stuff directly is because if you
1277   # had for e.g. two artists in a row with no cds, the collapse info for
1278   # both would be NULL (undef) so you'd lose the second artist
1279
1280   # store just the index so we can check the array positions from the row
1281   # without having to contruct the full hash
1282
1283   if (keys %collapse) {
1284     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1285     foreach my $i (0 .. $#construct_as) {
1286       next if defined($construct_as[$i][0]); # only self table
1287       if (delete $pri{$construct_as[$i][1]}) {
1288         push(@pri_index, $i);
1289       }
1290       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1291     }
1292   }
1293
1294   # no need to do an if, it'll be empty if @pri_index is empty anyway
1295
1296   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1297
1298   my @const_rows;
1299
1300   do { # no need to check anything at the front, we always want the first row
1301
1302     my %const;
1303
1304     foreach my $this_as (@construct_as) {
1305       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1306     }
1307
1308     push(@const_rows, \%const);
1309
1310   } until ( # no pri_index => no collapse => drop straight out
1311       !@pri_index
1312     or
1313       do { # get another row, stash it, drop out if different PK
1314
1315         @copy = $self->cursor->next;
1316         $self->{stashed_row} = \@copy;
1317
1318         # last thing in do block, counts as true if anything doesn't match
1319
1320         # check xor defined first for NULL vs. NOT NULL then if one is
1321         # defined the other must be so check string equality
1322
1323         grep {
1324           (defined $pri_vals{$_} ^ defined $copy[$_])
1325           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1326         } @pri_index;
1327       }
1328   );
1329
1330   my $alias = $self->{attrs}{alias};
1331   my $info = [];
1332
1333   my %collapse_pos;
1334
1335   my @const_keys;
1336
1337   foreach my $const (@const_rows) {
1338     scalar @const_keys or do {
1339       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1340     };
1341     foreach my $key (@const_keys) {
1342       if (length $key) {
1343         my $target = $info;
1344         my @parts = split(/\./, $key);
1345         my $cur = '';
1346         my $data = $const->{$key};
1347         foreach my $p (@parts) {
1348           $target = $target->[1]->{$p} ||= [];
1349           $cur .= ".${p}";
1350           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1351             # collapsing at this point and on final part
1352             my $pos = $collapse_pos{$cur};
1353             CK: foreach my $ck (@ckey) {
1354               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1355                 $collapse_pos{$cur} = $data;
1356                 delete @collapse_pos{ # clear all positioning for sub-entries
1357                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1358                 };
1359                 push(@$target, []);
1360                 last CK;
1361               }
1362             }
1363           }
1364           if (exists $collapse{$cur}) {
1365             $target = $target->[-1];
1366           }
1367         }
1368         $target->[0] = $data;
1369       } else {
1370         $info->[0] = $const->{$key};
1371       }
1372     }
1373   }
1374
1375   return $info;
1376 }
1377
1378 =head2 result_source
1379
1380 =over 4
1381
1382 =item Arguments: $result_source?
1383
1384 =item Return Value: $result_source
1385
1386 =back
1387
1388 An accessor for the primary ResultSource object from which this ResultSet
1389 is derived.
1390
1391 =head2 result_class
1392
1393 =over 4
1394
1395 =item Arguments: $result_class?
1396
1397 =item Return Value: $result_class
1398
1399 =back
1400
1401 An accessor for the class to use when creating row objects. Defaults to
1402 C<< result_source->result_class >> - which in most cases is the name of the
1403 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1404
1405 Note that changing the result_class will also remove any components
1406 that were originally loaded in the source class via
1407 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1408 in the original source class will not run.
1409
1410 =cut
1411
1412 sub result_class {
1413   my ($self, $result_class) = @_;
1414   if ($result_class) {
1415     unless (ref $result_class) { # don't fire this for an object
1416       $self->ensure_class_loaded($result_class);
1417     }
1418     $self->_result_class($result_class);
1419     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1420     # permit the user to set result class on one result set only; it only
1421     # chains if provided to search()
1422     #$self->{attrs}{result_class} = $result_class if ref $self;
1423   }
1424   $self->_result_class;
1425 }
1426
1427 =head2 count
1428
1429 =over 4
1430
1431 =item Arguments: $cond, \%attrs??
1432
1433 =item Return Value: $count
1434
1435 =back
1436
1437 Performs an SQL C<COUNT> with the same query as the resultset was built
1438 with to find the number of elements. Passing arguments is equivalent to
1439 C<< $rs->search ($cond, \%attrs)->count >>
1440
1441 =cut
1442
1443 sub count {
1444   my $self = shift;
1445   return $self->search(@_)->count if @_ and defined $_[0];
1446   return scalar @{ $self->get_cache } if $self->get_cache;
1447
1448   my $attrs = $self->_resolved_attrs_copy;
1449
1450   # this is a little optimization - it is faster to do the limit
1451   # adjustments in software, instead of a subquery
1452   my $rows = delete $attrs->{rows};
1453   my $offset = delete $attrs->{offset};
1454
1455   my $crs;
1456   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1457     $crs = $self->_count_subq_rs ($attrs);
1458   }
1459   else {
1460     $crs = $self->_count_rs ($attrs);
1461   }
1462   my $count = $crs->next;
1463
1464   $count -= $offset if $offset;
1465   $count = $rows if $rows and $rows < $count;
1466   $count = 0 if ($count < 0);
1467
1468   return $count;
1469 }
1470
1471 =head2 count_rs
1472
1473 =over 4
1474
1475 =item Arguments: $cond, \%attrs??
1476
1477 =item Return Value: $count_rs
1478
1479 =back
1480
1481 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1482 This can be very handy for subqueries:
1483
1484   ->search( { amount => $some_rs->count_rs->as_query } )
1485
1486 As with regular resultsets the SQL query will be executed only after
1487 the resultset is accessed via L</next> or L</all>. That would return
1488 the same single value obtainable via L</count>.
1489
1490 =cut
1491
1492 sub count_rs {
1493   my $self = shift;
1494   return $self->search(@_)->count_rs if @_;
1495
1496   # this may look like a lack of abstraction (count() does about the same)
1497   # but in fact an _rs *must* use a subquery for the limits, as the
1498   # software based limiting can not be ported if this $rs is to be used
1499   # in a subquery itself (i.e. ->as_query)
1500   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1501     return $self->_count_subq_rs;
1502   }
1503   else {
1504     return $self->_count_rs;
1505   }
1506 }
1507
1508 #
1509 # returns a ResultSetColumn object tied to the count query
1510 #
1511 sub _count_rs {
1512   my ($self, $attrs) = @_;
1513
1514   my $rsrc = $self->result_source;
1515   $attrs ||= $self->_resolved_attrs;
1516
1517   my $tmp_attrs = { %$attrs };
1518   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1519   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1520
1521   # overwrite the selector (supplied by the storage)
1522   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1523   $tmp_attrs->{as} = 'count';
1524   delete @{$tmp_attrs}{qw/columns/};
1525
1526   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1527
1528   return $tmp_rs;
1529 }
1530
1531 #
1532 # same as above but uses a subquery
1533 #
1534 sub _count_subq_rs {
1535   my ($self, $attrs) = @_;
1536
1537   my $rsrc = $self->result_source;
1538   $attrs ||= $self->_resolved_attrs;
1539
1540   my $sub_attrs = { %$attrs };
1541   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1542   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1543
1544   # if we multi-prefetch we group_by something unique, as this is what we would
1545   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1546   if ( keys %{$attrs->{collapse}}  ) {
1547     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } @{
1548       $rsrc->_identifying_column_set || $self->throw_exception(
1549         'Unable to construct a unique group_by criteria properly collapsing the '
1550       . 'has_many prefetch before count()'
1551       );
1552     } ]
1553   }
1554
1555   # Calculate subquery selector
1556   if (my $g = $sub_attrs->{group_by}) {
1557
1558     my $sql_maker = $rsrc->storage->sql_maker;
1559
1560     # necessary as the group_by may refer to aliased functions
1561     my $sel_index;
1562     for my $sel (@{$attrs->{select}}) {
1563       $sel_index->{$sel->{-as}} = $sel
1564         if (ref $sel eq 'HASH' and $sel->{-as});
1565     }
1566
1567     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1568     # also look for named aggregates referred in the having clause
1569     # having often contains scalarrefs - thus parse it out entirely
1570     my @parts = @$g;
1571     if ($attrs->{having}) {
1572       local $sql_maker->{having_bind};
1573       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1574       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1575       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1576         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1577         # if we don't unset it we screw up retarded but unfortunately working
1578         # 'MAX(foo.bar)' => { '>', 3 }
1579         $sql_maker->{name_sep} = '';
1580       }
1581
1582       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1583
1584       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1585
1586       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1587       # and if all fails for an utterly naive quoted scalar-with-function
1588       while ($sql =~ /
1589         $rquote $sep $lquote (.+?) $rquote
1590           |
1591         [\s,] \w+ \. (\w+) [\s,]
1592           |
1593         [\s,] $lquote (.+?) $rquote [\s,]
1594       /gx) {
1595         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1596       }
1597     }
1598
1599     for (@parts) {
1600       my $colpiece = $sel_index->{$_} || $_;
1601
1602       # unqualify join-based group_by's. Arcane but possible query
1603       # also horrible horrible hack to alias a column (not a func.)
1604       # (probably need to introduce SQLA syntax)
1605       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1606         my $as = $colpiece;
1607         $as =~ s/\./__/;
1608         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1609       }
1610       push @{$sub_attrs->{select}}, $colpiece;
1611     }
1612   }
1613   else {
1614     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1615     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1616   }
1617
1618   return $rsrc->resultset_class
1619                ->new ($rsrc, $sub_attrs)
1620                 ->as_subselect_rs
1621                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1622                   ->get_column ('count');
1623 }
1624
1625 sub _bool {
1626   return 1;
1627 }
1628
1629 =head2 count_literal
1630
1631 =over 4
1632
1633 =item Arguments: $sql_fragment, @bind_values
1634
1635 =item Return Value: $count
1636
1637 =back
1638
1639 Counts the results in a literal query. Equivalent to calling L</search_literal>
1640 with the passed arguments, then L</count>.
1641
1642 =cut
1643
1644 sub count_literal { shift->search_literal(@_)->count; }
1645
1646 =head2 all
1647
1648 =over 4
1649
1650 =item Arguments: none
1651
1652 =item Return Value: @objects
1653
1654 =back
1655
1656 Returns all elements in the resultset.
1657
1658 =cut
1659
1660 sub all {
1661   my $self = shift;
1662   if(@_) {
1663       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1664   }
1665
1666   return @{ $self->get_cache } if $self->get_cache;
1667
1668   my @obj;
1669
1670   if (keys %{$self->_resolved_attrs->{collapse}}) {
1671     # Using $self->cursor->all is really just an optimisation.
1672     # If we're collapsing has_many prefetches it probably makes
1673     # very little difference, and this is cleaner than hacking
1674     # _construct_object to survive the approach
1675     $self->cursor->reset;
1676     my @row = $self->cursor->next;
1677     while (@row) {
1678       push(@obj, $self->_construct_object(@row));
1679       @row = (exists $self->{stashed_row}
1680                ? @{delete $self->{stashed_row}}
1681                : $self->cursor->next);
1682     }
1683   } else {
1684     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1685   }
1686
1687   $self->set_cache(\@obj) if $self->{attrs}{cache};
1688
1689   return @obj;
1690 }
1691
1692 =head2 reset
1693
1694 =over 4
1695
1696 =item Arguments: none
1697
1698 =item Return Value: $self
1699
1700 =back
1701
1702 Resets the resultset's cursor, so you can iterate through the elements again.
1703 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1704 another query.
1705
1706 =cut
1707
1708 sub reset {
1709   my ($self) = @_;
1710   delete $self->{_attrs} if exists $self->{_attrs};
1711   $self->{all_cache_position} = 0;
1712   $self->cursor->reset;
1713   return $self;
1714 }
1715
1716 =head2 first
1717
1718 =over 4
1719
1720 =item Arguments: none
1721
1722 =item Return Value: $object | undef
1723
1724 =back
1725
1726 Resets the resultset and returns an object for the first result (or C<undef>
1727 if the resultset is empty).
1728
1729 =cut
1730
1731 sub first {
1732   return $_[0]->reset->next;
1733 }
1734
1735
1736 # _rs_update_delete
1737 #
1738 # Determines whether and what type of subquery is required for the $rs operation.
1739 # If grouping is necessary either supplies its own, or verifies the current one
1740 # After all is done delegates to the proper storage method.
1741
1742 sub _rs_update_delete {
1743   my ($self, $op, $values) = @_;
1744
1745   my $cond = $self->{cond};
1746   my $rsrc = $self->result_source;
1747   my $storage = $rsrc->schema->storage;
1748
1749   my $attrs = { %{$self->_resolved_attrs} };
1750
1751   # "needs" is a strong word here - if the subquery is part of an IN clause - no point of
1752   # even adding the group_by. It will really be used only when composing a poor-man's
1753   # multicolumn-IN equivalent OR set
1754   my $needs_group_by_subq = defined $attrs->{group_by};
1755
1756   # simplify the joinmap and maybe decide if a grouping (and thus subquery) is necessary
1757   my $relation_classifications;
1758   if (ref($attrs->{from}) eq 'ARRAY') {
1759     if (@{$attrs->{from}} == 1) {
1760       # not a fucking JOIN at all, quit with the dickery
1761       $relation_classifications = {};
1762     } else {
1763       $attrs->{from} = $storage->_prune_unused_joins ($attrs->{from}, $attrs->{select}, $cond, $attrs);
1764
1765       $relation_classifications = $storage->_resolve_aliastypes_from_select_args (
1766         [ @{$attrs->{from}}[1 .. $#{$attrs->{from}}] ],
1767         $attrs->{select},
1768         $cond,
1769         $attrs
1770       ) unless $needs_group_by_subq;  # we already know we need a group, no point of resolving them
1771     }
1772   }
1773   else {
1774     $needs_group_by_subq ||= 1; # if {from} is unparseable assume the worst
1775   }
1776
1777   $needs_group_by_subq ||= exists $relation_classifications->{multiplying};
1778
1779   # if no subquery - life is easy-ish
1780   unless (
1781     $needs_group_by_subq
1782       or
1783     keys %$relation_classifications # if any joins at all - need to wrap a subq
1784       or
1785     $self->_has_resolved_attr(qw/rows offset/) # limits call for a subq
1786   ) {
1787     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1788     # a condition containing 'me' or other table prefixes will not work
1789     # at all. Tell SQLMaker to dequalify idents via a gross hack.
1790     my $sqla = $rsrc->storage->sql_maker;
1791     local $sqla->{_dequalify_idents} = 1;
1792     return $rsrc->storage->$op(
1793       $rsrc,
1794       $op eq 'update' ? $values : (),
1795       $self->{cond},
1796     );
1797   }
1798
1799   # we got this far - means it is time to wrap a subquery
1800   my $idcols = $rsrc->_identifying_column_set || $self->throw_exception(
1801     sprintf(
1802       "Unable to perform complex resultset %s() without an identifying set of columns on source '%s'",
1803       $op,
1804       $rsrc->source_name,
1805     )
1806   );
1807   my $existing_group_by = delete $attrs->{group_by};
1808
1809   # make a new $rs selecting only the PKs (that's all we really need for the subq)
1810   delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_selector_range as/;
1811   $attrs->{columns} = [ map { "$attrs->{alias}.$_" } @$idcols ];
1812   $attrs->{group_by} = \ '';  # FIXME - this is an evil hack, it causes the optimiser to kick in and throw away the LEFT joins
1813   my $subrs = (ref $self)->new($rsrc, $attrs);
1814
1815   if (@$idcols == 1) {
1816     return $storage->$op (
1817       $rsrc,
1818       $op eq 'update' ? $values : (),
1819       { $idcols->[0] => { -in => $subrs->as_query } },
1820     );
1821   }
1822   elsif ($storage->_use_multicolumn_in) {
1823     # This is hideously ugly, but SQLA does not understand multicol IN expressions
1824     my $sql_maker = $storage->sql_maker;
1825     my ($sql, @bind) = @${$subrs->as_query};
1826     $sql = sprintf ('(%s) IN %s', # the as_query already comes with a set of parenthesis
1827       join (', ', map { $sql_maker->_quote ($_) } @$idcols),
1828       $sql,
1829     );
1830
1831     return $storage->$op (
1832       $rsrc,
1833       $op eq 'update' ? $values : (),
1834       \[$sql, @bind],
1835     );
1836   }
1837   else {
1838     # if all else fails - get all primary keys and operate over a ORed set
1839     # wrap in a transaction for consistency
1840     # this is where the group_by starts to matter
1841     my $subq_group_by;
1842     if ($needs_group_by_subq) {
1843       $subq_group_by = $attrs->{columns};
1844
1845       # make sure if there is a supplied group_by it matches the columns compiled above
1846       # perfectly. Anything else can not be sanely executed on most databases so croak
1847       # right then and there
1848       if ($existing_group_by) {
1849         my @current_group_by = map
1850           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1851           @$existing_group_by
1852         ;
1853
1854         if (
1855           join ("\x00", sort @current_group_by)
1856             ne
1857           join ("\x00", sort @$subq_group_by )
1858         ) {
1859           $self->throw_exception (
1860             "You have just attempted a $op operation on a resultset which does group_by"
1861             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1862             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1863             . ' kind of queries. Please retry the operation with a modified group_by or'
1864             . ' without using one at all.'
1865           );
1866         }
1867       }
1868     }
1869
1870     my $guard = $storage->txn_scope_guard;
1871
1872     my @op_condition;
1873     for my $row ($subrs->search({}, { group_by => $subq_group_by })->cursor->all) {
1874       push @op_condition, { map
1875         { $idcols->[$_] => $row->[$_] }
1876         (0 .. $#$idcols)
1877       };
1878     }
1879
1880     my $res = $storage->$op (
1881       $rsrc,
1882       $op eq 'update' ? $values : (),
1883       \@op_condition,
1884     );
1885
1886     $guard->commit;
1887
1888     return $res;
1889   }
1890 }
1891
1892 =head2 update
1893
1894 =over 4
1895
1896 =item Arguments: \%values
1897
1898 =item Return Value: $storage_rv
1899
1900 =back
1901
1902 Sets the specified columns in the resultset to the supplied values in a
1903 single query. Note that this will not run any accessor/set_column/update
1904 triggers, nor will it update any row object instances derived from this
1905 resultset (this includes the contents of the L<resultset cache|/set_cache>
1906 if any). See L</update_all> if you need to execute any on-update
1907 triggers or cascades defined either by you or a
1908 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1909
1910 The return value is a pass through of what the underlying
1911 storage backend returned, and may vary. See L<DBI/execute> for the most
1912 common case.
1913
1914 =head3 CAVEAT
1915
1916 Note that L</update> does not process/deflate any of the values passed in.
1917 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1918 ensure manually that any value passed to this method will stringify to
1919 something the RDBMS knows how to deal with. A notable example is the
1920 handling of L<DateTime> objects, for more info see:
1921 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
1922
1923 =cut
1924
1925 sub update {
1926   my ($self, $values) = @_;
1927   $self->throw_exception('Values for update must be a hash')
1928     unless ref $values eq 'HASH';
1929
1930   return $self->_rs_update_delete ('update', $values);
1931 }
1932
1933 =head2 update_all
1934
1935 =over 4
1936
1937 =item Arguments: \%values
1938
1939 =item Return Value: 1
1940
1941 =back
1942
1943 Fetches all objects and updates them one at a time via
1944 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1945 triggers, while L</update> will not.
1946
1947 =cut
1948
1949 sub update_all {
1950   my ($self, $values) = @_;
1951   $self->throw_exception('Values for update_all must be a hash')
1952     unless ref $values eq 'HASH';
1953
1954   my $guard = $self->result_source->schema->txn_scope_guard;
1955   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1956   $guard->commit;
1957   return 1;
1958 }
1959
1960 =head2 delete
1961
1962 =over 4
1963
1964 =item Arguments: none
1965
1966 =item Return Value: $storage_rv
1967
1968 =back
1969
1970 Deletes the rows matching this resultset in a single query. Note that this
1971 will not run any delete triggers, nor will it alter the
1972 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1973 derived from this resultset (this includes the contents of the
1974 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1975 execute any on-delete triggers or cascades defined either by you or a
1976 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1977
1978 The return value is a pass through of what the underlying storage backend
1979 returned, and may vary. See L<DBI/execute> for the most common case.
1980
1981 =cut
1982
1983 sub delete {
1984   my $self = shift;
1985   $self->throw_exception('delete does not accept any arguments')
1986     if @_;
1987
1988   return $self->_rs_update_delete ('delete');
1989 }
1990
1991 =head2 delete_all
1992
1993 =over 4
1994
1995 =item Arguments: none
1996
1997 =item Return Value: 1
1998
1999 =back
2000
2001 Fetches all objects and deletes them one at a time via
2002 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
2003 triggers, while L</delete> will not.
2004
2005 =cut
2006
2007 sub delete_all {
2008   my $self = shift;
2009   $self->throw_exception('delete_all does not accept any arguments')
2010     if @_;
2011
2012   my $guard = $self->result_source->schema->txn_scope_guard;
2013   $_->delete for $self->all;
2014   $guard->commit;
2015   return 1;
2016 }
2017
2018 =head2 populate
2019
2020 =over 4
2021
2022 =item Arguments: \@data;
2023
2024 =back
2025
2026 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
2027 For the arrayref of hashrefs style each hashref should be a structure suitable
2028 for submitting to a $resultset->create(...) method.
2029
2030 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
2031 to insert the data, as this is a faster method.
2032
2033 Otherwise, each set of data is inserted into the database using
2034 L<DBIx::Class::ResultSet/create>, and the resulting objects are
2035 accumulated into an array. The array itself, or an array reference
2036 is returned depending on scalar or list context.
2037
2038 Example:  Assuming an Artist Class that has many CDs Classes relating:
2039
2040   my $Artist_rs = $schema->resultset("Artist");
2041
2042   ## Void Context Example
2043   $Artist_rs->populate([
2044      { artistid => 4, name => 'Manufactured Crap', cds => [
2045         { title => 'My First CD', year => 2006 },
2046         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2047       ],
2048      },
2049      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
2050         { title => 'My parents sold me to a record company', year => 2005 },
2051         { title => 'Why Am I So Ugly?', year => 2006 },
2052         { title => 'I Got Surgery and am now Popular', year => 2007 }
2053       ],
2054      },
2055   ]);
2056
2057   ## Array Context Example
2058   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
2059     { name => "Artist One"},
2060     { name => "Artist Two"},
2061     { name => "Artist Three", cds=> [
2062     { title => "First CD", year => 2007},
2063     { title => "Second CD", year => 2008},
2064   ]}
2065   ]);
2066
2067   print $ArtistOne->name; ## response is 'Artist One'
2068   print $ArtistThree->cds->count ## reponse is '2'
2069
2070 For the arrayref of arrayrefs style,  the first element should be a list of the
2071 fieldsnames to which the remaining elements are rows being inserted.  For
2072 example:
2073
2074   $Arstist_rs->populate([
2075     [qw/artistid name/],
2076     [100, 'A Formally Unknown Singer'],
2077     [101, 'A singer that jumped the shark two albums ago'],
2078     [102, 'An actually cool singer'],
2079   ]);
2080
2081 Please note an important effect on your data when choosing between void and
2082 wantarray context. Since void context goes straight to C<insert_bulk> in
2083 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
2084 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
2085 create primary keys for you, you will find that your PKs are empty.  In this
2086 case you will have to use the wantarray context in order to create those
2087 values.
2088
2089 =cut
2090
2091 sub populate {
2092   my $self = shift;
2093
2094   # cruft placed in standalone method
2095   my $data = $self->_normalize_populate_args(@_);
2096
2097   return unless @$data;
2098
2099   if(defined wantarray) {
2100     my @created;
2101     foreach my $item (@$data) {
2102       push(@created, $self->create($item));
2103     }
2104     return wantarray ? @created : \@created;
2105   }
2106   else {
2107     my $first = $data->[0];
2108
2109     # if a column is a registered relationship, and is a non-blessed hash/array, consider
2110     # it relationship data
2111     my (@rels, @columns);
2112     my $rsrc = $self->result_source;
2113     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
2114     for (keys %$first) {
2115       my $ref = ref $first->{$_};
2116       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
2117         ? push @rels, $_
2118         : push @columns, $_
2119       ;
2120     }
2121
2122     my @pks = $rsrc->primary_columns;
2123
2124     ## do the belongs_to relationships
2125     foreach my $index (0..$#$data) {
2126
2127       # delegate to create() for any dataset without primary keys with specified relationships
2128       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2129         for my $r (@rels) {
2130           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2131             my @ret = $self->populate($data);
2132             return;
2133           }
2134         }
2135       }
2136
2137       foreach my $rel (@rels) {
2138         next unless ref $data->[$index]->{$rel} eq "HASH";
2139         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2140         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2141         my $related = $result->result_source->_resolve_condition(
2142           $reverse_relinfo->{cond},
2143           $self,
2144           $result,
2145           $rel,
2146         );
2147
2148         delete $data->[$index]->{$rel};
2149         $data->[$index] = {%{$data->[$index]}, %$related};
2150
2151         push @columns, keys %$related if $index == 0;
2152       }
2153     }
2154
2155     ## inherit the data locked in the conditions of the resultset
2156     my ($rs_data) = $self->_merge_with_rscond({});
2157     delete @{$rs_data}{@columns};
2158     my @inherit_cols = keys %$rs_data;
2159     my @inherit_data = values %$rs_data;
2160
2161     ## do bulk insert on current row
2162     $rsrc->storage->insert_bulk(
2163       $rsrc,
2164       [@columns, @inherit_cols],
2165       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2166     );
2167
2168     ## do the has_many relationships
2169     foreach my $item (@$data) {
2170
2171       my $main_row;
2172
2173       foreach my $rel (@rels) {
2174         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2175
2176         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2177
2178         my $child = $main_row->$rel;
2179
2180         my $related = $child->result_source->_resolve_condition(
2181           $rels->{$rel}{cond},
2182           $child,
2183           $main_row,
2184           $rel,
2185         );
2186
2187         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2188         my @populate = map { {%$_, %$related} } @rows_to_add;
2189
2190         $child->populate( \@populate );
2191       }
2192     }
2193   }
2194 }
2195
2196
2197 # populate() argumnets went over several incarnations
2198 # What we ultimately support is AoH
2199 sub _normalize_populate_args {
2200   my ($self, $arg) = @_;
2201
2202   if (ref $arg eq 'ARRAY') {
2203     if (!@$arg) {
2204       return [];
2205     }
2206     elsif (ref $arg->[0] eq 'HASH') {
2207       return $arg;
2208     }
2209     elsif (ref $arg->[0] eq 'ARRAY') {
2210       my @ret;
2211       my @colnames = @{$arg->[0]};
2212       foreach my $values (@{$arg}[1 .. $#$arg]) {
2213         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2214       }
2215       return \@ret;
2216     }
2217   }
2218
2219   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2220 }
2221
2222 =head2 pager
2223
2224 =over 4
2225
2226 =item Arguments: none
2227
2228 =item Return Value: $pager
2229
2230 =back
2231
2232 Return Value a L<Data::Page> object for the current resultset. Only makes
2233 sense for queries with a C<page> attribute.
2234
2235 To get the full count of entries for a paged resultset, call
2236 C<total_entries> on the L<Data::Page> object.
2237
2238 =cut
2239
2240 sub pager {
2241   my ($self) = @_;
2242
2243   return $self->{pager} if $self->{pager};
2244
2245   my $attrs = $self->{attrs};
2246   if (!defined $attrs->{page}) {
2247     $self->throw_exception("Can't create pager for non-paged rs");
2248   }
2249   elsif ($attrs->{page} <= 0) {
2250     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2251   }
2252   $attrs->{rows} ||= 10;
2253
2254   # throw away the paging flags and re-run the count (possibly
2255   # with a subselect) to get the real total count
2256   my $count_attrs = { %$attrs };
2257   delete $count_attrs->{$_} for qw/rows offset page pager/;
2258
2259   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2260
2261   require DBIx::Class::ResultSet::Pager;
2262   return $self->{pager} = DBIx::Class::ResultSet::Pager->new(
2263     sub { $total_rs->count },  #lazy-get the total
2264     $attrs->{rows},
2265     $self->{attrs}{page},
2266   );
2267 }
2268
2269 =head2 page
2270
2271 =over 4
2272
2273 =item Arguments: $page_number
2274
2275 =item Return Value: $rs
2276
2277 =back
2278
2279 Returns a resultset for the $page_number page of the resultset on which page
2280 is called, where each page contains a number of rows equal to the 'rows'
2281 attribute set on the resultset (10 by default).
2282
2283 =cut
2284
2285 sub page {
2286   my ($self, $page) = @_;
2287   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2288 }
2289
2290 =head2 new_result
2291
2292 =over 4
2293
2294 =item Arguments: \%vals
2295
2296 =item Return Value: $rowobject
2297
2298 =back
2299
2300 Creates a new row object in the resultset's result class and returns
2301 it. The row is not inserted into the database at this point, call
2302 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2303 will tell you whether the row object has been inserted or not.
2304
2305 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2306
2307 =cut
2308
2309 sub new_result {
2310   my ($self, $values) = @_;
2311   $self->throw_exception( "new_result needs a hash" )
2312     unless (ref $values eq 'HASH');
2313
2314   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2315
2316   my %new = (
2317     %$merged_cond,
2318     @$cols_from_relations
2319       ? (-cols_from_relations => $cols_from_relations)
2320       : (),
2321     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2322   );
2323
2324   return $self->result_class->new(\%new);
2325 }
2326
2327 # _merge_with_rscond
2328 #
2329 # Takes a simple hash of K/V data and returns its copy merged with the
2330 # condition already present on the resultset. Additionally returns an
2331 # arrayref of value/condition names, which were inferred from related
2332 # objects (this is needed for in-memory related objects)
2333 sub _merge_with_rscond {
2334   my ($self, $data) = @_;
2335
2336   my (%new_data, @cols_from_relations);
2337
2338   my $alias = $self->{attrs}{alias};
2339
2340   if (! defined $self->{cond}) {
2341     # just massage $data below
2342   }
2343   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2344     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2345     @cols_from_relations = keys %new_data;
2346   }
2347   elsif (ref $self->{cond} ne 'HASH') {
2348     $self->throw_exception(
2349       "Can't abstract implicit construct, resultset condition not a hash"
2350     );
2351   }
2352   else {
2353     # precendence must be given to passed values over values inherited from
2354     # the cond, so the order here is important.
2355     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2356     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2357
2358     while ( my($col, $value) = each %implied ) {
2359       my $vref = ref $value;
2360       if (
2361         $vref eq 'HASH'
2362           and
2363         keys(%$value) == 1
2364           and
2365         (keys %$value)[0] eq '='
2366       ) {
2367         $new_data{$col} = $value->{'='};
2368       }
2369       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2370         $new_data{$col} = $value;
2371       }
2372     }
2373   }
2374
2375   %new_data = (
2376     %new_data,
2377     %{ $self->_remove_alias($data, $alias) },
2378   );
2379
2380   return (\%new_data, \@cols_from_relations);
2381 }
2382
2383 # _has_resolved_attr
2384 #
2385 # determines if the resultset defines at least one
2386 # of the attributes supplied
2387 #
2388 # used to determine if a subquery is neccessary
2389 #
2390 # supports some virtual attributes:
2391 #   -join
2392 #     This will scan for any joins being present on the resultset.
2393 #     It is not a mere key-search but a deep inspection of {from}
2394 #
2395
2396 sub _has_resolved_attr {
2397   my ($self, @attr_names) = @_;
2398
2399   my $attrs = $self->_resolved_attrs;
2400
2401   my %extra_checks;
2402
2403   for my $n (@attr_names) {
2404     if (grep { $n eq $_ } (qw/-join/) ) {
2405       $extra_checks{$n}++;
2406       next;
2407     }
2408
2409     my $attr =  $attrs->{$n};
2410
2411     next if not defined $attr;
2412
2413     if (ref $attr eq 'HASH') {
2414       return 1 if keys %$attr;
2415     }
2416     elsif (ref $attr eq 'ARRAY') {
2417       return 1 if @$attr;
2418     }
2419     else {
2420       return 1 if $attr;
2421     }
2422   }
2423
2424   # a resolved join is expressed as a multi-level from
2425   return 1 if (
2426     $extra_checks{-join}
2427       and
2428     ref $attrs->{from} eq 'ARRAY'
2429       and
2430     @{$attrs->{from}} > 1
2431   );
2432
2433   return 0;
2434 }
2435
2436 # _collapse_cond
2437 #
2438 # Recursively collapse the condition.
2439
2440 sub _collapse_cond {
2441   my ($self, $cond, $collapsed) = @_;
2442
2443   $collapsed ||= {};
2444
2445   if (ref $cond eq 'ARRAY') {
2446     foreach my $subcond (@$cond) {
2447       next unless ref $subcond;  # -or
2448       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2449     }
2450   }
2451   elsif (ref $cond eq 'HASH') {
2452     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2453       foreach my $subcond (@{$cond->{-and}}) {
2454         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2455       }
2456     }
2457     else {
2458       foreach my $col (keys %$cond) {
2459         my $value = $cond->{$col};
2460         $collapsed->{$col} = $value;
2461       }
2462     }
2463   }
2464
2465   return $collapsed;
2466 }
2467
2468 # _remove_alias
2469 #
2470 # Remove the specified alias from the specified query hash. A copy is made so
2471 # the original query is not modified.
2472
2473 sub _remove_alias {
2474   my ($self, $query, $alias) = @_;
2475
2476   my %orig = %{ $query || {} };
2477   my %unaliased;
2478
2479   foreach my $key (keys %orig) {
2480     if ($key !~ /\./) {
2481       $unaliased{$key} = $orig{$key};
2482       next;
2483     }
2484     $unaliased{$1} = $orig{$key}
2485       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2486   }
2487
2488   return \%unaliased;
2489 }
2490
2491 =head2 as_query
2492
2493 =over 4
2494
2495 =item Arguments: none
2496
2497 =item Return Value: \[ $sql, @bind ]
2498
2499 =back
2500
2501 Returns the SQL query and bind vars associated with the invocant.
2502
2503 This is generally used as the RHS for a subquery.
2504
2505 =cut
2506
2507 sub as_query {
2508   my $self = shift;
2509
2510   my $attrs = $self->_resolved_attrs_copy;
2511
2512   # For future use:
2513   #
2514   # in list ctx:
2515   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2516   # $sql also has no wrapping parenthesis in list ctx
2517   #
2518   my $sqlbind = $self->result_source->storage
2519     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2520
2521   return $sqlbind;
2522 }
2523
2524 =head2 find_or_new
2525
2526 =over 4
2527
2528 =item Arguments: \%vals, \%attrs?
2529
2530 =item Return Value: $rowobject
2531
2532 =back
2533
2534   my $artist = $schema->resultset('Artist')->find_or_new(
2535     { artist => 'fred' }, { key => 'artists' });
2536
2537   $cd->cd_to_producer->find_or_new({ producer => $producer },
2538                                    { key => 'primary });
2539
2540 Find an existing record from this resultset using L</find>. if none exists,
2541 instantiate a new result object and return it. The object will not be saved
2542 into your storage until you call L<DBIx::Class::Row/insert> on it.
2543
2544 You most likely want this method when looking for existing rows using a unique
2545 constraint that is not the primary key, or looking for related rows.
2546
2547 If you want objects to be saved immediately, use L</find_or_create> instead.
2548
2549 B<Note>: Make sure to read the documentation of L</find> and understand the
2550 significance of the C<key> attribute, as its lack may skew your search, and
2551 subsequently result in spurious new objects.
2552
2553 B<Note>: Take care when using C<find_or_new> with a table having
2554 columns with default values that you intend to be automatically
2555 supplied by the database (e.g. an auto_increment primary key column).
2556 In normal usage, the value of such columns should NOT be included at
2557 all in the call to C<find_or_new>, even when set to C<undef>.
2558
2559 =cut
2560
2561 sub find_or_new {
2562   my $self     = shift;
2563   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2564   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2565   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2566     return $row;
2567   }
2568   return $self->new_result($hash);
2569 }
2570
2571 =head2 create
2572
2573 =over 4
2574
2575 =item Arguments: \%vals
2576
2577 =item Return Value: a L<DBIx::Class::Row> $object
2578
2579 =back
2580
2581 Attempt to create a single new row or a row with multiple related rows
2582 in the table represented by the resultset (and related tables). This
2583 will not check for duplicate rows before inserting, use
2584 L</find_or_create> to do that.
2585
2586 To create one row for this resultset, pass a hashref of key/value
2587 pairs representing the columns of the table and the values you wish to
2588 store. If the appropriate relationships are set up, foreign key fields
2589 can also be passed an object representing the foreign row, and the
2590 value will be set to its primary key.
2591
2592 To create related objects, pass a hashref of related-object column values
2593 B<keyed on the relationship name>. If the relationship is of type C<multi>
2594 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2595 The process will correctly identify columns holding foreign keys, and will
2596 transparently populate them from the keys of the corresponding relation.
2597 This can be applied recursively, and will work correctly for a structure
2598 with an arbitrary depth and width, as long as the relationships actually
2599 exists and the correct column data has been supplied.
2600
2601
2602 Instead of hashrefs of plain related data (key/value pairs), you may
2603 also pass new or inserted objects. New objects (not inserted yet, see
2604 L</new>), will be inserted into their appropriate tables.
2605
2606 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2607
2608 Example of creating a new row.
2609
2610   $person_rs->create({
2611     name=>"Some Person",
2612     email=>"somebody@someplace.com"
2613   });
2614
2615 Example of creating a new row and also creating rows in a related C<has_many>
2616 or C<has_one> resultset.  Note Arrayref.
2617
2618   $artist_rs->create(
2619      { artistid => 4, name => 'Manufactured Crap', cds => [
2620         { title => 'My First CD', year => 2006 },
2621         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2622       ],
2623      },
2624   );
2625
2626 Example of creating a new row and also creating a row in a related
2627 C<belongs_to> resultset. Note Hashref.
2628
2629   $cd_rs->create({
2630     title=>"Music for Silly Walks",
2631     year=>2000,
2632     artist => {
2633       name=>"Silly Musician",
2634     }
2635   });
2636
2637 =over
2638
2639 =item WARNING
2640
2641 When subclassing ResultSet never attempt to override this method. Since
2642 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2643 lot of the internals simply never call it, so your override will be
2644 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2645 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2646 L</create> process you need to intervene.
2647
2648 =back
2649
2650 =cut
2651
2652 sub create {
2653   my ($self, $attrs) = @_;
2654   $self->throw_exception( "create needs a hashref" )
2655     unless ref $attrs eq 'HASH';
2656   return $self->new_result($attrs)->insert;
2657 }
2658
2659 =head2 find_or_create
2660
2661 =over 4
2662
2663 =item Arguments: \%vals, \%attrs?
2664
2665 =item Return Value: $rowobject
2666
2667 =back
2668
2669   $cd->cd_to_producer->find_or_create({ producer => $producer },
2670                                       { key => 'primary' });
2671
2672 Tries to find a record based on its primary key or unique constraints; if none
2673 is found, creates one and returns that instead.
2674
2675   my $cd = $schema->resultset('CD')->find_or_create({
2676     cdid   => 5,
2677     artist => 'Massive Attack',
2678     title  => 'Mezzanine',
2679     year   => 2005,
2680   });
2681
2682 Also takes an optional C<key> attribute, to search by a specific key or unique
2683 constraint. For example:
2684
2685   my $cd = $schema->resultset('CD')->find_or_create(
2686     {
2687       artist => 'Massive Attack',
2688       title  => 'Mezzanine',
2689     },
2690     { key => 'cd_artist_title' }
2691   );
2692
2693 B<Note>: Make sure to read the documentation of L</find> and understand the
2694 significance of the C<key> attribute, as its lack may skew your search, and
2695 subsequently result in spurious row creation.
2696
2697 B<Note>: Because find_or_create() reads from the database and then
2698 possibly inserts based on the result, this method is subject to a race
2699 condition. Another process could create a record in the table after
2700 the find has completed and before the create has started. To avoid
2701 this problem, use find_or_create() inside a transaction.
2702
2703 B<Note>: Take care when using C<find_or_create> with a table having
2704 columns with default values that you intend to be automatically
2705 supplied by the database (e.g. an auto_increment primary key column).
2706 In normal usage, the value of such columns should NOT be included at
2707 all in the call to C<find_or_create>, even when set to C<undef>.
2708
2709 See also L</find> and L</update_or_create>. For information on how to declare
2710 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2711
2712 If you need to know if an existing row was found or a new one created use
2713 L</find_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2714 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2715 database!
2716
2717   my $cd = $schema->resultset('CD')->find_or_new({
2718     cdid   => 5,
2719     artist => 'Massive Attack',
2720     title  => 'Mezzanine',
2721     year   => 2005,
2722   });
2723
2724   if( $cd->in_storage ) {
2725       # do some stuff
2726       $cd->insert;
2727   }
2728
2729 =cut
2730
2731 sub find_or_create {
2732   my $self     = shift;
2733   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2734   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2735   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2736     return $row;
2737   }
2738   return $self->create($hash);
2739 }
2740
2741 =head2 update_or_create
2742
2743 =over 4
2744
2745 =item Arguments: \%col_values, { key => $unique_constraint }?
2746
2747 =item Return Value: $row_object
2748
2749 =back
2750
2751   $resultset->update_or_create({ col => $val, ... });
2752
2753 Like L</find_or_create>, but if a row is found it is immediately updated via
2754 C<< $found_row->update (\%col_values) >>.
2755
2756
2757 Takes an optional C<key> attribute to search on a specific unique constraint.
2758 For example:
2759
2760   # In your application
2761   my $cd = $schema->resultset('CD')->update_or_create(
2762     {
2763       artist => 'Massive Attack',
2764       title  => 'Mezzanine',
2765       year   => 1998,
2766     },
2767     { key => 'cd_artist_title' }
2768   );
2769
2770   $cd->cd_to_producer->update_or_create({
2771     producer => $producer,
2772     name => 'harry',
2773   }, {
2774     key => 'primary',
2775   });
2776
2777 B<Note>: Make sure to read the documentation of L</find> and understand the
2778 significance of the C<key> attribute, as its lack may skew your search, and
2779 subsequently result in spurious row creation.
2780
2781 B<Note>: Take care when using C<update_or_create> with a table having
2782 columns with default values that you intend to be automatically
2783 supplied by the database (e.g. an auto_increment primary key column).
2784 In normal usage, the value of such columns should NOT be included at
2785 all in the call to C<update_or_create>, even when set to C<undef>.
2786
2787 See also L</find> and L</find_or_create>. For information on how to declare
2788 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2789
2790 If you need to know if an existing row was updated or a new one created use
2791 L</update_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2792 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2793 database!
2794
2795   my $cd = $schema->resultset('CD')->update_or_new(
2796     {
2797       artist => 'Massive Attack',
2798       title  => 'Mezzanine',
2799       year   => 1998,
2800     },
2801     { key => 'cd_artist_title' }
2802   );
2803
2804   if( $cd->in_storage ) {
2805       # do some stuff
2806       $cd->insert;
2807   }
2808
2809 =cut
2810
2811 sub update_or_create {
2812   my $self = shift;
2813   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2814   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2815
2816   my $row = $self->find($cond, $attrs);
2817   if (defined $row) {
2818     $row->update($cond);
2819     return $row;
2820   }
2821
2822   return $self->create($cond);
2823 }
2824
2825 =head2 update_or_new
2826
2827 =over 4
2828
2829 =item Arguments: \%col_values, { key => $unique_constraint }?
2830
2831 =item Return Value: $rowobject
2832
2833 =back
2834
2835   $resultset->update_or_new({ col => $val, ... });
2836
2837 Like L</find_or_new> but if a row is found it is immediately updated via
2838 C<< $found_row->update (\%col_values) >>.
2839
2840 For example:
2841
2842   # In your application
2843   my $cd = $schema->resultset('CD')->update_or_new(
2844     {
2845       artist => 'Massive Attack',
2846       title  => 'Mezzanine',
2847       year   => 1998,
2848     },
2849     { key => 'cd_artist_title' }
2850   );
2851
2852   if ($cd->in_storage) {
2853       # the cd was updated
2854   }
2855   else {
2856       # the cd is not yet in the database, let's insert it
2857       $cd->insert;
2858   }
2859
2860 B<Note>: Make sure to read the documentation of L</find> and understand the
2861 significance of the C<key> attribute, as its lack may skew your search, and
2862 subsequently result in spurious new objects.
2863
2864 B<Note>: Take care when using C<update_or_new> with a table having
2865 columns with default values that you intend to be automatically
2866 supplied by the database (e.g. an auto_increment primary key column).
2867 In normal usage, the value of such columns should NOT be included at
2868 all in the call to C<update_or_new>, even when set to C<undef>.
2869
2870 See also L</find>, L</find_or_create> and L</find_or_new>.
2871
2872 =cut
2873
2874 sub update_or_new {
2875     my $self  = shift;
2876     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2877     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2878
2879     my $row = $self->find( $cond, $attrs );
2880     if ( defined $row ) {
2881         $row->update($cond);
2882         return $row;
2883     }
2884
2885     return $self->new_result($cond);
2886 }
2887
2888 =head2 get_cache
2889
2890 =over 4
2891
2892 =item Arguments: none
2893
2894 =item Return Value: \@cache_objects | undef
2895
2896 =back
2897
2898 Gets the contents of the cache for the resultset, if the cache is set.
2899
2900 The cache is populated either by using the L</prefetch> attribute to
2901 L</search> or by calling L</set_cache>.
2902
2903 =cut
2904
2905 sub get_cache {
2906   shift->{all_cache};
2907 }
2908
2909 =head2 set_cache
2910
2911 =over 4
2912
2913 =item Arguments: \@cache_objects
2914
2915 =item Return Value: \@cache_objects
2916
2917 =back
2918
2919 Sets the contents of the cache for the resultset. Expects an arrayref
2920 of objects of the same class as those produced by the resultset. Note that
2921 if the cache is set the resultset will return the cached objects rather
2922 than re-querying the database even if the cache attr is not set.
2923
2924 The contents of the cache can also be populated by using the
2925 L</prefetch> attribute to L</search>.
2926
2927 =cut
2928
2929 sub set_cache {
2930   my ( $self, $data ) = @_;
2931   $self->throw_exception("set_cache requires an arrayref")
2932       if defined($data) && (ref $data ne 'ARRAY');
2933   $self->{all_cache} = $data;
2934 }
2935
2936 =head2 clear_cache
2937
2938 =over 4
2939
2940 =item Arguments: none
2941
2942 =item Return Value: undef
2943
2944 =back
2945
2946 Clears the cache for the resultset.
2947
2948 =cut
2949
2950 sub clear_cache {
2951   shift->set_cache(undef);
2952 }
2953
2954 =head2 is_paged
2955
2956 =over 4
2957
2958 =item Arguments: none
2959
2960 =item Return Value: true, if the resultset has been paginated
2961
2962 =back
2963
2964 =cut
2965
2966 sub is_paged {
2967   my ($self) = @_;
2968   return !!$self->{attrs}{page};
2969 }
2970
2971 =head2 is_ordered
2972
2973 =over 4
2974
2975 =item Arguments: none
2976
2977 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2978
2979 =back
2980
2981 =cut
2982
2983 sub is_ordered {
2984   my ($self) = @_;
2985   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2986 }
2987
2988 =head2 related_resultset
2989
2990 =over 4
2991
2992 =item Arguments: $relationship_name
2993
2994 =item Return Value: $resultset
2995
2996 =back
2997
2998 Returns a related resultset for the supplied relationship name.
2999
3000   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
3001
3002 =cut
3003
3004 sub related_resultset {
3005   my ($self, $rel) = @_;
3006
3007   $self->{related_resultsets} ||= {};
3008   return $self->{related_resultsets}{$rel} ||= do {
3009     my $rsrc = $self->result_source;
3010     my $rel_info = $rsrc->relationship_info($rel);
3011
3012     $self->throw_exception(
3013       "search_related: result source '" . $rsrc->source_name .
3014         "' has no such relationship $rel")
3015       unless $rel_info;
3016
3017     my $attrs = $self->_chain_relationship($rel);
3018
3019     my $join_count = $attrs->{seen_join}{$rel};
3020
3021     my $alias = $self->result_source->storage
3022         ->relname_to_table_alias($rel, $join_count);
3023
3024     # since this is search_related, and we already slid the select window inwards
3025     # (the select/as attrs were deleted in the beginning), we need to flip all
3026     # left joins to inner, so we get the expected results
3027     # read the comment on top of the actual function to see what this does
3028     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
3029
3030
3031     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
3032     delete @{$attrs}{qw(result_class alias)};
3033
3034     my $new_cache;
3035
3036     if (my $cache = $self->get_cache) {
3037       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
3038         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
3039                         @$cache ];
3040       }
3041     }
3042
3043     my $rel_source = $rsrc->related_source($rel);
3044
3045     my $new = do {
3046
3047       # The reason we do this now instead of passing the alias to the
3048       # search_rs below is that if you wrap/overload resultset on the
3049       # source you need to know what alias it's -going- to have for things
3050       # to work sanely (e.g. RestrictWithObject wants to be able to add
3051       # extra query restrictions, and these may need to be $alias.)
3052
3053       my $rel_attrs = $rel_source->resultset_attributes;
3054       local $rel_attrs->{alias} = $alias;
3055
3056       $rel_source->resultset
3057                  ->search_rs(
3058                      undef, {
3059                        %$attrs,
3060                        where => $attrs->{where},
3061                    });
3062     };
3063     $new->set_cache($new_cache) if $new_cache;
3064     $new;
3065   };
3066 }
3067
3068 =head2 current_source_alias
3069
3070 =over 4
3071
3072 =item Arguments: none
3073
3074 =item Return Value: $source_alias
3075
3076 =back
3077
3078 Returns the current table alias for the result source this resultset is built
3079 on, that will be used in the SQL query. Usually it is C<me>.
3080
3081 Currently the source alias that refers to the result set returned by a
3082 L</search>/L</find> family method depends on how you got to the resultset: it's
3083 C<me> by default, but eg. L</search_related> aliases it to the related result
3084 source name (and keeps C<me> referring to the original result set). The long
3085 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3086 (and make this method unnecessary).
3087
3088 Thus it's currently necessary to use this method in predefined queries (see
3089 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3090 source alias of the current result set:
3091
3092   # in a result set class
3093   sub modified_by {
3094     my ($self, $user) = @_;
3095
3096     my $me = $self->current_source_alias;
3097
3098     return $self->search({
3099       "$me.modified" => $user->id,
3100     });
3101   }
3102
3103 =cut
3104
3105 sub current_source_alias {
3106   my ($self) = @_;
3107
3108   return ($self->{attrs} || {})->{alias} || 'me';
3109 }
3110
3111 =head2 as_subselect_rs
3112
3113 =over 4
3114
3115 =item Arguments: none
3116
3117 =item Return Value: $resultset
3118
3119 =back
3120
3121 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3122 "virtual view" by including it as a subquery within the from clause.  From this
3123 point on, any joined tables are inaccessible to ->search on the resultset (as if
3124 it were simply where-filtered without joins).  For example:
3125
3126  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3127
3128  # 'x' now pollutes the query namespace
3129
3130  # So the following works as expected
3131  my $ok_rs = $rs->search({'x.other' => 1});
3132
3133  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3134  # def) we look for one row with contradictory terms and join in another table
3135  # (aliased 'x_2') which we never use
3136  my $broken_rs = $rs->search({'x.name' => 'def'});
3137
3138  my $rs2 = $rs->as_subselect_rs;
3139
3140  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3141  my $not_joined_rs = $rs2->search({'x.other' => 1});
3142
3143  # works as expected: finds a 'table' row related to two x rows (abc and def)
3144  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3145
3146 Another example of when one might use this would be to select a subset of
3147 columns in a group by clause:
3148
3149  my $rs = $schema->resultset('Bar')->search(undef, {
3150    group_by => [qw{ id foo_id baz_id }],
3151  })->as_subselect_rs->search(undef, {
3152    columns => [qw{ id foo_id }]
3153  });
3154
3155 In the above example normally columns would have to be equal to the group by,
3156 but because we isolated the group by into a subselect the above works.
3157
3158 =cut
3159
3160 sub as_subselect_rs {
3161   my $self = shift;
3162
3163   my $attrs = $self->_resolved_attrs;
3164
3165   my $fresh_rs = (ref $self)->new (
3166     $self->result_source
3167   );
3168
3169   # these pieces will be locked in the subquery
3170   delete $fresh_rs->{cond};
3171   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3172
3173   return $fresh_rs->search( {}, {
3174     from => [{
3175       $attrs->{alias} => $self->as_query,
3176       -alias  => $attrs->{alias},
3177       -rsrc   => $self->result_source,
3178     }],
3179     alias => $attrs->{alias},
3180   });
3181 }
3182
3183 # This code is called by search_related, and makes sure there
3184 # is clear separation between the joins before, during, and
3185 # after the relationship. This information is needed later
3186 # in order to properly resolve prefetch aliases (any alias
3187 # with a relation_chain_depth less than the depth of the
3188 # current prefetch is not considered)
3189 #
3190 # The increments happen twice per join. An even number means a
3191 # relationship specified via a search_related, whereas an odd
3192 # number indicates a join/prefetch added via attributes
3193 #
3194 # Also this code will wrap the current resultset (the one we
3195 # chain to) in a subselect IFF it contains limiting attributes
3196 sub _chain_relationship {
3197   my ($self, $rel) = @_;
3198   my $source = $self->result_source;
3199   my $attrs = { %{$self->{attrs}||{}} };
3200
3201   # we need to take the prefetch the attrs into account before we
3202   # ->_resolve_join as otherwise they get lost - captainL
3203   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3204
3205   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3206
3207   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3208
3209   my $from;
3210   my @force_subq_attrs = qw/offset rows group_by having/;
3211
3212   if (
3213     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3214       ||
3215     $self->_has_resolved_attr (@force_subq_attrs)
3216   ) {
3217     # Nuke the prefetch (if any) before the new $rs attrs
3218     # are resolved (prefetch is useless - we are wrapping
3219     # a subquery anyway).
3220     my $rs_copy = $self->search;
3221     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3222       $rs_copy->{attrs}{join},
3223       delete $rs_copy->{attrs}{prefetch},
3224     );
3225
3226     $from = [{
3227       -rsrc   => $source,
3228       -alias  => $attrs->{alias},
3229       $attrs->{alias} => $rs_copy->as_query,
3230     }];
3231     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3232     $seen->{-relation_chain_depth} = 0;
3233   }
3234   elsif ($attrs->{from}) {  #shallow copy suffices
3235     $from = [ @{$attrs->{from}} ];
3236   }
3237   else {
3238     $from = [{
3239       -rsrc  => $source,
3240       -alias => $attrs->{alias},
3241       $attrs->{alias} => $source->from,
3242     }];
3243   }
3244
3245   my $jpath = ($seen->{-relation_chain_depth})
3246     ? $from->[-1][0]{-join_path}
3247     : [];
3248
3249   my @requested_joins = $source->_resolve_join(
3250     $join,
3251     $attrs->{alias},
3252     $seen,
3253     $jpath,
3254   );
3255
3256   push @$from, @requested_joins;
3257
3258   $seen->{-relation_chain_depth}++;
3259
3260   # if $self already had a join/prefetch specified on it, the requested
3261   # $rel might very well be already included. What we do in this case
3262   # is effectively a no-op (except that we bump up the chain_depth on
3263   # the join in question so we could tell it *is* the search_related)
3264   my $already_joined;
3265
3266   # we consider the last one thus reverse
3267   for my $j (reverse @requested_joins) {
3268     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3269     if ($rel eq $last_j) {
3270       $j->[0]{-relation_chain_depth}++;
3271       $already_joined++;
3272       last;
3273     }
3274   }
3275
3276   unless ($already_joined) {
3277     push @$from, $source->_resolve_join(
3278       $rel,
3279       $attrs->{alias},
3280       $seen,
3281       $jpath,
3282     );
3283   }
3284
3285   $seen->{-relation_chain_depth}++;
3286
3287   return {%$attrs, from => $from, seen_join => $seen};
3288 }
3289
3290 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3291 sub _resolved_attrs_copy {
3292   my $self = shift;
3293   return { %{$self->_resolved_attrs (@_)} };
3294 }
3295
3296 sub _resolved_attrs {
3297   my $self = shift;
3298   return $self->{_attrs} if $self->{_attrs};
3299
3300   my $attrs  = { %{ $self->{attrs} || {} } };
3301   my $source = $self->result_source;
3302   my $alias  = $attrs->{alias};
3303
3304   # default selection list
3305   $attrs->{columns} = [ $source->columns ]
3306     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3307
3308   # merge selectors together
3309   for (qw/columns select as/) {
3310     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3311       if $attrs->{$_} or $attrs->{"+$_"};
3312   }
3313
3314   # disassemble columns
3315   my (@sel, @as);
3316   if (my $cols = delete $attrs->{columns}) {
3317     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3318       if (ref $c eq 'HASH') {
3319         for my $as (keys %$c) {
3320           push @sel, $c->{$as};
3321           push @as, $as;
3322         }
3323       }
3324       else {
3325         push @sel, $c;
3326         push @as, $c;
3327       }
3328     }
3329   }
3330
3331   # when trying to weed off duplicates later do not go past this point -
3332   # everything added from here on is unbalanced "anyone's guess" stuff
3333   my $dedup_stop_idx = $#as;
3334
3335   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3336     if $attrs->{as};
3337   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3338     if $attrs->{select};
3339
3340   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3341   for (@sel) {
3342     $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_";
3343   }
3344
3345   # disqualify all $alias.col as-bits (collapser mandated)
3346   for (@as) {
3347     $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_;
3348   }
3349
3350   # de-duplicate the result (remove *identical* select/as pairs)
3351   # and also die on duplicate {as} pointing to different {select}s
3352   # not using a c-style for as the condition is prone to shrinkage
3353   my $seen;
3354   my $i = 0;
3355   while ($i <= $dedup_stop_idx) {
3356     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3357       splice @sel, $i, 1;
3358       splice @as, $i, 1;
3359       $dedup_stop_idx--;
3360     }
3361     elsif ($seen->{$as[$i]}++) {
3362       $self->throw_exception(
3363         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3364       );
3365     }
3366     else {
3367       $i++;
3368     }
3369   }
3370
3371   $attrs->{select} = \@sel;
3372   $attrs->{as} = \@as;
3373
3374   $attrs->{from} ||= [{
3375     -rsrc   => $source,
3376     -alias  => $self->{attrs}{alias},
3377     $self->{attrs}{alias} => $source->from,
3378   }];
3379
3380   if ( $attrs->{join} || $attrs->{prefetch} ) {
3381
3382     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3383       if ref $attrs->{from} ne 'ARRAY';
3384
3385     my $join = (delete $attrs->{join}) || {};
3386
3387     if ( defined $attrs->{prefetch} ) {
3388       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3389     }
3390
3391     $attrs->{from} =    # have to copy here to avoid corrupting the original
3392       [
3393         @{ $attrs->{from} },
3394         $source->_resolve_join(
3395           $join,
3396           $alias,
3397           { %{ $attrs->{seen_join} || {} } },
3398           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3399             ? $attrs->{from}[-1][0]{-join_path}
3400             : []
3401           ,
3402         )
3403       ];
3404   }
3405
3406   if ( defined $attrs->{order_by} ) {
3407     $attrs->{order_by} = (
3408       ref( $attrs->{order_by} ) eq 'ARRAY'
3409       ? [ @{ $attrs->{order_by} } ]
3410       : [ $attrs->{order_by} || () ]
3411     );
3412   }
3413
3414   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3415     $attrs->{group_by} = [ $attrs->{group_by} ];
3416   }
3417
3418   # generate the distinct induced group_by early, as prefetch will be carried via a
3419   # subquery (since a group_by is present)
3420   if (delete $attrs->{distinct}) {
3421     if ($attrs->{group_by}) {
3422       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3423     }
3424     else {
3425       # distinct affects only the main selection part, not what prefetch may
3426       # add below.
3427       $attrs->{group_by} = $source->storage->_group_over_selection (
3428         $attrs->{from},
3429         $attrs->{select},
3430         $attrs->{order_by},
3431       );
3432     }
3433   }
3434
3435   $attrs->{collapse} ||= {};
3436   if ($attrs->{prefetch}) {
3437
3438     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3439       if $attrs->{_dark_selector};
3440
3441     my $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} );
3442
3443     my $prefetch_ordering = [];
3444
3445     # this is a separate structure (we don't look in {from} directly)
3446     # as the resolver needs to shift things off the lists to work
3447     # properly (identical-prefetches on different branches)
3448     my $join_map = {};
3449     if (ref $attrs->{from} eq 'ARRAY') {
3450
3451       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3452
3453       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3454         next unless $j->[0]{-alias};
3455         next unless $j->[0]{-join_path};
3456         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3457
3458         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3459
3460         my $p = $join_map;
3461         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3462         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3463       }
3464     }
3465
3466     my @prefetch =
3467       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3468
3469     # we need to somehow mark which columns came from prefetch
3470     if (@prefetch) {
3471       my $sel_end = $#{$attrs->{select}};
3472       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3473     }
3474
3475     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3476     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3477
3478     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3479     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3480   }
3481
3482   # if both page and offset are specified, produce a combined offset
3483   # even though it doesn't make much sense, this is what pre 081xx has
3484   # been doing
3485   if (my $page = delete $attrs->{page}) {
3486     $attrs->{offset} =
3487       ($attrs->{rows} * ($page - 1))
3488             +
3489       ($attrs->{offset} || 0)
3490     ;
3491   }
3492
3493   return $self->{_attrs} = $attrs;
3494 }
3495
3496 sub _rollout_attr {
3497   my ($self, $attr) = @_;
3498
3499   if (ref $attr eq 'HASH') {
3500     return $self->_rollout_hash($attr);
3501   } elsif (ref $attr eq 'ARRAY') {
3502     return $self->_rollout_array($attr);
3503   } else {
3504     return [$attr];
3505   }
3506 }
3507
3508 sub _rollout_array {
3509   my ($self, $attr) = @_;
3510
3511   my @rolled_array;
3512   foreach my $element (@{$attr}) {
3513     if (ref $element eq 'HASH') {
3514       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3515     } elsif (ref $element eq 'ARRAY') {
3516       #  XXX - should probably recurse here
3517       push( @rolled_array, @{$self->_rollout_array($element)} );
3518     } else {
3519       push( @rolled_array, $element );
3520     }
3521   }
3522   return \@rolled_array;
3523 }
3524
3525 sub _rollout_hash {
3526   my ($self, $attr) = @_;
3527
3528   my @rolled_array;
3529   foreach my $key (keys %{$attr}) {
3530     push( @rolled_array, { $key => $attr->{$key} } );
3531   }
3532   return \@rolled_array;
3533 }
3534
3535 sub _calculate_score {
3536   my ($self, $a, $b) = @_;
3537
3538   if (defined $a xor defined $b) {
3539     return 0;
3540   }
3541   elsif (not defined $a) {
3542     return 1;
3543   }
3544
3545   if (ref $b eq 'HASH') {
3546     my ($b_key) = keys %{$b};
3547     if (ref $a eq 'HASH') {
3548       my ($a_key) = keys %{$a};
3549       if ($a_key eq $b_key) {
3550         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3551       } else {
3552         return 0;
3553       }
3554     } else {
3555       return ($a eq $b_key) ? 1 : 0;
3556     }
3557   } else {
3558     if (ref $a eq 'HASH') {
3559       my ($a_key) = keys %{$a};
3560       return ($b eq $a_key) ? 1 : 0;
3561     } else {
3562       return ($b eq $a) ? 1 : 0;
3563     }
3564   }
3565 }
3566
3567 sub _merge_joinpref_attr {
3568   my ($self, $orig, $import) = @_;
3569
3570   return $import unless defined($orig);
3571   return $orig unless defined($import);
3572
3573   $orig = $self->_rollout_attr($orig);
3574   $import = $self->_rollout_attr($import);
3575
3576   my $seen_keys;
3577   foreach my $import_element ( @{$import} ) {
3578     # find best candidate from $orig to merge $b_element into
3579     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3580     foreach my $orig_element ( @{$orig} ) {
3581       my $score = $self->_calculate_score( $orig_element, $import_element );
3582       if ($score > $best_candidate->{score}) {
3583         $best_candidate->{position} = $position;
3584         $best_candidate->{score} = $score;
3585       }
3586       $position++;
3587     }
3588     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3589     $import_key = '' if not defined $import_key;
3590
3591     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3592       push( @{$orig}, $import_element );
3593     } else {
3594       my $orig_best = $orig->[$best_candidate->{position}];
3595       # merge orig_best and b_element together and replace original with merged
3596       if (ref $orig_best ne 'HASH') {
3597         $orig->[$best_candidate->{position}] = $import_element;
3598       } elsif (ref $import_element eq 'HASH') {
3599         my ($key) = keys %{$orig_best};
3600         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3601       }
3602     }
3603     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3604   }
3605
3606   return $orig;
3607 }
3608
3609 {
3610   my $hm;
3611
3612   sub _merge_attr {
3613     $hm ||= do {
3614       require Hash::Merge;
3615       my $hm = Hash::Merge->new;
3616
3617       $hm->specify_behavior({
3618         SCALAR => {
3619           SCALAR => sub {
3620             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3621
3622             if ($defl xor $defr) {
3623               return [ $defl ? $_[0] : $_[1] ];
3624             }
3625             elsif (! $defl) {
3626               return [];
3627             }
3628             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3629               return [ $_[0] ];
3630             }
3631             else {
3632               return [$_[0], $_[1]];
3633             }
3634           },
3635           ARRAY => sub {
3636             return $_[1] if !defined $_[0];
3637             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3638             return [$_[0], @{$_[1]}]
3639           },
3640           HASH  => sub {
3641             return [] if !defined $_[0] and !keys %{$_[1]};
3642             return [ $_[1] ] if !defined $_[0];
3643             return [ $_[0] ] if !keys %{$_[1]};
3644             return [$_[0], $_[1]]
3645           },
3646         },
3647         ARRAY => {
3648           SCALAR => sub {
3649             return $_[0] if !defined $_[1];
3650             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3651             return [@{$_[0]}, $_[1]]
3652           },
3653           ARRAY => sub {
3654             my @ret = @{$_[0]} or return $_[1];
3655             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3656             my %idx = map { $_ => 1 } @ret;
3657             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3658             \@ret;
3659           },
3660           HASH => sub {
3661             return [ $_[1] ] if ! @{$_[0]};
3662             return $_[0] if !keys %{$_[1]};
3663             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3664             return [ @{$_[0]}, $_[1] ];
3665           },
3666         },
3667         HASH => {
3668           SCALAR => sub {
3669             return [] if !keys %{$_[0]} and !defined $_[1];
3670             return [ $_[0] ] if !defined $_[1];
3671             return [ $_[1] ] if !keys %{$_[0]};
3672             return [$_[0], $_[1]]
3673           },
3674           ARRAY => sub {
3675             return [] if !keys %{$_[0]} and !@{$_[1]};
3676             return [ $_[0] ] if !@{$_[1]};
3677             return $_[1] if !keys %{$_[0]};
3678             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3679             return [ $_[0], @{$_[1]} ];
3680           },
3681           HASH => sub {
3682             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3683             return [ $_[0] ] if !keys %{$_[1]};
3684             return [ $_[1] ] if !keys %{$_[0]};
3685             return [ $_[0] ] if $_[0] eq $_[1];
3686             return [ $_[0], $_[1] ];
3687           },
3688         }
3689       } => 'DBIC_RS_ATTR_MERGER');
3690       $hm;
3691     };
3692
3693     return $hm->merge ($_[1], $_[2]);
3694   }
3695 }
3696
3697 sub STORABLE_freeze {
3698   my ($self, $cloning) = @_;
3699   my $to_serialize = { %$self };
3700
3701   # A cursor in progress can't be serialized (and would make little sense anyway)
3702   delete $to_serialize->{cursor};
3703
3704   # nor is it sensical to store a not-yet-fired-count pager
3705   if ($to_serialize->{pager} and ref $to_serialize->{pager}{total_entries} eq 'CODE') {
3706     delete $to_serialize->{pager};
3707   }
3708
3709   Storable::nfreeze($to_serialize);
3710 }
3711
3712 # need this hook for symmetry
3713 sub STORABLE_thaw {
3714   my ($self, $cloning, $serialized) = @_;
3715
3716   %$self = %{ Storable::thaw($serialized) };
3717
3718   $self;
3719 }
3720
3721
3722 =head2 throw_exception
3723
3724 See L<DBIx::Class::Schema/throw_exception> for details.
3725
3726 =cut
3727
3728 sub throw_exception {
3729   my $self=shift;
3730
3731   if (ref $self and my $rsrc = $self->result_source) {
3732     $rsrc->throw_exception(@_)
3733   }
3734   else {
3735     DBIx::Class::Exception->throw(@_);
3736   }
3737 }
3738
3739 # XXX: FIXME: Attributes docs need clearing up
3740
3741 =head1 ATTRIBUTES
3742
3743 Attributes are used to refine a ResultSet in various ways when
3744 searching for data. They can be passed to any method which takes an
3745 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3746 L</count>.
3747
3748 These are in no particular order:
3749
3750 =head2 order_by
3751
3752 =over 4
3753
3754 =item Value: ( $order_by | \@order_by | \%order_by )
3755
3756 =back
3757
3758 Which column(s) to order the results by.
3759
3760 [The full list of suitable values is documented in
3761 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3762 common options.]
3763
3764 If a single column name, or an arrayref of names is supplied, the
3765 argument is passed through directly to SQL. The hashref syntax allows
3766 for connection-agnostic specification of ordering direction:
3767
3768  For descending order:
3769
3770   order_by => { -desc => [qw/col1 col2 col3/] }
3771
3772  For explicit ascending order:
3773
3774   order_by => { -asc => 'col' }
3775
3776 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3777 supported, although you are strongly encouraged to use the hashref
3778 syntax as outlined above.
3779
3780 =head2 columns
3781
3782 =over 4
3783
3784 =item Value: \@columns
3785
3786 =back
3787
3788 Shortcut to request a particular set of columns to be retrieved. Each
3789 column spec may be a string (a table column name), or a hash (in which
3790 case the key is the C<as> value, and the value is used as the C<select>
3791 expression). Adds C<me.> onto the start of any column without a C<.> in
3792 it and sets C<select> from that, then auto-populates C<as> from
3793 C<select> as normal. (You may also use the C<cols> attribute, as in
3794 earlier versions of DBIC.)
3795
3796 Essentially C<columns> does the same as L</select> and L</as>.
3797
3798     columns => [ 'foo', { bar => 'baz' } ]
3799
3800 is the same as
3801
3802     select => [qw/foo baz/],
3803     as => [qw/foo bar/]
3804
3805 =head2 +columns
3806
3807 =over 4
3808
3809 =item Value: \@columns
3810
3811 =back
3812
3813 Indicates additional columns to be selected from storage. Works the same
3814 as L</columns> but adds columns to the selection. (You may also use the
3815 C<include_columns> attribute, as in earlier versions of DBIC). For
3816 example:-
3817
3818   $schema->resultset('CD')->search(undef, {
3819     '+columns' => ['artist.name'],
3820     join => ['artist']
3821   });
3822
3823 would return all CDs and include a 'name' column to the information
3824 passed to object inflation. Note that the 'artist' is the name of the
3825 column (or relationship) accessor, and 'name' is the name of the column
3826 accessor in the related table.
3827
3828 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3829 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3830 unary plus operator before it.
3831
3832 =head2 include_columns
3833
3834 =over 4
3835
3836 =item Value: \@columns
3837
3838 =back
3839
3840 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3841
3842 =head2 select
3843
3844 =over 4
3845
3846 =item Value: \@select_columns
3847
3848 =back
3849
3850 Indicates which columns should be selected from the storage. You can use
3851 column names, or in the case of RDBMS back ends, function or stored procedure
3852 names:
3853
3854   $rs = $schema->resultset('Employee')->search(undef, {
3855     select => [
3856       'name',
3857       { count => 'employeeid' },
3858       { max => { length => 'name' }, -as => 'longest_name' }
3859     ]
3860   });
3861
3862   # Equivalent SQL
3863   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3864
3865 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3866 use L</select>, to instruct DBIx::Class how to store the result of the column.
3867 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3868 identifier aliasing. You can however alias a function, so you can use it in
3869 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3870 attribute> supplied as shown in the example above.
3871
3872 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3873 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3874 unary plus operator before it.
3875
3876 =head2 +select
3877
3878 =over 4
3879
3880 Indicates additional columns to be selected from storage.  Works the same as
3881 L</select> but adds columns to the default selection, instead of specifying
3882 an explicit list.
3883
3884 =back
3885
3886 =head2 +as
3887
3888 =over 4
3889
3890 Indicates additional column names for those added via L</+select>. See L</as>.
3891
3892 =back
3893
3894 =head2 as
3895
3896 =over 4
3897
3898 =item Value: \@inflation_names
3899
3900 =back
3901
3902 Indicates column names for object inflation. That is L</as> indicates the
3903 slot name in which the column value will be stored within the
3904 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3905 identifier by the C<get_column> method (or via the object accessor B<if one
3906 with the same name already exists>) as shown below. The L</as> attribute has
3907 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3908
3909   $rs = $schema->resultset('Employee')->search(undef, {
3910     select => [
3911       'name',
3912       { count => 'employeeid' },
3913       { max => { length => 'name' }, -as => 'longest_name' }
3914     ],
3915     as => [qw/
3916       name
3917       employee_count
3918       max_name_length
3919     /],
3920   });
3921
3922 If the object against which the search is performed already has an accessor
3923 matching a column name specified in C<as>, the value can be retrieved using
3924 the accessor as normal:
3925
3926   my $name = $employee->name();
3927
3928 If on the other hand an accessor does not exist in the object, you need to
3929 use C<get_column> instead:
3930
3931   my $employee_count = $employee->get_column('employee_count');
3932
3933 You can create your own accessors if required - see
3934 L<DBIx::Class::Manual::Cookbook> for details.
3935
3936 =head2 join
3937
3938 =over 4
3939
3940 =item Value: ($rel_name | \@rel_names | \%rel_names)
3941
3942 =back
3943
3944 Contains a list of relationships that should be joined for this query.  For
3945 example:
3946
3947   # Get CDs by Nine Inch Nails
3948   my $rs = $schema->resultset('CD')->search(
3949     { 'artist.name' => 'Nine Inch Nails' },
3950     { join => 'artist' }
3951   );
3952
3953 Can also contain a hash reference to refer to the other relation's relations.
3954 For example:
3955
3956   package MyApp::Schema::Track;
3957   use base qw/DBIx::Class/;
3958   __PACKAGE__->table('track');
3959   __PACKAGE__->add_columns(qw/trackid cd position title/);
3960   __PACKAGE__->set_primary_key('trackid');
3961   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3962   1;
3963
3964   # In your application
3965   my $rs = $schema->resultset('Artist')->search(
3966     { 'track.title' => 'Teardrop' },
3967     {
3968       join     => { cd => 'track' },
3969       order_by => 'artist.name',
3970     }
3971   );
3972
3973 You need to use the relationship (not the table) name in  conditions,
3974 because they are aliased as such. The current table is aliased as "me", so
3975 you need to use me.column_name in order to avoid ambiguity. For example:
3976
3977   # Get CDs from 1984 with a 'Foo' track
3978   my $rs = $schema->resultset('CD')->search(
3979     {
3980       'me.year' => 1984,
3981       'tracks.name' => 'Foo'
3982     },
3983     { join => 'tracks' }
3984   );
3985
3986 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3987 similarly for a third time). For e.g.
3988
3989   my $rs = $schema->resultset('Artist')->search({
3990     'cds.title'   => 'Down to Earth',
3991     'cds_2.title' => 'Popular',
3992   }, {
3993     join => [ qw/cds cds/ ],
3994   });
3995
3996 will return a set of all artists that have both a cd with title 'Down
3997 to Earth' and a cd with title 'Popular'.
3998
3999 If you want to fetch related objects from other tables as well, see C<prefetch>
4000 below.
4001
4002 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
4003
4004 =head2 prefetch
4005
4006 =over 4
4007
4008 =item Value: ($rel_name | \@rel_names | \%rel_names)
4009
4010 =back
4011
4012 Contains one or more relationships that should be fetched along with
4013 the main query (when they are accessed afterwards the data will
4014 already be available, without extra queries to the database).  This is
4015 useful for when you know you will need the related objects, because it
4016 saves at least one query:
4017
4018   my $rs = $schema->resultset('Tag')->search(
4019     undef,
4020     {
4021       prefetch => {
4022         cd => 'artist'
4023       }
4024     }
4025   );
4026
4027 The initial search results in SQL like the following:
4028
4029   SELECT tag.*, cd.*, artist.* FROM tag
4030   JOIN cd ON tag.cd = cd.cdid
4031   JOIN artist ON cd.artist = artist.artistid
4032
4033 L<DBIx::Class> has no need to go back to the database when we access the
4034 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4035 case.
4036
4037 Simple prefetches will be joined automatically, so there is no need
4038 for a C<join> attribute in the above search.
4039
4040 L</prefetch> can be used with the any of the relationship types and
4041 multiple prefetches can be specified together. Below is a more complex
4042 example that prefetches a CD's artist, its liner notes (if present),
4043 the cover image, the tracks on that cd, and the guests on those
4044 tracks.
4045
4046  # Assuming:
4047  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4048  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4049  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4050  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4051
4052  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4053
4054  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4055
4056
4057  my $rs = $schema->resultset('CD')->search(
4058    undef,
4059    {
4060      prefetch => [
4061        { artist => 'record_label'},  # belongs_to => belongs_to
4062        'liner_note',                 # might_have
4063        'cover_image',                # has_one
4064        { tracks => 'guests' },       # has_many => has_many
4065      ]
4066    }
4067  );
4068
4069 This will produce SQL like the following:
4070
4071  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4072         tracks.*, guests.*
4073    FROM cd me
4074    JOIN artist artist
4075      ON artist.artistid = me.artistid
4076    JOIN record_label record_label
4077      ON record_label.labelid = artist.labelid
4078    LEFT JOIN track tracks
4079      ON tracks.cdid = me.cdid
4080    LEFT JOIN guest guests
4081      ON guests.trackid = track.trackid
4082    LEFT JOIN liner_notes liner_note
4083      ON liner_note.cdid = me.cdid
4084    JOIN cd_artwork cover_image
4085      ON cover_image.cdid = me.cdid
4086  ORDER BY tracks.cd
4087
4088 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4089 C<tracks>, and C<guests> of the CD will all be available through the
4090 relationship accessors without the need for additional queries to the
4091 database.
4092
4093 However, there is one caveat to be observed: it can be dangerous to
4094 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4095 relationship on a given level. e.g.:
4096
4097  my $rs = $schema->resultset('CD')->search(
4098    undef,
4099    {
4100      prefetch => [
4101        'tracks',                         # has_many
4102        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4103      ]
4104    }
4105  );
4106
4107 In fact, C<DBIx::Class> will emit the following warning:
4108
4109  Prefetching multiple has_many rels tracks and cd_to_producer at top
4110  level will explode the number of row objects retrievable via ->next
4111  or ->all. Use at your own risk.
4112
4113 The collapser currently can't identify duplicate tuples for multiple
4114 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4115 result the second L<has_many|DBIx::Class::Relationship/has_many>
4116 relation could contain redundant objects.
4117
4118 =head3 Using L</prefetch> with L</join>
4119
4120 L</prefetch> implies a L</join> with the equivalent argument, and is
4121 properly merged with any existing L</join> specification. So the
4122 following:
4123
4124   my $rs = $schema->resultset('CD')->search(
4125    {'record_label.name' => 'Music Product Ltd.'},
4126    {
4127      join     => {artist => 'record_label'},
4128      prefetch => 'artist',
4129    }
4130  );
4131
4132 ... will work, searching on the record label's name, but only
4133 prefetching the C<artist>.
4134
4135 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4136
4137 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4138 prefetched relations.  So given:
4139
4140   my $rs = $schema->resultset('CD')->search(
4141    undef,
4142    {
4143      select   => ['cd.title'],
4144      as       => ['cd_title'],
4145      prefetch => 'artist',
4146    }
4147  );
4148
4149 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4150 becomes: C<'cd_title', 'artist.*'>.
4151
4152 =head3 CAVEATS
4153
4154 Prefetch does a lot of deep magic. As such, it may not behave exactly
4155 as you might expect.
4156
4157 =over 4
4158
4159 =item *
4160
4161 Prefetch uses the L</cache> to populate the prefetched relationships. This
4162 may or may not be what you want.
4163
4164 =item *
4165
4166 If you specify a condition on a prefetched relationship, ONLY those
4167 rows that match the prefetched condition will be fetched into that relationship.
4168 This means that adding prefetch to a search() B<may alter> what is returned by
4169 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4170
4171   my $artist_rs = $schema->resultset('Artist')->search({
4172       'cds.year' => 2008,
4173   }, {
4174       join => 'cds',
4175   });
4176
4177   my $count = $artist_rs->first->cds->count;
4178
4179   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4180
4181   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4182
4183   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4184
4185 that cmp_ok() may or may not pass depending on the datasets involved. This
4186 behavior may or may not survive the 0.09 transition.
4187
4188 =back
4189
4190 =head2 page
4191
4192 =over 4
4193
4194 =item Value: $page
4195
4196 =back
4197
4198 Makes the resultset paged and specifies the page to retrieve. Effectively
4199 identical to creating a non-pages resultset and then calling ->page($page)
4200 on it.
4201
4202 If L</rows> attribute is not specified it defaults to 10 rows per page.
4203
4204 When you have a paged resultset, L</count> will only return the number
4205 of rows in the page. To get the total, use the L</pager> and call
4206 C<total_entries> on it.
4207
4208 =head2 rows
4209
4210 =over 4
4211
4212 =item Value: $rows
4213
4214 =back
4215
4216 Specifies the maximum number of rows for direct retrieval or the number of
4217 rows per page if the page attribute or method is used.
4218
4219 =head2 offset
4220
4221 =over 4
4222
4223 =item Value: $offset
4224
4225 =back
4226
4227 Specifies the (zero-based) row number for the  first row to be returned, or the
4228 of the first row of the first page if paging is used.
4229
4230 =head2 software_limit
4231
4232 =over 4
4233
4234 =item Value: (0 | 1)
4235
4236 =back
4237
4238 When combined with L</rows> and/or L</offset> the generated SQL will not
4239 include any limit dialect stanzas. Instead the entire result will be selected
4240 as if no limits were specified, and DBIC will perform the limit locally, by
4241 artificially advancing and finishing the resulting L</cursor>.
4242
4243 This is the recommended way of performing resultset limiting when no sane RDBMS
4244 implementation is available (e.g.
4245 L<Sybase ASE|DBIx::Class::Storage::DBI::Sybase::ASE> using the
4246 L<Generic Sub Query|DBIx::Class::SQLMaker::LimitDialects/GenericSubQ> hack)
4247
4248 =head2 group_by
4249
4250 =over 4
4251
4252 =item Value: \@columns
4253
4254 =back
4255
4256 A arrayref of columns to group by. Can include columns of joined tables.
4257
4258   group_by => [qw/ column1 column2 ... /]
4259
4260 =head2 having
4261
4262 =over 4
4263
4264 =item Value: $condition
4265
4266 =back
4267
4268 HAVING is a select statement attribute that is applied between GROUP BY and
4269 ORDER BY. It is applied to the after the grouping calculations have been
4270 done.
4271
4272   having => { 'count_employee' => { '>=', 100 } }
4273
4274 or with an in-place function in which case literal SQL is required:
4275
4276   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4277
4278 =head2 distinct
4279
4280 =over 4
4281
4282 =item Value: (0 | 1)
4283
4284 =back
4285
4286 Set to 1 to group by all columns. If the resultset already has a group_by
4287 attribute, this setting is ignored and an appropriate warning is issued.
4288
4289 =head2 where
4290
4291 =over 4
4292
4293 Adds to the WHERE clause.
4294
4295   # only return rows WHERE deleted IS NULL for all searches
4296   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4297
4298 Can be overridden by passing C<< { where => undef } >> as an attribute
4299 to a resultset.
4300
4301 For more complicated where clauses see L<SQL::Abstract/WHERE CLAUSES>.
4302
4303 =back
4304
4305 =head2 cache
4306
4307 Set to 1 to cache search results. This prevents extra SQL queries if you
4308 revisit rows in your ResultSet:
4309
4310   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4311
4312   while( my $artist = $resultset->next ) {
4313     ... do stuff ...
4314   }
4315
4316   $rs->first; # without cache, this would issue a query
4317
4318 By default, searches are not cached.
4319
4320 For more examples of using these attributes, see
4321 L<DBIx::Class::Manual::Cookbook>.
4322
4323 =head2 for
4324
4325 =over 4
4326
4327 =item Value: ( 'update' | 'shared' )
4328
4329 =back
4330
4331 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4332 ... FOR SHARED.
4333
4334 =cut
4335
4336 1;