Fix find() pessimization when invoked with bare values
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Data::Compare (); # no imports!!! guard against insane architecture
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 EXAMPLES
78
79 =head2 Chaining resultsets
80
81 Let's say you've got a query that needs to be run to return some data
82 to the user. But, you have an authorization system in place that
83 prevents certain users from seeing certain information. So, you want
84 to construct the basic query in one method, but add constraints to it in
85 another.
86
87   sub get_data {
88     my $self = shift;
89     my $request = $self->get_request; # Get a request object somehow.
90     my $schema = $self->result_source->schema;
91
92     my $cd_rs = $schema->resultset('CD')->search({
93       title => $request->param('title'),
94       year => $request->param('year'),
95     });
96
97     $cd_rs = $self->apply_security_policy( $cd_rs );
98
99     return $cd_rs->all();
100   }
101
102   sub apply_security_policy {
103     my $self = shift;
104     my ($rs) = @_;
105
106     return $rs->search({
107       subversive => 0,
108     });
109   }
110
111 =head3 Resolving conditions and attributes
112
113 When a resultset is chained from another resultset, conditions and
114 attributes with the same keys need resolving.
115
116 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
117 into the existing ones from the original resultset.
118
119 The L</where> and L</having> attributes, and any search conditions, are
120 merged with an SQL C<AND> to the existing condition from the original
121 resultset.
122
123 All other attributes are overridden by any new ones supplied in the
124 search attributes.
125
126 =head2 Multiple queries
127
128 Since a resultset just defines a query, you can do all sorts of
129 things with it with the same object.
130
131   # Don't hit the DB yet.
132   my $cd_rs = $schema->resultset('CD')->search({
133     title => 'something',
134     year => 2009,
135   });
136
137   # Each of these hits the DB individually.
138   my $count = $cd_rs->count;
139   my $most_recent = $cd_rs->get_column('date_released')->max();
140   my @records = $cd_rs->all;
141
142 And it's not just limited to SELECT statements.
143
144   $cd_rs->delete();
145
146 This is even cooler:
147
148   $cd_rs->create({ artist => 'Fred' });
149
150 Which is the same as:
151
152   $schema->resultset('CD')->create({
153     title => 'something',
154     year => 2009,
155     artist => 'Fred'
156   });
157
158 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
159
160 =head1 METHODS
161
162 =head2 new
163
164 =over 4
165
166 =item Arguments: $source, \%$attrs
167
168 =item Return Value: $rs
169
170 =back
171
172 The resultset constructor. Takes a source object (usually a
173 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
174 L</ATTRIBUTES> below).  Does not perform any queries -- these are
175 executed as needed by the other methods.
176
177 Generally you won't need to construct a resultset manually.  You'll
178 automatically get one from e.g. a L</search> called in scalar context:
179
180   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
181
182 IMPORTANT: If called on an object, proxies to new_result instead so
183
184   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
185
186 will return a CD object, not a ResultSet.
187
188 =cut
189
190 sub new {
191   my $class = shift;
192   return $class->new_result(@_) if ref $class;
193
194   my ($source, $attrs) = @_;
195   $source = $source->resolve
196     if $source->isa('DBIx::Class::ResultSourceHandle');
197   $attrs = { %{$attrs||{}} };
198
199   if ($attrs->{page}) {
200     $attrs->{rows} ||= 10;
201   }
202
203   $attrs->{alias} ||= 'me';
204
205   my $self = bless {
206     result_source => $source,
207     cond => $attrs->{where},
208     pager => undef,
209     attrs => $attrs,
210   }, $class;
211
212   # if there is a dark selector, this means we are already in a
213   # chain and the cleanup/sanification was taken care of by
214   # _search_rs already
215   $self->_normalize_selection($attrs)
216     unless $attrs->{_dark_selector};
217
218   $self->result_class(
219     $attrs->{result_class} || $source->result_class
220   );
221
222   $self;
223 }
224
225 =head2 search
226
227 =over 4
228
229 =item Arguments: $cond, \%attrs?
230
231 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
232
233 =back
234
235   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
236   my $new_rs = $cd_rs->search({ year => 2005 });
237
238   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
239                  # year = 2005 OR year = 2004
240
241 In list context, C<< ->all() >> is called implicitly on the resultset, thus
242 returning a list of row objects instead. To avoid that, use L</search_rs>.
243
244 If you need to pass in additional attributes but no additional condition,
245 call it as C<search(undef, \%attrs)>.
246
247   # "SELECT name, artistid FROM $artist_table"
248   my @all_artists = $schema->resultset('Artist')->search(undef, {
249     columns => [qw/name artistid/],
250   });
251
252 For a list of attributes that can be passed to C<search>, see
253 L</ATTRIBUTES>. For more examples of using this function, see
254 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
255 documentation for the first argument, see L<SQL::Abstract>
256 and its extension L<DBIx::Class::SQLMaker>.
257
258 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
259
260 =head3 CAVEAT
261
262 Note that L</search> does not process/deflate any of the values passed in the
263 L<SQL::Abstract>-compatible search condition structure. This is unlike other
264 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
265 manually that any value passed to this method will stringify to something the
266 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
267 objects, for more info see:
268 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
269
270 =cut
271
272 sub search {
273   my $self = shift;
274   my $rs = $self->search_rs( @_ );
275
276   if (wantarray) {
277     return $rs->all;
278   }
279   elsif (defined wantarray) {
280     return $rs;
281   }
282   else {
283     # we can be called by a relationship helper, which in
284     # turn may be called in void context due to some braindead
285     # overload or whatever else the user decided to be clever
286     # at this particular day. Thus limit the exception to
287     # external code calls only
288     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
289       if (caller)[0] !~ /^\QDBIx::Class::/;
290
291     return ();
292   }
293 }
294
295 =head2 search_rs
296
297 =over 4
298
299 =item Arguments: $cond, \%attrs?
300
301 =item Return Value: $resultset
302
303 =back
304
305 This method does the same exact thing as search() except it will
306 always return a resultset, even in list context.
307
308 =cut
309
310 sub search_rs {
311   my $self = shift;
312
313   # Special-case handling for (undef, undef).
314   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
315     @_ = ();
316   }
317
318   my $call_attrs = {};
319   if (@_ > 1) {
320     if (ref $_[-1] eq 'HASH') {
321       # copy for _normalize_selection
322       $call_attrs = { %{ pop @_ } };
323     }
324     elsif (! defined $_[-1] ) {
325       pop @_;   # search({}, undef)
326     }
327   }
328
329   # see if we can keep the cache (no $rs changes)
330   my $cache;
331   my %safe = (alias => 1, cache => 1);
332   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
333     ! defined $_[0]
334       or
335     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
336       or
337     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
338   )) {
339     $cache = $self->get_cache;
340   }
341
342   my $rsrc = $self->result_source;
343
344   my $old_attrs = { %{$self->{attrs}} };
345   my $old_having = delete $old_attrs->{having};
346   my $old_where = delete $old_attrs->{where};
347
348   my $new_attrs = { %$old_attrs };
349
350   # take care of call attrs (only if anything is changing)
351   if (keys %$call_attrs) {
352
353     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
354
355     # reset the current selector list if new selectors are supplied
356     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
357       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
358     }
359
360     # Normalize the new selector list (operates on the passed-in attr structure)
361     # Need to do it on every chain instead of only once on _resolved_attrs, in
362     # order to allow detection of empty vs partial 'as'
363     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
364       if $old_attrs->{_dark_selector};
365     $self->_normalize_selection ($call_attrs);
366
367     # start with blind overwriting merge, exclude selector attrs
368     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
369     delete @{$new_attrs}{@selector_attrs};
370
371     for (@selector_attrs) {
372       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
373         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
374     }
375
376     # older deprecated name, use only if {columns} is not there
377     if (my $c = delete $new_attrs->{cols}) {
378       if ($new_attrs->{columns}) {
379         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
380       }
381       else {
382         $new_attrs->{columns} = $c;
383       }
384     }
385
386
387     # join/prefetch use their own crazy merging heuristics
388     foreach my $key (qw/join prefetch/) {
389       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
390         if exists $call_attrs->{$key};
391     }
392
393     # stack binds together
394     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
395   }
396
397
398   # rip apart the rest of @_, parse a condition
399   my $call_cond = do {
400
401     if (ref $_[0] eq 'HASH') {
402       (keys %{$_[0]}) ? $_[0] : undef
403     }
404     elsif (@_ == 1) {
405       $_[0]
406     }
407     elsif (@_ % 2) {
408       $self->throw_exception('Odd number of arguments to search')
409     }
410     else {
411       +{ @_ }
412     }
413
414   } if @_;
415
416   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
417     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
418   }
419
420   for ($old_where, $call_cond) {
421     if (defined $_) {
422       $new_attrs->{where} = $self->_stack_cond (
423         $_, $new_attrs->{where}
424       );
425     }
426   }
427
428   if (defined $old_having) {
429     $new_attrs->{having} = $self->_stack_cond (
430       $old_having, $new_attrs->{having}
431     )
432   }
433
434   my $rs = (ref $self)->new($rsrc, $new_attrs);
435
436   $rs->set_cache($cache) if ($cache);
437
438   return $rs;
439 }
440
441 my $dark_sel_dumper;
442 sub _normalize_selection {
443   my ($self, $attrs) = @_;
444
445   # legacy syntax
446   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
447     if exists $attrs->{include_columns};
448
449   # columns are always placed first, however 
450
451   # Keep the X vs +X separation until _resolved_attrs time - this allows to
452   # delay the decision on whether to use a default select list ($rsrc->columns)
453   # allowing stuff like the remove_columns helper to work
454   #
455   # select/as +select/+as pairs need special handling - the amount of select/as
456   # elements in each pair does *not* have to be equal (think multicolumn
457   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
458   # supplied at all) - try to infer the alias, either from the -as parameter
459   # of the selector spec, or use the parameter whole if it looks like a column
460   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
461   # is ok as well), but make sure no more additions to the 'as' chain take place
462   for my $pref ('', '+') {
463
464     my ($sel, $as) = map {
465       my $key = "${pref}${_}";
466
467       my $val = [ ref $attrs->{$key} eq 'ARRAY'
468         ? @{$attrs->{$key}}
469         : $attrs->{$key} || ()
470       ];
471       delete $attrs->{$key};
472       $val;
473     } qw/select as/;
474
475     if (! @$as and ! @$sel ) {
476       next;
477     }
478     elsif (@$as and ! @$sel) {
479       $self->throw_exception(
480         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
481       );
482     }
483     elsif( ! @$as ) {
484       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
485       # if any @$as has been supplied we assume the user knows what (s)he is doing
486       # and blindly keep stacking up pieces
487       unless ($attrs->{_dark_selector}) {
488         SELECTOR:
489         for (@$sel) {
490           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
491             push @$as, $_->{-as};
492           }
493           # assume any plain no-space, no-parenthesis string to be a column spec
494           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
495           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
496             push @$as, $_;
497           }
498           # if all else fails - raise a flag that no more aliasing will be allowed
499           else {
500             $attrs->{_dark_selector} = {
501               plus_stage => $pref,
502               string => ($dark_sel_dumper ||= do {
503                   require Data::Dumper::Concise;
504                   Data::Dumper::Concise::DumperObject()->Indent(0);
505                 })->Values([$_])->Dump
506               ,
507             };
508             last SELECTOR;
509           }
510         }
511       }
512     }
513     elsif (@$as < @$sel) {
514       $self->throw_exception(
515         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
516       );
517     }
518     elsif ($pref and $attrs->{_dark_selector}) {
519       $self->throw_exception(
520         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
521       );
522     }
523
524
525     # merge result
526     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
527     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
528   }
529 }
530
531 sub _stack_cond {
532   my ($self, $left, $right) = @_;
533
534   # collapse single element top-level conditions
535   # (single pass only, unlikely to need recursion)
536   for ($left, $right) {
537     if (ref $_ eq 'ARRAY') {
538       if (@$_ == 0) {
539         $_ = undef;
540       }
541       elsif (@$_ == 1) {
542         $_ = $_->[0];
543       }
544     }
545     elsif (ref $_ eq 'HASH') {
546       my ($first, $more) = keys %$_;
547
548       # empty hash
549       if (! defined $first) {
550         $_ = undef;
551       }
552       # one element hash
553       elsif (! defined $more) {
554         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
555           $_ = $_->{'-and'};
556         }
557         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
558           $_ = $_->{'-or'};
559         }
560       }
561     }
562   }
563
564   # merge hashes with weeding out of duplicates (simple cases only)
565   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
566
567     # shallow copy to destroy
568     $right = { %$right };
569     for (grep { exists $right->{$_} } keys %$left) {
570       # the use of eq_deeply here is justified - the rhs of an
571       # expression can contain a lot of twisted weird stuff
572       delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
573     }
574
575     $right = undef unless keys %$right;
576   }
577
578
579   if (defined $left xor defined $right) {
580     return defined $left ? $left : $right;
581   }
582   elsif (! defined $left) {
583     return undef;
584   }
585   else {
586     return { -and => [ $left, $right ] };
587   }
588 }
589
590 =head2 search_literal
591
592 =over 4
593
594 =item Arguments: $sql_fragment, @bind_values
595
596 =item Return Value: $resultset (scalar context) || @row_objs (list context)
597
598 =back
599
600   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
601   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
602
603 Pass a literal chunk of SQL to be added to the conditional part of the
604 resultset query.
605
606 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
607 only be used in that context. C<search_literal> is a convenience method.
608 It is equivalent to calling $schema->search(\[]), but if you want to ensure
609 columns are bound correctly, use C<search>.
610
611 Example of how to use C<search> instead of C<search_literal>
612
613   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
614   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
615
616
617 See L<DBIx::Class::Manual::Cookbook/Searching> and
618 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
619 require C<search_literal>.
620
621 =cut
622
623 sub search_literal {
624   my ($self, $sql, @bind) = @_;
625   my $attr;
626   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
627     $attr = pop @bind;
628   }
629   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
630 }
631
632 =head2 find
633
634 =over 4
635
636 =item Arguments: \%columns_values | @pk_values, \%attrs?
637
638 =item Return Value: $row_object | undef
639
640 =back
641
642 Finds and returns a single row based on supplied criteria. Takes either a
643 hashref with the same format as L</create> (including inference of foreign
644 keys from related objects), or a list of primary key values in the same
645 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
646 declaration on the L</result_source>.
647
648 In either case an attempt is made to combine conditions already existing on
649 the resultset with the condition passed to this method.
650
651 To aid with preparing the correct query for the storage you may supply the
652 C<key> attribute, which is the name of a
653 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
654 unique constraint corresponding to the
655 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
656 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
657 to construct a query that satisfies the named unique constraint fully (
658 non-NULL values for each column member of the constraint) an exception is
659 thrown.
660
661 If no C<key> is specified, the search is carried over all unique constraints
662 which are fully defined by the available condition.
663
664 If no such constraint is found, C<find> currently defaults to a simple
665 C<< search->(\%column_values) >> which may or may not do what you expect.
666 Note that this fallback behavior may be deprecated in further versions. If
667 you need to search with arbitrary conditions - use L</search>. If the query
668 resulting from this fallback produces more than one row, a warning to the
669 effect is issued, though only the first row is constructed and returned as
670 C<$row_object>.
671
672 In addition to C<key>, L</find> recognizes and applies standard
673 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
674
675 Note that if you have extra concerns about the correctness of the resulting
676 query you need to specify the C<key> attribute and supply the entire condition
677 as an argument to find (since it is not always possible to perform the
678 combination of the resultset condition with the supplied one, especially if
679 the resultset condition contains literal sql).
680
681 For example, to find a row by its primary key:
682
683   my $cd = $schema->resultset('CD')->find(5);
684
685 You can also find a row by a specific unique constraint:
686
687   my $cd = $schema->resultset('CD')->find(
688     {
689       artist => 'Massive Attack',
690       title  => 'Mezzanine',
691     },
692     { key => 'cd_artist_title' }
693   );
694
695 See also L</find_or_create> and L</update_or_create>.
696
697 =cut
698
699 sub find {
700   my $self = shift;
701   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
702
703   my $rsrc = $self->result_source;
704
705   my $constraint_name;
706   if (exists $attrs->{key}) {
707     $constraint_name = defined $attrs->{key}
708       ? $attrs->{key}
709       : $self->throw_exception("An undefined 'key' resultset attribute makes no sense")
710     ;
711   }
712
713   # Parse out the condition from input
714   my $call_cond;
715
716   if (ref $_[0] eq 'HASH') {
717     $call_cond = { %{$_[0]} };
718   }
719   else {
720     # if only values are supplied we need to default to 'primary'
721     $constraint_name = 'primary' unless defined $constraint_name;
722
723     my @c_cols = $rsrc->unique_constraint_columns($constraint_name);
724
725     $self->throw_exception(
726       "No constraint columns, maybe a malformed '$constraint_name' constraint?"
727     ) unless @c_cols;
728
729     $self->throw_exception (
730       'find() expects either a column/value hashref, or a list of values '
731     . "corresponding to the columns of the specified unique constraint '$constraint_name'"
732     ) unless @c_cols == @_;
733
734     $call_cond = {};
735     @{$call_cond}{@c_cols} = @_;
736   }
737
738   my %related;
739   for my $key (keys %$call_cond) {
740     if (
741       my $keyref = ref($call_cond->{$key})
742         and
743       my $relinfo = $rsrc->relationship_info($key)
744     ) {
745       my $val = delete $call_cond->{$key};
746
747       next if $keyref eq 'ARRAY'; # has_many for multi_create
748
749       my $rel_q = $rsrc->_resolve_condition(
750         $relinfo->{cond}, $val, $key, $key
751       );
752       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
753       @related{keys %$rel_q} = values %$rel_q;
754     }
755   }
756
757   # relationship conditions take precedence (?)
758   @{$call_cond}{keys %related} = values %related;
759
760   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
761   my $final_cond;
762   if (defined $constraint_name) {
763     $final_cond = $self->_qualify_cond_columns (
764
765       $self->_build_unique_cond (
766         $constraint_name,
767         $call_cond,
768       ),
769
770       $alias,
771     );
772   }
773   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
774     # This means that we got here after a merger of relationship conditions
775     # in ::Relationship::Base::search_related (the row method), and furthermore
776     # the relationship is of the 'single' type. This means that the condition
777     # provided by the relationship (already attached to $self) is sufficient,
778     # as there can be only one row in the database that would satisfy the
779     # relationship
780   }
781   else {
782     # no key was specified - fall down to heuristics mode:
783     # run through all unique queries registered on the resultset, and
784     # 'OR' all qualifying queries together
785     my (@unique_queries, %seen_column_combinations);
786     for my $c_name ($rsrc->unique_constraint_names) {
787       next if $seen_column_combinations{
788         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
789       }++;
790
791       push @unique_queries, try {
792         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
793       } || ();
794     }
795
796     $final_cond = @unique_queries
797       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
798       : $self->_non_unique_find_fallback ($call_cond, $attrs)
799     ;
800   }
801
802   # Run the query, passing the result_class since it should propagate for find
803   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
804   if (keys %{$rs->_resolved_attrs->{collapse}}) {
805     my $row = $rs->next;
806     carp "Query returned more than one row" if $rs->next;
807     return $row;
808   }
809   else {
810     return $rs->single;
811   }
812 }
813
814 # This is a stop-gap method as agreed during the discussion on find() cleanup:
815 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
816 #
817 # It is invoked when find() is called in legacy-mode with insufficiently-unique
818 # condition. It is provided for overrides until a saner way forward is devised
819 #
820 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
821 # the road. Please adjust your tests accordingly to catch this situation early
822 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
823 #
824 # The method will not be removed without an adequately complete replacement
825 # for strict-mode enforcement
826 sub _non_unique_find_fallback {
827   my ($self, $cond, $attrs) = @_;
828
829   return $self->_qualify_cond_columns(
830     $cond,
831     exists $attrs->{alias}
832       ? $attrs->{alias}
833       : $self->{attrs}{alias}
834   );
835 }
836
837
838 sub _qualify_cond_columns {
839   my ($self, $cond, $alias) = @_;
840
841   my %aliased = %$cond;
842   for (keys %aliased) {
843     $aliased{"$alias.$_"} = delete $aliased{$_}
844       if $_ !~ /\./;
845   }
846
847   return \%aliased;
848 }
849
850 sub _build_unique_cond {
851   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
852
853   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
854
855   # combination may fail if $self->{cond} is non-trivial
856   my ($final_cond) = try {
857     $self->_merge_with_rscond ($extra_cond)
858   } catch {
859     +{ %$extra_cond }
860   };
861
862   # trim out everything not in $columns
863   $final_cond = { map {
864     exists $final_cond->{$_}
865       ? ( $_ => $final_cond->{$_} )
866       : ()
867   } @c_cols };
868
869   if (my @missing = grep
870     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
871     (@c_cols)
872   ) {
873     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
874       $constraint_name,
875       join (', ', map { "'$_'" } @missing),
876     ) );
877   }
878
879   if (
880     !$croak_on_null
881       and
882     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
883       and
884     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
885   ) {
886     carp_unique ( sprintf (
887       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
888     . 'values in column(s): %s). This is almost certainly not what you wanted, '
889     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
890       $constraint_name,
891       join (', ', map { "'$_'" } @undefs),
892     ));
893   }
894
895   return $final_cond;
896 }
897
898 =head2 search_related
899
900 =over 4
901
902 =item Arguments: $rel, $cond, \%attrs?
903
904 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
905
906 =back
907
908   $new_rs = $cd_rs->search_related('artist', {
909     name => 'Emo-R-Us',
910   });
911
912 Searches the specified relationship, optionally specifying a condition and
913 attributes for matching records. See L</ATTRIBUTES> for more information.
914
915 In list context, C<< ->all() >> is called implicitly on the resultset, thus
916 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
917
918 See also L</search_related_rs>.
919
920 =cut
921
922 sub search_related {
923   return shift->related_resultset(shift)->search(@_);
924 }
925
926 =head2 search_related_rs
927
928 This method works exactly the same as search_related, except that
929 it guarantees a resultset, even in list context.
930
931 =cut
932
933 sub search_related_rs {
934   return shift->related_resultset(shift)->search_rs(@_);
935 }
936
937 =head2 cursor
938
939 =over 4
940
941 =item Arguments: none
942
943 =item Return Value: $cursor
944
945 =back
946
947 Returns a storage-driven cursor to the given resultset. See
948 L<DBIx::Class::Cursor> for more information.
949
950 =cut
951
952 sub cursor {
953   my ($self) = @_;
954
955   my $attrs = $self->_resolved_attrs_copy;
956
957   return $self->{cursor}
958     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
959           $attrs->{where},$attrs);
960 }
961
962 =head2 single
963
964 =over 4
965
966 =item Arguments: $cond?
967
968 =item Return Value: $row_object | undef
969
970 =back
971
972   my $cd = $schema->resultset('CD')->single({ year => 2001 });
973
974 Inflates the first result without creating a cursor if the resultset has
975 any records in it; if not returns C<undef>. Used by L</find> as a lean version
976 of L</search>.
977
978 While this method can take an optional search condition (just like L</search>)
979 being a fast-code-path it does not recognize search attributes. If you need to
980 add extra joins or similar, call L</search> and then chain-call L</single> on the
981 L<DBIx::Class::ResultSet> returned.
982
983 =over
984
985 =item B<Note>
986
987 As of 0.08100, this method enforces the assumption that the preceding
988 query returns only one row. If more than one row is returned, you will receive
989 a warning:
990
991   Query returned more than one row
992
993 In this case, you should be using L</next> or L</find> instead, or if you really
994 know what you are doing, use the L</rows> attribute to explicitly limit the size
995 of the resultset.
996
997 This method will also throw an exception if it is called on a resultset prefetching
998 has_many, as such a prefetch implies fetching multiple rows from the database in
999 order to assemble the resulting object.
1000
1001 =back
1002
1003 =cut
1004
1005 sub single {
1006   my ($self, $where) = @_;
1007   if(@_ > 2) {
1008       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
1009   }
1010
1011   my $attrs = $self->_resolved_attrs_copy;
1012
1013   if (keys %{$attrs->{collapse}}) {
1014     $self->throw_exception(
1015       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1016     );
1017   }
1018
1019   if ($where) {
1020     if (defined $attrs->{where}) {
1021       $attrs->{where} = {
1022         '-and' =>
1023             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1024                $where, delete $attrs->{where} ]
1025       };
1026     } else {
1027       $attrs->{where} = $where;
1028     }
1029   }
1030
1031   my @data = $self->result_source->storage->select_single(
1032     $attrs->{from}, $attrs->{select},
1033     $attrs->{where}, $attrs
1034   );
1035
1036   return (@data ? ($self->_construct_object(@data))[0] : undef);
1037 }
1038
1039
1040 # _collapse_query
1041 #
1042 # Recursively collapse the query, accumulating values for each column.
1043
1044 sub _collapse_query {
1045   my ($self, $query, $collapsed) = @_;
1046
1047   $collapsed ||= {};
1048
1049   if (ref $query eq 'ARRAY') {
1050     foreach my $subquery (@$query) {
1051       next unless ref $subquery;  # -or
1052       $collapsed = $self->_collapse_query($subquery, $collapsed);
1053     }
1054   }
1055   elsif (ref $query eq 'HASH') {
1056     if (keys %$query and (keys %$query)[0] eq '-and') {
1057       foreach my $subquery (@{$query->{-and}}) {
1058         $collapsed = $self->_collapse_query($subquery, $collapsed);
1059       }
1060     }
1061     else {
1062       foreach my $col (keys %$query) {
1063         my $value = $query->{$col};
1064         $collapsed->{$col}{$value}++;
1065       }
1066     }
1067   }
1068
1069   return $collapsed;
1070 }
1071
1072 =head2 get_column
1073
1074 =over 4
1075
1076 =item Arguments: $cond?
1077
1078 =item Return Value: $resultsetcolumn
1079
1080 =back
1081
1082   my $max_length = $rs->get_column('length')->max;
1083
1084 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1085
1086 =cut
1087
1088 sub get_column {
1089   my ($self, $column) = @_;
1090   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1091   return $new;
1092 }
1093
1094 =head2 search_like
1095
1096 =over 4
1097
1098 =item Arguments: $cond, \%attrs?
1099
1100 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1101
1102 =back
1103
1104   # WHERE title LIKE '%blue%'
1105   $cd_rs = $rs->search_like({ title => '%blue%'});
1106
1107 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1108 that this is simply a convenience method retained for ex Class::DBI users.
1109 You most likely want to use L</search> with specific operators.
1110
1111 For more information, see L<DBIx::Class::Manual::Cookbook>.
1112
1113 This method is deprecated and will be removed in 0.09. Use L</search()>
1114 instead. An example conversion is:
1115
1116   ->search_like({ foo => 'bar' });
1117
1118   # Becomes
1119
1120   ->search({ foo => { like => 'bar' } });
1121
1122 =cut
1123
1124 sub search_like {
1125   my $class = shift;
1126   carp_unique (
1127     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1128    .' Instead use ->search({ x => { -like => "y%" } })'
1129    .' (note the outer pair of {}s - they are important!)'
1130   );
1131   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1132   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1133   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1134   return $class->search($query, { %$attrs });
1135 }
1136
1137 =head2 slice
1138
1139 =over 4
1140
1141 =item Arguments: $first, $last
1142
1143 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1144
1145 =back
1146
1147 Returns a resultset or object list representing a subset of elements from the
1148 resultset slice is called on. Indexes are from 0, i.e., to get the first
1149 three records, call:
1150
1151   my ($one, $two, $three) = $rs->slice(0, 2);
1152
1153 =cut
1154
1155 sub slice {
1156   my ($self, $min, $max) = @_;
1157   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1158   $attrs->{offset} = $self->{attrs}{offset} || 0;
1159   $attrs->{offset} += $min;
1160   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1161   return $self->search(undef, $attrs);
1162   #my $slice = (ref $self)->new($self->result_source, $attrs);
1163   #return (wantarray ? $slice->all : $slice);
1164 }
1165
1166 =head2 next
1167
1168 =over 4
1169
1170 =item Arguments: none
1171
1172 =item Return Value: $result | undef
1173
1174 =back
1175
1176 Returns the next element in the resultset (C<undef> is there is none).
1177
1178 Can be used to efficiently iterate over records in the resultset:
1179
1180   my $rs = $schema->resultset('CD')->search;
1181   while (my $cd = $rs->next) {
1182     print $cd->title;
1183   }
1184
1185 Note that you need to store the resultset object, and call C<next> on it.
1186 Calling C<< resultset('Table')->next >> repeatedly will always return the
1187 first record from the resultset.
1188
1189 =cut
1190
1191 sub next {
1192   my ($self) = @_;
1193   if (my $cache = $self->get_cache) {
1194     $self->{all_cache_position} ||= 0;
1195     return $cache->[$self->{all_cache_position}++];
1196   }
1197   if ($self->{attrs}{cache}) {
1198     delete $self->{pager};
1199     $self->{all_cache_position} = 1;
1200     return ($self->all)[0];
1201   }
1202   if ($self->{stashed_objects}) {
1203     my $obj = shift(@{$self->{stashed_objects}});
1204     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
1205     return $obj;
1206   }
1207   my @row = (
1208     exists $self->{stashed_row}
1209       ? @{delete $self->{stashed_row}}
1210       : $self->cursor->next
1211   );
1212   return undef unless (@row);
1213   my ($row, @more) = $self->_construct_object(@row);
1214   $self->{stashed_objects} = \@more if @more;
1215   return $row;
1216 }
1217
1218 sub _construct_object {
1219   my ($self, @row) = @_;
1220
1221   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
1222     or return ();
1223   my @new = $self->result_class->inflate_result($self->result_source, @$info);
1224   @new = $self->{_attrs}{record_filter}->(@new)
1225     if exists $self->{_attrs}{record_filter};
1226   return @new;
1227 }
1228
1229 sub _collapse_result {
1230   my ($self, $as_proto, $row) = @_;
1231
1232   my @copy = @$row;
1233
1234   # 'foo'         => [ undef, 'foo' ]
1235   # 'foo.bar'     => [ 'foo', 'bar' ]
1236   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
1237
1238   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
1239
1240   my %collapse = %{$self->{_attrs}{collapse}||{}};
1241
1242   my @pri_index;
1243
1244   # if we're doing collapsing (has_many prefetch) we need to grab records
1245   # until the PK changes, so fill @pri_index. if not, we leave it empty so
1246   # we know we don't have to bother.
1247
1248   # the reason for not using the collapse stuff directly is because if you
1249   # had for e.g. two artists in a row with no cds, the collapse info for
1250   # both would be NULL (undef) so you'd lose the second artist
1251
1252   # store just the index so we can check the array positions from the row
1253   # without having to contruct the full hash
1254
1255   if (keys %collapse) {
1256     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1257     foreach my $i (0 .. $#construct_as) {
1258       next if defined($construct_as[$i][0]); # only self table
1259       if (delete $pri{$construct_as[$i][1]}) {
1260         push(@pri_index, $i);
1261       }
1262       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1263     }
1264   }
1265
1266   # no need to do an if, it'll be empty if @pri_index is empty anyway
1267
1268   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1269
1270   my @const_rows;
1271
1272   do { # no need to check anything at the front, we always want the first row
1273
1274     my %const;
1275
1276     foreach my $this_as (@construct_as) {
1277       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1278     }
1279
1280     push(@const_rows, \%const);
1281
1282   } until ( # no pri_index => no collapse => drop straight out
1283       !@pri_index
1284     or
1285       do { # get another row, stash it, drop out if different PK
1286
1287         @copy = $self->cursor->next;
1288         $self->{stashed_row} = \@copy;
1289
1290         # last thing in do block, counts as true if anything doesn't match
1291
1292         # check xor defined first for NULL vs. NOT NULL then if one is
1293         # defined the other must be so check string equality
1294
1295         grep {
1296           (defined $pri_vals{$_} ^ defined $copy[$_])
1297           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1298         } @pri_index;
1299       }
1300   );
1301
1302   my $alias = $self->{attrs}{alias};
1303   my $info = [];
1304
1305   my %collapse_pos;
1306
1307   my @const_keys;
1308
1309   foreach my $const (@const_rows) {
1310     scalar @const_keys or do {
1311       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1312     };
1313     foreach my $key (@const_keys) {
1314       if (length $key) {
1315         my $target = $info;
1316         my @parts = split(/\./, $key);
1317         my $cur = '';
1318         my $data = $const->{$key};
1319         foreach my $p (@parts) {
1320           $target = $target->[1]->{$p} ||= [];
1321           $cur .= ".${p}";
1322           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1323             # collapsing at this point and on final part
1324             my $pos = $collapse_pos{$cur};
1325             CK: foreach my $ck (@ckey) {
1326               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1327                 $collapse_pos{$cur} = $data;
1328                 delete @collapse_pos{ # clear all positioning for sub-entries
1329                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1330                 };
1331                 push(@$target, []);
1332                 last CK;
1333               }
1334             }
1335           }
1336           if (exists $collapse{$cur}) {
1337             $target = $target->[-1];
1338           }
1339         }
1340         $target->[0] = $data;
1341       } else {
1342         $info->[0] = $const->{$key};
1343       }
1344     }
1345   }
1346
1347   return $info;
1348 }
1349
1350 =head2 result_source
1351
1352 =over 4
1353
1354 =item Arguments: $result_source?
1355
1356 =item Return Value: $result_source
1357
1358 =back
1359
1360 An accessor for the primary ResultSource object from which this ResultSet
1361 is derived.
1362
1363 =head2 result_class
1364
1365 =over 4
1366
1367 =item Arguments: $result_class?
1368
1369 =item Return Value: $result_class
1370
1371 =back
1372
1373 An accessor for the class to use when creating row objects. Defaults to
1374 C<< result_source->result_class >> - which in most cases is the name of the
1375 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1376
1377 Note that changing the result_class will also remove any components
1378 that were originally loaded in the source class via
1379 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1380 in the original source class will not run.
1381
1382 =cut
1383
1384 sub result_class {
1385   my ($self, $result_class) = @_;
1386   if ($result_class) {
1387     unless (ref $result_class) { # don't fire this for an object
1388       $self->ensure_class_loaded($result_class);
1389     }
1390     $self->_result_class($result_class);
1391     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1392     # permit the user to set result class on one result set only; it only
1393     # chains if provided to search()
1394     #$self->{attrs}{result_class} = $result_class if ref $self;
1395   }
1396   $self->_result_class;
1397 }
1398
1399 =head2 count
1400
1401 =over 4
1402
1403 =item Arguments: $cond, \%attrs??
1404
1405 =item Return Value: $count
1406
1407 =back
1408
1409 Performs an SQL C<COUNT> with the same query as the resultset was built
1410 with to find the number of elements. Passing arguments is equivalent to
1411 C<< $rs->search ($cond, \%attrs)->count >>
1412
1413 =cut
1414
1415 sub count {
1416   my $self = shift;
1417   return $self->search(@_)->count if @_ and defined $_[0];
1418   return scalar @{ $self->get_cache } if $self->get_cache;
1419
1420   my $attrs = $self->_resolved_attrs_copy;
1421
1422   # this is a little optimization - it is faster to do the limit
1423   # adjustments in software, instead of a subquery
1424   my $rows = delete $attrs->{rows};
1425   my $offset = delete $attrs->{offset};
1426
1427   my $crs;
1428   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1429     $crs = $self->_count_subq_rs ($attrs);
1430   }
1431   else {
1432     $crs = $self->_count_rs ($attrs);
1433   }
1434   my $count = $crs->next;
1435
1436   $count -= $offset if $offset;
1437   $count = $rows if $rows and $rows < $count;
1438   $count = 0 if ($count < 0);
1439
1440   return $count;
1441 }
1442
1443 =head2 count_rs
1444
1445 =over 4
1446
1447 =item Arguments: $cond, \%attrs??
1448
1449 =item Return Value: $count_rs
1450
1451 =back
1452
1453 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1454 This can be very handy for subqueries:
1455
1456   ->search( { amount => $some_rs->count_rs->as_query } )
1457
1458 As with regular resultsets the SQL query will be executed only after
1459 the resultset is accessed via L</next> or L</all>. That would return
1460 the same single value obtainable via L</count>.
1461
1462 =cut
1463
1464 sub count_rs {
1465   my $self = shift;
1466   return $self->search(@_)->count_rs if @_;
1467
1468   # this may look like a lack of abstraction (count() does about the same)
1469   # but in fact an _rs *must* use a subquery for the limits, as the
1470   # software based limiting can not be ported if this $rs is to be used
1471   # in a subquery itself (i.e. ->as_query)
1472   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1473     return $self->_count_subq_rs;
1474   }
1475   else {
1476     return $self->_count_rs;
1477   }
1478 }
1479
1480 #
1481 # returns a ResultSetColumn object tied to the count query
1482 #
1483 sub _count_rs {
1484   my ($self, $attrs) = @_;
1485
1486   my $rsrc = $self->result_source;
1487   $attrs ||= $self->_resolved_attrs;
1488
1489   my $tmp_attrs = { %$attrs };
1490   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1491   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1492
1493   # overwrite the selector (supplied by the storage)
1494   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1495   $tmp_attrs->{as} = 'count';
1496   delete @{$tmp_attrs}{qw/columns/};
1497
1498   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1499
1500   return $tmp_rs;
1501 }
1502
1503 #
1504 # same as above but uses a subquery
1505 #
1506 sub _count_subq_rs {
1507   my ($self, $attrs) = @_;
1508
1509   my $rsrc = $self->result_source;
1510   $attrs ||= $self->_resolved_attrs;
1511
1512   my $sub_attrs = { %$attrs };
1513   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1514   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1515
1516   # if we multi-prefetch we group_by primary keys only as this is what we would
1517   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1518   if ( keys %{$attrs->{collapse}}  ) {
1519     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1520   }
1521
1522   # Calculate subquery selector
1523   if (my $g = $sub_attrs->{group_by}) {
1524
1525     my $sql_maker = $rsrc->storage->sql_maker;
1526
1527     # necessary as the group_by may refer to aliased functions
1528     my $sel_index;
1529     for my $sel (@{$attrs->{select}}) {
1530       $sel_index->{$sel->{-as}} = $sel
1531         if (ref $sel eq 'HASH' and $sel->{-as});
1532     }
1533
1534     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1535     # also look for named aggregates referred in the having clause
1536     # having often contains scalarrefs - thus parse it out entirely
1537     my @parts = @$g;
1538     if ($attrs->{having}) {
1539       local $sql_maker->{having_bind};
1540       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1541       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1542       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1543         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1544         # if we don't unset it we screw up retarded but unfortunately working
1545         # 'MAX(foo.bar)' => { '>', 3 }
1546         $sql_maker->{name_sep} = '';
1547       }
1548
1549       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1550
1551       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1552
1553       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1554       # and if all fails for an utterly naive quoted scalar-with-function
1555       while ($sql =~ /
1556         $rquote $sep $lquote (.+?) $rquote
1557           |
1558         [\s,] \w+ \. (\w+) [\s,]
1559           |
1560         [\s,] $lquote (.+?) $rquote [\s,]
1561       /gx) {
1562         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1563       }
1564     }
1565
1566     for (@parts) {
1567       my $colpiece = $sel_index->{$_} || $_;
1568
1569       # unqualify join-based group_by's. Arcane but possible query
1570       # also horrible horrible hack to alias a column (not a func.)
1571       # (probably need to introduce SQLA syntax)
1572       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1573         my $as = $colpiece;
1574         $as =~ s/\./__/;
1575         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1576       }
1577       push @{$sub_attrs->{select}}, $colpiece;
1578     }
1579   }
1580   else {
1581     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1582     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1583   }
1584
1585   return $rsrc->resultset_class
1586                ->new ($rsrc, $sub_attrs)
1587                 ->as_subselect_rs
1588                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1589                   ->get_column ('count');
1590 }
1591
1592 sub _bool {
1593   return 1;
1594 }
1595
1596 =head2 count_literal
1597
1598 =over 4
1599
1600 =item Arguments: $sql_fragment, @bind_values
1601
1602 =item Return Value: $count
1603
1604 =back
1605
1606 Counts the results in a literal query. Equivalent to calling L</search_literal>
1607 with the passed arguments, then L</count>.
1608
1609 =cut
1610
1611 sub count_literal { shift->search_literal(@_)->count; }
1612
1613 =head2 all
1614
1615 =over 4
1616
1617 =item Arguments: none
1618
1619 =item Return Value: @objects
1620
1621 =back
1622
1623 Returns all elements in the resultset.
1624
1625 =cut
1626
1627 sub all {
1628   my $self = shift;
1629   if(@_) {
1630       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1631   }
1632
1633   return @{ $self->get_cache } if $self->get_cache;
1634
1635   my @obj;
1636
1637   if (keys %{$self->_resolved_attrs->{collapse}}) {
1638     # Using $self->cursor->all is really just an optimisation.
1639     # If we're collapsing has_many prefetches it probably makes
1640     # very little difference, and this is cleaner than hacking
1641     # _construct_object to survive the approach
1642     $self->cursor->reset;
1643     my @row = $self->cursor->next;
1644     while (@row) {
1645       push(@obj, $self->_construct_object(@row));
1646       @row = (exists $self->{stashed_row}
1647                ? @{delete $self->{stashed_row}}
1648                : $self->cursor->next);
1649     }
1650   } else {
1651     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1652   }
1653
1654   $self->set_cache(\@obj) if $self->{attrs}{cache};
1655
1656   return @obj;
1657 }
1658
1659 =head2 reset
1660
1661 =over 4
1662
1663 =item Arguments: none
1664
1665 =item Return Value: $self
1666
1667 =back
1668
1669 Resets the resultset's cursor, so you can iterate through the elements again.
1670 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1671 another query.
1672
1673 =cut
1674
1675 sub reset {
1676   my ($self) = @_;
1677   delete $self->{_attrs} if exists $self->{_attrs};
1678   $self->{all_cache_position} = 0;
1679   $self->cursor->reset;
1680   return $self;
1681 }
1682
1683 =head2 first
1684
1685 =over 4
1686
1687 =item Arguments: none
1688
1689 =item Return Value: $object | undef
1690
1691 =back
1692
1693 Resets the resultset and returns an object for the first result (or C<undef>
1694 if the resultset is empty).
1695
1696 =cut
1697
1698 sub first {
1699   return $_[0]->reset->next;
1700 }
1701
1702
1703 # _rs_update_delete
1704 #
1705 # Determines whether and what type of subquery is required for the $rs operation.
1706 # If grouping is necessary either supplies its own, or verifies the current one
1707 # After all is done delegates to the proper storage method.
1708
1709 sub _rs_update_delete {
1710   my ($self, $op, $values) = @_;
1711
1712   my $rsrc = $self->result_source;
1713
1714   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1715   my $needs_subq = $needs_group_by_subq || $self->_has_resolved_attr(qw/rows offset/);
1716
1717   if ($needs_group_by_subq or $needs_subq) {
1718
1719     # make a new $rs selecting only the PKs (that's all we really need)
1720     my $attrs = $self->_resolved_attrs_copy;
1721
1722
1723     delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_selector_range as/;
1724     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1725
1726     if ($needs_group_by_subq) {
1727       # make sure no group_by was supplied, or if there is one - make sure it matches
1728       # the columns compiled above perfectly. Anything else can not be sanely executed
1729       # on most databases so croak right then and there
1730
1731       if (my $g = $attrs->{group_by}) {
1732         my @current_group_by = map
1733           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1734           @$g
1735         ;
1736
1737         if (
1738           join ("\x00", sort @current_group_by)
1739             ne
1740           join ("\x00", sort @{$attrs->{columns}} )
1741         ) {
1742           $self->throw_exception (
1743             "You have just attempted a $op operation on a resultset which does group_by"
1744             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1745             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1746             . ' kind of queries. Please retry the operation with a modified group_by or'
1747             . ' without using one at all.'
1748           );
1749         }
1750       }
1751       else {
1752         $attrs->{group_by} = $attrs->{columns};
1753       }
1754     }
1755
1756     my $subrs = (ref $self)->new($rsrc, $attrs);
1757     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1758   }
1759   else {
1760     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1761     # a condition containing 'me' or other table prefixes will not work
1762     # at all. What this code tries to do (badly) is to generate a condition
1763     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1764     #
1765     # this is atrocious and should be replaced by normal sqla introspection
1766     # one sunny day
1767     my ($sql, @bind) = do {
1768       my $sqla = $rsrc->storage->sql_maker;
1769       local $sqla->{_dequalify_idents} = 1;
1770       $sqla->_recurse_where($self->{cond});
1771     } if $self->{cond};
1772
1773     return $rsrc->storage->$op(
1774       $rsrc,
1775       $op eq 'update' ? $values : (),
1776       $self->{cond} ? \[$sql, @bind] : (),
1777     );
1778   }
1779 }
1780
1781 =head2 update
1782
1783 =over 4
1784
1785 =item Arguments: \%values
1786
1787 =item Return Value: $storage_rv
1788
1789 =back
1790
1791 Sets the specified columns in the resultset to the supplied values in a
1792 single query. Note that this will not run any accessor/set_column/update
1793 triggers, nor will it update any row object instances derived from this
1794 resultset (this includes the contents of the L<resultset cache|/set_cache>
1795 if any). See L</update_all> if you need to execute any on-update
1796 triggers or cascades defined either by you or a
1797 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1798
1799 The return value is a pass through of what the underlying
1800 storage backend returned, and may vary. See L<DBI/execute> for the most
1801 common case.
1802
1803 =head3 CAVEAT
1804
1805 Note that L</update> does not process/deflate any of the values passed in.
1806 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1807 ensure manually that any value passed to this method will stringify to
1808 something the RDBMS knows how to deal with. A notable example is the
1809 handling of L<DateTime> objects, for more info see:
1810 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
1811
1812 =cut
1813
1814 sub update {
1815   my ($self, $values) = @_;
1816   $self->throw_exception('Values for update must be a hash')
1817     unless ref $values eq 'HASH';
1818
1819   return $self->_rs_update_delete ('update', $values);
1820 }
1821
1822 =head2 update_all
1823
1824 =over 4
1825
1826 =item Arguments: \%values
1827
1828 =item Return Value: 1
1829
1830 =back
1831
1832 Fetches all objects and updates them one at a time via
1833 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1834 triggers, while L</update> will not.
1835
1836 =cut
1837
1838 sub update_all {
1839   my ($self, $values) = @_;
1840   $self->throw_exception('Values for update_all must be a hash')
1841     unless ref $values eq 'HASH';
1842
1843   my $guard = $self->result_source->schema->txn_scope_guard;
1844   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1845   $guard->commit;
1846   return 1;
1847 }
1848
1849 =head2 delete
1850
1851 =over 4
1852
1853 =item Arguments: none
1854
1855 =item Return Value: $storage_rv
1856
1857 =back
1858
1859 Deletes the rows matching this resultset in a single query. Note that this
1860 will not run any delete triggers, nor will it alter the
1861 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1862 derived from this resultset (this includes the contents of the
1863 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1864 execute any on-delete triggers or cascades defined either by you or a
1865 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1866
1867 The return value is a pass through of what the underlying storage backend
1868 returned, and may vary. See L<DBI/execute> for the most common case.
1869
1870 =cut
1871
1872 sub delete {
1873   my $self = shift;
1874   $self->throw_exception('delete does not accept any arguments')
1875     if @_;
1876
1877   return $self->_rs_update_delete ('delete');
1878 }
1879
1880 =head2 delete_all
1881
1882 =over 4
1883
1884 =item Arguments: none
1885
1886 =item Return Value: 1
1887
1888 =back
1889
1890 Fetches all objects and deletes them one at a time via
1891 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1892 triggers, while L</delete> will not.
1893
1894 =cut
1895
1896 sub delete_all {
1897   my $self = shift;
1898   $self->throw_exception('delete_all does not accept any arguments')
1899     if @_;
1900
1901   my $guard = $self->result_source->schema->txn_scope_guard;
1902   $_->delete for $self->all;
1903   $guard->commit;
1904   return 1;
1905 }
1906
1907 =head2 populate
1908
1909 =over 4
1910
1911 =item Arguments: \@data;
1912
1913 =back
1914
1915 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1916 For the arrayref of hashrefs style each hashref should be a structure suitable
1917 for submitting to a $resultset->create(...) method.
1918
1919 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1920 to insert the data, as this is a faster method.
1921
1922 Otherwise, each set of data is inserted into the database using
1923 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1924 accumulated into an array. The array itself, or an array reference
1925 is returned depending on scalar or list context.
1926
1927 Example:  Assuming an Artist Class that has many CDs Classes relating:
1928
1929   my $Artist_rs = $schema->resultset("Artist");
1930
1931   ## Void Context Example
1932   $Artist_rs->populate([
1933      { artistid => 4, name => 'Manufactured Crap', cds => [
1934         { title => 'My First CD', year => 2006 },
1935         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1936       ],
1937      },
1938      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1939         { title => 'My parents sold me to a record company', year => 2005 },
1940         { title => 'Why Am I So Ugly?', year => 2006 },
1941         { title => 'I Got Surgery and am now Popular', year => 2007 }
1942       ],
1943      },
1944   ]);
1945
1946   ## Array Context Example
1947   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1948     { name => "Artist One"},
1949     { name => "Artist Two"},
1950     { name => "Artist Three", cds=> [
1951     { title => "First CD", year => 2007},
1952     { title => "Second CD", year => 2008},
1953   ]}
1954   ]);
1955
1956   print $ArtistOne->name; ## response is 'Artist One'
1957   print $ArtistThree->cds->count ## reponse is '2'
1958
1959 For the arrayref of arrayrefs style,  the first element should be a list of the
1960 fieldsnames to which the remaining elements are rows being inserted.  For
1961 example:
1962
1963   $Arstist_rs->populate([
1964     [qw/artistid name/],
1965     [100, 'A Formally Unknown Singer'],
1966     [101, 'A singer that jumped the shark two albums ago'],
1967     [102, 'An actually cool singer'],
1968   ]);
1969
1970 Please note an important effect on your data when choosing between void and
1971 wantarray context. Since void context goes straight to C<insert_bulk> in
1972 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1973 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1974 create primary keys for you, you will find that your PKs are empty.  In this
1975 case you will have to use the wantarray context in order to create those
1976 values.
1977
1978 =cut
1979
1980 sub populate {
1981   my $self = shift;
1982
1983   # cruft placed in standalone method
1984   my $data = $self->_normalize_populate_args(@_);
1985
1986   return unless @$data;
1987
1988   if(defined wantarray) {
1989     my @created;
1990     foreach my $item (@$data) {
1991       push(@created, $self->create($item));
1992     }
1993     return wantarray ? @created : \@created;
1994   } 
1995   else {
1996     my $first = $data->[0];
1997
1998     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1999     # it relationship data
2000     my (@rels, @columns);
2001     my $rsrc = $self->result_source;
2002     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
2003     for (keys %$first) {
2004       my $ref = ref $first->{$_};
2005       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
2006         ? push @rels, $_
2007         : push @columns, $_
2008       ;
2009     }
2010
2011     my @pks = $rsrc->primary_columns;
2012
2013     ## do the belongs_to relationships
2014     foreach my $index (0..$#$data) {
2015
2016       # delegate to create() for any dataset without primary keys with specified relationships
2017       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2018         for my $r (@rels) {
2019           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2020             my @ret = $self->populate($data);
2021             return;
2022           }
2023         }
2024       }
2025
2026       foreach my $rel (@rels) {
2027         next unless ref $data->[$index]->{$rel} eq "HASH";
2028         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2029         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2030         my $related = $result->result_source->_resolve_condition(
2031           $reverse_relinfo->{cond},
2032           $self,
2033           $result,
2034           $rel,
2035         );
2036
2037         delete $data->[$index]->{$rel};
2038         $data->[$index] = {%{$data->[$index]}, %$related};
2039
2040         push @columns, keys %$related if $index == 0;
2041       }
2042     }
2043
2044     ## inherit the data locked in the conditions of the resultset
2045     my ($rs_data) = $self->_merge_with_rscond({});
2046     delete @{$rs_data}{@columns};
2047     my @inherit_cols = keys %$rs_data;
2048     my @inherit_data = values %$rs_data;
2049
2050     ## do bulk insert on current row
2051     $rsrc->storage->insert_bulk(
2052       $rsrc,
2053       [@columns, @inherit_cols],
2054       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2055     );
2056
2057     ## do the has_many relationships
2058     foreach my $item (@$data) {
2059
2060       my $main_row;
2061
2062       foreach my $rel (@rels) {
2063         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2064
2065         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2066
2067         my $child = $main_row->$rel;
2068
2069         my $related = $child->result_source->_resolve_condition(
2070           $rels->{$rel}{cond},
2071           $child,
2072           $main_row,
2073           $rel,
2074         );
2075
2076         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2077         my @populate = map { {%$_, %$related} } @rows_to_add;
2078
2079         $child->populate( \@populate );
2080       }
2081     }
2082   }
2083 }
2084
2085
2086 # populate() argumnets went over several incarnations
2087 # What we ultimately support is AoH
2088 sub _normalize_populate_args {
2089   my ($self, $arg) = @_;
2090
2091   if (ref $arg eq 'ARRAY') {
2092     if (!@$arg) {
2093       return [];
2094     }
2095     elsif (ref $arg->[0] eq 'HASH') {
2096       return $arg;
2097     }
2098     elsif (ref $arg->[0] eq 'ARRAY') {
2099       my @ret;
2100       my @colnames = @{$arg->[0]};
2101       foreach my $values (@{$arg}[1 .. $#$arg]) {
2102         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2103       }
2104       return \@ret;
2105     }
2106   }
2107
2108   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2109 }
2110
2111 =head2 pager
2112
2113 =over 4
2114
2115 =item Arguments: none
2116
2117 =item Return Value: $pager
2118
2119 =back
2120
2121 Return Value a L<Data::Page> object for the current resultset. Only makes
2122 sense for queries with a C<page> attribute.
2123
2124 To get the full count of entries for a paged resultset, call
2125 C<total_entries> on the L<Data::Page> object.
2126
2127 =cut
2128
2129 # make a wizard good for both a scalar and a hashref
2130 my $mk_lazy_count_wizard = sub {
2131   require Variable::Magic;
2132
2133   my $stash = { total_rs => shift };
2134   my $slot = shift; # only used by the hashref magic
2135
2136   my $magic = Variable::Magic::wizard (
2137     data => sub { $stash },
2138
2139     (!$slot)
2140     ? (
2141       # the scalar magic
2142       get => sub {
2143         # set value lazily, and dispell for good
2144         ${$_[0]} = $_[1]{total_rs}->count;
2145         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
2146         return 1;
2147       },
2148       set => sub {
2149         # an explicit set implies dispell as well
2150         # the unless() is to work around "fun and giggles" below
2151         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
2152           unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2153         return 1;
2154       },
2155     )
2156     : (
2157       # the uvar magic
2158       fetch => sub {
2159         if ($_[2] eq $slot and !$_[1]{inactive}) {
2160           my $cnt = $_[1]{total_rs}->count;
2161           $_[0]->{$slot} = $cnt;
2162
2163           # attempting to dispell in a fetch handle (works in store), seems
2164           # to invariable segfault on 5.10, 5.12, 5.13 :(
2165           # so use an inactivator instead
2166           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2167           $_[1]{inactive}++;
2168         }
2169         return 1;
2170       },
2171       store => sub {
2172         if (! $_[1]{inactive} and $_[2] eq $slot) {
2173           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2174           $_[1]{inactive}++
2175             unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2176         }
2177         return 1;
2178       },
2179     ),
2180   );
2181
2182   $stash->{magic_selfref} = $magic;
2183   weaken ($stash->{magic_selfref}); # this fails on 5.8.1
2184
2185   return $magic;
2186 };
2187
2188 # the tie class for 5.8.1
2189 {
2190   package # hide from pause
2191     DBIx::Class::__DBIC_LAZY_RS_COUNT__;
2192   use base qw/Tie::Hash/;
2193
2194   sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
2195   sub NEXTKEY  { each %{$_[0]{data}} }
2196   sub EXISTS   { exists $_[0]{data}{$_[1]} }
2197   sub DELETE   { delete $_[0]{data}{$_[1]} }
2198   sub CLEAR    { %{$_[0]{data}} = () }
2199   sub SCALAR   { scalar %{$_[0]{data}} }
2200
2201   sub TIEHASH {
2202     $_[1]{data} = {%{$_[1]{selfref}}};
2203     %{$_[1]{selfref}} = ();
2204     Scalar::Util::weaken ($_[1]{selfref});
2205     return bless ($_[1], $_[0]);
2206   };
2207
2208   sub FETCH {
2209     if ($_[1] eq $_[0]{slot}) {
2210       my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
2211       untie %{$_[0]{selfref}};
2212       %{$_[0]{selfref}} = %{$_[0]{data}};
2213       return $cnt;
2214     }
2215     else {
2216       $_[0]{data}{$_[1]};
2217     }
2218   }
2219
2220   sub STORE {
2221     $_[0]{data}{$_[1]} = $_[2];
2222     if ($_[1] eq $_[0]{slot}) {
2223       untie %{$_[0]{selfref}};
2224       %{$_[0]{selfref}} = %{$_[0]{data}};
2225     }
2226     $_[2];
2227   }
2228 }
2229
2230 sub pager {
2231   my ($self) = @_;
2232
2233   return $self->{pager} if $self->{pager};
2234
2235   my $attrs = $self->{attrs};
2236   if (!defined $attrs->{page}) {
2237     $self->throw_exception("Can't create pager for non-paged rs");
2238   }
2239   elsif ($attrs->{page} <= 0) {
2240     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2241   }
2242   $attrs->{rows} ||= 10;
2243
2244   # throw away the paging flags and re-run the count (possibly
2245   # with a subselect) to get the real total count
2246   my $count_attrs = { %$attrs };
2247   delete $count_attrs->{$_} for qw/rows offset page pager/;
2248   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2249
2250
2251 ### the following may seem awkward and dirty, but it's a thought-experiment
2252 ### necessary for future development of DBIx::DS. Do *NOT* change this code
2253 ### before talking to ribasushi/mst
2254
2255   require Data::Page;
2256   my $pager = Data::Page->new(
2257     0,  #start with an empty set
2258     $attrs->{rows},
2259     $self->{attrs}{page},
2260   );
2261
2262   my $data_slot = 'total_entries';
2263
2264   # Since we are interested in a cached value (once it's set - it's set), every
2265   # technique will detach from the magic-host once the time comes to fire the
2266   # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
2267
2268   if ($] < 5.008003) {
2269     # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
2270     # to weakref the magic container :(
2271     # tested on 5.8.1
2272     tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
2273       { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
2274     );
2275   }
2276   elsif ($] < 5.010) {
2277     # We can use magic on the hash value slot. It's interesting that the magic is
2278     # attached to the hash-slot, and does *not* stop working once I do the dummy
2279     # assignments after the cast()
2280     # tested on 5.8.3 and 5.8.9
2281     my $magic = $mk_lazy_count_wizard->($total_rs);
2282     Variable::Magic::cast ( $pager->{$data_slot}, $magic );
2283
2284     # this is for fun and giggles
2285     $pager->{$data_slot} = -1;
2286     $pager->{$data_slot} = 0;
2287
2288     # this does not work for scalars, but works with
2289     # uvar magic below
2290     #my %vals = %$pager;
2291     #%$pager = ();
2292     #%{$pager} = %vals;
2293   }
2294   else {
2295     # And the uvar magic
2296     # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
2297     # however see the wizard maker for more notes
2298     my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
2299     Variable::Magic::cast ( %$pager, $magic );
2300
2301     # still works
2302     $pager->{$data_slot} = -1;
2303     $pager->{$data_slot} = 0;
2304
2305     # this now works
2306     my %vals = %$pager;
2307     %$pager = ();
2308     %{$pager} = %vals;
2309   }
2310
2311   return $self->{pager} = $pager;
2312 }
2313
2314 =head2 page
2315
2316 =over 4
2317
2318 =item Arguments: $page_number
2319
2320 =item Return Value: $rs
2321
2322 =back
2323
2324 Returns a resultset for the $page_number page of the resultset on which page
2325 is called, where each page contains a number of rows equal to the 'rows'
2326 attribute set on the resultset (10 by default).
2327
2328 =cut
2329
2330 sub page {
2331   my ($self, $page) = @_;
2332   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2333 }
2334
2335 =head2 new_result
2336
2337 =over 4
2338
2339 =item Arguments: \%vals
2340
2341 =item Return Value: $rowobject
2342
2343 =back
2344
2345 Creates a new row object in the resultset's result class and returns
2346 it. The row is not inserted into the database at this point, call
2347 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2348 will tell you whether the row object has been inserted or not.
2349
2350 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2351
2352 =cut
2353
2354 sub new_result {
2355   my ($self, $values) = @_;
2356   $self->throw_exception( "new_result needs a hash" )
2357     unless (ref $values eq 'HASH');
2358
2359   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2360
2361   my %new = (
2362     %$merged_cond,
2363     @$cols_from_relations
2364       ? (-cols_from_relations => $cols_from_relations)
2365       : (),
2366     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2367   );
2368
2369   return $self->result_class->new(\%new);
2370 }
2371
2372 # _merge_with_rscond
2373 #
2374 # Takes a simple hash of K/V data and returns its copy merged with the
2375 # condition already present on the resultset. Additionally returns an
2376 # arrayref of value/condition names, which were inferred from related
2377 # objects (this is needed for in-memory related objects)
2378 sub _merge_with_rscond {
2379   my ($self, $data) = @_;
2380
2381   my (%new_data, @cols_from_relations);
2382
2383   my $alias = $self->{attrs}{alias};
2384
2385   if (! defined $self->{cond}) {
2386     # just massage $data below
2387   }
2388   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2389     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2390     @cols_from_relations = keys %new_data;
2391   }
2392   elsif (ref $self->{cond} ne 'HASH') {
2393     $self->throw_exception(
2394       "Can't abstract implicit construct, resultset condition not a hash"
2395     );
2396   }
2397   else {
2398     # precendence must be given to passed values over values inherited from
2399     # the cond, so the order here is important.
2400     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2401     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2402
2403     while ( my($col, $value) = each %implied ) {
2404       my $vref = ref $value;
2405       if (
2406         $vref eq 'HASH'
2407           and
2408         keys(%$value) == 1
2409           and
2410         (keys %$value)[0] eq '='
2411       ) {
2412         $new_data{$col} = $value->{'='};
2413       }
2414       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2415         $new_data{$col} = $value;
2416       }
2417     }
2418   }
2419
2420   %new_data = (
2421     %new_data,
2422     %{ $self->_remove_alias($data, $alias) },
2423   );
2424
2425   return (\%new_data, \@cols_from_relations);
2426 }
2427
2428 # _has_resolved_attr
2429 #
2430 # determines if the resultset defines at least one
2431 # of the attributes supplied
2432 #
2433 # used to determine if a subquery is neccessary
2434 #
2435 # supports some virtual attributes:
2436 #   -join
2437 #     This will scan for any joins being present on the resultset.
2438 #     It is not a mere key-search but a deep inspection of {from}
2439 #
2440
2441 sub _has_resolved_attr {
2442   my ($self, @attr_names) = @_;
2443
2444   my $attrs = $self->_resolved_attrs;
2445
2446   my %extra_checks;
2447
2448   for my $n (@attr_names) {
2449     if (grep { $n eq $_ } (qw/-join/) ) {
2450       $extra_checks{$n}++;
2451       next;
2452     }
2453
2454     my $attr =  $attrs->{$n};
2455
2456     next if not defined $attr;
2457
2458     if (ref $attr eq 'HASH') {
2459       return 1 if keys %$attr;
2460     }
2461     elsif (ref $attr eq 'ARRAY') {
2462       return 1 if @$attr;
2463     }
2464     else {
2465       return 1 if $attr;
2466     }
2467   }
2468
2469   # a resolved join is expressed as a multi-level from
2470   return 1 if (
2471     $extra_checks{-join}
2472       and
2473     ref $attrs->{from} eq 'ARRAY'
2474       and
2475     @{$attrs->{from}} > 1
2476   );
2477
2478   return 0;
2479 }
2480
2481 # _collapse_cond
2482 #
2483 # Recursively collapse the condition.
2484
2485 sub _collapse_cond {
2486   my ($self, $cond, $collapsed) = @_;
2487
2488   $collapsed ||= {};
2489
2490   if (ref $cond eq 'ARRAY') {
2491     foreach my $subcond (@$cond) {
2492       next unless ref $subcond;  # -or
2493       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2494     }
2495   }
2496   elsif (ref $cond eq 'HASH') {
2497     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2498       foreach my $subcond (@{$cond->{-and}}) {
2499         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2500       }
2501     }
2502     else {
2503       foreach my $col (keys %$cond) {
2504         my $value = $cond->{$col};
2505         $collapsed->{$col} = $value;
2506       }
2507     }
2508   }
2509
2510   return $collapsed;
2511 }
2512
2513 # _remove_alias
2514 #
2515 # Remove the specified alias from the specified query hash. A copy is made so
2516 # the original query is not modified.
2517
2518 sub _remove_alias {
2519   my ($self, $query, $alias) = @_;
2520
2521   my %orig = %{ $query || {} };
2522   my %unaliased;
2523
2524   foreach my $key (keys %orig) {
2525     if ($key !~ /\./) {
2526       $unaliased{$key} = $orig{$key};
2527       next;
2528     }
2529     $unaliased{$1} = $orig{$key}
2530       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2531   }
2532
2533   return \%unaliased;
2534 }
2535
2536 =head2 as_query
2537
2538 =over 4
2539
2540 =item Arguments: none
2541
2542 =item Return Value: \[ $sql, @bind ]
2543
2544 =back
2545
2546 Returns the SQL query and bind vars associated with the invocant.
2547
2548 This is generally used as the RHS for a subquery.
2549
2550 =cut
2551
2552 sub as_query {
2553   my $self = shift;
2554
2555   my $attrs = $self->_resolved_attrs_copy;
2556
2557   # For future use:
2558   #
2559   # in list ctx:
2560   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2561   # $sql also has no wrapping parenthesis in list ctx
2562   #
2563   my $sqlbind = $self->result_source->storage
2564     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2565
2566   return $sqlbind;
2567 }
2568
2569 =head2 find_or_new
2570
2571 =over 4
2572
2573 =item Arguments: \%vals, \%attrs?
2574
2575 =item Return Value: $rowobject
2576
2577 =back
2578
2579   my $artist = $schema->resultset('Artist')->find_or_new(
2580     { artist => 'fred' }, { key => 'artists' });
2581
2582   $cd->cd_to_producer->find_or_new({ producer => $producer },
2583                                    { key => 'primary });
2584
2585 Find an existing record from this resultset using L</find>. if none exists,
2586 instantiate a new result object and return it. The object will not be saved
2587 into your storage until you call L<DBIx::Class::Row/insert> on it.
2588
2589 You most likely want this method when looking for existing rows using a unique
2590 constraint that is not the primary key, or looking for related rows.
2591
2592 If you want objects to be saved immediately, use L</find_or_create> instead.
2593
2594 B<Note>: Make sure to read the documentation of L</find> and understand the
2595 significance of the C<key> attribute, as its lack may skew your search, and
2596 subsequently result in spurious new objects.
2597
2598 B<Note>: Take care when using C<find_or_new> with a table having
2599 columns with default values that you intend to be automatically
2600 supplied by the database (e.g. an auto_increment primary key column).
2601 In normal usage, the value of such columns should NOT be included at
2602 all in the call to C<find_or_new>, even when set to C<undef>.
2603
2604 =cut
2605
2606 sub find_or_new {
2607   my $self     = shift;
2608   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2609   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2610   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2611     return $row;
2612   }
2613   return $self->new_result($hash);
2614 }
2615
2616 =head2 create
2617
2618 =over 4
2619
2620 =item Arguments: \%vals
2621
2622 =item Return Value: a L<DBIx::Class::Row> $object
2623
2624 =back
2625
2626 Attempt to create a single new row or a row with multiple related rows
2627 in the table represented by the resultset (and related tables). This
2628 will not check for duplicate rows before inserting, use
2629 L</find_or_create> to do that.
2630
2631 To create one row for this resultset, pass a hashref of key/value
2632 pairs representing the columns of the table and the values you wish to
2633 store. If the appropriate relationships are set up, foreign key fields
2634 can also be passed an object representing the foreign row, and the
2635 value will be set to its primary key.
2636
2637 To create related objects, pass a hashref of related-object column values
2638 B<keyed on the relationship name>. If the relationship is of type C<multi>
2639 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2640 The process will correctly identify columns holding foreign keys, and will
2641 transparently populate them from the keys of the corresponding relation.
2642 This can be applied recursively, and will work correctly for a structure
2643 with an arbitrary depth and width, as long as the relationships actually
2644 exists and the correct column data has been supplied.
2645
2646
2647 Instead of hashrefs of plain related data (key/value pairs), you may
2648 also pass new or inserted objects. New objects (not inserted yet, see
2649 L</new>), will be inserted into their appropriate tables.
2650
2651 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2652
2653 Example of creating a new row.
2654
2655   $person_rs->create({
2656     name=>"Some Person",
2657     email=>"somebody@someplace.com"
2658   });
2659
2660 Example of creating a new row and also creating rows in a related C<has_many>
2661 or C<has_one> resultset.  Note Arrayref.
2662
2663   $artist_rs->create(
2664      { artistid => 4, name => 'Manufactured Crap', cds => [
2665         { title => 'My First CD', year => 2006 },
2666         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2667       ],
2668      },
2669   );
2670
2671 Example of creating a new row and also creating a row in a related
2672 C<belongs_to> resultset. Note Hashref.
2673
2674   $cd_rs->create({
2675     title=>"Music for Silly Walks",
2676     year=>2000,
2677     artist => {
2678       name=>"Silly Musician",
2679     }
2680   });
2681
2682 =over
2683
2684 =item WARNING
2685
2686 When subclassing ResultSet never attempt to override this method. Since
2687 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2688 lot of the internals simply never call it, so your override will be
2689 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2690 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2691 L</create> process you need to intervene.
2692
2693 =back
2694
2695 =cut
2696
2697 sub create {
2698   my ($self, $attrs) = @_;
2699   $self->throw_exception( "create needs a hashref" )
2700     unless ref $attrs eq 'HASH';
2701   return $self->new_result($attrs)->insert;
2702 }
2703
2704 =head2 find_or_create
2705
2706 =over 4
2707
2708 =item Arguments: \%vals, \%attrs?
2709
2710 =item Return Value: $rowobject
2711
2712 =back
2713
2714   $cd->cd_to_producer->find_or_create({ producer => $producer },
2715                                       { key => 'primary' });
2716
2717 Tries to find a record based on its primary key or unique constraints; if none
2718 is found, creates one and returns that instead.
2719
2720   my $cd = $schema->resultset('CD')->find_or_create({
2721     cdid   => 5,
2722     artist => 'Massive Attack',
2723     title  => 'Mezzanine',
2724     year   => 2005,
2725   });
2726
2727 Also takes an optional C<key> attribute, to search by a specific key or unique
2728 constraint. For example:
2729
2730   my $cd = $schema->resultset('CD')->find_or_create(
2731     {
2732       artist => 'Massive Attack',
2733       title  => 'Mezzanine',
2734     },
2735     { key => 'cd_artist_title' }
2736   );
2737
2738 B<Note>: Make sure to read the documentation of L</find> and understand the
2739 significance of the C<key> attribute, as its lack may skew your search, and
2740 subsequently result in spurious row creation.
2741
2742 B<Note>: Because find_or_create() reads from the database and then
2743 possibly inserts based on the result, this method is subject to a race
2744 condition. Another process could create a record in the table after
2745 the find has completed and before the create has started. To avoid
2746 this problem, use find_or_create() inside a transaction.
2747
2748 B<Note>: Take care when using C<find_or_create> with a table having
2749 columns with default values that you intend to be automatically
2750 supplied by the database (e.g. an auto_increment primary key column).
2751 In normal usage, the value of such columns should NOT be included at
2752 all in the call to C<find_or_create>, even when set to C<undef>.
2753
2754 See also L</find> and L</update_or_create>. For information on how to declare
2755 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2756
2757 =cut
2758
2759 sub find_or_create {
2760   my $self     = shift;
2761   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2762   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2763   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2764     return $row;
2765   }
2766   return $self->create($hash);
2767 }
2768
2769 =head2 update_or_create
2770
2771 =over 4
2772
2773 =item Arguments: \%col_values, { key => $unique_constraint }?
2774
2775 =item Return Value: $row_object
2776
2777 =back
2778
2779   $resultset->update_or_create({ col => $val, ... });
2780
2781 Like L</find_or_create>, but if a row is found it is immediately updated via
2782 C<< $found_row->update (\%col_values) >>.
2783
2784
2785 Takes an optional C<key> attribute to search on a specific unique constraint.
2786 For example:
2787
2788   # In your application
2789   my $cd = $schema->resultset('CD')->update_or_create(
2790     {
2791       artist => 'Massive Attack',
2792       title  => 'Mezzanine',
2793       year   => 1998,
2794     },
2795     { key => 'cd_artist_title' }
2796   );
2797
2798   $cd->cd_to_producer->update_or_create({
2799     producer => $producer,
2800     name => 'harry',
2801   }, {
2802     key => 'primary',
2803   });
2804
2805 B<Note>: Make sure to read the documentation of L</find> and understand the
2806 significance of the C<key> attribute, as its lack may skew your search, and
2807 subsequently result in spurious row creation.
2808
2809 B<Note>: Take care when using C<update_or_create> with a table having
2810 columns with default values that you intend to be automatically
2811 supplied by the database (e.g. an auto_increment primary key column).
2812 In normal usage, the value of such columns should NOT be included at
2813 all in the call to C<update_or_create>, even when set to C<undef>.
2814
2815 See also L</find> and L</find_or_create>. For information on how to declare
2816 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2817
2818 =cut
2819
2820 sub update_or_create {
2821   my $self = shift;
2822   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2823   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2824
2825   my $row = $self->find($cond, $attrs);
2826   if (defined $row) {
2827     $row->update($cond);
2828     return $row;
2829   }
2830
2831   return $self->create($cond);
2832 }
2833
2834 =head2 update_or_new
2835
2836 =over 4
2837
2838 =item Arguments: \%col_values, { key => $unique_constraint }?
2839
2840 =item Return Value: $rowobject
2841
2842 =back
2843
2844   $resultset->update_or_new({ col => $val, ... });
2845
2846 Like L</find_or_new> but if a row is found it is immediately updated via
2847 C<< $found_row->update (\%col_values) >>.
2848
2849 For example:
2850
2851   # In your application
2852   my $cd = $schema->resultset('CD')->update_or_new(
2853     {
2854       artist => 'Massive Attack',
2855       title  => 'Mezzanine',
2856       year   => 1998,
2857     },
2858     { key => 'cd_artist_title' }
2859   );
2860
2861   if ($cd->in_storage) {
2862       # the cd was updated
2863   }
2864   else {
2865       # the cd is not yet in the database, let's insert it
2866       $cd->insert;
2867   }
2868
2869 B<Note>: Make sure to read the documentation of L</find> and understand the
2870 significance of the C<key> attribute, as its lack may skew your search, and
2871 subsequently result in spurious new objects.
2872
2873 B<Note>: Take care when using C<update_or_new> with a table having
2874 columns with default values that you intend to be automatically
2875 supplied by the database (e.g. an auto_increment primary key column).
2876 In normal usage, the value of such columns should NOT be included at
2877 all in the call to C<update_or_new>, even when set to C<undef>.
2878
2879 See also L</find>, L</find_or_create> and L</find_or_new>. 
2880
2881 =cut
2882
2883 sub update_or_new {
2884     my $self  = shift;
2885     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2886     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2887
2888     my $row = $self->find( $cond, $attrs );
2889     if ( defined $row ) {
2890         $row->update($cond);
2891         return $row;
2892     }
2893
2894     return $self->new_result($cond);
2895 }
2896
2897 =head2 get_cache
2898
2899 =over 4
2900
2901 =item Arguments: none
2902
2903 =item Return Value: \@cache_objects | undef
2904
2905 =back
2906
2907 Gets the contents of the cache for the resultset, if the cache is set.
2908
2909 The cache is populated either by using the L</prefetch> attribute to
2910 L</search> or by calling L</set_cache>.
2911
2912 =cut
2913
2914 sub get_cache {
2915   shift->{all_cache};
2916 }
2917
2918 =head2 set_cache
2919
2920 =over 4
2921
2922 =item Arguments: \@cache_objects
2923
2924 =item Return Value: \@cache_objects
2925
2926 =back
2927
2928 Sets the contents of the cache for the resultset. Expects an arrayref
2929 of objects of the same class as those produced by the resultset. Note that
2930 if the cache is set the resultset will return the cached objects rather
2931 than re-querying the database even if the cache attr is not set.
2932
2933 The contents of the cache can also be populated by using the
2934 L</prefetch> attribute to L</search>.
2935
2936 =cut
2937
2938 sub set_cache {
2939   my ( $self, $data ) = @_;
2940   $self->throw_exception("set_cache requires an arrayref")
2941       if defined($data) && (ref $data ne 'ARRAY');
2942   $self->{all_cache} = $data;
2943 }
2944
2945 =head2 clear_cache
2946
2947 =over 4
2948
2949 =item Arguments: none
2950
2951 =item Return Value: undef
2952
2953 =back
2954
2955 Clears the cache for the resultset.
2956
2957 =cut
2958
2959 sub clear_cache {
2960   shift->set_cache(undef);
2961 }
2962
2963 =head2 is_paged
2964
2965 =over 4
2966
2967 =item Arguments: none
2968
2969 =item Return Value: true, if the resultset has been paginated
2970
2971 =back
2972
2973 =cut
2974
2975 sub is_paged {
2976   my ($self) = @_;
2977   return !!$self->{attrs}{page};
2978 }
2979
2980 =head2 is_ordered
2981
2982 =over 4
2983
2984 =item Arguments: none
2985
2986 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2987
2988 =back
2989
2990 =cut
2991
2992 sub is_ordered {
2993   my ($self) = @_;
2994   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2995 }
2996
2997 =head2 related_resultset
2998
2999 =over 4
3000
3001 =item Arguments: $relationship_name
3002
3003 =item Return Value: $resultset
3004
3005 =back
3006
3007 Returns a related resultset for the supplied relationship name.
3008
3009   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
3010
3011 =cut
3012
3013 sub related_resultset {
3014   my ($self, $rel) = @_;
3015
3016   $self->{related_resultsets} ||= {};
3017   return $self->{related_resultsets}{$rel} ||= do {
3018     my $rsrc = $self->result_source;
3019     my $rel_info = $rsrc->relationship_info($rel);
3020
3021     $self->throw_exception(
3022       "search_related: result source '" . $rsrc->source_name .
3023         "' has no such relationship $rel")
3024       unless $rel_info;
3025
3026     my $attrs = $self->_chain_relationship($rel);
3027
3028     my $join_count = $attrs->{seen_join}{$rel};
3029
3030     my $alias = $self->result_source->storage
3031         ->relname_to_table_alias($rel, $join_count);
3032
3033     # since this is search_related, and we already slid the select window inwards
3034     # (the select/as attrs were deleted in the beginning), we need to flip all
3035     # left joins to inner, so we get the expected results
3036     # read the comment on top of the actual function to see what this does
3037     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
3038
3039
3040     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
3041     delete @{$attrs}{qw(result_class alias)};
3042
3043     my $new_cache;
3044
3045     if (my $cache = $self->get_cache) {
3046       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
3047         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
3048                         @$cache ];
3049       }
3050     }
3051
3052     my $rel_source = $rsrc->related_source($rel);
3053
3054     my $new = do {
3055
3056       # The reason we do this now instead of passing the alias to the
3057       # search_rs below is that if you wrap/overload resultset on the
3058       # source you need to know what alias it's -going- to have for things
3059       # to work sanely (e.g. RestrictWithObject wants to be able to add
3060       # extra query restrictions, and these may need to be $alias.)
3061
3062       my $rel_attrs = $rel_source->resultset_attributes;
3063       local $rel_attrs->{alias} = $alias;
3064
3065       $rel_source->resultset
3066                  ->search_rs(
3067                      undef, {
3068                        %$attrs,
3069                        where => $attrs->{where},
3070                    });
3071     };
3072     $new->set_cache($new_cache) if $new_cache;
3073     $new;
3074   };
3075 }
3076
3077 =head2 current_source_alias
3078
3079 =over 4
3080
3081 =item Arguments: none
3082
3083 =item Return Value: $source_alias
3084
3085 =back
3086
3087 Returns the current table alias for the result source this resultset is built
3088 on, that will be used in the SQL query. Usually it is C<me>.
3089
3090 Currently the source alias that refers to the result set returned by a
3091 L</search>/L</find> family method depends on how you got to the resultset: it's
3092 C<me> by default, but eg. L</search_related> aliases it to the related result
3093 source name (and keeps C<me> referring to the original result set). The long
3094 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3095 (and make this method unnecessary).
3096
3097 Thus it's currently necessary to use this method in predefined queries (see
3098 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3099 source alias of the current result set:
3100
3101   # in a result set class
3102   sub modified_by {
3103     my ($self, $user) = @_;
3104
3105     my $me = $self->current_source_alias;
3106
3107     return $self->search(
3108       "$me.modified" => $user->id,
3109     );
3110   }
3111
3112 =cut
3113
3114 sub current_source_alias {
3115   my ($self) = @_;
3116
3117   return ($self->{attrs} || {})->{alias} || 'me';
3118 }
3119
3120 =head2 as_subselect_rs
3121
3122 =over 4
3123
3124 =item Arguments: none
3125
3126 =item Return Value: $resultset
3127
3128 =back
3129
3130 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3131 "virtual view" by including it as a subquery within the from clause.  From this
3132 point on, any joined tables are inaccessible to ->search on the resultset (as if
3133 it were simply where-filtered without joins).  For example:
3134
3135  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3136
3137  # 'x' now pollutes the query namespace
3138
3139  # So the following works as expected
3140  my $ok_rs = $rs->search({'x.other' => 1});
3141
3142  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3143  # def) we look for one row with contradictory terms and join in another table
3144  # (aliased 'x_2') which we never use
3145  my $broken_rs = $rs->search({'x.name' => 'def'});
3146
3147  my $rs2 = $rs->as_subselect_rs;
3148
3149  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3150  my $not_joined_rs = $rs2->search({'x.other' => 1});
3151
3152  # works as expected: finds a 'table' row related to two x rows (abc and def)
3153  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3154
3155 Another example of when one might use this would be to select a subset of
3156 columns in a group by clause:
3157
3158  my $rs = $schema->resultset('Bar')->search(undef, {
3159    group_by => [qw{ id foo_id baz_id }],
3160  })->as_subselect_rs->search(undef, {
3161    columns => [qw{ id foo_id }]
3162  });
3163
3164 In the above example normally columns would have to be equal to the group by,
3165 but because we isolated the group by into a subselect the above works.
3166
3167 =cut
3168
3169 sub as_subselect_rs {
3170   my $self = shift;
3171
3172   my $attrs = $self->_resolved_attrs;
3173
3174   my $fresh_rs = (ref $self)->new (
3175     $self->result_source
3176   );
3177
3178   # these pieces will be locked in the subquery
3179   delete $fresh_rs->{cond};
3180   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3181
3182   return $fresh_rs->search( {}, {
3183     from => [{
3184       $attrs->{alias} => $self->as_query,
3185       -alias  => $attrs->{alias},
3186       -rsrc   => $self->result_source,
3187     }],
3188     alias => $attrs->{alias},
3189   });
3190 }
3191
3192 # This code is called by search_related, and makes sure there
3193 # is clear separation between the joins before, during, and
3194 # after the relationship. This information is needed later
3195 # in order to properly resolve prefetch aliases (any alias
3196 # with a relation_chain_depth less than the depth of the
3197 # current prefetch is not considered)
3198 #
3199 # The increments happen twice per join. An even number means a
3200 # relationship specified via a search_related, whereas an odd
3201 # number indicates a join/prefetch added via attributes
3202 #
3203 # Also this code will wrap the current resultset (the one we
3204 # chain to) in a subselect IFF it contains limiting attributes
3205 sub _chain_relationship {
3206   my ($self, $rel) = @_;
3207   my $source = $self->result_source;
3208   my $attrs = { %{$self->{attrs}||{}} };
3209
3210   # we need to take the prefetch the attrs into account before we
3211   # ->_resolve_join as otherwise they get lost - captainL
3212   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3213
3214   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3215
3216   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3217
3218   my $from;
3219   my @force_subq_attrs = qw/offset rows group_by having/;
3220
3221   if (
3222     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3223       ||
3224     $self->_has_resolved_attr (@force_subq_attrs)
3225   ) {
3226     # Nuke the prefetch (if any) before the new $rs attrs
3227     # are resolved (prefetch is useless - we are wrapping
3228     # a subquery anyway).
3229     my $rs_copy = $self->search;
3230     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3231       $rs_copy->{attrs}{join},
3232       delete $rs_copy->{attrs}{prefetch},
3233     );
3234
3235     $from = [{
3236       -rsrc   => $source,
3237       -alias  => $attrs->{alias},
3238       $attrs->{alias} => $rs_copy->as_query,
3239     }];
3240     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3241     $seen->{-relation_chain_depth} = 0;
3242   }
3243   elsif ($attrs->{from}) {  #shallow copy suffices
3244     $from = [ @{$attrs->{from}} ];
3245   }
3246   else {
3247     $from = [{
3248       -rsrc  => $source,
3249       -alias => $attrs->{alias},
3250       $attrs->{alias} => $source->from,
3251     }];
3252   }
3253
3254   my $jpath = ($seen->{-relation_chain_depth})
3255     ? $from->[-1][0]{-join_path}
3256     : [];
3257
3258   my @requested_joins = $source->_resolve_join(
3259     $join,
3260     $attrs->{alias},
3261     $seen,
3262     $jpath,
3263   );
3264
3265   push @$from, @requested_joins;
3266
3267   $seen->{-relation_chain_depth}++;
3268
3269   # if $self already had a join/prefetch specified on it, the requested
3270   # $rel might very well be already included. What we do in this case
3271   # is effectively a no-op (except that we bump up the chain_depth on
3272   # the join in question so we could tell it *is* the search_related)
3273   my $already_joined;
3274
3275   # we consider the last one thus reverse
3276   for my $j (reverse @requested_joins) {
3277     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3278     if ($rel eq $last_j) {
3279       $j->[0]{-relation_chain_depth}++;
3280       $already_joined++;
3281       last;
3282     }
3283   }
3284
3285   unless ($already_joined) {
3286     push @$from, $source->_resolve_join(
3287       $rel,
3288       $attrs->{alias},
3289       $seen,
3290       $jpath,
3291     );
3292   }
3293
3294   $seen->{-relation_chain_depth}++;
3295
3296   return {%$attrs, from => $from, seen_join => $seen};
3297 }
3298
3299 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3300 sub _resolved_attrs_copy {
3301   my $self = shift;
3302   return { %{$self->_resolved_attrs (@_)} };
3303 }
3304
3305 sub _resolved_attrs {
3306   my $self = shift;
3307   return $self->{_attrs} if $self->{_attrs};
3308
3309   my $attrs  = { %{ $self->{attrs} || {} } };
3310   my $source = $self->result_source;
3311   my $alias  = $attrs->{alias};
3312
3313   # default selection list
3314   $attrs->{columns} = [ $source->columns ]
3315     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3316
3317   # merge selectors together
3318   for (qw/columns select as/) {
3319     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3320       if $attrs->{$_} or $attrs->{"+$_"};
3321   }
3322
3323   # disassemble columns
3324   my (@sel, @as);
3325   if (my $cols = delete $attrs->{columns}) {
3326     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3327       if (ref $c eq 'HASH') {
3328         for my $as (keys %$c) {
3329           push @sel, $c->{$as};
3330           push @as, $as;
3331         }
3332       }
3333       else {
3334         push @sel, $c;
3335         push @as, $c;
3336       }
3337     }
3338   }
3339
3340   # when trying to weed off duplicates later do not go past this point -
3341   # everything added from here on is unbalanced "anyone's guess" stuff
3342   my $dedup_stop_idx = $#as;
3343
3344   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3345     if $attrs->{as};
3346   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3347     if $attrs->{select};
3348
3349   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3350   for (@sel) {
3351     $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_";
3352   }
3353
3354   # disqualify all $alias.col as-bits (collapser mandated)
3355   for (@as) {
3356     $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_;
3357   }
3358
3359   # de-duplicate the result (remove *identical* select/as pairs)
3360   # and also die on duplicate {as} pointing to different {select}s
3361   # not using a c-style for as the condition is prone to shrinkage
3362   my $seen;
3363   my $i = 0;
3364   while ($i <= $dedup_stop_idx) {
3365     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3366       splice @sel, $i, 1;
3367       splice @as, $i, 1;
3368       $dedup_stop_idx--;
3369     }
3370     elsif ($seen->{$as[$i]}++) {
3371       $self->throw_exception(
3372         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3373       );
3374     }
3375     else {
3376       $i++;
3377     }
3378   }
3379
3380   $attrs->{select} = \@sel;
3381   $attrs->{as} = \@as;
3382
3383   $attrs->{from} ||= [{
3384     -rsrc   => $source,
3385     -alias  => $self->{attrs}{alias},
3386     $self->{attrs}{alias} => $source->from,
3387   }];
3388
3389   if ( $attrs->{join} || $attrs->{prefetch} ) {
3390
3391     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3392       if ref $attrs->{from} ne 'ARRAY';
3393
3394     my $join = (delete $attrs->{join}) || {};
3395
3396     if ( defined $attrs->{prefetch} ) {
3397       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3398     }
3399
3400     $attrs->{from} =    # have to copy here to avoid corrupting the original
3401       [
3402         @{ $attrs->{from} },
3403         $source->_resolve_join(
3404           $join,
3405           $alias,
3406           { %{ $attrs->{seen_join} || {} } },
3407           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3408             ? $attrs->{from}[-1][0]{-join_path}
3409             : []
3410           ,
3411         )
3412       ];
3413   }
3414
3415   if ( defined $attrs->{order_by} ) {
3416     $attrs->{order_by} = (
3417       ref( $attrs->{order_by} ) eq 'ARRAY'
3418       ? [ @{ $attrs->{order_by} } ]
3419       : [ $attrs->{order_by} || () ]
3420     );
3421   }
3422
3423   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3424     $attrs->{group_by} = [ $attrs->{group_by} ];
3425   }
3426
3427   # generate the distinct induced group_by early, as prefetch will be carried via a
3428   # subquery (since a group_by is present)
3429   if (delete $attrs->{distinct}) {
3430     if ($attrs->{group_by}) {
3431       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3432     }
3433     else {
3434       # distinct affects only the main selection part, not what prefetch may
3435       # add below.
3436       $attrs->{group_by} = $source->storage->_group_over_selection (
3437         $attrs->{from},
3438         $attrs->{select},
3439         $attrs->{order_by},
3440       );
3441     }
3442   }
3443
3444   $attrs->{collapse} ||= {};
3445   if ($attrs->{prefetch}) {
3446
3447     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3448       if $attrs->{_dark_selector};
3449
3450     my $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} );
3451
3452     my $prefetch_ordering = [];
3453
3454     # this is a separate structure (we don't look in {from} directly)
3455     # as the resolver needs to shift things off the lists to work
3456     # properly (identical-prefetches on different branches)
3457     my $join_map = {};
3458     if (ref $attrs->{from} eq 'ARRAY') {
3459
3460       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3461
3462       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3463         next unless $j->[0]{-alias};
3464         next unless $j->[0]{-join_path};
3465         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3466
3467         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3468
3469         my $p = $join_map;
3470         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3471         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3472       }
3473     }
3474
3475     my @prefetch =
3476       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3477
3478     # we need to somehow mark which columns came from prefetch
3479     if (@prefetch) {
3480       my $sel_end = $#{$attrs->{select}};
3481       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3482     }
3483
3484     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3485     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3486
3487     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3488     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3489   }
3490
3491
3492   # if both page and offset are specified, produce a combined offset
3493   # even though it doesn't make much sense, this is what pre 081xx has
3494   # been doing
3495   if (my $page = delete $attrs->{page}) {
3496     $attrs->{offset} =
3497       ($attrs->{rows} * ($page - 1))
3498             +
3499       ($attrs->{offset} || 0)
3500     ;
3501   }
3502
3503   return $self->{_attrs} = $attrs;
3504 }
3505
3506 sub _rollout_attr {
3507   my ($self, $attr) = @_;
3508
3509   if (ref $attr eq 'HASH') {
3510     return $self->_rollout_hash($attr);
3511   } elsif (ref $attr eq 'ARRAY') {
3512     return $self->_rollout_array($attr);
3513   } else {
3514     return [$attr];
3515   }
3516 }
3517
3518 sub _rollout_array {
3519   my ($self, $attr) = @_;
3520
3521   my @rolled_array;
3522   foreach my $element (@{$attr}) {
3523     if (ref $element eq 'HASH') {
3524       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3525     } elsif (ref $element eq 'ARRAY') {
3526       #  XXX - should probably recurse here
3527       push( @rolled_array, @{$self->_rollout_array($element)} );
3528     } else {
3529       push( @rolled_array, $element );
3530     }
3531   }
3532   return \@rolled_array;
3533 }
3534
3535 sub _rollout_hash {
3536   my ($self, $attr) = @_;
3537
3538   my @rolled_array;
3539   foreach my $key (keys %{$attr}) {
3540     push( @rolled_array, { $key => $attr->{$key} } );
3541   }
3542   return \@rolled_array;
3543 }
3544
3545 sub _calculate_score {
3546   my ($self, $a, $b) = @_;
3547
3548   if (defined $a xor defined $b) {
3549     return 0;
3550   }
3551   elsif (not defined $a) {
3552     return 1;
3553   }
3554
3555   if (ref $b eq 'HASH') {
3556     my ($b_key) = keys %{$b};
3557     if (ref $a eq 'HASH') {
3558       my ($a_key) = keys %{$a};
3559       if ($a_key eq $b_key) {
3560         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3561       } else {
3562         return 0;
3563       }
3564     } else {
3565       return ($a eq $b_key) ? 1 : 0;
3566     }
3567   } else {
3568     if (ref $a eq 'HASH') {
3569       my ($a_key) = keys %{$a};
3570       return ($b eq $a_key) ? 1 : 0;
3571     } else {
3572       return ($b eq $a) ? 1 : 0;
3573     }
3574   }
3575 }
3576
3577 sub _merge_joinpref_attr {
3578   my ($self, $orig, $import) = @_;
3579
3580   return $import unless defined($orig);
3581   return $orig unless defined($import);
3582
3583   $orig = $self->_rollout_attr($orig);
3584   $import = $self->_rollout_attr($import);
3585
3586   my $seen_keys;
3587   foreach my $import_element ( @{$import} ) {
3588     # find best candidate from $orig to merge $b_element into
3589     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3590     foreach my $orig_element ( @{$orig} ) {
3591       my $score = $self->_calculate_score( $orig_element, $import_element );
3592       if ($score > $best_candidate->{score}) {
3593         $best_candidate->{position} = $position;
3594         $best_candidate->{score} = $score;
3595       }
3596       $position++;
3597     }
3598     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3599
3600     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3601       push( @{$orig}, $import_element );
3602     } else {
3603       my $orig_best = $orig->[$best_candidate->{position}];
3604       # merge orig_best and b_element together and replace original with merged
3605       if (ref $orig_best ne 'HASH') {
3606         $orig->[$best_candidate->{position}] = $import_element;
3607       } elsif (ref $import_element eq 'HASH') {
3608         my ($key) = keys %{$orig_best};
3609         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3610       }
3611     }
3612     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3613   }
3614
3615   return $orig;
3616 }
3617
3618 {
3619   my $hm;
3620
3621   sub _merge_attr {
3622     $hm ||= do {
3623       require Hash::Merge;
3624       my $hm = Hash::Merge->new;
3625
3626       $hm->specify_behavior({
3627         SCALAR => {
3628           SCALAR => sub {
3629             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3630
3631             if ($defl xor $defr) {
3632               return [ $defl ? $_[0] : $_[1] ];
3633             }
3634             elsif (! $defl) {
3635               return [];
3636             }
3637             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3638               return [ $_[0] ];
3639             }
3640             else {
3641               return [$_[0], $_[1]];
3642             }
3643           },
3644           ARRAY => sub {
3645             return $_[1] if !defined $_[0];
3646             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3647             return [$_[0], @{$_[1]}]
3648           },
3649           HASH  => sub {
3650             return [] if !defined $_[0] and !keys %{$_[1]};
3651             return [ $_[1] ] if !defined $_[0];
3652             return [ $_[0] ] if !keys %{$_[1]};
3653             return [$_[0], $_[1]]
3654           },
3655         },
3656         ARRAY => {
3657           SCALAR => sub {
3658             return $_[0] if !defined $_[1];
3659             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3660             return [@{$_[0]}, $_[1]]
3661           },
3662           ARRAY => sub {
3663             my @ret = @{$_[0]} or return $_[1];
3664             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3665             my %idx = map { $_ => 1 } @ret;
3666             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3667             \@ret;
3668           },
3669           HASH => sub {
3670             return [ $_[1] ] if ! @{$_[0]};
3671             return $_[0] if !keys %{$_[1]};
3672             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3673             return [ @{$_[0]}, $_[1] ];
3674           },
3675         },
3676         HASH => {
3677           SCALAR => sub {
3678             return [] if !keys %{$_[0]} and !defined $_[1];
3679             return [ $_[0] ] if !defined $_[1];
3680             return [ $_[1] ] if !keys %{$_[0]};
3681             return [$_[0], $_[1]]
3682           },
3683           ARRAY => sub {
3684             return [] if !keys %{$_[0]} and !@{$_[1]};
3685             return [ $_[0] ] if !@{$_[1]};
3686             return $_[1] if !keys %{$_[0]};
3687             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3688             return [ $_[0], @{$_[1]} ];
3689           },
3690           HASH => sub {
3691             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3692             return [ $_[0] ] if !keys %{$_[1]};
3693             return [ $_[1] ] if !keys %{$_[0]};
3694             return [ $_[0] ] if $_[0] eq $_[1];
3695             return [ $_[0], $_[1] ];
3696           },
3697         }
3698       } => 'DBIC_RS_ATTR_MERGER');
3699       $hm;
3700     };
3701
3702     return $hm->merge ($_[1], $_[2]);
3703   }
3704 }
3705
3706 sub STORABLE_freeze {
3707   my ($self, $cloning) = @_;
3708   my $to_serialize = { %$self };
3709
3710   # A cursor in progress can't be serialized (and would make little sense anyway)
3711   delete $to_serialize->{cursor};
3712
3713   Storable::nfreeze($to_serialize);
3714 }
3715
3716 # need this hook for symmetry
3717 sub STORABLE_thaw {
3718   my ($self, $cloning, $serialized) = @_;
3719
3720   %$self = %{ Storable::thaw($serialized) };
3721
3722   $self;
3723 }
3724
3725
3726 =head2 throw_exception
3727
3728 See L<DBIx::Class::Schema/throw_exception> for details.
3729
3730 =cut
3731
3732 sub throw_exception {
3733   my $self=shift;
3734
3735   if (ref $self and my $rsrc = $self->result_source) {
3736     $rsrc->throw_exception(@_)
3737   }
3738   else {
3739     DBIx::Class::Exception->throw(@_);
3740   }
3741 }
3742
3743 # XXX: FIXME: Attributes docs need clearing up
3744
3745 =head1 ATTRIBUTES
3746
3747 Attributes are used to refine a ResultSet in various ways when
3748 searching for data. They can be passed to any method which takes an
3749 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3750 L</count>.
3751
3752 These are in no particular order:
3753
3754 =head2 order_by
3755
3756 =over 4
3757
3758 =item Value: ( $order_by | \@order_by | \%order_by )
3759
3760 =back
3761
3762 Which column(s) to order the results by.
3763
3764 [The full list of suitable values is documented in
3765 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3766 common options.]
3767
3768 If a single column name, or an arrayref of names is supplied, the
3769 argument is passed through directly to SQL. The hashref syntax allows
3770 for connection-agnostic specification of ordering direction:
3771
3772  For descending order:
3773
3774   order_by => { -desc => [qw/col1 col2 col3/] }
3775
3776  For explicit ascending order:
3777
3778   order_by => { -asc => 'col' }
3779
3780 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3781 supported, although you are strongly encouraged to use the hashref
3782 syntax as outlined above.
3783
3784 =head2 columns
3785
3786 =over 4
3787
3788 =item Value: \@columns
3789
3790 =back
3791
3792 Shortcut to request a particular set of columns to be retrieved. Each
3793 column spec may be a string (a table column name), or a hash (in which
3794 case the key is the C<as> value, and the value is used as the C<select>
3795 expression). Adds C<me.> onto the start of any column without a C<.> in
3796 it and sets C<select> from that, then auto-populates C<as> from
3797 C<select> as normal. (You may also use the C<cols> attribute, as in
3798 earlier versions of DBIC.)
3799
3800 Essentially C<columns> does the same as L</select> and L</as>.
3801
3802     columns => [ 'foo', { bar => 'baz' } ]
3803
3804 is the same as
3805
3806     select => [qw/foo baz/],
3807     as => [qw/foo bar/]
3808
3809 =head2 +columns
3810
3811 =over 4
3812
3813 =item Value: \@columns
3814
3815 =back
3816
3817 Indicates additional columns to be selected from storage. Works the same
3818 as L</columns> but adds columns to the selection. (You may also use the
3819 C<include_columns> attribute, as in earlier versions of DBIC). For
3820 example:-
3821
3822   $schema->resultset('CD')->search(undef, {
3823     '+columns' => ['artist.name'],
3824     join => ['artist']
3825   });
3826
3827 would return all CDs and include a 'name' column to the information
3828 passed to object inflation. Note that the 'artist' is the name of the
3829 column (or relationship) accessor, and 'name' is the name of the column
3830 accessor in the related table.
3831
3832 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3833 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3834 unary plus operator before it.
3835
3836 =head2 include_columns
3837
3838 =over 4
3839
3840 =item Value: \@columns
3841
3842 =back
3843
3844 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3845
3846 =head2 select
3847
3848 =over 4
3849
3850 =item Value: \@select_columns
3851
3852 =back
3853
3854 Indicates which columns should be selected from the storage. You can use
3855 column names, or in the case of RDBMS back ends, function or stored procedure
3856 names:
3857
3858   $rs = $schema->resultset('Employee')->search(undef, {
3859     select => [
3860       'name',
3861       { count => 'employeeid' },
3862       { max => { length => 'name' }, -as => 'longest_name' }
3863     ]
3864   });
3865
3866   # Equivalent SQL
3867   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3868
3869 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3870 use L</select>, to instruct DBIx::Class how to store the result of the column.
3871 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3872 identifier aliasing. You can however alias a function, so you can use it in
3873 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3874 attribute> supplied as shown in the example above.
3875
3876 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3877 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3878 unary plus operator before it.
3879
3880 =head2 +select
3881
3882 =over 4
3883
3884 Indicates additional columns to be selected from storage.  Works the same as
3885 L</select> but adds columns to the default selection, instead of specifying
3886 an explicit list.
3887
3888 =back
3889
3890 =head2 +as
3891
3892 =over 4
3893
3894 Indicates additional column names for those added via L</+select>. See L</as>.
3895
3896 =back
3897
3898 =head2 as
3899
3900 =over 4
3901
3902 =item Value: \@inflation_names
3903
3904 =back
3905
3906 Indicates column names for object inflation. That is L</as> indicates the
3907 slot name in which the column value will be stored within the
3908 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3909 identifier by the C<get_column> method (or via the object accessor B<if one
3910 with the same name already exists>) as shown below. The L</as> attribute has
3911 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3912
3913   $rs = $schema->resultset('Employee')->search(undef, {
3914     select => [
3915       'name',
3916       { count => 'employeeid' },
3917       { max => { length => 'name' }, -as => 'longest_name' }
3918     ],
3919     as => [qw/
3920       name
3921       employee_count
3922       max_name_length
3923     /],
3924   });
3925
3926 If the object against which the search is performed already has an accessor
3927 matching a column name specified in C<as>, the value can be retrieved using
3928 the accessor as normal:
3929
3930   my $name = $employee->name();
3931
3932 If on the other hand an accessor does not exist in the object, you need to
3933 use C<get_column> instead:
3934
3935   my $employee_count = $employee->get_column('employee_count');
3936
3937 You can create your own accessors if required - see
3938 L<DBIx::Class::Manual::Cookbook> for details.
3939
3940 =head2 join
3941
3942 =over 4
3943
3944 =item Value: ($rel_name | \@rel_names | \%rel_names)
3945
3946 =back
3947
3948 Contains a list of relationships that should be joined for this query.  For
3949 example:
3950
3951   # Get CDs by Nine Inch Nails
3952   my $rs = $schema->resultset('CD')->search(
3953     { 'artist.name' => 'Nine Inch Nails' },
3954     { join => 'artist' }
3955   );
3956
3957 Can also contain a hash reference to refer to the other relation's relations.
3958 For example:
3959
3960   package MyApp::Schema::Track;
3961   use base qw/DBIx::Class/;
3962   __PACKAGE__->table('track');
3963   __PACKAGE__->add_columns(qw/trackid cd position title/);
3964   __PACKAGE__->set_primary_key('trackid');
3965   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3966   1;
3967
3968   # In your application
3969   my $rs = $schema->resultset('Artist')->search(
3970     { 'track.title' => 'Teardrop' },
3971     {
3972       join     => { cd => 'track' },
3973       order_by => 'artist.name',
3974     }
3975   );
3976
3977 You need to use the relationship (not the table) name in  conditions,
3978 because they are aliased as such. The current table is aliased as "me", so
3979 you need to use me.column_name in order to avoid ambiguity. For example:
3980
3981   # Get CDs from 1984 with a 'Foo' track
3982   my $rs = $schema->resultset('CD')->search(
3983     {
3984       'me.year' => 1984,
3985       'tracks.name' => 'Foo'
3986     },
3987     { join => 'tracks' }
3988   );
3989
3990 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3991 similarly for a third time). For e.g.
3992
3993   my $rs = $schema->resultset('Artist')->search({
3994     'cds.title'   => 'Down to Earth',
3995     'cds_2.title' => 'Popular',
3996   }, {
3997     join => [ qw/cds cds/ ],
3998   });
3999
4000 will return a set of all artists that have both a cd with title 'Down
4001 to Earth' and a cd with title 'Popular'.
4002
4003 If you want to fetch related objects from other tables as well, see C<prefetch>
4004 below.
4005
4006 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
4007
4008 =head2 prefetch
4009
4010 =over 4
4011
4012 =item Value: ($rel_name | \@rel_names | \%rel_names)
4013
4014 =back
4015
4016 Contains one or more relationships that should be fetched along with
4017 the main query (when they are accessed afterwards the data will
4018 already be available, without extra queries to the database).  This is
4019 useful for when you know you will need the related objects, because it
4020 saves at least one query:
4021
4022   my $rs = $schema->resultset('Tag')->search(
4023     undef,
4024     {
4025       prefetch => {
4026         cd => 'artist'
4027       }
4028     }
4029   );
4030
4031 The initial search results in SQL like the following:
4032
4033   SELECT tag.*, cd.*, artist.* FROM tag
4034   JOIN cd ON tag.cd = cd.cdid
4035   JOIN artist ON cd.artist = artist.artistid
4036
4037 L<DBIx::Class> has no need to go back to the database when we access the
4038 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4039 case.
4040
4041 Simple prefetches will be joined automatically, so there is no need
4042 for a C<join> attribute in the above search.
4043
4044 L</prefetch> can be used with the any of the relationship types and
4045 multiple prefetches can be specified together. Below is a more complex
4046 example that prefetches a CD's artist, its liner notes (if present),
4047 the cover image, the tracks on that cd, and the guests on those
4048 tracks.
4049
4050  # Assuming:
4051  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4052  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4053  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4054  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4055
4056  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4057
4058  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4059
4060
4061  my $rs = $schema->resultset('CD')->search(
4062    undef,
4063    {
4064      prefetch => [
4065        { artist => 'record_label'},  # belongs_to => belongs_to
4066        'liner_note',                 # might_have
4067        'cover_image',                # has_one
4068        { tracks => 'guests' },       # has_many => has_many
4069      ]
4070    }
4071  );
4072
4073 This will produce SQL like the following:
4074
4075  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4076         tracks.*, guests.*
4077    FROM cd me
4078    JOIN artist artist
4079      ON artist.artistid = me.artistid
4080    JOIN record_label record_label
4081      ON record_label.labelid = artist.labelid
4082    LEFT JOIN track tracks
4083      ON tracks.cdid = me.cdid
4084    LEFT JOIN guest guests
4085      ON guests.trackid = track.trackid
4086    LEFT JOIN liner_notes liner_note
4087      ON liner_note.cdid = me.cdid
4088    JOIN cd_artwork cover_image
4089      ON cover_image.cdid = me.cdid
4090  ORDER BY tracks.cd
4091
4092 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4093 C<tracks>, and C<guests> of the CD will all be available through the
4094 relationship accessors without the need for additional queries to the
4095 database.
4096
4097 However, there is one caveat to be observed: it can be dangerous to
4098 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4099 relationship on a given level. e.g.:
4100
4101  my $rs = $schema->resultset('CD')->search(
4102    undef,
4103    {
4104      prefetch => [
4105        'tracks',                         # has_many
4106        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4107      ]
4108    }
4109  );
4110
4111 In fact, C<DBIx::Class> will emit the following warning:
4112
4113  Prefetching multiple has_many rels tracks and cd_to_producer at top
4114  level will explode the number of row objects retrievable via ->next
4115  or ->all. Use at your own risk.
4116
4117 The collapser currently can't identify duplicate tuples for multiple
4118 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4119 result the second L<has_many|DBIx::Class::Relationship/has_many>
4120 relation could contain redundant objects.
4121
4122 =head3 Using L</prefetch> with L</join>
4123
4124 L</prefetch> implies a L</join> with the equivalent argument, and is
4125 properly merged with any existing L</join> specification. So the
4126 following:
4127
4128   my $rs = $schema->resultset('CD')->search(
4129    {'record_label.name' => 'Music Product Ltd.'},
4130    {
4131      join     => {artist => 'record_label'},
4132      prefetch => 'artist',
4133    }
4134  );
4135
4136 ... will work, searching on the record label's name, but only
4137 prefetching the C<artist>.
4138
4139 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4140
4141 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4142 prefetched relations.  So given:
4143
4144   my $rs = $schema->resultset('CD')->search(
4145    undef,
4146    {
4147      select   => ['cd.title'],
4148      as       => ['cd_title'],
4149      prefetch => 'artist',
4150    }
4151  );
4152
4153 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4154 becomes: C<'cd_title', 'artist.*'>.
4155
4156 =head3 CAVEATS
4157
4158 Prefetch does a lot of deep magic. As such, it may not behave exactly
4159 as you might expect.
4160
4161 =over 4
4162
4163 =item *
4164
4165 Prefetch uses the L</cache> to populate the prefetched relationships. This
4166 may or may not be what you want.
4167
4168 =item *
4169
4170 If you specify a condition on a prefetched relationship, ONLY those
4171 rows that match the prefetched condition will be fetched into that relationship.
4172 This means that adding prefetch to a search() B<may alter> what is returned by
4173 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4174
4175   my $artist_rs = $schema->resultset('Artist')->search({
4176       'cds.year' => 2008,
4177   }, {
4178       join => 'cds',
4179   });
4180
4181   my $count = $artist_rs->first->cds->count;
4182
4183   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4184
4185   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4186
4187   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4188
4189 that cmp_ok() may or may not pass depending on the datasets involved. This
4190 behavior may or may not survive the 0.09 transition.
4191
4192 =back
4193
4194 =head2 page
4195
4196 =over 4
4197
4198 =item Value: $page
4199
4200 =back
4201
4202 Makes the resultset paged and specifies the page to retrieve. Effectively
4203 identical to creating a non-pages resultset and then calling ->page($page)
4204 on it.
4205
4206 If L</rows> attribute is not specified it defaults to 10 rows per page.
4207
4208 When you have a paged resultset, L</count> will only return the number
4209 of rows in the page. To get the total, use the L</pager> and call
4210 C<total_entries> on it.
4211
4212 =head2 rows
4213
4214 =over 4
4215
4216 =item Value: $rows
4217
4218 =back
4219
4220 Specifies the maximum number of rows for direct retrieval or the number of
4221 rows per page if the page attribute or method is used.
4222
4223 =head2 offset
4224
4225 =over 4
4226
4227 =item Value: $offset
4228
4229 =back
4230
4231 Specifies the (zero-based) row number for the  first row to be returned, or the
4232 of the first row of the first page if paging is used.
4233
4234 =head2 group_by
4235
4236 =over 4
4237
4238 =item Value: \@columns
4239
4240 =back
4241
4242 A arrayref of columns to group by. Can include columns of joined tables.
4243
4244   group_by => [qw/ column1 column2 ... /]
4245
4246 =head2 having
4247
4248 =over 4
4249
4250 =item Value: $condition
4251
4252 =back
4253
4254 HAVING is a select statement attribute that is applied between GROUP BY and
4255 ORDER BY. It is applied to the after the grouping calculations have been
4256 done.
4257
4258   having => { 'count_employee' => { '>=', 100 } }
4259
4260 or with an in-place function in which case literal SQL is required:
4261
4262   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4263
4264 =head2 distinct
4265
4266 =over 4
4267
4268 =item Value: (0 | 1)
4269
4270 =back
4271
4272 Set to 1 to group by all columns. If the resultset already has a group_by
4273 attribute, this setting is ignored and an appropriate warning is issued.
4274
4275 =head2 where
4276
4277 =over 4
4278
4279 Adds to the WHERE clause.
4280
4281   # only return rows WHERE deleted IS NULL for all searches
4282   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4283
4284 Can be overridden by passing C<< { where => undef } >> as an attribute
4285 to a resultset.
4286
4287 =back
4288
4289 =head2 cache
4290
4291 Set to 1 to cache search results. This prevents extra SQL queries if you
4292 revisit rows in your ResultSet:
4293
4294   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4295
4296   while( my $artist = $resultset->next ) {
4297     ... do stuff ...
4298   }
4299
4300   $rs->first; # without cache, this would issue a query
4301
4302 By default, searches are not cached.
4303
4304 For more examples of using these attributes, see
4305 L<DBIx::Class::Manual::Cookbook>.
4306
4307 =head2 for
4308
4309 =over 4
4310
4311 =item Value: ( 'update' | 'shared' )
4312
4313 =back
4314
4315 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4316 ... FOR SHARED.
4317
4318 =cut
4319
4320 1;