Move scary stuff to its own class
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Data::Compare (); # no imports!!! guard against insane architecture
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 CUSTOM ResultSet CLASSES THAT USE Moose
78
79 If you want to make your custom ResultSet classes with L<Moose>, use a template
80 similar to:
81
82     package MyApp::Schema::ResultSet::User;
83
84     use Moose;
85     use namespace::autoclean;
86     use MooseX::NonMoose;
87     extends 'DBIx::Class::ResultSet';
88
89     sub BUILDARGS { $_[2] }
90
91     ...your code...
92
93     __PACKAGE__->meta->make_immutable;
94
95     1;
96
97 The L<MooseX::NonMoose> is necessary so that the L<Moose> constructor does not
98 clash with the regular ResultSet constructor. Alternatively, you can use:
99
100     __PACKAGE__->meta->make_immutable(inline_constructor => 0);
101
102 The L<BUILDARGS|Moose::Manual::Construction/BUILDARGS> is necessary because the
103 signature of the ResultSet C<new> is C<< ->new($source, \%args) >>.
104
105 =head1 EXAMPLES
106
107 =head2 Chaining resultsets
108
109 Let's say you've got a query that needs to be run to return some data
110 to the user. But, you have an authorization system in place that
111 prevents certain users from seeing certain information. So, you want
112 to construct the basic query in one method, but add constraints to it in
113 another.
114
115   sub get_data {
116     my $self = shift;
117     my $request = $self->get_request; # Get a request object somehow.
118     my $schema = $self->result_source->schema;
119
120     my $cd_rs = $schema->resultset('CD')->search({
121       title => $request->param('title'),
122       year => $request->param('year'),
123     });
124
125     $cd_rs = $self->apply_security_policy( $cd_rs );
126
127     return $cd_rs->all();
128   }
129
130   sub apply_security_policy {
131     my $self = shift;
132     my ($rs) = @_;
133
134     return $rs->search({
135       subversive => 0,
136     });
137   }
138
139 =head3 Resolving conditions and attributes
140
141 When a resultset is chained from another resultset, conditions and
142 attributes with the same keys need resolving.
143
144 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
145 into the existing ones from the original resultset.
146
147 The L</where> and L</having> attributes, and any search conditions, are
148 merged with an SQL C<AND> to the existing condition from the original
149 resultset.
150
151 All other attributes are overridden by any new ones supplied in the
152 search attributes.
153
154 =head2 Multiple queries
155
156 Since a resultset just defines a query, you can do all sorts of
157 things with it with the same object.
158
159   # Don't hit the DB yet.
160   my $cd_rs = $schema->resultset('CD')->search({
161     title => 'something',
162     year => 2009,
163   });
164
165   # Each of these hits the DB individually.
166   my $count = $cd_rs->count;
167   my $most_recent = $cd_rs->get_column('date_released')->max();
168   my @records = $cd_rs->all;
169
170 And it's not just limited to SELECT statements.
171
172   $cd_rs->delete();
173
174 This is even cooler:
175
176   $cd_rs->create({ artist => 'Fred' });
177
178 Which is the same as:
179
180   $schema->resultset('CD')->create({
181     title => 'something',
182     year => 2009,
183     artist => 'Fred'
184   });
185
186 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
187
188 =head1 METHODS
189
190 =head2 new
191
192 =over 4
193
194 =item Arguments: $source, \%$attrs
195
196 =item Return Value: $rs
197
198 =back
199
200 The resultset constructor. Takes a source object (usually a
201 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
202 L</ATTRIBUTES> below).  Does not perform any queries -- these are
203 executed as needed by the other methods.
204
205 Generally you won't need to construct a resultset manually.  You'll
206 automatically get one from e.g. a L</search> called in scalar context:
207
208   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
209
210 IMPORTANT: If called on an object, proxies to new_result instead so
211
212   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
213
214 will return a CD object, not a ResultSet.
215
216 =cut
217
218 sub new {
219   my $class = shift;
220   return $class->new_result(@_) if ref $class;
221
222   my ($source, $attrs) = @_;
223   $source = $source->resolve
224     if $source->isa('DBIx::Class::ResultSourceHandle');
225   $attrs = { %{$attrs||{}} };
226
227   if ($attrs->{page}) {
228     $attrs->{rows} ||= 10;
229   }
230
231   $attrs->{alias} ||= 'me';
232
233   my $self = bless {
234     result_source => $source,
235     cond => $attrs->{where},
236     pager => undef,
237     attrs => $attrs,
238   }, $class;
239
240   # if there is a dark selector, this means we are already in a
241   # chain and the cleanup/sanification was taken care of by
242   # _search_rs already
243   $self->_normalize_selection($attrs)
244     unless $attrs->{_dark_selector};
245
246   $self->result_class(
247     $attrs->{result_class} || $source->result_class
248   );
249
250   $self;
251 }
252
253 =head2 search
254
255 =over 4
256
257 =item Arguments: $cond, \%attrs?
258
259 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
260
261 =back
262
263   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
264   my $new_rs = $cd_rs->search({ year => 2005 });
265
266   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
267                  # year = 2005 OR year = 2004
268
269 In list context, C<< ->all() >> is called implicitly on the resultset, thus
270 returning a list of row objects instead. To avoid that, use L</search_rs>.
271
272 If you need to pass in additional attributes but no additional condition,
273 call it as C<search(undef, \%attrs)>.
274
275   # "SELECT name, artistid FROM $artist_table"
276   my @all_artists = $schema->resultset('Artist')->search(undef, {
277     columns => [qw/name artistid/],
278   });
279
280 For a list of attributes that can be passed to C<search>, see
281 L</ATTRIBUTES>. For more examples of using this function, see
282 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
283 documentation for the first argument, see L<SQL::Abstract>
284 and its extension L<DBIx::Class::SQLMaker>.
285
286 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
287
288 =head3 CAVEAT
289
290 Note that L</search> does not process/deflate any of the values passed in the
291 L<SQL::Abstract>-compatible search condition structure. This is unlike other
292 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
293 manually that any value passed to this method will stringify to something the
294 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
295 objects, for more info see:
296 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
297
298 =cut
299
300 sub search {
301   my $self = shift;
302   my $rs = $self->search_rs( @_ );
303
304   if (wantarray) {
305     return $rs->all;
306   }
307   elsif (defined wantarray) {
308     return $rs;
309   }
310   else {
311     # we can be called by a relationship helper, which in
312     # turn may be called in void context due to some braindead
313     # overload or whatever else the user decided to be clever
314     # at this particular day. Thus limit the exception to
315     # external code calls only
316     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
317       if (caller)[0] !~ /^\QDBIx::Class::/;
318
319     return ();
320   }
321 }
322
323 =head2 search_rs
324
325 =over 4
326
327 =item Arguments: $cond, \%attrs?
328
329 =item Return Value: $resultset
330
331 =back
332
333 This method does the same exact thing as search() except it will
334 always return a resultset, even in list context.
335
336 =cut
337
338 sub search_rs {
339   my $self = shift;
340
341   # Special-case handling for (undef, undef).
342   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
343     @_ = ();
344   }
345
346   my $call_attrs = {};
347   if (@_ > 1) {
348     if (ref $_[-1] eq 'HASH') {
349       # copy for _normalize_selection
350       $call_attrs = { %{ pop @_ } };
351     }
352     elsif (! defined $_[-1] ) {
353       pop @_;   # search({}, undef)
354     }
355   }
356
357   # see if we can keep the cache (no $rs changes)
358   my $cache;
359   my %safe = (alias => 1, cache => 1);
360   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
361     ! defined $_[0]
362       or
363     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
364       or
365     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
366   )) {
367     $cache = $self->get_cache;
368   }
369
370   my $rsrc = $self->result_source;
371
372   my $old_attrs = { %{$self->{attrs}} };
373   my $old_having = delete $old_attrs->{having};
374   my $old_where = delete $old_attrs->{where};
375
376   my $new_attrs = { %$old_attrs };
377
378   # take care of call attrs (only if anything is changing)
379   if (keys %$call_attrs) {
380
381     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
382
383     # reset the current selector list if new selectors are supplied
384     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
385       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
386     }
387
388     # Normalize the new selector list (operates on the passed-in attr structure)
389     # Need to do it on every chain instead of only once on _resolved_attrs, in
390     # order to allow detection of empty vs partial 'as'
391     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
392       if $old_attrs->{_dark_selector};
393     $self->_normalize_selection ($call_attrs);
394
395     # start with blind overwriting merge, exclude selector attrs
396     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
397     delete @{$new_attrs}{@selector_attrs};
398
399     for (@selector_attrs) {
400       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
401         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
402     }
403
404     # older deprecated name, use only if {columns} is not there
405     if (my $c = delete $new_attrs->{cols}) {
406       if ($new_attrs->{columns}) {
407         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
408       }
409       else {
410         $new_attrs->{columns} = $c;
411       }
412     }
413
414
415     # join/prefetch use their own crazy merging heuristics
416     foreach my $key (qw/join prefetch/) {
417       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
418         if exists $call_attrs->{$key};
419     }
420
421     # stack binds together
422     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
423   }
424
425
426   # rip apart the rest of @_, parse a condition
427   my $call_cond = do {
428
429     if (ref $_[0] eq 'HASH') {
430       (keys %{$_[0]}) ? $_[0] : undef
431     }
432     elsif (@_ == 1) {
433       $_[0]
434     }
435     elsif (@_ % 2) {
436       $self->throw_exception('Odd number of arguments to search')
437     }
438     else {
439       +{ @_ }
440     }
441
442   } if @_;
443
444   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
445     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
446   }
447
448   for ($old_where, $call_cond) {
449     if (defined $_) {
450       $new_attrs->{where} = $self->_stack_cond (
451         $_, $new_attrs->{where}
452       );
453     }
454   }
455
456   if (defined $old_having) {
457     $new_attrs->{having} = $self->_stack_cond (
458       $old_having, $new_attrs->{having}
459     )
460   }
461
462   my $rs = (ref $self)->new($rsrc, $new_attrs);
463
464   $rs->set_cache($cache) if ($cache);
465
466   return $rs;
467 }
468
469 my $dark_sel_dumper;
470 sub _normalize_selection {
471   my ($self, $attrs) = @_;
472
473   # legacy syntax
474   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
475     if exists $attrs->{include_columns};
476
477   # columns are always placed first, however
478
479   # Keep the X vs +X separation until _resolved_attrs time - this allows to
480   # delay the decision on whether to use a default select list ($rsrc->columns)
481   # allowing stuff like the remove_columns helper to work
482   #
483   # select/as +select/+as pairs need special handling - the amount of select/as
484   # elements in each pair does *not* have to be equal (think multicolumn
485   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
486   # supplied at all) - try to infer the alias, either from the -as parameter
487   # of the selector spec, or use the parameter whole if it looks like a column
488   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
489   # is ok as well), but make sure no more additions to the 'as' chain take place
490   for my $pref ('', '+') {
491
492     my ($sel, $as) = map {
493       my $key = "${pref}${_}";
494
495       my $val = [ ref $attrs->{$key} eq 'ARRAY'
496         ? @{$attrs->{$key}}
497         : $attrs->{$key} || ()
498       ];
499       delete $attrs->{$key};
500       $val;
501     } qw/select as/;
502
503     if (! @$as and ! @$sel ) {
504       next;
505     }
506     elsif (@$as and ! @$sel) {
507       $self->throw_exception(
508         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
509       );
510     }
511     elsif( ! @$as ) {
512       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
513       # if any @$as has been supplied we assume the user knows what (s)he is doing
514       # and blindly keep stacking up pieces
515       unless ($attrs->{_dark_selector}) {
516         SELECTOR:
517         for (@$sel) {
518           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
519             push @$as, $_->{-as};
520           }
521           # assume any plain no-space, no-parenthesis string to be a column spec
522           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
523           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
524             push @$as, $_;
525           }
526           # if all else fails - raise a flag that no more aliasing will be allowed
527           else {
528             $attrs->{_dark_selector} = {
529               plus_stage => $pref,
530               string => ($dark_sel_dumper ||= do {
531                   require Data::Dumper::Concise;
532                   Data::Dumper::Concise::DumperObject()->Indent(0);
533                 })->Values([$_])->Dump
534               ,
535             };
536             last SELECTOR;
537           }
538         }
539       }
540     }
541     elsif (@$as < @$sel) {
542       $self->throw_exception(
543         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
544       );
545     }
546     elsif ($pref and $attrs->{_dark_selector}) {
547       $self->throw_exception(
548         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
549       );
550     }
551
552
553     # merge result
554     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
555     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
556   }
557 }
558
559 sub _stack_cond {
560   my ($self, $left, $right) = @_;
561
562   # collapse single element top-level conditions
563   # (single pass only, unlikely to need recursion)
564   for ($left, $right) {
565     if (ref $_ eq 'ARRAY') {
566       if (@$_ == 0) {
567         $_ = undef;
568       }
569       elsif (@$_ == 1) {
570         $_ = $_->[0];
571       }
572     }
573     elsif (ref $_ eq 'HASH') {
574       my ($first, $more) = keys %$_;
575
576       # empty hash
577       if (! defined $first) {
578         $_ = undef;
579       }
580       # one element hash
581       elsif (! defined $more) {
582         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
583           $_ = $_->{'-and'};
584         }
585         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
586           $_ = $_->{'-or'};
587         }
588       }
589     }
590   }
591
592   # merge hashes with weeding out of duplicates (simple cases only)
593   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
594
595     # shallow copy to destroy
596     $right = { %$right };
597     for (grep { exists $right->{$_} } keys %$left) {
598       # the use of eq_deeply here is justified - the rhs of an
599       # expression can contain a lot of twisted weird stuff
600       delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
601     }
602
603     $right = undef unless keys %$right;
604   }
605
606
607   if (defined $left xor defined $right) {
608     return defined $left ? $left : $right;
609   }
610   elsif (! defined $left) {
611     return undef;
612   }
613   else {
614     return { -and => [ $left, $right ] };
615   }
616 }
617
618 =head2 search_literal
619
620 =over 4
621
622 =item Arguments: $sql_fragment, @bind_values
623
624 =item Return Value: $resultset (scalar context) || @row_objs (list context)
625
626 =back
627
628   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
629   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
630
631 Pass a literal chunk of SQL to be added to the conditional part of the
632 resultset query.
633
634 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
635 only be used in that context. C<search_literal> is a convenience method.
636 It is equivalent to calling $schema->search(\[]), but if you want to ensure
637 columns are bound correctly, use C<search>.
638
639 Example of how to use C<search> instead of C<search_literal>
640
641   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
642   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
643
644
645 See L<DBIx::Class::Manual::Cookbook/Searching> and
646 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
647 require C<search_literal>.
648
649 =cut
650
651 sub search_literal {
652   my ($self, $sql, @bind) = @_;
653   my $attr;
654   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
655     $attr = pop @bind;
656   }
657   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
658 }
659
660 =head2 find
661
662 =over 4
663
664 =item Arguments: \%columns_values | @pk_values, \%attrs?
665
666 =item Return Value: $row_object | undef
667
668 =back
669
670 Finds and returns a single row based on supplied criteria. Takes either a
671 hashref with the same format as L</create> (including inference of foreign
672 keys from related objects), or a list of primary key values in the same
673 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
674 declaration on the L</result_source>.
675
676 In either case an attempt is made to combine conditions already existing on
677 the resultset with the condition passed to this method.
678
679 To aid with preparing the correct query for the storage you may supply the
680 C<key> attribute, which is the name of a
681 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
682 unique constraint corresponding to the
683 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
684 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
685 to construct a query that satisfies the named unique constraint fully (
686 non-NULL values for each column member of the constraint) an exception is
687 thrown.
688
689 If no C<key> is specified, the search is carried over all unique constraints
690 which are fully defined by the available condition.
691
692 If no such constraint is found, C<find> currently defaults to a simple
693 C<< search->(\%column_values) >> which may or may not do what you expect.
694 Note that this fallback behavior may be deprecated in further versions. If
695 you need to search with arbitrary conditions - use L</search>. If the query
696 resulting from this fallback produces more than one row, a warning to the
697 effect is issued, though only the first row is constructed and returned as
698 C<$row_object>.
699
700 In addition to C<key>, L</find> recognizes and applies standard
701 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
702
703 Note that if you have extra concerns about the correctness of the resulting
704 query you need to specify the C<key> attribute and supply the entire condition
705 as an argument to find (since it is not always possible to perform the
706 combination of the resultset condition with the supplied one, especially if
707 the resultset condition contains literal sql).
708
709 For example, to find a row by its primary key:
710
711   my $cd = $schema->resultset('CD')->find(5);
712
713 You can also find a row by a specific unique constraint:
714
715   my $cd = $schema->resultset('CD')->find(
716     {
717       artist => 'Massive Attack',
718       title  => 'Mezzanine',
719     },
720     { key => 'cd_artist_title' }
721   );
722
723 See also L</find_or_create> and L</update_or_create>.
724
725 =cut
726
727 sub find {
728   my $self = shift;
729   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
730
731   my $rsrc = $self->result_source;
732
733   my $constraint_name;
734   if (exists $attrs->{key}) {
735     $constraint_name = defined $attrs->{key}
736       ? $attrs->{key}
737       : $self->throw_exception("An undefined 'key' resultset attribute makes no sense")
738     ;
739   }
740
741   # Parse out the condition from input
742   my $call_cond;
743
744   if (ref $_[0] eq 'HASH') {
745     $call_cond = { %{$_[0]} };
746   }
747   else {
748     # if only values are supplied we need to default to 'primary'
749     $constraint_name = 'primary' unless defined $constraint_name;
750
751     my @c_cols = $rsrc->unique_constraint_columns($constraint_name);
752
753     $self->throw_exception(
754       "No constraint columns, maybe a malformed '$constraint_name' constraint?"
755     ) unless @c_cols;
756
757     $self->throw_exception (
758       'find() expects either a column/value hashref, or a list of values '
759     . "corresponding to the columns of the specified unique constraint '$constraint_name'"
760     ) unless @c_cols == @_;
761
762     $call_cond = {};
763     @{$call_cond}{@c_cols} = @_;
764   }
765
766   my %related;
767   for my $key (keys %$call_cond) {
768     if (
769       my $keyref = ref($call_cond->{$key})
770         and
771       my $relinfo = $rsrc->relationship_info($key)
772     ) {
773       my $val = delete $call_cond->{$key};
774
775       next if $keyref eq 'ARRAY'; # has_many for multi_create
776
777       my $rel_q = $rsrc->_resolve_condition(
778         $relinfo->{cond}, $val, $key, $key
779       );
780       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
781       @related{keys %$rel_q} = values %$rel_q;
782     }
783   }
784
785   # relationship conditions take precedence (?)
786   @{$call_cond}{keys %related} = values %related;
787
788   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
789   my $final_cond;
790   if (defined $constraint_name) {
791     $final_cond = $self->_qualify_cond_columns (
792
793       $self->_build_unique_cond (
794         $constraint_name,
795         $call_cond,
796       ),
797
798       $alias,
799     );
800   }
801   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
802     # This means that we got here after a merger of relationship conditions
803     # in ::Relationship::Base::search_related (the row method), and furthermore
804     # the relationship is of the 'single' type. This means that the condition
805     # provided by the relationship (already attached to $self) is sufficient,
806     # as there can be only one row in the database that would satisfy the
807     # relationship
808   }
809   else {
810     # no key was specified - fall down to heuristics mode:
811     # run through all unique queries registered on the resultset, and
812     # 'OR' all qualifying queries together
813     my (@unique_queries, %seen_column_combinations);
814     for my $c_name ($rsrc->unique_constraint_names) {
815       next if $seen_column_combinations{
816         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
817       }++;
818
819       push @unique_queries, try {
820         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
821       } || ();
822     }
823
824     $final_cond = @unique_queries
825       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
826       : $self->_non_unique_find_fallback ($call_cond, $attrs)
827     ;
828   }
829
830   # Run the query, passing the result_class since it should propagate for find
831   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
832   if ($rs->_resolved_attrs->{collapse}) {
833     my $row = $rs->next;
834     carp "Query returned more than one row" if $rs->next;
835     return $row;
836   }
837   else {
838     return $rs->single;
839   }
840 }
841
842 # This is a stop-gap method as agreed during the discussion on find() cleanup:
843 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
844 #
845 # It is invoked when find() is called in legacy-mode with insufficiently-unique
846 # condition. It is provided for overrides until a saner way forward is devised
847 #
848 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
849 # the road. Please adjust your tests accordingly to catch this situation early
850 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
851 #
852 # The method will not be removed without an adequately complete replacement
853 # for strict-mode enforcement
854 sub _non_unique_find_fallback {
855   my ($self, $cond, $attrs) = @_;
856
857   return $self->_qualify_cond_columns(
858     $cond,
859     exists $attrs->{alias}
860       ? $attrs->{alias}
861       : $self->{attrs}{alias}
862   );
863 }
864
865
866 sub _qualify_cond_columns {
867   my ($self, $cond, $alias) = @_;
868
869   my %aliased = %$cond;
870   for (keys %aliased) {
871     $aliased{"$alias.$_"} = delete $aliased{$_}
872       if $_ !~ /\./;
873   }
874
875   return \%aliased;
876 }
877
878 sub _build_unique_cond {
879   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
880
881   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
882
883   # combination may fail if $self->{cond} is non-trivial
884   my ($final_cond) = try {
885     $self->_merge_with_rscond ($extra_cond)
886   } catch {
887     +{ %$extra_cond }
888   };
889
890   # trim out everything not in $columns
891   $final_cond = { map {
892     exists $final_cond->{$_}
893       ? ( $_ => $final_cond->{$_} )
894       : ()
895   } @c_cols };
896
897   if (my @missing = grep
898     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
899     (@c_cols)
900   ) {
901     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
902       $constraint_name,
903       join (', ', map { "'$_'" } @missing),
904     ) );
905   }
906
907   if (
908     !$croak_on_null
909       and
910     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
911       and
912     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
913   ) {
914     carp_unique ( sprintf (
915       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
916     . 'values in column(s): %s). This is almost certainly not what you wanted, '
917     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
918       $constraint_name,
919       join (', ', map { "'$_'" } @undefs),
920     ));
921   }
922
923   return $final_cond;
924 }
925
926 =head2 search_related
927
928 =over 4
929
930 =item Arguments: $rel, $cond?, \%attrs?
931
932 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
933
934 =back
935
936   $new_rs = $cd_rs->search_related('artist', {
937     name => 'Emo-R-Us',
938   });
939
940 Searches the specified relationship, optionally specifying a condition and
941 attributes for matching records. See L</ATTRIBUTES> for more information.
942
943 In list context, C<< ->all() >> is called implicitly on the resultset, thus
944 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
945
946 See also L</search_related_rs>.
947
948 =cut
949
950 sub search_related {
951   return shift->related_resultset(shift)->search(@_);
952 }
953
954 =head2 search_related_rs
955
956 This method works exactly the same as search_related, except that
957 it guarantees a resultset, even in list context.
958
959 =cut
960
961 sub search_related_rs {
962   return shift->related_resultset(shift)->search_rs(@_);
963 }
964
965 =head2 cursor
966
967 =over 4
968
969 =item Arguments: none
970
971 =item Return Value: $cursor
972
973 =back
974
975 Returns a storage-driven cursor to the given resultset. See
976 L<DBIx::Class::Cursor> for more information.
977
978 =cut
979
980 sub cursor {
981   my ($self) = @_;
982
983   my $attrs = $self->_resolved_attrs_copy;
984
985   return $self->{cursor}
986     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
987           $attrs->{where},$attrs);
988 }
989
990 =head2 single
991
992 =over 4
993
994 =item Arguments: $cond?
995
996 =item Return Value: $row_object | undef
997
998 =back
999
1000   my $cd = $schema->resultset('CD')->single({ year => 2001 });
1001
1002 Inflates the first result without creating a cursor if the resultset has
1003 any records in it; if not returns C<undef>. Used by L</find> as a lean version
1004 of L</search>.
1005
1006 While this method can take an optional search condition (just like L</search>)
1007 being a fast-code-path it does not recognize search attributes. If you need to
1008 add extra joins or similar, call L</search> and then chain-call L</single> on the
1009 L<DBIx::Class::ResultSet> returned.
1010
1011 =over
1012
1013 =item B<Note>
1014
1015 As of 0.08100, this method enforces the assumption that the preceding
1016 query returns only one row. If more than one row is returned, you will receive
1017 a warning:
1018
1019   Query returned more than one row
1020
1021 In this case, you should be using L</next> or L</find> instead, or if you really
1022 know what you are doing, use the L</rows> attribute to explicitly limit the size
1023 of the resultset.
1024
1025 This method will also throw an exception if it is called on a resultset prefetching
1026 has_many, as such a prefetch implies fetching multiple rows from the database in
1027 order to assemble the resulting object.
1028
1029 =back
1030
1031 =cut
1032
1033 sub single {
1034   my ($self, $where) = @_;
1035   if(@_ > 2) {
1036       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
1037   }
1038
1039   my $attrs = $self->_resolved_attrs_copy;
1040
1041   $self->throw_exception(
1042     'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1043   ) if $attrs->{collapse};
1044
1045   if ($where) {
1046     if (defined $attrs->{where}) {
1047       $attrs->{where} = {
1048         '-and' =>
1049             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1050                $where, delete $attrs->{where} ]
1051       };
1052     } else {
1053       $attrs->{where} = $where;
1054     }
1055   }
1056
1057   my $data = [ $self->result_source->storage->select_single(
1058     $attrs->{from}, $attrs->{select},
1059     $attrs->{where}, $attrs
1060   )];
1061
1062   return @$data ? $self->_construct_objects($data)->[0] : undef;
1063 }
1064
1065
1066 # _collapse_query
1067 #
1068 # Recursively collapse the query, accumulating values for each column.
1069
1070 sub _collapse_query {
1071   my ($self, $query, $collapsed) = @_;
1072
1073   $collapsed ||= {};
1074
1075   if (ref $query eq 'ARRAY') {
1076     foreach my $subquery (@$query) {
1077       next unless ref $subquery;  # -or
1078       $collapsed = $self->_collapse_query($subquery, $collapsed);
1079     }
1080   }
1081   elsif (ref $query eq 'HASH') {
1082     if (keys %$query and (keys %$query)[0] eq '-and') {
1083       foreach my $subquery (@{$query->{-and}}) {
1084         $collapsed = $self->_collapse_query($subquery, $collapsed);
1085       }
1086     }
1087     else {
1088       foreach my $col (keys %$query) {
1089         my $value = $query->{$col};
1090         $collapsed->{$col}{$value}++;
1091       }
1092     }
1093   }
1094
1095   return $collapsed;
1096 }
1097
1098 =head2 get_column
1099
1100 =over 4
1101
1102 =item Arguments: $cond?
1103
1104 =item Return Value: $resultsetcolumn
1105
1106 =back
1107
1108   my $max_length = $rs->get_column('length')->max;
1109
1110 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1111
1112 =cut
1113
1114 sub get_column {
1115   my ($self, $column) = @_;
1116   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1117   return $new;
1118 }
1119
1120 =head2 search_like
1121
1122 =over 4
1123
1124 =item Arguments: $cond, \%attrs?
1125
1126 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1127
1128 =back
1129
1130   # WHERE title LIKE '%blue%'
1131   $cd_rs = $rs->search_like({ title => '%blue%'});
1132
1133 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1134 that this is simply a convenience method retained for ex Class::DBI users.
1135 You most likely want to use L</search> with specific operators.
1136
1137 For more information, see L<DBIx::Class::Manual::Cookbook>.
1138
1139 This method is deprecated and will be removed in 0.09. Use L</search()>
1140 instead. An example conversion is:
1141
1142   ->search_like({ foo => 'bar' });
1143
1144   # Becomes
1145
1146   ->search({ foo => { like => 'bar' } });
1147
1148 =cut
1149
1150 sub search_like {
1151   my $class = shift;
1152   carp_unique (
1153     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1154    .' Instead use ->search({ x => { -like => "y%" } })'
1155    .' (note the outer pair of {}s - they are important!)'
1156   );
1157   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1158   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1159   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1160   return $class->search($query, { %$attrs });
1161 }
1162
1163 =head2 slice
1164
1165 =over 4
1166
1167 =item Arguments: $first, $last
1168
1169 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1170
1171 =back
1172
1173 Returns a resultset or object list representing a subset of elements from the
1174 resultset slice is called on. Indexes are from 0, i.e., to get the first
1175 three records, call:
1176
1177   my ($one, $two, $three) = $rs->slice(0, 2);
1178
1179 =cut
1180
1181 sub slice {
1182   my ($self, $min, $max) = @_;
1183   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1184   $attrs->{offset} = $self->{attrs}{offset} || 0;
1185   $attrs->{offset} += $min;
1186   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1187   return $self->search(undef, $attrs);
1188   #my $slice = (ref $self)->new($self->result_source, $attrs);
1189   #return (wantarray ? $slice->all : $slice);
1190 }
1191
1192 =head2 next
1193
1194 =over 4
1195
1196 =item Arguments: none
1197
1198 =item Return Value: $result | undef
1199
1200 =back
1201
1202 Returns the next element in the resultset (C<undef> is there is none).
1203
1204 Can be used to efficiently iterate over records in the resultset:
1205
1206   my $rs = $schema->resultset('CD')->search;
1207   while (my $cd = $rs->next) {
1208     print $cd->title;
1209   }
1210
1211 Note that you need to store the resultset object, and call C<next> on it.
1212 Calling C<< resultset('Table')->next >> repeatedly will always return the
1213 first record from the resultset.
1214
1215 =cut
1216
1217 sub next {
1218   my ($self) = @_;
1219
1220   if (my $cache = $self->get_cache) {
1221     $self->{all_cache_position} ||= 0;
1222     return $cache->[$self->{all_cache_position}++];
1223   }
1224
1225   if ($self->{attrs}{cache}) {
1226     delete $self->{pager};
1227     $self->{all_cache_position} = 1;
1228     return ($self->all)[0];
1229   }
1230
1231   return shift(@{$self->{stashed_objects}}) if @{ $self->{stashed_objects}||[] };
1232
1233   $self->{stashed_objects} = $self->_construct_objects
1234     or return undef;
1235
1236   return shift @{$self->{stashed_objects}};
1237 }
1238
1239 # takes a single DBI-row of data and coinstructs as many objects
1240 # as the resultset attributes call for.
1241 # This can be a bit of an action at a distance - it takes as an argument
1242 # the *current* cursor-row (already taken off the $sth), but if
1243 # collapsing is requested it will keep advancing the cursor either
1244 # until the current row-object is assembled (the collapser was able to
1245 # order the result sensibly) OR until the cursor is exhausted (an
1246 # unordered collapsing resultset effectively triggers ->all)
1247 sub _construct_objects {
1248   my ($self, $fetched_row, $fetch_all) = @_;
1249
1250   my $attrs = $self->_resolved_attrs;
1251   my $unordered = 0;  # will deal with this later
1252
1253   # this will be used as both initial raw-row collector AND as a RV of
1254   # _construct_objects. Not regrowing the   # array twice matters a lot...
1255   # a suprising amount actually
1256   my $rows;
1257
1258   # $fetch_all implies all() which means all stashes have been cleared
1259   # and the cursor reset
1260   if ($fetch_all) {
1261     # FIXME - we can do better, cursor->all (well a diff. method) should return a ref
1262     $rows = [ $self->cursor->all ];
1263   }
1264   elsif ($unordered) {
1265     $rows = [
1266       $fetched_row||(),
1267       @{ delete $self->{stashed_rows} || []},
1268       $self->cursor->all,
1269     ];
1270   }
1271   else {  # simple single object
1272     $rows = [ $fetched_row || ( @{$self->{stashed_rows}||[]} ? shift @{$self->{stashed_rows}} : [$self->cursor->next] ) ];
1273   }
1274
1275   return undef unless @{$rows->[0]||[]};
1276
1277   my $rsrc = $self->result_source;
1278   my $res_class = $self->result_class;
1279   my $inflator = $res_class->can ('inflate_result')
1280     or $self->throw_exception("Inflator $res_class does not provide an inflate_result() method");
1281
1282   # construct a much simpler array->hash folder for the one-table cases right here
1283   if ($attrs->{_single_object_inflation} and ! $attrs->{collapse}) {
1284     # FIXME SUBOPTIMAL this is a very very very hot spot
1285     # while rather optimal we can *still* do much better, by
1286     # building a smarter [Row|HRI]::inflate_result(), and
1287     # switch to feeding it data via a much leaner interface
1288     #
1289     my $infmap = $attrs->{as};
1290     my @as_idx = 0..$#$infmap;
1291     for my $r (@$rows) {
1292       $r = [{ map { $infmap->[$_] => $r->[$_] } @as_idx }]
1293     }
1294
1295     # FIXME - this seems to be faster than the hashmapper above, especially
1296     # on more rows, but need a better bench-environment to confirm
1297     #eval sprintf (
1298     #  '$_ = [{ %s }] for @$rows',
1299     #  join (', ', map { "\$infmap->[$_] => \$_->[$_]" } 0..$#$infmap )
1300     #);
1301   }
1302   else {
1303     push @$rows, @{$self->{stashed_rows}||[]};
1304
1305     my $perl = $rsrc->_mk_row_parser({
1306       inflate_map => $attrs->{as},
1307       selection => $attrs->{select},
1308       collapse => $attrs->{collapse},
1309       unordered => $unordered,
1310     });
1311
1312     (eval "sub { no warnings; no strict; $perl }")->( # disable of strictures seems to have some effect, weird
1313       $rows,  # modify in-place, shrinking/extending as necessary
1314       ($attrs->{collapse} and ! $fetch_all and ! $unordered)
1315         ? (
1316             sub { my @r = $self->cursor->next or return undef; \@r },
1317             ($self->{stashed_rows} = []), # this is where we empty things and prepare for leftovers
1318           )
1319         : ()
1320       ,
1321     );
1322   }
1323
1324   $_ = $res_class->$inflator($rsrc, @$_) for @$rows;
1325
1326   # CDBI compat stuff
1327   if ($attrs->{record_filter}) {
1328     $_ = $attrs->{record_filter}->($_) for @$rows;
1329   }
1330
1331   return $rows;
1332 }
1333
1334 =head2 result_source
1335
1336 =over 4
1337
1338 =item Arguments: $result_source?
1339
1340 =item Return Value: $result_source
1341
1342 =back
1343
1344 An accessor for the primary ResultSource object from which this ResultSet
1345 is derived.
1346
1347 =head2 result_class
1348
1349 =over 4
1350
1351 =item Arguments: $result_class?
1352
1353 =item Return Value: $result_class
1354
1355 =back
1356
1357 An accessor for the class to use when creating row objects. Defaults to
1358 C<< result_source->result_class >> - which in most cases is the name of the
1359 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1360
1361 Note that changing the result_class will also remove any components
1362 that were originally loaded in the source class via
1363 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1364 in the original source class will not run.
1365
1366 =cut
1367
1368 sub result_class {
1369   my ($self, $result_class) = @_;
1370   if ($result_class) {
1371     unless (ref $result_class) { # don't fire this for an object
1372       $self->ensure_class_loaded($result_class);
1373     }
1374     $self->_result_class($result_class);
1375     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1376     # permit the user to set result class on one result set only; it only
1377     # chains if provided to search()
1378     #$self->{attrs}{result_class} = $result_class if ref $self;
1379   }
1380   $self->_result_class;
1381 }
1382
1383 =head2 count
1384
1385 =over 4
1386
1387 =item Arguments: $cond, \%attrs??
1388
1389 =item Return Value: $count
1390
1391 =back
1392
1393 Performs an SQL C<COUNT> with the same query as the resultset was built
1394 with to find the number of elements. Passing arguments is equivalent to
1395 C<< $rs->search ($cond, \%attrs)->count >>
1396
1397 =cut
1398
1399 sub count {
1400   my $self = shift;
1401   return $self->search(@_)->count if @_ and defined $_[0];
1402   return scalar @{ $self->get_cache } if $self->get_cache;
1403
1404   my $attrs = $self->_resolved_attrs_copy;
1405
1406   # this is a little optimization - it is faster to do the limit
1407   # adjustments in software, instead of a subquery
1408   my $rows = delete $attrs->{rows};
1409   my $offset = delete $attrs->{offset};
1410
1411   my $crs;
1412   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1413     $crs = $self->_count_subq_rs ($attrs);
1414   }
1415   else {
1416     $crs = $self->_count_rs ($attrs);
1417   }
1418   my $count = $crs->next;
1419
1420   $count -= $offset if $offset;
1421   $count = $rows if $rows and $rows < $count;
1422   $count = 0 if ($count < 0);
1423
1424   return $count;
1425 }
1426
1427 =head2 count_rs
1428
1429 =over 4
1430
1431 =item Arguments: $cond, \%attrs??
1432
1433 =item Return Value: $count_rs
1434
1435 =back
1436
1437 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1438 This can be very handy for subqueries:
1439
1440   ->search( { amount => $some_rs->count_rs->as_query } )
1441
1442 As with regular resultsets the SQL query will be executed only after
1443 the resultset is accessed via L</next> or L</all>. That would return
1444 the same single value obtainable via L</count>.
1445
1446 =cut
1447
1448 sub count_rs {
1449   my $self = shift;
1450   return $self->search(@_)->count_rs if @_;
1451
1452   # this may look like a lack of abstraction (count() does about the same)
1453   # but in fact an _rs *must* use a subquery for the limits, as the
1454   # software based limiting can not be ported if this $rs is to be used
1455   # in a subquery itself (i.e. ->as_query)
1456   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1457     return $self->_count_subq_rs;
1458   }
1459   else {
1460     return $self->_count_rs;
1461   }
1462 }
1463
1464 #
1465 # returns a ResultSetColumn object tied to the count query
1466 #
1467 sub _count_rs {
1468   my ($self, $attrs) = @_;
1469
1470   my $rsrc = $self->result_source;
1471   $attrs ||= $self->_resolved_attrs;
1472
1473   my $tmp_attrs = { %$attrs };
1474   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1475   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1476
1477   # overwrite the selector (supplied by the storage)
1478   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1479   $tmp_attrs->{as} = 'count';
1480   delete @{$tmp_attrs}{qw/columns/};
1481
1482   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1483
1484   return $tmp_rs;
1485 }
1486
1487 #
1488 # same as above but uses a subquery
1489 #
1490 sub _count_subq_rs {
1491   my ($self, $attrs) = @_;
1492
1493   my $rsrc = $self->result_source;
1494   $attrs ||= $self->_resolved_attrs;
1495
1496   my $sub_attrs = { %$attrs };
1497   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1498   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1499
1500   # if we multi-prefetch we group_by something unique, as this is what we would
1501   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1502   if ( $attrs->{collapse}  ) {
1503     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } @{
1504       $rsrc->_identifying_column_set || $self->throw_exception(
1505         'Unable to construct a unique group_by criteria properly collapsing the '
1506       . 'has_many prefetch before count()'
1507       );
1508     } ]
1509   }
1510
1511   # Calculate subquery selector
1512   if (my $g = $sub_attrs->{group_by}) {
1513
1514     my $sql_maker = $rsrc->storage->sql_maker;
1515
1516     # necessary as the group_by may refer to aliased functions
1517     my $sel_index;
1518     for my $sel (@{$attrs->{select}}) {
1519       $sel_index->{$sel->{-as}} = $sel
1520         if (ref $sel eq 'HASH' and $sel->{-as});
1521     }
1522
1523     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1524     # also look for named aggregates referred in the having clause
1525     # having often contains scalarrefs - thus parse it out entirely
1526     my @parts = @$g;
1527     if ($attrs->{having}) {
1528       local $sql_maker->{having_bind};
1529       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1530       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1531       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1532         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1533         # if we don't unset it we screw up retarded but unfortunately working
1534         # 'MAX(foo.bar)' => { '>', 3 }
1535         $sql_maker->{name_sep} = '';
1536       }
1537
1538       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1539
1540       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1541
1542       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1543       # and if all fails for an utterly naive quoted scalar-with-function
1544       while ($sql =~ /
1545         $rquote $sep $lquote (.+?) $rquote
1546           |
1547         [\s,] \w+ \. (\w+) [\s,]
1548           |
1549         [\s,] $lquote (.+?) $rquote [\s,]
1550       /gx) {
1551         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1552       }
1553     }
1554
1555     for (@parts) {
1556       my $colpiece = $sel_index->{$_} || $_;
1557
1558       # unqualify join-based group_by's. Arcane but possible query
1559       # also horrible horrible hack to alias a column (not a func.)
1560       # (probably need to introduce SQLA syntax)
1561       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1562         my $as = $colpiece;
1563         $as =~ s/\./__/;
1564         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1565       }
1566       push @{$sub_attrs->{select}}, $colpiece;
1567     }
1568   }
1569   else {
1570     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1571     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1572   }
1573
1574   return $rsrc->resultset_class
1575                ->new ($rsrc, $sub_attrs)
1576                 ->as_subselect_rs
1577                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1578                   ->get_column ('count');
1579 }
1580
1581 sub _bool {
1582   return 1;
1583 }
1584
1585 =head2 count_literal
1586
1587 =over 4
1588
1589 =item Arguments: $sql_fragment, @bind_values
1590
1591 =item Return Value: $count
1592
1593 =back
1594
1595 Counts the results in a literal query. Equivalent to calling L</search_literal>
1596 with the passed arguments, then L</count>.
1597
1598 =cut
1599
1600 sub count_literal { shift->search_literal(@_)->count; }
1601
1602 =head2 all
1603
1604 =over 4
1605
1606 =item Arguments: none
1607
1608 =item Return Value: @objects
1609
1610 =back
1611
1612 Returns all elements in the resultset.
1613
1614 =cut
1615
1616 sub all {
1617   my $self = shift;
1618   if(@_) {
1619     $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1620   }
1621
1622   delete $self->{stashed_rows};
1623   delete $self->{stashed_objects};
1624
1625   if (my $c = $self->get_cache) {
1626     return @$c;
1627   }
1628
1629   $self->cursor->reset;
1630
1631   my $objs = $self->_construct_objects(undef, 'fetch_all') || [];
1632
1633   $self->set_cache($objs) if $self->{attrs}{cache};
1634
1635   return @$objs;
1636 }
1637
1638 =head2 reset
1639
1640 =over 4
1641
1642 =item Arguments: none
1643
1644 =item Return Value: $self
1645
1646 =back
1647
1648 Resets the resultset's cursor, so you can iterate through the elements again.
1649 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1650 another query.
1651
1652 =cut
1653
1654 sub reset {
1655   my ($self) = @_;
1656   delete $self->{_attrs};
1657   delete $self->{stashed_rows};
1658   delete $self->{stashed_objects};
1659
1660   $self->{all_cache_position} = 0;
1661   $self->cursor->reset;
1662   return $self;
1663 }
1664
1665 =head2 first
1666
1667 =over 4
1668
1669 =item Arguments: none
1670
1671 =item Return Value: $object | undef
1672
1673 =back
1674
1675 Resets the resultset and returns an object for the first result (or C<undef>
1676 if the resultset is empty).
1677
1678 =cut
1679
1680 sub first {
1681   return $_[0]->reset->next;
1682 }
1683
1684
1685 # _rs_update_delete
1686 #
1687 # Determines whether and what type of subquery is required for the $rs operation.
1688 # If grouping is necessary either supplies its own, or verifies the current one
1689 # After all is done delegates to the proper storage method.
1690
1691 sub _rs_update_delete {
1692   my ($self, $op, $values) = @_;
1693
1694   my $cond = $self->{cond};
1695   my $rsrc = $self->result_source;
1696   my $storage = $rsrc->schema->storage;
1697
1698   my $attrs = { %{$self->_resolved_attrs} };
1699
1700   # "needs" is a strong word here - if the subquery is part of an IN clause - no point of
1701   # even adding the group_by. It will really be used only when composing a poor-man's
1702   # multicolumn-IN equivalent OR set
1703   my $needs_group_by_subq = defined $attrs->{group_by};
1704
1705   # simplify the joinmap and maybe decide if a grouping (and thus subquery) is necessary
1706   my $relation_classifications;
1707   if (ref($attrs->{from}) eq 'ARRAY') {
1708     $attrs->{from} = $storage->_prune_unused_joins ($attrs->{from}, $attrs->{select}, $cond, $attrs);
1709
1710     $relation_classifications = $storage->_resolve_aliastypes_from_select_args (
1711       [ @{$attrs->{from}}[1 .. $#{$attrs->{from}}] ],
1712       $attrs->{select},
1713       $cond,
1714       $attrs
1715     ) unless $needs_group_by_subq;  # we already know we need a group, no point of resolving them
1716   }
1717   else {
1718     $needs_group_by_subq ||= 1; # if {from} is unparseable assume the worst
1719   }
1720
1721   $needs_group_by_subq ||= exists $relation_classifications->{multiplying};
1722
1723   # if no subquery - life is easy-ish
1724   unless (
1725     $needs_group_by_subq
1726       or
1727     keys %$relation_classifications # if any joins at all - need to wrap a subq
1728       or
1729     $self->_has_resolved_attr(qw/rows offset/) # limits call for a subq
1730   ) {
1731     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1732     # a condition containing 'me' or other table prefixes will not work
1733     # at all. What this code tries to do (badly) is to generate a condition
1734     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1735     #
1736     # this is atrocious and should be replaced by normal sqla introspection
1737     # one sunny day
1738     my ($sql, @bind) = do {
1739       my $sqla = $rsrc->storage->sql_maker;
1740       local $sqla->{_dequalify_idents} = 1;
1741       $sqla->_recurse_where($self->{cond});
1742     } if $self->{cond};
1743
1744     return $rsrc->storage->$op(
1745       $rsrc,
1746       $op eq 'update' ? $values : (),
1747       $self->{cond} ? \[$sql, @bind] : (),
1748     );
1749   }
1750
1751   # we got this far - means it is time to wrap a subquery
1752   my $idcols = $rsrc->_identifying_column_set || $self->throw_exception(
1753     sprintf(
1754       "Unable to perform complex resultset %s() without an identifying set of columns on source '%s'",
1755       $op,
1756       $rsrc->source_name,
1757     )
1758   );
1759   my $existing_group_by = delete $attrs->{group_by};
1760
1761   # make a new $rs selecting only the PKs (that's all we really need for the subq)
1762   delete $attrs->{$_} for qw/collapse select _prefetch_selector_range as/;
1763   $attrs->{columns} = [ map { "$attrs->{alias}.$_" } @$idcols ];
1764   $attrs->{group_by} = \ '';  # FIXME - this is an evil hack, it causes the optimiser to kick in and throw away the LEFT joins
1765   my $subrs = (ref $self)->new($rsrc, $attrs);
1766
1767   if (@$idcols == 1) {
1768     return $storage->$op (
1769       $rsrc,
1770       $op eq 'update' ? $values : (),
1771       { $idcols->[0] => { -in => $subrs->as_query } },
1772     );
1773   }
1774   elsif ($storage->_use_multicolumn_in) {
1775     # This is hideously ugly, but SQLA does not understand multicol IN expressions
1776     my $sql_maker = $storage->sql_maker;
1777     my ($sql, @bind) = @${$subrs->as_query};
1778     $sql = sprintf ('(%s) IN %s', # the as_query already comes with a set of parenthesis
1779       join (', ', map { $sql_maker->_quote ($_) } @$idcols),
1780       $sql,
1781     );
1782
1783     return $storage->$op (
1784       $rsrc,
1785       $op eq 'update' ? $values : (),
1786       \[$sql, @bind],
1787     );
1788   }
1789   else {
1790     # if all else fails - get all primary keys and operate over a ORed set
1791     # wrap in a transaction for consistency
1792     # this is where the group_by starts to matter
1793     my $subq_group_by;
1794     if ($needs_group_by_subq) {
1795       $subq_group_by = $attrs->{columns};
1796
1797       # make sure if there is a supplied group_by it matches the columns compiled above
1798       # perfectly. Anything else can not be sanely executed on most databases so croak
1799       # right then and there
1800       if ($existing_group_by) {
1801         my @current_group_by = map
1802           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1803           @$existing_group_by
1804         ;
1805
1806         if (
1807           join ("\x00", sort @current_group_by)
1808             ne
1809           join ("\x00", sort @$subq_group_by )
1810         ) {
1811           $self->throw_exception (
1812             "You have just attempted a $op operation on a resultset which does group_by"
1813             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1814             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1815             . ' kind of queries. Please retry the operation with a modified group_by or'
1816             . ' without using one at all.'
1817           );
1818         }
1819       }
1820     }
1821
1822     my $guard = $storage->txn_scope_guard;
1823
1824     my @op_condition;
1825     for my $row ($subrs->search({}, { group_by => $subq_group_by })->cursor->all) {
1826       push @op_condition, { map
1827         { $idcols->[$_] => $row->[$_] }
1828         (0 .. $#$idcols)
1829       };
1830     }
1831
1832     my $res = $storage->$op (
1833       $rsrc,
1834       $op eq 'update' ? $values : (),
1835       \@op_condition,
1836     );
1837
1838     $guard->commit;
1839
1840     return $res;
1841   }
1842 }
1843
1844 =head2 update
1845
1846 =over 4
1847
1848 =item Arguments: \%values
1849
1850 =item Return Value: $storage_rv
1851
1852 =back
1853
1854 Sets the specified columns in the resultset to the supplied values in a
1855 single query. Note that this will not run any accessor/set_column/update
1856 triggers, nor will it update any row object instances derived from this
1857 resultset (this includes the contents of the L<resultset cache|/set_cache>
1858 if any). See L</update_all> if you need to execute any on-update
1859 triggers or cascades defined either by you or a
1860 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1861
1862 The return value is a pass through of what the underlying
1863 storage backend returned, and may vary. See L<DBI/execute> for the most
1864 common case.
1865
1866 =head3 CAVEAT
1867
1868 Note that L</update> does not process/deflate any of the values passed in.
1869 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1870 ensure manually that any value passed to this method will stringify to
1871 something the RDBMS knows how to deal with. A notable example is the
1872 handling of L<DateTime> objects, for more info see:
1873 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
1874
1875 =cut
1876
1877 sub update {
1878   my ($self, $values) = @_;
1879   $self->throw_exception('Values for update must be a hash')
1880     unless ref $values eq 'HASH';
1881
1882   return $self->_rs_update_delete ('update', $values);
1883 }
1884
1885 =head2 update_all
1886
1887 =over 4
1888
1889 =item Arguments: \%values
1890
1891 =item Return Value: 1
1892
1893 =back
1894
1895 Fetches all objects and updates them one at a time via
1896 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1897 triggers, while L</update> will not.
1898
1899 =cut
1900
1901 sub update_all {
1902   my ($self, $values) = @_;
1903   $self->throw_exception('Values for update_all must be a hash')
1904     unless ref $values eq 'HASH';
1905
1906   my $guard = $self->result_source->schema->txn_scope_guard;
1907   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1908   $guard->commit;
1909   return 1;
1910 }
1911
1912 =head2 delete
1913
1914 =over 4
1915
1916 =item Arguments: none
1917
1918 =item Return Value: $storage_rv
1919
1920 =back
1921
1922 Deletes the rows matching this resultset in a single query. Note that this
1923 will not run any delete triggers, nor will it alter the
1924 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1925 derived from this resultset (this includes the contents of the
1926 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1927 execute any on-delete triggers or cascades defined either by you or a
1928 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1929
1930 The return value is a pass through of what the underlying storage backend
1931 returned, and may vary. See L<DBI/execute> for the most common case.
1932
1933 =cut
1934
1935 sub delete {
1936   my $self = shift;
1937   $self->throw_exception('delete does not accept any arguments')
1938     if @_;
1939
1940   return $self->_rs_update_delete ('delete');
1941 }
1942
1943 =head2 delete_all
1944
1945 =over 4
1946
1947 =item Arguments: none
1948
1949 =item Return Value: 1
1950
1951 =back
1952
1953 Fetches all objects and deletes them one at a time via
1954 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1955 triggers, while L</delete> will not.
1956
1957 =cut
1958
1959 sub delete_all {
1960   my $self = shift;
1961   $self->throw_exception('delete_all does not accept any arguments')
1962     if @_;
1963
1964   my $guard = $self->result_source->schema->txn_scope_guard;
1965   $_->delete for $self->all;
1966   $guard->commit;
1967   return 1;
1968 }
1969
1970 =head2 populate
1971
1972 =over 4
1973
1974 =item Arguments: \@data;
1975
1976 =back
1977
1978 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1979 For the arrayref of hashrefs style each hashref should be a structure suitable
1980 for submitting to a $resultset->create(...) method.
1981
1982 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1983 to insert the data, as this is a faster method.
1984
1985 Otherwise, each set of data is inserted into the database using
1986 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1987 accumulated into an array. The array itself, or an array reference
1988 is returned depending on scalar or list context.
1989
1990 Example:  Assuming an Artist Class that has many CDs Classes relating:
1991
1992   my $Artist_rs = $schema->resultset("Artist");
1993
1994   ## Void Context Example
1995   $Artist_rs->populate([
1996      { artistid => 4, name => 'Manufactured Crap', cds => [
1997         { title => 'My First CD', year => 2006 },
1998         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1999       ],
2000      },
2001      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
2002         { title => 'My parents sold me to a record company', year => 2005 },
2003         { title => 'Why Am I So Ugly?', year => 2006 },
2004         { title => 'I Got Surgery and am now Popular', year => 2007 }
2005       ],
2006      },
2007   ]);
2008
2009   ## Array Context Example
2010   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
2011     { name => "Artist One"},
2012     { name => "Artist Two"},
2013     { name => "Artist Three", cds=> [
2014     { title => "First CD", year => 2007},
2015     { title => "Second CD", year => 2008},
2016   ]}
2017   ]);
2018
2019   print $ArtistOne->name; ## response is 'Artist One'
2020   print $ArtistThree->cds->count ## reponse is '2'
2021
2022 For the arrayref of arrayrefs style,  the first element should be a list of the
2023 fieldsnames to which the remaining elements are rows being inserted.  For
2024 example:
2025
2026   $Arstist_rs->populate([
2027     [qw/artistid name/],
2028     [100, 'A Formally Unknown Singer'],
2029     [101, 'A singer that jumped the shark two albums ago'],
2030     [102, 'An actually cool singer'],
2031   ]);
2032
2033 Please note an important effect on your data when choosing between void and
2034 wantarray context. Since void context goes straight to C<insert_bulk> in
2035 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
2036 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
2037 create primary keys for you, you will find that your PKs are empty.  In this
2038 case you will have to use the wantarray context in order to create those
2039 values.
2040
2041 =cut
2042
2043 sub populate {
2044   my $self = shift;
2045
2046   # cruft placed in standalone method
2047   my $data = $self->_normalize_populate_args(@_);
2048
2049   return unless @$data;
2050
2051   if(defined wantarray) {
2052     my @created;
2053     foreach my $item (@$data) {
2054       push(@created, $self->create($item));
2055     }
2056     return wantarray ? @created : \@created;
2057   }
2058   else {
2059     my $first = $data->[0];
2060
2061     # if a column is a registered relationship, and is a non-blessed hash/array, consider
2062     # it relationship data
2063     my (@rels, @columns);
2064     my $rsrc = $self->result_source;
2065     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
2066     for (keys %$first) {
2067       my $ref = ref $first->{$_};
2068       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
2069         ? push @rels, $_
2070         : push @columns, $_
2071       ;
2072     }
2073
2074     my @pks = $rsrc->primary_columns;
2075
2076     ## do the belongs_to relationships
2077     foreach my $index (0..$#$data) {
2078
2079       # delegate to create() for any dataset without primary keys with specified relationships
2080       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2081         for my $r (@rels) {
2082           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2083             my @ret = $self->populate($data);
2084             return;
2085           }
2086         }
2087       }
2088
2089       foreach my $rel (@rels) {
2090         next unless ref $data->[$index]->{$rel} eq "HASH";
2091         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2092         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2093         my $related = $result->result_source->_resolve_condition(
2094           $reverse_relinfo->{cond},
2095           $self,
2096           $result,
2097           $rel,
2098         );
2099
2100         delete $data->[$index]->{$rel};
2101         $data->[$index] = {%{$data->[$index]}, %$related};
2102
2103         push @columns, keys %$related if $index == 0;
2104       }
2105     }
2106
2107     ## inherit the data locked in the conditions of the resultset
2108     my ($rs_data) = $self->_merge_with_rscond({});
2109     delete @{$rs_data}{@columns};
2110     my @inherit_cols = keys %$rs_data;
2111     my @inherit_data = values %$rs_data;
2112
2113     ## do bulk insert on current row
2114     $rsrc->storage->insert_bulk(
2115       $rsrc,
2116       [@columns, @inherit_cols],
2117       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2118     );
2119
2120     ## do the has_many relationships
2121     foreach my $item (@$data) {
2122
2123       my $main_row;
2124
2125       foreach my $rel (@rels) {
2126         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2127
2128         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2129
2130         my $child = $main_row->$rel;
2131
2132         my $related = $child->result_source->_resolve_condition(
2133           $rels->{$rel}{cond},
2134           $child,
2135           $main_row,
2136           $rel,
2137         );
2138
2139         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2140         my @populate = map { {%$_, %$related} } @rows_to_add;
2141
2142         $child->populate( \@populate );
2143       }
2144     }
2145   }
2146 }
2147
2148
2149 # populate() argumnets went over several incarnations
2150 # What we ultimately support is AoH
2151 sub _normalize_populate_args {
2152   my ($self, $arg) = @_;
2153
2154   if (ref $arg eq 'ARRAY') {
2155     if (!@$arg) {
2156       return [];
2157     }
2158     elsif (ref $arg->[0] eq 'HASH') {
2159       return $arg;
2160     }
2161     elsif (ref $arg->[0] eq 'ARRAY') {
2162       my @ret;
2163       my @colnames = @{$arg->[0]};
2164       foreach my $values (@{$arg}[1 .. $#$arg]) {
2165         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2166       }
2167       return \@ret;
2168     }
2169   }
2170
2171   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2172 }
2173
2174 =head2 pager
2175
2176 =over 4
2177
2178 =item Arguments: none
2179
2180 =item Return Value: $pager
2181
2182 =back
2183
2184 Return Value a L<Data::Page> object for the current resultset. Only makes
2185 sense for queries with a C<page> attribute.
2186
2187 To get the full count of entries for a paged resultset, call
2188 C<total_entries> on the L<Data::Page> object.
2189
2190 =cut
2191
2192 sub pager {
2193   my ($self) = @_;
2194
2195   return $self->{pager} if $self->{pager};
2196
2197   my $attrs = $self->{attrs};
2198   if (!defined $attrs->{page}) {
2199     $self->throw_exception("Can't create pager for non-paged rs");
2200   }
2201   elsif ($attrs->{page} <= 0) {
2202     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2203   }
2204   $attrs->{rows} ||= 10;
2205
2206   # throw away the paging flags and re-run the count (possibly
2207   # with a subselect) to get the real total count
2208   my $count_attrs = { %$attrs };
2209   delete $count_attrs->{$_} for qw/rows offset page pager/;
2210
2211   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2212
2213   require DBIx::Class::ResultSet::Pager;
2214   return $self->{pager} = DBIx::Class::ResultSet::Pager->new(
2215     sub { $total_rs->count },  #lazy-get the total
2216     $attrs->{rows},
2217     $self->{attrs}{page},
2218   );
2219 }
2220
2221 =head2 page
2222
2223 =over 4
2224
2225 =item Arguments: $page_number
2226
2227 =item Return Value: $rs
2228
2229 =back
2230
2231 Returns a resultset for the $page_number page of the resultset on which page
2232 is called, where each page contains a number of rows equal to the 'rows'
2233 attribute set on the resultset (10 by default).
2234
2235 =cut
2236
2237 sub page {
2238   my ($self, $page) = @_;
2239   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2240 }
2241
2242 =head2 new_result
2243
2244 =over 4
2245
2246 =item Arguments: \%vals
2247
2248 =item Return Value: $rowobject
2249
2250 =back
2251
2252 Creates a new row object in the resultset's result class and returns
2253 it. The row is not inserted into the database at this point, call
2254 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2255 will tell you whether the row object has been inserted or not.
2256
2257 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2258
2259 =cut
2260
2261 sub new_result {
2262   my ($self, $values) = @_;
2263   $self->throw_exception( "new_result needs a hash" )
2264     unless (ref $values eq 'HASH');
2265
2266   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2267
2268   my %new = (
2269     %$merged_cond,
2270     @$cols_from_relations
2271       ? (-cols_from_relations => $cols_from_relations)
2272       : (),
2273     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2274   );
2275
2276   return $self->result_class->new(\%new);
2277 }
2278
2279 # _merge_with_rscond
2280 #
2281 # Takes a simple hash of K/V data and returns its copy merged with the
2282 # condition already present on the resultset. Additionally returns an
2283 # arrayref of value/condition names, which were inferred from related
2284 # objects (this is needed for in-memory related objects)
2285 sub _merge_with_rscond {
2286   my ($self, $data) = @_;
2287
2288   my (%new_data, @cols_from_relations);
2289
2290   my $alias = $self->{attrs}{alias};
2291
2292   if (! defined $self->{cond}) {
2293     # just massage $data below
2294   }
2295   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2296     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2297     @cols_from_relations = keys %new_data;
2298   }
2299   elsif (ref $self->{cond} ne 'HASH') {
2300     $self->throw_exception(
2301       "Can't abstract implicit construct, resultset condition not a hash"
2302     );
2303   }
2304   else {
2305     # precendence must be given to passed values over values inherited from
2306     # the cond, so the order here is important.
2307     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2308     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2309
2310     while ( my($col, $value) = each %implied ) {
2311       my $vref = ref $value;
2312       if (
2313         $vref eq 'HASH'
2314           and
2315         keys(%$value) == 1
2316           and
2317         (keys %$value)[0] eq '='
2318       ) {
2319         $new_data{$col} = $value->{'='};
2320       }
2321       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2322         $new_data{$col} = $value;
2323       }
2324     }
2325   }
2326
2327   %new_data = (
2328     %new_data,
2329     %{ $self->_remove_alias($data, $alias) },
2330   );
2331
2332   return (\%new_data, \@cols_from_relations);
2333 }
2334
2335 # _has_resolved_attr
2336 #
2337 # determines if the resultset defines at least one
2338 # of the attributes supplied
2339 #
2340 # used to determine if a subquery is neccessary
2341 #
2342 # supports some virtual attributes:
2343 #   -join
2344 #     This will scan for any joins being present on the resultset.
2345 #     It is not a mere key-search but a deep inspection of {from}
2346 #
2347
2348 sub _has_resolved_attr {
2349   my ($self, @attr_names) = @_;
2350
2351   my $attrs = $self->_resolved_attrs;
2352
2353   my %extra_checks;
2354
2355   for my $n (@attr_names) {
2356     if (grep { $n eq $_ } (qw/-join/) ) {
2357       $extra_checks{$n}++;
2358       next;
2359     }
2360
2361     my $attr =  $attrs->{$n};
2362
2363     next if not defined $attr;
2364
2365     if (ref $attr eq 'HASH') {
2366       return 1 if keys %$attr;
2367     }
2368     elsif (ref $attr eq 'ARRAY') {
2369       return 1 if @$attr;
2370     }
2371     else {
2372       return 1 if $attr;
2373     }
2374   }
2375
2376   # a resolved join is expressed as a multi-level from
2377   return 1 if (
2378     $extra_checks{-join}
2379       and
2380     ref $attrs->{from} eq 'ARRAY'
2381       and
2382     @{$attrs->{from}} > 1
2383   );
2384
2385   return 0;
2386 }
2387
2388 # _collapse_cond
2389 #
2390 # Recursively collapse the condition.
2391
2392 sub _collapse_cond {
2393   my ($self, $cond, $collapsed) = @_;
2394
2395   $collapsed ||= {};
2396
2397   if (ref $cond eq 'ARRAY') {
2398     foreach my $subcond (@$cond) {
2399       next unless ref $subcond;  # -or
2400       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2401     }
2402   }
2403   elsif (ref $cond eq 'HASH') {
2404     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2405       foreach my $subcond (@{$cond->{-and}}) {
2406         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2407       }
2408     }
2409     else {
2410       foreach my $col (keys %$cond) {
2411         my $value = $cond->{$col};
2412         $collapsed->{$col} = $value;
2413       }
2414     }
2415   }
2416
2417   return $collapsed;
2418 }
2419
2420 # _remove_alias
2421 #
2422 # Remove the specified alias from the specified query hash. A copy is made so
2423 # the original query is not modified.
2424
2425 sub _remove_alias {
2426   my ($self, $query, $alias) = @_;
2427
2428   my %orig = %{ $query || {} };
2429   my %unaliased;
2430
2431   foreach my $key (keys %orig) {
2432     if ($key !~ /\./) {
2433       $unaliased{$key} = $orig{$key};
2434       next;
2435     }
2436     $unaliased{$1} = $orig{$key}
2437       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2438   }
2439
2440   return \%unaliased;
2441 }
2442
2443 =head2 as_query
2444
2445 =over 4
2446
2447 =item Arguments: none
2448
2449 =item Return Value: \[ $sql, @bind ]
2450
2451 =back
2452
2453 Returns the SQL query and bind vars associated with the invocant.
2454
2455 This is generally used as the RHS for a subquery.
2456
2457 =cut
2458
2459 sub as_query {
2460   my $self = shift;
2461
2462   my $attrs = $self->_resolved_attrs_copy;
2463
2464   # For future use:
2465   #
2466   # in list ctx:
2467   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2468   # $sql also has no wrapping parenthesis in list ctx
2469   #
2470   my $sqlbind = $self->result_source->storage
2471     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2472
2473   return $sqlbind;
2474 }
2475
2476 =head2 find_or_new
2477
2478 =over 4
2479
2480 =item Arguments: \%vals, \%attrs?
2481
2482 =item Return Value: $rowobject
2483
2484 =back
2485
2486   my $artist = $schema->resultset('Artist')->find_or_new(
2487     { artist => 'fred' }, { key => 'artists' });
2488
2489   $cd->cd_to_producer->find_or_new({ producer => $producer },
2490                                    { key => 'primary });
2491
2492 Find an existing record from this resultset using L</find>. if none exists,
2493 instantiate a new result object and return it. The object will not be saved
2494 into your storage until you call L<DBIx::Class::Row/insert> on it.
2495
2496 You most likely want this method when looking for existing rows using a unique
2497 constraint that is not the primary key, or looking for related rows.
2498
2499 If you want objects to be saved immediately, use L</find_or_create> instead.
2500
2501 B<Note>: Make sure to read the documentation of L</find> and understand the
2502 significance of the C<key> attribute, as its lack may skew your search, and
2503 subsequently result in spurious new objects.
2504
2505 B<Note>: Take care when using C<find_or_new> with a table having
2506 columns with default values that you intend to be automatically
2507 supplied by the database (e.g. an auto_increment primary key column).
2508 In normal usage, the value of such columns should NOT be included at
2509 all in the call to C<find_or_new>, even when set to C<undef>.
2510
2511 =cut
2512
2513 sub find_or_new {
2514   my $self     = shift;
2515   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2516   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2517   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2518     return $row;
2519   }
2520   return $self->new_result($hash);
2521 }
2522
2523 =head2 create
2524
2525 =over 4
2526
2527 =item Arguments: \%vals
2528
2529 =item Return Value: a L<DBIx::Class::Row> $object
2530
2531 =back
2532
2533 Attempt to create a single new row or a row with multiple related rows
2534 in the table represented by the resultset (and related tables). This
2535 will not check for duplicate rows before inserting, use
2536 L</find_or_create> to do that.
2537
2538 To create one row for this resultset, pass a hashref of key/value
2539 pairs representing the columns of the table and the values you wish to
2540 store. If the appropriate relationships are set up, foreign key fields
2541 can also be passed an object representing the foreign row, and the
2542 value will be set to its primary key.
2543
2544 To create related objects, pass a hashref of related-object column values
2545 B<keyed on the relationship name>. If the relationship is of type C<multi>
2546 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2547 The process will correctly identify columns holding foreign keys, and will
2548 transparently populate them from the keys of the corresponding relation.
2549 This can be applied recursively, and will work correctly for a structure
2550 with an arbitrary depth and width, as long as the relationships actually
2551 exists and the correct column data has been supplied.
2552
2553
2554 Instead of hashrefs of plain related data (key/value pairs), you may
2555 also pass new or inserted objects. New objects (not inserted yet, see
2556 L</new>), will be inserted into their appropriate tables.
2557
2558 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2559
2560 Example of creating a new row.
2561
2562   $person_rs->create({
2563     name=>"Some Person",
2564     email=>"somebody@someplace.com"
2565   });
2566
2567 Example of creating a new row and also creating rows in a related C<has_many>
2568 or C<has_one> resultset.  Note Arrayref.
2569
2570   $artist_rs->create(
2571      { artistid => 4, name => 'Manufactured Crap', cds => [
2572         { title => 'My First CD', year => 2006 },
2573         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2574       ],
2575      },
2576   );
2577
2578 Example of creating a new row and also creating a row in a related
2579 C<belongs_to> resultset. Note Hashref.
2580
2581   $cd_rs->create({
2582     title=>"Music for Silly Walks",
2583     year=>2000,
2584     artist => {
2585       name=>"Silly Musician",
2586     }
2587   });
2588
2589 =over
2590
2591 =item WARNING
2592
2593 When subclassing ResultSet never attempt to override this method. Since
2594 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2595 lot of the internals simply never call it, so your override will be
2596 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2597 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2598 L</create> process you need to intervene.
2599
2600 =back
2601
2602 =cut
2603
2604 sub create {
2605   my ($self, $attrs) = @_;
2606   $self->throw_exception( "create needs a hashref" )
2607     unless ref $attrs eq 'HASH';
2608   return $self->new_result($attrs)->insert;
2609 }
2610
2611 =head2 find_or_create
2612
2613 =over 4
2614
2615 =item Arguments: \%vals, \%attrs?
2616
2617 =item Return Value: $rowobject
2618
2619 =back
2620
2621   $cd->cd_to_producer->find_or_create({ producer => $producer },
2622                                       { key => 'primary' });
2623
2624 Tries to find a record based on its primary key or unique constraints; if none
2625 is found, creates one and returns that instead.
2626
2627   my $cd = $schema->resultset('CD')->find_or_create({
2628     cdid   => 5,
2629     artist => 'Massive Attack',
2630     title  => 'Mezzanine',
2631     year   => 2005,
2632   });
2633
2634 Also takes an optional C<key> attribute, to search by a specific key or unique
2635 constraint. For example:
2636
2637   my $cd = $schema->resultset('CD')->find_or_create(
2638     {
2639       artist => 'Massive Attack',
2640       title  => 'Mezzanine',
2641     },
2642     { key => 'cd_artist_title' }
2643   );
2644
2645 B<Note>: Make sure to read the documentation of L</find> and understand the
2646 significance of the C<key> attribute, as its lack may skew your search, and
2647 subsequently result in spurious row creation.
2648
2649 B<Note>: Because find_or_create() reads from the database and then
2650 possibly inserts based on the result, this method is subject to a race
2651 condition. Another process could create a record in the table after
2652 the find has completed and before the create has started. To avoid
2653 this problem, use find_or_create() inside a transaction.
2654
2655 B<Note>: Take care when using C<find_or_create> with a table having
2656 columns with default values that you intend to be automatically
2657 supplied by the database (e.g. an auto_increment primary key column).
2658 In normal usage, the value of such columns should NOT be included at
2659 all in the call to C<find_or_create>, even when set to C<undef>.
2660
2661 See also L</find> and L</update_or_create>. For information on how to declare
2662 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2663
2664 If you need to know if an existing row was found or a new one created use
2665 L</find_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2666 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2667 database!
2668
2669   my $cd = $schema->resultset('CD')->find_or_new({
2670     cdid   => 5,
2671     artist => 'Massive Attack',
2672     title  => 'Mezzanine',
2673     year   => 2005,
2674   });
2675
2676   if( $cd->in_storage ) {
2677       # do some stuff
2678       $cd->insert;
2679   }
2680
2681 =cut
2682
2683 sub find_or_create {
2684   my $self     = shift;
2685   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2686   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2687   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2688     return $row;
2689   }
2690   return $self->create($hash);
2691 }
2692
2693 =head2 update_or_create
2694
2695 =over 4
2696
2697 =item Arguments: \%col_values, { key => $unique_constraint }?
2698
2699 =item Return Value: $row_object
2700
2701 =back
2702
2703   $resultset->update_or_create({ col => $val, ... });
2704
2705 Like L</find_or_create>, but if a row is found it is immediately updated via
2706 C<< $found_row->update (\%col_values) >>.
2707
2708
2709 Takes an optional C<key> attribute to search on a specific unique constraint.
2710 For example:
2711
2712   # In your application
2713   my $cd = $schema->resultset('CD')->update_or_create(
2714     {
2715       artist => 'Massive Attack',
2716       title  => 'Mezzanine',
2717       year   => 1998,
2718     },
2719     { key => 'cd_artist_title' }
2720   );
2721
2722   $cd->cd_to_producer->update_or_create({
2723     producer => $producer,
2724     name => 'harry',
2725   }, {
2726     key => 'primary',
2727   });
2728
2729 B<Note>: Make sure to read the documentation of L</find> and understand the
2730 significance of the C<key> attribute, as its lack may skew your search, and
2731 subsequently result in spurious row creation.
2732
2733 B<Note>: Take care when using C<update_or_create> with a table having
2734 columns with default values that you intend to be automatically
2735 supplied by the database (e.g. an auto_increment primary key column).
2736 In normal usage, the value of such columns should NOT be included at
2737 all in the call to C<update_or_create>, even when set to C<undef>.
2738
2739 See also L</find> and L</find_or_create>. For information on how to declare
2740 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2741
2742 If you need to know if an existing row was updated or a new one created use
2743 L</update_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2744 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2745 database!
2746
2747   my $cd = $schema->resultset('CD')->update_or_new(
2748     {
2749       artist => 'Massive Attack',
2750       title  => 'Mezzanine',
2751       year   => 1998,
2752     },
2753     { key => 'cd_artist_title' }
2754   );
2755
2756   if( $cd->in_storage ) {
2757       # do some stuff
2758       $cd->insert;
2759   }
2760
2761 =cut
2762
2763 sub update_or_create {
2764   my $self = shift;
2765   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2766   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2767
2768   my $row = $self->find($cond, $attrs);
2769   if (defined $row) {
2770     $row->update($cond);
2771     return $row;
2772   }
2773
2774   return $self->create($cond);
2775 }
2776
2777 =head2 update_or_new
2778
2779 =over 4
2780
2781 =item Arguments: \%col_values, { key => $unique_constraint }?
2782
2783 =item Return Value: $rowobject
2784
2785 =back
2786
2787   $resultset->update_or_new({ col => $val, ... });
2788
2789 Like L</find_or_new> but if a row is found it is immediately updated via
2790 C<< $found_row->update (\%col_values) >>.
2791
2792 For example:
2793
2794   # In your application
2795   my $cd = $schema->resultset('CD')->update_or_new(
2796     {
2797       artist => 'Massive Attack',
2798       title  => 'Mezzanine',
2799       year   => 1998,
2800     },
2801     { key => 'cd_artist_title' }
2802   );
2803
2804   if ($cd->in_storage) {
2805       # the cd was updated
2806   }
2807   else {
2808       # the cd is not yet in the database, let's insert it
2809       $cd->insert;
2810   }
2811
2812 B<Note>: Make sure to read the documentation of L</find> and understand the
2813 significance of the C<key> attribute, as its lack may skew your search, and
2814 subsequently result in spurious new objects.
2815
2816 B<Note>: Take care when using C<update_or_new> with a table having
2817 columns with default values that you intend to be automatically
2818 supplied by the database (e.g. an auto_increment primary key column).
2819 In normal usage, the value of such columns should NOT be included at
2820 all in the call to C<update_or_new>, even when set to C<undef>.
2821
2822 See also L</find>, L</find_or_create> and L</find_or_new>.
2823
2824 =cut
2825
2826 sub update_or_new {
2827     my $self  = shift;
2828     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2829     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2830
2831     my $row = $self->find( $cond, $attrs );
2832     if ( defined $row ) {
2833         $row->update($cond);
2834         return $row;
2835     }
2836
2837     return $self->new_result($cond);
2838 }
2839
2840 =head2 get_cache
2841
2842 =over 4
2843
2844 =item Arguments: none
2845
2846 =item Return Value: \@cache_objects | undef
2847
2848 =back
2849
2850 Gets the contents of the cache for the resultset, if the cache is set.
2851
2852 The cache is populated either by using the L</prefetch> attribute to
2853 L</search> or by calling L</set_cache>.
2854
2855 =cut
2856
2857 sub get_cache {
2858   shift->{all_cache};
2859 }
2860
2861 =head2 set_cache
2862
2863 =over 4
2864
2865 =item Arguments: \@cache_objects
2866
2867 =item Return Value: \@cache_objects
2868
2869 =back
2870
2871 Sets the contents of the cache for the resultset. Expects an arrayref
2872 of objects of the same class as those produced by the resultset. Note that
2873 if the cache is set the resultset will return the cached objects rather
2874 than re-querying the database even if the cache attr is not set.
2875
2876 The contents of the cache can also be populated by using the
2877 L</prefetch> attribute to L</search>.
2878
2879 =cut
2880
2881 sub set_cache {
2882   my ( $self, $data ) = @_;
2883   $self->throw_exception("set_cache requires an arrayref")
2884       if defined($data) && (ref $data ne 'ARRAY');
2885   $self->{all_cache} = $data;
2886 }
2887
2888 =head2 clear_cache
2889
2890 =over 4
2891
2892 =item Arguments: none
2893
2894 =item Return Value: undef
2895
2896 =back
2897
2898 Clears the cache for the resultset.
2899
2900 =cut
2901
2902 sub clear_cache {
2903   shift->set_cache(undef);
2904 }
2905
2906 =head2 is_paged
2907
2908 =over 4
2909
2910 =item Arguments: none
2911
2912 =item Return Value: true, if the resultset has been paginated
2913
2914 =back
2915
2916 =cut
2917
2918 sub is_paged {
2919   my ($self) = @_;
2920   return !!$self->{attrs}{page};
2921 }
2922
2923 =head2 is_ordered
2924
2925 =over 4
2926
2927 =item Arguments: none
2928
2929 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2930
2931 =back
2932
2933 =cut
2934
2935 sub is_ordered {
2936   my ($self) = @_;
2937   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2938 }
2939
2940 =head2 related_resultset
2941
2942 =over 4
2943
2944 =item Arguments: $relationship_name
2945
2946 =item Return Value: $resultset
2947
2948 =back
2949
2950 Returns a related resultset for the supplied relationship name.
2951
2952   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2953
2954 =cut
2955
2956 sub related_resultset {
2957   my ($self, $rel) = @_;
2958
2959   $self->{related_resultsets} ||= {};
2960   return $self->{related_resultsets}{$rel} ||= do {
2961     my $rsrc = $self->result_source;
2962     my $rel_info = $rsrc->relationship_info($rel);
2963
2964     $self->throw_exception(
2965       "search_related: result source '" . $rsrc->source_name .
2966         "' has no such relationship $rel")
2967       unless $rel_info;
2968
2969     my $attrs = $self->_chain_relationship($rel);
2970
2971     my $join_count = $attrs->{seen_join}{$rel};
2972
2973     my $alias = $self->result_source->storage
2974         ->relname_to_table_alias($rel, $join_count);
2975
2976     # since this is search_related, and we already slid the select window inwards
2977     # (the select/as attrs were deleted in the beginning), we need to flip all
2978     # left joins to inner, so we get the expected results
2979     # read the comment on top of the actual function to see what this does
2980     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
2981
2982
2983     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2984     delete @{$attrs}{qw(result_class alias)};
2985
2986     my $new_cache;
2987
2988     if (my $cache = $self->get_cache) {
2989       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2990         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache||[]} }
2991                         @$cache ];
2992       }
2993     }
2994
2995     my $rel_source = $rsrc->related_source($rel);
2996
2997     my $new = do {
2998
2999       # The reason we do this now instead of passing the alias to the
3000       # search_rs below is that if you wrap/overload resultset on the
3001       # source you need to know what alias it's -going- to have for things
3002       # to work sanely (e.g. RestrictWithObject wants to be able to add
3003       # extra query restrictions, and these may need to be $alias.)
3004
3005       my $rel_attrs = $rel_source->resultset_attributes;
3006       local $rel_attrs->{alias} = $alias;
3007
3008       $rel_source->resultset
3009                  ->search_rs(
3010                      undef, {
3011                        %$attrs,
3012                        where => $attrs->{where},
3013                    });
3014     };
3015     $new->set_cache($new_cache) if $new_cache;
3016     $new;
3017   };
3018 }
3019
3020 =head2 current_source_alias
3021
3022 =over 4
3023
3024 =item Arguments: none
3025
3026 =item Return Value: $source_alias
3027
3028 =back
3029
3030 Returns the current table alias for the result source this resultset is built
3031 on, that will be used in the SQL query. Usually it is C<me>.
3032
3033 Currently the source alias that refers to the result set returned by a
3034 L</search>/L</find> family method depends on how you got to the resultset: it's
3035 C<me> by default, but eg. L</search_related> aliases it to the related result
3036 source name (and keeps C<me> referring to the original result set). The long
3037 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3038 (and make this method unnecessary).
3039
3040 Thus it's currently necessary to use this method in predefined queries (see
3041 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3042 source alias of the current result set:
3043
3044   # in a result set class
3045   sub modified_by {
3046     my ($self, $user) = @_;
3047
3048     my $me = $self->current_source_alias;
3049
3050     return $self->search({
3051       "$me.modified" => $user->id,
3052     });
3053   }
3054
3055 =cut
3056
3057 sub current_source_alias {
3058   my ($self) = @_;
3059
3060   return ($self->{attrs} || {})->{alias} || 'me';
3061 }
3062
3063 =head2 as_subselect_rs
3064
3065 =over 4
3066
3067 =item Arguments: none
3068
3069 =item Return Value: $resultset
3070
3071 =back
3072
3073 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3074 "virtual view" by including it as a subquery within the from clause.  From this
3075 point on, any joined tables are inaccessible to ->search on the resultset (as if
3076 it were simply where-filtered without joins).  For example:
3077
3078  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3079
3080  # 'x' now pollutes the query namespace
3081
3082  # So the following works as expected
3083  my $ok_rs = $rs->search({'x.other' => 1});
3084
3085  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3086  # def) we look for one row with contradictory terms and join in another table
3087  # (aliased 'x_2') which we never use
3088  my $broken_rs = $rs->search({'x.name' => 'def'});
3089
3090  my $rs2 = $rs->as_subselect_rs;
3091
3092  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3093  my $not_joined_rs = $rs2->search({'x.other' => 1});
3094
3095  # works as expected: finds a 'table' row related to two x rows (abc and def)
3096  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3097
3098 Another example of when one might use this would be to select a subset of
3099 columns in a group by clause:
3100
3101  my $rs = $schema->resultset('Bar')->search(undef, {
3102    group_by => [qw{ id foo_id baz_id }],
3103  })->as_subselect_rs->search(undef, {
3104    columns => [qw{ id foo_id }]
3105  });
3106
3107 In the above example normally columns would have to be equal to the group by,
3108 but because we isolated the group by into a subselect the above works.
3109
3110 =cut
3111
3112 sub as_subselect_rs {
3113   my $self = shift;
3114
3115   my $attrs = $self->_resolved_attrs;
3116
3117   my $fresh_rs = (ref $self)->new (
3118     $self->result_source
3119   );
3120
3121   # these pieces will be locked in the subquery
3122   delete $fresh_rs->{cond};
3123   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3124
3125   return $fresh_rs->search( {}, {
3126     from => [{
3127       $attrs->{alias} => $self->as_query,
3128       -alias  => $attrs->{alias},
3129       -rsrc   => $self->result_source,
3130     }],
3131     alias => $attrs->{alias},
3132   });
3133 }
3134
3135 # This code is called by search_related, and makes sure there
3136 # is clear separation between the joins before, during, and
3137 # after the relationship. This information is needed later
3138 # in order to properly resolve prefetch aliases (any alias
3139 # with a relation_chain_depth less than the depth of the
3140 # current prefetch is not considered)
3141 #
3142 # The increments happen twice per join. An even number means a
3143 # relationship specified via a search_related, whereas an odd
3144 # number indicates a join/prefetch added via attributes
3145 #
3146 # Also this code will wrap the current resultset (the one we
3147 # chain to) in a subselect IFF it contains limiting attributes
3148 sub _chain_relationship {
3149   my ($self, $rel) = @_;
3150   my $source = $self->result_source;
3151   my $attrs = { %{$self->{attrs}||{}} };
3152
3153   # we need to take the prefetch the attrs into account before we
3154   # ->_resolve_join as otherwise they get lost - captainL
3155   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3156
3157   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3158
3159   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3160
3161   my $from;
3162   my @force_subq_attrs = qw/offset rows group_by having/;
3163
3164   if (
3165     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3166       ||
3167     $self->_has_resolved_attr (@force_subq_attrs)
3168   ) {
3169     # Nuke the prefetch (if any) before the new $rs attrs
3170     # are resolved (prefetch is useless - we are wrapping
3171     # a subquery anyway).
3172     my $rs_copy = $self->search;
3173     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3174       $rs_copy->{attrs}{join},
3175       delete $rs_copy->{attrs}{prefetch},
3176     );
3177
3178     $from = [{
3179       -rsrc   => $source,
3180       -alias  => $attrs->{alias},
3181       $attrs->{alias} => $rs_copy->as_query,
3182     }];
3183     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3184     $seen->{-relation_chain_depth} = 0;
3185   }
3186   elsif ($attrs->{from}) {  #shallow copy suffices
3187     $from = [ @{$attrs->{from}} ];
3188   }
3189   else {
3190     $from = [{
3191       -rsrc  => $source,
3192       -alias => $attrs->{alias},
3193       $attrs->{alias} => $source->from,
3194     }];
3195   }
3196
3197   my $jpath = ($seen->{-relation_chain_depth})
3198     ? $from->[-1][0]{-join_path}
3199     : [];
3200
3201   my @requested_joins = $source->_resolve_join(
3202     $join,
3203     $attrs->{alias},
3204     $seen,
3205     $jpath,
3206   );
3207
3208   push @$from, @requested_joins;
3209
3210   $seen->{-relation_chain_depth}++;
3211
3212   # if $self already had a join/prefetch specified on it, the requested
3213   # $rel might very well be already included. What we do in this case
3214   # is effectively a no-op (except that we bump up the chain_depth on
3215   # the join in question so we could tell it *is* the search_related)
3216   my $already_joined;
3217
3218   # we consider the last one thus reverse
3219   for my $j (reverse @requested_joins) {
3220     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3221     if ($rel eq $last_j) {
3222       $j->[0]{-relation_chain_depth}++;
3223       $already_joined++;
3224       last;
3225     }
3226   }
3227
3228   unless ($already_joined) {
3229     push @$from, $source->_resolve_join(
3230       $rel,
3231       $attrs->{alias},
3232       $seen,
3233       $jpath,
3234     );
3235   }
3236
3237   $seen->{-relation_chain_depth}++;
3238
3239   return {%$attrs, from => $from, seen_join => $seen};
3240 }
3241
3242 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3243 sub _resolved_attrs_copy {
3244   my $self = shift;
3245   return { %{$self->_resolved_attrs (@_)} };
3246 }
3247
3248 sub _resolved_attrs {
3249   my $self = shift;
3250   return $self->{_attrs} if $self->{_attrs};
3251
3252   my $attrs  = { %{ $self->{attrs} || {} } };
3253   my $source = $self->result_source;
3254   my $alias  = $attrs->{alias};
3255
3256   # default selection list
3257   $attrs->{columns} = [ $source->columns ]
3258     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3259
3260   # merge selectors together
3261   for (qw/columns select as/) {
3262     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3263       if $attrs->{$_} or $attrs->{"+$_"};
3264   }
3265
3266   # disassemble columns
3267   my (@sel, @as);
3268   if (my $cols = delete $attrs->{columns}) {
3269     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3270       if (ref $c eq 'HASH') {
3271         for my $as (keys %$c) {
3272           push @sel, $c->{$as};
3273           push @as, $as;
3274         }
3275       }
3276       else {
3277         push @sel, $c;
3278         push @as, $c;
3279       }
3280     }
3281   }
3282
3283   # when trying to weed off duplicates later do not go past this point -
3284   # everything added from here on is unbalanced "anyone's guess" stuff
3285   my $dedup_stop_idx = $#as;
3286
3287   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3288     if $attrs->{as};
3289   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3290     if $attrs->{select};
3291
3292   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3293   $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_" for @sel;
3294
3295   # disqualify all $alias.col as-bits (inflate-map mandated)
3296   $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_ for @as;
3297
3298   # de-duplicate the result (remove *identical* select/as pairs)
3299   # and also die on duplicate {as} pointing to different {select}s
3300   # not using a c-style for as the condition is prone to shrinkage
3301   my $seen;
3302   my $i = 0;
3303   while ($i <= $dedup_stop_idx) {
3304     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3305       splice @sel, $i, 1;
3306       splice @as, $i, 1;
3307       $dedup_stop_idx--;
3308     }
3309     elsif ($seen->{$as[$i]}++) {
3310       $self->throw_exception(
3311         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3312       );
3313     }
3314     else {
3315       $i++;
3316     }
3317   }
3318
3319   $attrs->{select} = \@sel;
3320   $attrs->{as} = \@as;
3321
3322   $attrs->{from} ||= [{
3323     -rsrc   => $source,
3324     -alias  => $self->{attrs}{alias},
3325     $self->{attrs}{alias} => $source->from,
3326   }];
3327
3328   if ( $attrs->{join} || $attrs->{prefetch} ) {
3329
3330     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3331       if ref $attrs->{from} ne 'ARRAY';
3332
3333     my $join = (delete $attrs->{join}) || {};
3334
3335     if ( defined $attrs->{prefetch} ) {
3336       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3337     }
3338
3339     $attrs->{from} =    # have to copy here to avoid corrupting the original
3340       [
3341         @{ $attrs->{from} },
3342         $source->_resolve_join(
3343           $join,
3344           $alias,
3345           { %{ $attrs->{seen_join} || {} } },
3346           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3347             ? $attrs->{from}[-1][0]{-join_path}
3348             : []
3349           ,
3350         )
3351       ];
3352   }
3353
3354   if ( defined $attrs->{order_by} ) {
3355     $attrs->{order_by} = (
3356       ref( $attrs->{order_by} ) eq 'ARRAY'
3357       ? [ @{ $attrs->{order_by} } ]
3358       : [ $attrs->{order_by} || () ]
3359     );
3360   }
3361
3362   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3363     $attrs->{group_by} = [ $attrs->{group_by} ];
3364   }
3365
3366   # generate the distinct induced group_by early, as prefetch will be carried via a
3367   # subquery (since a group_by is present)
3368   if (delete $attrs->{distinct}) {
3369     if ($attrs->{group_by}) {
3370       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3371     }
3372     else {
3373       # distinct affects only the main selection part, not what prefetch may
3374       # add below.
3375       $attrs->{group_by} = $source->storage->_group_over_selection (
3376         $attrs->{from},
3377         $attrs->{select},
3378         $attrs->{order_by},
3379       );
3380     }
3381   }
3382
3383   # generate selections based on the prefetch helper
3384   my $prefetch;
3385   $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} )
3386     if defined $attrs->{prefetch};
3387
3388   if ($prefetch) {
3389
3390     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3391       if $attrs->{_dark_selector};
3392
3393     $attrs->{collapse} = 1;
3394
3395     # this is a separate structure (we don't look in {from} directly)
3396     # as the resolver needs to shift things off the lists to work
3397     # properly (identical-prefetches on different branches)
3398     my $join_map = {};
3399     if (ref $attrs->{from} eq 'ARRAY') {
3400
3401       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3402
3403       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3404         next unless $j->[0]{-alias};
3405         next unless $j->[0]{-join_path};
3406         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3407
3408         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3409
3410         my $p = $join_map;
3411         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3412         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3413       }
3414     }
3415
3416     my @prefetch = $source->_resolve_prefetch( $prefetch, $alias, $join_map );
3417
3418     # we need to somehow mark which columns came from prefetch
3419     if (@prefetch) {
3420       my $sel_end = $#{$attrs->{select}};
3421       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3422     }
3423
3424     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3425     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3426   }
3427
3428   # run through the resulting joinstructure (starting from our current slot)
3429   # and unset collapse if proven unnesessary
3430   if ($attrs->{collapse} && ref $attrs->{from} eq 'ARRAY') {
3431
3432     if (@{$attrs->{from}} > 1) {
3433
3434       # find where our table-spec starts and consider only things after us
3435       my @fromlist = @{$attrs->{from}};
3436       while (@fromlist) {
3437         my $t = shift @fromlist;
3438         $t = $t->[0] if ref $t eq 'ARRAY';  #me vs join from-spec mismatch
3439         last if ($t->{-alias} && $t->{-alias} eq $alias);
3440       }
3441
3442       for (@fromlist) {
3443         $attrs->{collapse} = ! $_->[0]{-is_single}
3444           and last;
3445       }
3446     }
3447     else {
3448       # no joins - no collapse
3449       $attrs->{collapse} = 0;
3450     }
3451   }
3452
3453   $attrs->{_single_object_inflation} = ! List::Util::first { $_ =~ /\./ } @{$attrs->{as}};
3454
3455   # if both page and offset are specified, produce a combined offset
3456   # even though it doesn't make much sense, this is what pre 081xx has
3457   # been doing
3458   if (my $page = delete $attrs->{page}) {
3459     $attrs->{offset} =
3460       ($attrs->{rows} * ($page - 1))
3461             +
3462       ($attrs->{offset} || 0)
3463     ;
3464   }
3465
3466   return $self->{_attrs} = $attrs;
3467 }
3468
3469 sub _rollout_attr {
3470   my ($self, $attr) = @_;
3471
3472   if (ref $attr eq 'HASH') {
3473     return $self->_rollout_hash($attr);
3474   } elsif (ref $attr eq 'ARRAY') {
3475     return $self->_rollout_array($attr);
3476   } else {
3477     return [$attr];
3478   }
3479 }
3480
3481 sub _rollout_array {
3482   my ($self, $attr) = @_;
3483
3484   my @rolled_array;
3485   foreach my $element (@{$attr}) {
3486     if (ref $element eq 'HASH') {
3487       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3488     } elsif (ref $element eq 'ARRAY') {
3489       #  XXX - should probably recurse here
3490       push( @rolled_array, @{$self->_rollout_array($element)} );
3491     } else {
3492       push( @rolled_array, $element );
3493     }
3494   }
3495   return \@rolled_array;
3496 }
3497
3498 sub _rollout_hash {
3499   my ($self, $attr) = @_;
3500
3501   my @rolled_array;
3502   foreach my $key (keys %{$attr}) {
3503     push( @rolled_array, { $key => $attr->{$key} } );
3504   }
3505   return \@rolled_array;
3506 }
3507
3508 sub _calculate_score {
3509   my ($self, $a, $b) = @_;
3510
3511   if (defined $a xor defined $b) {
3512     return 0;
3513   }
3514   elsif (not defined $a) {
3515     return 1;
3516   }
3517
3518   if (ref $b eq 'HASH') {
3519     my ($b_key) = keys %{$b};
3520     if (ref $a eq 'HASH') {
3521       my ($a_key) = keys %{$a};
3522       if ($a_key eq $b_key) {
3523         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3524       } else {
3525         return 0;
3526       }
3527     } else {
3528       return ($a eq $b_key) ? 1 : 0;
3529     }
3530   } else {
3531     if (ref $a eq 'HASH') {
3532       my ($a_key) = keys %{$a};
3533       return ($b eq $a_key) ? 1 : 0;
3534     } else {
3535       return ($b eq $a) ? 1 : 0;
3536     }
3537   }
3538 }
3539
3540 sub _merge_joinpref_attr {
3541   my ($self, $orig, $import) = @_;
3542
3543   return $import unless defined($orig);
3544   return $orig unless defined($import);
3545
3546   $orig = $self->_rollout_attr($orig);
3547   $import = $self->_rollout_attr($import);
3548
3549   my $seen_keys;
3550   foreach my $import_element ( @{$import} ) {
3551     # find best candidate from $orig to merge $b_element into
3552     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3553     foreach my $orig_element ( @{$orig} ) {
3554       my $score = $self->_calculate_score( $orig_element, $import_element );
3555       if ($score > $best_candidate->{score}) {
3556         $best_candidate->{position} = $position;
3557         $best_candidate->{score} = $score;
3558       }
3559       $position++;
3560     }
3561     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3562     $import_key = '' if not defined $import_key;
3563
3564     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3565       push( @{$orig}, $import_element );
3566     } else {
3567       my $orig_best = $orig->[$best_candidate->{position}];
3568       # merge orig_best and b_element together and replace original with merged
3569       if (ref $orig_best ne 'HASH') {
3570         $orig->[$best_candidate->{position}] = $import_element;
3571       } elsif (ref $import_element eq 'HASH') {
3572         my ($key) = keys %{$orig_best};
3573         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3574       }
3575     }
3576     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3577   }
3578
3579   return $orig;
3580 }
3581
3582 {
3583   my $hm;
3584
3585   sub _merge_attr {
3586     $hm ||= do {
3587       require Hash::Merge;
3588       my $hm = Hash::Merge->new;
3589
3590       $hm->specify_behavior({
3591         SCALAR => {
3592           SCALAR => sub {
3593             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3594
3595             if ($defl xor $defr) {
3596               return [ $defl ? $_[0] : $_[1] ];
3597             }
3598             elsif (! $defl) {
3599               return [];
3600             }
3601             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3602               return [ $_[0] ];
3603             }
3604             else {
3605               return [$_[0], $_[1]];
3606             }
3607           },
3608           ARRAY => sub {
3609             return $_[1] if !defined $_[0];
3610             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3611             return [$_[0], @{$_[1]}]
3612           },
3613           HASH  => sub {
3614             return [] if !defined $_[0] and !keys %{$_[1]};
3615             return [ $_[1] ] if !defined $_[0];
3616             return [ $_[0] ] if !keys %{$_[1]};
3617             return [$_[0], $_[1]]
3618           },
3619         },
3620         ARRAY => {
3621           SCALAR => sub {
3622             return $_[0] if !defined $_[1];
3623             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3624             return [@{$_[0]}, $_[1]]
3625           },
3626           ARRAY => sub {
3627             my @ret = @{$_[0]} or return $_[1];
3628             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3629             my %idx = map { $_ => 1 } @ret;
3630             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3631             \@ret;
3632           },
3633           HASH => sub {
3634             return [ $_[1] ] if ! @{$_[0]};
3635             return $_[0] if !keys %{$_[1]};
3636             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3637             return [ @{$_[0]}, $_[1] ];
3638           },
3639         },
3640         HASH => {
3641           SCALAR => sub {
3642             return [] if !keys %{$_[0]} and !defined $_[1];
3643             return [ $_[0] ] if !defined $_[1];
3644             return [ $_[1] ] if !keys %{$_[0]};
3645             return [$_[0], $_[1]]
3646           },
3647           ARRAY => sub {
3648             return [] if !keys %{$_[0]} and !@{$_[1]};
3649             return [ $_[0] ] if !@{$_[1]};
3650             return $_[1] if !keys %{$_[0]};
3651             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3652             return [ $_[0], @{$_[1]} ];
3653           },
3654           HASH => sub {
3655             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3656             return [ $_[0] ] if !keys %{$_[1]};
3657             return [ $_[1] ] if !keys %{$_[0]};
3658             return [ $_[0] ] if $_[0] eq $_[1];
3659             return [ $_[0], $_[1] ];
3660           },
3661         }
3662       } => 'DBIC_RS_ATTR_MERGER');
3663       $hm;
3664     };
3665
3666     return $hm->merge ($_[1], $_[2]);
3667   }
3668 }
3669
3670 sub STORABLE_freeze {
3671   my ($self, $cloning) = @_;
3672   my $to_serialize = { %$self };
3673
3674   # A cursor in progress can't be serialized (and would make little sense anyway)
3675   delete $to_serialize->{cursor};
3676
3677   # nor is it sensical to store a not-yet-fired-count pager
3678   if ($to_serialize->{pager} and ref $to_serialize->{pager}{total_entries} eq 'CODE') {
3679     delete $to_serialize->{pager};
3680   }
3681
3682   Storable::nfreeze($to_serialize);
3683 }
3684
3685 # need this hook for symmetry
3686 sub STORABLE_thaw {
3687   my ($self, $cloning, $serialized) = @_;
3688
3689   %$self = %{ Storable::thaw($serialized) };
3690
3691   $self;
3692 }
3693
3694
3695 =head2 throw_exception
3696
3697 See L<DBIx::Class::Schema/throw_exception> for details.
3698
3699 =cut
3700
3701 sub throw_exception {
3702   my $self=shift;
3703
3704   if (ref $self and my $rsrc = $self->result_source) {
3705     $rsrc->throw_exception(@_)
3706   }
3707   else {
3708     DBIx::Class::Exception->throw(@_);
3709   }
3710 }
3711
3712 # XXX: FIXME: Attributes docs need clearing up
3713
3714 =head1 ATTRIBUTES
3715
3716 Attributes are used to refine a ResultSet in various ways when
3717 searching for data. They can be passed to any method which takes an
3718 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3719 L</count>.
3720
3721 These are in no particular order:
3722
3723 =head2 order_by
3724
3725 =over 4
3726
3727 =item Value: ( $order_by | \@order_by | \%order_by )
3728
3729 =back
3730
3731 Which column(s) to order the results by.
3732
3733 [The full list of suitable values is documented in
3734 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3735 common options.]
3736
3737 If a single column name, or an arrayref of names is supplied, the
3738 argument is passed through directly to SQL. The hashref syntax allows
3739 for connection-agnostic specification of ordering direction:
3740
3741  For descending order:
3742
3743   order_by => { -desc => [qw/col1 col2 col3/] }
3744
3745  For explicit ascending order:
3746
3747   order_by => { -asc => 'col' }
3748
3749 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3750 supported, although you are strongly encouraged to use the hashref
3751 syntax as outlined above.
3752
3753 =head2 columns
3754
3755 =over 4
3756
3757 =item Value: \@columns
3758
3759 =back
3760
3761 Shortcut to request a particular set of columns to be retrieved. Each
3762 column spec may be a string (a table column name), or a hash (in which
3763 case the key is the C<as> value, and the value is used as the C<select>
3764 expression). Adds C<me.> onto the start of any column without a C<.> in
3765 it and sets C<select> from that, then auto-populates C<as> from
3766 C<select> as normal. (You may also use the C<cols> attribute, as in
3767 earlier versions of DBIC.)
3768
3769 Essentially C<columns> does the same as L</select> and L</as>.
3770
3771     columns => [ 'foo', { bar => 'baz' } ]
3772
3773 is the same as
3774
3775     select => [qw/foo baz/],
3776     as => [qw/foo bar/]
3777
3778 =head2 +columns
3779
3780 =over 4
3781
3782 =item Value: \@columns
3783
3784 =back
3785
3786 Indicates additional columns to be selected from storage. Works the same
3787 as L</columns> but adds columns to the selection. (You may also use the
3788 C<include_columns> attribute, as in earlier versions of DBIC). For
3789 example:-
3790
3791   $schema->resultset('CD')->search(undef, {
3792     '+columns' => ['artist.name'],
3793     join => ['artist']
3794   });
3795
3796 would return all CDs and include a 'name' column to the information
3797 passed to object inflation. Note that the 'artist' is the name of the
3798 column (or relationship) accessor, and 'name' is the name of the column
3799 accessor in the related table.
3800
3801 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3802 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3803 unary plus operator before it.
3804
3805 =head2 include_columns
3806
3807 =over 4
3808
3809 =item Value: \@columns
3810
3811 =back
3812
3813 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3814
3815 =head2 select
3816
3817 =over 4
3818
3819 =item Value: \@select_columns
3820
3821 =back
3822
3823 Indicates which columns should be selected from the storage. You can use
3824 column names, or in the case of RDBMS back ends, function or stored procedure
3825 names:
3826
3827   $rs = $schema->resultset('Employee')->search(undef, {
3828     select => [
3829       'name',
3830       { count => 'employeeid' },
3831       { max => { length => 'name' }, -as => 'longest_name' }
3832     ]
3833   });
3834
3835   # Equivalent SQL
3836   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3837
3838 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3839 use L</select>, to instruct DBIx::Class how to store the result of the column.
3840 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3841 identifier aliasing. You can however alias a function, so you can use it in
3842 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3843 attribute> supplied as shown in the example above.
3844
3845 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3846 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3847 unary plus operator before it.
3848
3849 =head2 +select
3850
3851 =over 4
3852
3853 Indicates additional columns to be selected from storage.  Works the same as
3854 L</select> but adds columns to the default selection, instead of specifying
3855 an explicit list.
3856
3857 =back
3858
3859 =head2 +as
3860
3861 =over 4
3862
3863 Indicates additional column names for those added via L</+select>. See L</as>.
3864
3865 =back
3866
3867 =head2 as
3868
3869 =over 4
3870
3871 =item Value: \@inflation_names
3872
3873 =back
3874
3875 Indicates column names for object inflation. That is L</as> indicates the
3876 slot name in which the column value will be stored within the
3877 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3878 identifier by the C<get_column> method (or via the object accessor B<if one
3879 with the same name already exists>) as shown below. The L</as> attribute has
3880 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3881
3882   $rs = $schema->resultset('Employee')->search(undef, {
3883     select => [
3884       'name',
3885       { count => 'employeeid' },
3886       { max => { length => 'name' }, -as => 'longest_name' }
3887     ],
3888     as => [qw/
3889       name
3890       employee_count
3891       max_name_length
3892     /],
3893   });
3894
3895 If the object against which the search is performed already has an accessor
3896 matching a column name specified in C<as>, the value can be retrieved using
3897 the accessor as normal:
3898
3899   my $name = $employee->name();
3900
3901 If on the other hand an accessor does not exist in the object, you need to
3902 use C<get_column> instead:
3903
3904   my $employee_count = $employee->get_column('employee_count');
3905
3906 You can create your own accessors if required - see
3907 L<DBIx::Class::Manual::Cookbook> for details.
3908
3909 =head2 join
3910
3911 =over 4
3912
3913 =item Value: ($rel_name | \@rel_names | \%rel_names)
3914
3915 =back
3916
3917 Contains a list of relationships that should be joined for this query.  For
3918 example:
3919
3920   # Get CDs by Nine Inch Nails
3921   my $rs = $schema->resultset('CD')->search(
3922     { 'artist.name' => 'Nine Inch Nails' },
3923     { join => 'artist' }
3924   );
3925
3926 Can also contain a hash reference to refer to the other relation's relations.
3927 For example:
3928
3929   package MyApp::Schema::Track;
3930   use base qw/DBIx::Class/;
3931   __PACKAGE__->table('track');
3932   __PACKAGE__->add_columns(qw/trackid cd position title/);
3933   __PACKAGE__->set_primary_key('trackid');
3934   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3935   1;
3936
3937   # In your application
3938   my $rs = $schema->resultset('Artist')->search(
3939     { 'track.title' => 'Teardrop' },
3940     {
3941       join     => { cd => 'track' },
3942       order_by => 'artist.name',
3943     }
3944   );
3945
3946 You need to use the relationship (not the table) name in  conditions,
3947 because they are aliased as such. The current table is aliased as "me", so
3948 you need to use me.column_name in order to avoid ambiguity. For example:
3949
3950   # Get CDs from 1984 with a 'Foo' track
3951   my $rs = $schema->resultset('CD')->search(
3952     {
3953       'me.year' => 1984,
3954       'tracks.name' => 'Foo'
3955     },
3956     { join => 'tracks' }
3957   );
3958
3959 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3960 similarly for a third time). For e.g.
3961
3962   my $rs = $schema->resultset('Artist')->search({
3963     'cds.title'   => 'Down to Earth',
3964     'cds_2.title' => 'Popular',
3965   }, {
3966     join => [ qw/cds cds/ ],
3967   });
3968
3969 will return a set of all artists that have both a cd with title 'Down
3970 to Earth' and a cd with title 'Popular'.
3971
3972 If you want to fetch related objects from other tables as well, see C<prefetch>
3973 below.
3974
3975 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3976
3977 =head2 prefetch
3978
3979 =over 4
3980
3981 =item Value: ($rel_name | \@rel_names | \%rel_names)
3982
3983 =back
3984
3985 Contains one or more relationships that should be fetched along with
3986 the main query (when they are accessed afterwards the data will
3987 already be available, without extra queries to the database).  This is
3988 useful for when you know you will need the related objects, because it
3989 saves at least one query:
3990
3991   my $rs = $schema->resultset('Tag')->search(
3992     undef,
3993     {
3994       prefetch => {
3995         cd => 'artist'
3996       }
3997     }
3998   );
3999
4000 The initial search results in SQL like the following:
4001
4002   SELECT tag.*, cd.*, artist.* FROM tag
4003   JOIN cd ON tag.cd = cd.cdid
4004   JOIN artist ON cd.artist = artist.artistid
4005
4006 L<DBIx::Class> has no need to go back to the database when we access the
4007 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4008 case.
4009
4010 Simple prefetches will be joined automatically, so there is no need
4011 for a C<join> attribute in the above search.
4012
4013 L</prefetch> can be used with the any of the relationship types and
4014 multiple prefetches can be specified together. Below is a more complex
4015 example that prefetches a CD's artist, its liner notes (if present),
4016 the cover image, the tracks on that cd, and the guests on those
4017 tracks.
4018
4019  # Assuming:
4020  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4021  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4022  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4023  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4024
4025  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4026
4027  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4028
4029
4030  my $rs = $schema->resultset('CD')->search(
4031    undef,
4032    {
4033      prefetch => [
4034        { artist => 'record_label'},  # belongs_to => belongs_to
4035        'liner_note',                 # might_have
4036        'cover_image',                # has_one
4037        { tracks => 'guests' },       # has_many => has_many
4038      ]
4039    }
4040  );
4041
4042 This will produce SQL like the following:
4043
4044  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4045         tracks.*, guests.*
4046    FROM cd me
4047    JOIN artist artist
4048      ON artist.artistid = me.artistid
4049    JOIN record_label record_label
4050      ON record_label.labelid = artist.labelid
4051    LEFT JOIN track tracks
4052      ON tracks.cdid = me.cdid
4053    LEFT JOIN guest guests
4054      ON guests.trackid = track.trackid
4055    LEFT JOIN liner_notes liner_note
4056      ON liner_note.cdid = me.cdid
4057    JOIN cd_artwork cover_image
4058      ON cover_image.cdid = me.cdid
4059  ORDER BY tracks.cd
4060
4061 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4062 C<tracks>, and C<guests> of the CD will all be available through the
4063 relationship accessors without the need for additional queries to the
4064 database.
4065
4066 However, there is one caveat to be observed: it can be dangerous to
4067 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4068 relationship on a given level. e.g.:
4069
4070  my $rs = $schema->resultset('CD')->search(
4071    undef,
4072    {
4073      prefetch => [
4074        'tracks',                         # has_many
4075        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4076      ]
4077    }
4078  );
4079
4080 In fact, C<DBIx::Class> will emit the following warning:
4081
4082  Prefetching multiple has_many rels tracks and cd_to_producer at top
4083  level will explode the number of row objects retrievable via ->next
4084  or ->all. Use at your own risk.
4085
4086 The collapser currently can't identify duplicate tuples for multiple
4087 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4088 result the second L<has_many|DBIx::Class::Relationship/has_many>
4089 relation could contain redundant objects.
4090
4091 =head3 Using L</prefetch> with L</join>
4092
4093 L</prefetch> implies a L</join> with the equivalent argument, and is
4094 properly merged with any existing L</join> specification. So the
4095 following:
4096
4097   my $rs = $schema->resultset('CD')->search(
4098    {'record_label.name' => 'Music Product Ltd.'},
4099    {
4100      join     => {artist => 'record_label'},
4101      prefetch => 'artist',
4102    }
4103  );
4104
4105 ... will work, searching on the record label's name, but only
4106 prefetching the C<artist>.
4107
4108 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4109
4110 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4111 prefetched relations.  So given:
4112
4113   my $rs = $schema->resultset('CD')->search(
4114    undef,
4115    {
4116      select   => ['cd.title'],
4117      as       => ['cd_title'],
4118      prefetch => 'artist',
4119    }
4120  );
4121
4122 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4123 becomes: C<'cd_title', 'artist.*'>.
4124
4125 =head3 CAVEATS
4126
4127 Prefetch does a lot of deep magic. As such, it may not behave exactly
4128 as you might expect.
4129
4130 =over 4
4131
4132 =item *
4133
4134 Prefetch uses the L</cache> to populate the prefetched relationships. This
4135 may or may not be what you want.
4136
4137 =item *
4138
4139 If you specify a condition on a prefetched relationship, ONLY those
4140 rows that match the prefetched condition will be fetched into that relationship.
4141 This means that adding prefetch to a search() B<may alter> what is returned by
4142 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4143
4144   my $artist_rs = $schema->resultset('Artist')->search({
4145       'cds.year' => 2008,
4146   }, {
4147       join => 'cds',
4148   });
4149
4150   my $count = $artist_rs->first->cds->count;
4151
4152   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4153
4154   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4155
4156   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4157
4158 that cmp_ok() may or may not pass depending on the datasets involved. This
4159 behavior may or may not survive the 0.09 transition.
4160
4161 =back
4162
4163 =head2 page
4164
4165 =over 4
4166
4167 =item Value: $page
4168
4169 =back
4170
4171 Makes the resultset paged and specifies the page to retrieve. Effectively
4172 identical to creating a non-pages resultset and then calling ->page($page)
4173 on it.
4174
4175 If L</rows> attribute is not specified it defaults to 10 rows per page.
4176
4177 When you have a paged resultset, L</count> will only return the number
4178 of rows in the page. To get the total, use the L</pager> and call
4179 C<total_entries> on it.
4180
4181 =head2 rows
4182
4183 =over 4
4184
4185 =item Value: $rows
4186
4187 =back
4188
4189 Specifies the maximum number of rows for direct retrieval or the number of
4190 rows per page if the page attribute or method is used.
4191
4192 =head2 offset
4193
4194 =over 4
4195
4196 =item Value: $offset
4197
4198 =back
4199
4200 Specifies the (zero-based) row number for the  first row to be returned, or the
4201 of the first row of the first page if paging is used.
4202
4203 =head2 software_limit
4204
4205 =over 4
4206
4207 =item Value: (0 | 1)
4208
4209 =back
4210
4211 When combined with L</rows> and/or L</offset> the generated SQL will not
4212 include any limit dialect stanzas. Instead the entire result will be selected
4213 as if no limits were specified, and DBIC will perform the limit locally, by
4214 artificially advancing and finishing the resulting L</cursor>.
4215
4216 This is the recommended way of performing resultset limiting when no sane RDBMS
4217 implementation is available (e.g.
4218 L<Sybase ASE|DBIx::Class::Storage::DBI::Sybase::ASE> using the
4219 L<Generic Sub Query|DBIx::Class::SQLMaker::LimitDialects/GenericSubQ> hack)
4220
4221 =head2 group_by
4222
4223 =over 4
4224
4225 =item Value: \@columns
4226
4227 =back
4228
4229 A arrayref of columns to group by. Can include columns of joined tables.
4230
4231   group_by => [qw/ column1 column2 ... /]
4232
4233 =head2 having
4234
4235 =over 4
4236
4237 =item Value: $condition
4238
4239 =back
4240
4241 HAVING is a select statement attribute that is applied between GROUP BY and
4242 ORDER BY. It is applied to the after the grouping calculations have been
4243 done.
4244
4245   having => { 'count_employee' => { '>=', 100 } }
4246
4247 or with an in-place function in which case literal SQL is required:
4248
4249   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4250
4251 =head2 distinct
4252
4253 =over 4
4254
4255 =item Value: (0 | 1)
4256
4257 =back
4258
4259 Set to 1 to group by all columns. If the resultset already has a group_by
4260 attribute, this setting is ignored and an appropriate warning is issued.
4261
4262 =head2 where
4263
4264 =over 4
4265
4266 Adds to the WHERE clause.
4267
4268   # only return rows WHERE deleted IS NULL for all searches
4269   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4270
4271 Can be overridden by passing C<< { where => undef } >> as an attribute
4272 to a resultset.
4273
4274 For more complicated where clauses see L<SQL::Abstract/WHERE CLAUSES>.
4275
4276 =back
4277
4278 =head2 cache
4279
4280 Set to 1 to cache search results. This prevents extra SQL queries if you
4281 revisit rows in your ResultSet:
4282
4283   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4284
4285   while( my $artist = $resultset->next ) {
4286     ... do stuff ...
4287   }
4288
4289   $rs->first; # without cache, this would issue a query
4290
4291 By default, searches are not cached.
4292
4293 For more examples of using these attributes, see
4294 L<DBIx::Class::Manual::Cookbook>.
4295
4296 =head2 for
4297
4298 =over 4
4299
4300 =item Value: ( 'update' | 'shared' )
4301
4302 =back
4303
4304 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4305 ... FOR SHARED.
4306
4307 =cut
4308
4309 1;