Stop Data::Compare from loading plugins at will
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Data::Compare (); # no imports!!! guard against insane architecture
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 EXAMPLES
78
79 =head2 Chaining resultsets
80
81 Let's say you've got a query that needs to be run to return some data
82 to the user. But, you have an authorization system in place that
83 prevents certain users from seeing certain information. So, you want
84 to construct the basic query in one method, but add constraints to it in
85 another.
86
87   sub get_data {
88     my $self = shift;
89     my $request = $self->get_request; # Get a request object somehow.
90     my $schema = $self->result_source->schema;
91
92     my $cd_rs = $schema->resultset('CD')->search({
93       title => $request->param('title'),
94       year => $request->param('year'),
95     });
96
97     $cd_rs = $self->apply_security_policy( $cd_rs );
98
99     return $cd_rs->all();
100   }
101
102   sub apply_security_policy {
103     my $self = shift;
104     my ($rs) = @_;
105
106     return $rs->search({
107       subversive => 0,
108     });
109   }
110
111 =head3 Resolving conditions and attributes
112
113 When a resultset is chained from another resultset, conditions and
114 attributes with the same keys need resolving.
115
116 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
117 into the existing ones from the original resultset.
118
119 The L</where> and L</having> attributes, and any search conditions, are
120 merged with an SQL C<AND> to the existing condition from the original
121 resultset.
122
123 All other attributes are overridden by any new ones supplied in the
124 search attributes.
125
126 =head2 Multiple queries
127
128 Since a resultset just defines a query, you can do all sorts of
129 things with it with the same object.
130
131   # Don't hit the DB yet.
132   my $cd_rs = $schema->resultset('CD')->search({
133     title => 'something',
134     year => 2009,
135   });
136
137   # Each of these hits the DB individually.
138   my $count = $cd_rs->count;
139   my $most_recent = $cd_rs->get_column('date_released')->max();
140   my @records = $cd_rs->all;
141
142 And it's not just limited to SELECT statements.
143
144   $cd_rs->delete();
145
146 This is even cooler:
147
148   $cd_rs->create({ artist => 'Fred' });
149
150 Which is the same as:
151
152   $schema->resultset('CD')->create({
153     title => 'something',
154     year => 2009,
155     artist => 'Fred'
156   });
157
158 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
159
160 =head1 METHODS
161
162 =head2 new
163
164 =over 4
165
166 =item Arguments: $source, \%$attrs
167
168 =item Return Value: $rs
169
170 =back
171
172 The resultset constructor. Takes a source object (usually a
173 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
174 L</ATTRIBUTES> below).  Does not perform any queries -- these are
175 executed as needed by the other methods.
176
177 Generally you won't need to construct a resultset manually.  You'll
178 automatically get one from e.g. a L</search> called in scalar context:
179
180   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
181
182 IMPORTANT: If called on an object, proxies to new_result instead so
183
184   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
185
186 will return a CD object, not a ResultSet.
187
188 =cut
189
190 sub new {
191   my $class = shift;
192   return $class->new_result(@_) if ref $class;
193
194   my ($source, $attrs) = @_;
195   $source = $source->resolve
196     if $source->isa('DBIx::Class::ResultSourceHandle');
197   $attrs = { %{$attrs||{}} };
198
199   if ($attrs->{page}) {
200     $attrs->{rows} ||= 10;
201   }
202
203   $attrs->{alias} ||= 'me';
204
205   my $self = bless {
206     result_source => $source,
207     cond => $attrs->{where},
208     pager => undef,
209     attrs => $attrs,
210   }, $class;
211
212   # if there is a dark selector, this means we are already in a
213   # chain and the cleanup/sanification was taken care of by
214   # _search_rs already
215   $self->_normalize_selection($attrs)
216     unless $attrs->{_dark_selector};
217
218   $self->result_class(
219     $attrs->{result_class} || $source->result_class
220   );
221
222   $self;
223 }
224
225 =head2 search
226
227 =over 4
228
229 =item Arguments: $cond, \%attrs?
230
231 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
232
233 =back
234
235   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
236   my $new_rs = $cd_rs->search({ year => 2005 });
237
238   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
239                  # year = 2005 OR year = 2004
240
241 In list context, C<< ->all() >> is called implicitly on the resultset, thus
242 returning a list of row objects instead. To avoid that, use L</search_rs>.
243
244 If you need to pass in additional attributes but no additional condition,
245 call it as C<search(undef, \%attrs)>.
246
247   # "SELECT name, artistid FROM $artist_table"
248   my @all_artists = $schema->resultset('Artist')->search(undef, {
249     columns => [qw/name artistid/],
250   });
251
252 For a list of attributes that can be passed to C<search>, see
253 L</ATTRIBUTES>. For more examples of using this function, see
254 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
255 documentation for the first argument, see L<SQL::Abstract>
256 and its extension L<DBIx::Class::SQLMaker>.
257
258 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
259
260 =head3 CAVEAT
261
262 Note that L</search> does not process/deflate any of the values passed in the
263 L<SQL::Abstract>-compatible search condition structure. This is unlike other
264 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
265 manually that any value passed to this method will stringify to something the
266 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
267 objects, for more info see:
268 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
269
270 =cut
271
272 sub search {
273   my $self = shift;
274   my $rs = $self->search_rs( @_ );
275
276   if (wantarray) {
277     return $rs->all;
278   }
279   elsif (defined wantarray) {
280     return $rs;
281   }
282   else {
283     # we can be called by a relationship helper, which in
284     # turn may be called in void context due to some braindead
285     # overload or whatever else the user decided to be clever
286     # at this particular day. Thus limit the exception to
287     # external code calls only
288     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
289       if (caller)[0] !~ /^\QDBIx::Class::/;
290
291     return ();
292   }
293 }
294
295 =head2 search_rs
296
297 =over 4
298
299 =item Arguments: $cond, \%attrs?
300
301 =item Return Value: $resultset
302
303 =back
304
305 This method does the same exact thing as search() except it will
306 always return a resultset, even in list context.
307
308 =cut
309
310 sub search_rs {
311   my $self = shift;
312
313   # Special-case handling for (undef, undef).
314   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
315     @_ = ();
316   }
317
318   my $call_attrs = {};
319   if (@_ > 1) {
320     if (ref $_[-1] eq 'HASH') {
321       # copy for _normalize_selection
322       $call_attrs = { %{ pop @_ } };
323     }
324     elsif (! defined $_[-1] ) {
325       pop @_;   # search({}, undef)
326     }
327   }
328
329   # see if we can keep the cache (no $rs changes)
330   my $cache;
331   my %safe = (alias => 1, cache => 1);
332   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
333     ! defined $_[0]
334       or
335     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
336       or
337     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
338   )) {
339     $cache = $self->get_cache;
340   }
341
342   my $rsrc = $self->result_source;
343
344   my $old_attrs = { %{$self->{attrs}} };
345   my $old_having = delete $old_attrs->{having};
346   my $old_where = delete $old_attrs->{where};
347
348   my $new_attrs = { %$old_attrs };
349
350   # take care of call attrs (only if anything is changing)
351   if (keys %$call_attrs) {
352
353     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
354
355     # reset the current selector list if new selectors are supplied
356     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
357       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
358     }
359
360     # Normalize the new selector list (operates on the passed-in attr structure)
361     # Need to do it on every chain instead of only once on _resolved_attrs, in
362     # order to allow detection of empty vs partial 'as'
363     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
364       if $old_attrs->{_dark_selector};
365     $self->_normalize_selection ($call_attrs);
366
367     # start with blind overwriting merge, exclude selector attrs
368     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
369     delete @{$new_attrs}{@selector_attrs};
370
371     for (@selector_attrs) {
372       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
373         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
374     }
375
376     # older deprecated name, use only if {columns} is not there
377     if (my $c = delete $new_attrs->{cols}) {
378       if ($new_attrs->{columns}) {
379         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
380       }
381       else {
382         $new_attrs->{columns} = $c;
383       }
384     }
385
386
387     # join/prefetch use their own crazy merging heuristics
388     foreach my $key (qw/join prefetch/) {
389       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
390         if exists $call_attrs->{$key};
391     }
392
393     # stack binds together
394     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
395   }
396
397
398   # rip apart the rest of @_, parse a condition
399   my $call_cond = do {
400
401     if (ref $_[0] eq 'HASH') {
402       (keys %{$_[0]}) ? $_[0] : undef
403     }
404     elsif (@_ == 1) {
405       $_[0]
406     }
407     elsif (@_ % 2) {
408       $self->throw_exception('Odd number of arguments to search')
409     }
410     else {
411       +{ @_ }
412     }
413
414   } if @_;
415
416   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
417     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
418   }
419
420   for ($old_where, $call_cond) {
421     if (defined $_) {
422       $new_attrs->{where} = $self->_stack_cond (
423         $_, $new_attrs->{where}
424       );
425     }
426   }
427
428   if (defined $old_having) {
429     $new_attrs->{having} = $self->_stack_cond (
430       $old_having, $new_attrs->{having}
431     )
432   }
433
434   my $rs = (ref $self)->new($rsrc, $new_attrs);
435
436   $rs->set_cache($cache) if ($cache);
437
438   return $rs;
439 }
440
441 my $dark_sel_dumper;
442 sub _normalize_selection {
443   my ($self, $attrs) = @_;
444
445   # legacy syntax
446   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
447     if exists $attrs->{include_columns};
448
449   # columns are always placed first, however 
450
451   # Keep the X vs +X separation until _resolved_attrs time - this allows to
452   # delay the decision on whether to use a default select list ($rsrc->columns)
453   # allowing stuff like the remove_columns helper to work
454   #
455   # select/as +select/+as pairs need special handling - the amount of select/as
456   # elements in each pair does *not* have to be equal (think multicolumn
457   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
458   # supplied at all) - try to infer the alias, either from the -as parameter
459   # of the selector spec, or use the parameter whole if it looks like a column
460   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
461   # is ok as well), but make sure no more additions to the 'as' chain take place
462   for my $pref ('', '+') {
463
464     my ($sel, $as) = map {
465       my $key = "${pref}${_}";
466
467       my $val = [ ref $attrs->{$key} eq 'ARRAY'
468         ? @{$attrs->{$key}}
469         : $attrs->{$key} || ()
470       ];
471       delete $attrs->{$key};
472       $val;
473     } qw/select as/;
474
475     if (! @$as and ! @$sel ) {
476       next;
477     }
478     elsif (@$as and ! @$sel) {
479       $self->throw_exception(
480         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
481       );
482     }
483     elsif( ! @$as ) {
484       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
485       # if any @$as has been supplied we assume the user knows what (s)he is doing
486       # and blindly keep stacking up pieces
487       unless ($attrs->{_dark_selector}) {
488         SELECTOR:
489         for (@$sel) {
490           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
491             push @$as, $_->{-as};
492           }
493           # assume any plain no-space, no-parenthesis string to be a column spec
494           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
495           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
496             push @$as, $_;
497           }
498           # if all else fails - raise a flag that no more aliasing will be allowed
499           else {
500             $attrs->{_dark_selector} = {
501               plus_stage => $pref,
502               string => ($dark_sel_dumper ||= do {
503                   require Data::Dumper::Concise;
504                   Data::Dumper::Concise::DumperObject()->Indent(0);
505                 })->Values([$_])->Dump
506               ,
507             };
508             last SELECTOR;
509           }
510         }
511       }
512     }
513     elsif (@$as < @$sel) {
514       $self->throw_exception(
515         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
516       );
517     }
518     elsif ($pref and $attrs->{_dark_selector}) {
519       $self->throw_exception(
520         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
521       );
522     }
523
524
525     # merge result
526     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
527     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
528   }
529 }
530
531 sub _stack_cond {
532   my ($self, $left, $right) = @_;
533
534   # collapse single element top-level conditions
535   # (single pass only, unlikely to need recursion)
536   for ($left, $right) {
537     if (ref $_ eq 'ARRAY') {
538       if (@$_ == 0) {
539         $_ = undef;
540       }
541       elsif (@$_ == 1) {
542         $_ = $_->[0];
543       }
544     }
545     elsif (ref $_ eq 'HASH') {
546       my ($first, $more) = keys %$_;
547
548       # empty hash
549       if (! defined $first) {
550         $_ = undef;
551       }
552       # one element hash
553       elsif (! defined $more) {
554         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
555           $_ = $_->{'-and'};
556         }
557         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
558           $_ = $_->{'-or'};
559         }
560       }
561     }
562   }
563
564   # merge hashes with weeding out of duplicates (simple cases only)
565   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
566
567     # shallow copy to destroy
568     $right = { %$right };
569     for (grep { exists $right->{$_} } keys %$left) {
570       # the use of eq_deeply here is justified - the rhs of an
571       # expression can contain a lot of twisted weird stuff
572       delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
573     }
574
575     $right = undef unless keys %$right;
576   }
577
578
579   if (defined $left xor defined $right) {
580     return defined $left ? $left : $right;
581   }
582   elsif (! defined $left) {
583     return undef;
584   }
585   else {
586     return { -and => [ $left, $right ] };
587   }
588 }
589
590 =head2 search_literal
591
592 =over 4
593
594 =item Arguments: $sql_fragment, @bind_values
595
596 =item Return Value: $resultset (scalar context) || @row_objs (list context)
597
598 =back
599
600   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
601   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
602
603 Pass a literal chunk of SQL to be added to the conditional part of the
604 resultset query.
605
606 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
607 only be used in that context. C<search_literal> is a convenience method.
608 It is equivalent to calling $schema->search(\[]), but if you want to ensure
609 columns are bound correctly, use C<search>.
610
611 Example of how to use C<search> instead of C<search_literal>
612
613   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
614   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
615
616
617 See L<DBIx::Class::Manual::Cookbook/Searching> and
618 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
619 require C<search_literal>.
620
621 =cut
622
623 sub search_literal {
624   my ($self, $sql, @bind) = @_;
625   my $attr;
626   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
627     $attr = pop @bind;
628   }
629   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
630 }
631
632 =head2 find
633
634 =over 4
635
636 =item Arguments: \%columns_values | @pk_values, \%attrs?
637
638 =item Return Value: $row_object | undef
639
640 =back
641
642 Finds and returns a single row based on supplied criteria. Takes either a
643 hashref with the same format as L</create> (including inference of foreign
644 keys from related objects), or a list of primary key values in the same
645 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
646 declaration on the L</result_source>.
647
648 In either case an attempt is made to combine conditions already existing on
649 the resultset with the condition passed to this method.
650
651 To aid with preparing the correct query for the storage you may supply the
652 C<key> attribute, which is the name of a
653 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
654 unique constraint corresponding to the
655 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
656 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
657 to construct a query that satisfies the named unique constraint fully (
658 non-NULL values for each column member of the constraint) an exception is
659 thrown.
660
661 If no C<key> is specified, the search is carried over all unique constraints
662 which are fully defined by the available condition.
663
664 If no such constraint is found, C<find> currently defaults to a simple
665 C<< search->(\%column_values) >> which may or may not do what you expect.
666 Note that this fallback behavior may be deprecated in further versions. If
667 you need to search with arbitrary conditions - use L</search>. If the query
668 resulting from this fallback produces more than one row, a warning to the
669 effect is issued, though only the first row is constructed and returned as
670 C<$row_object>.
671
672 In addition to C<key>, L</find> recognizes and applies standard
673 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
674
675 Note that if you have extra concerns about the correctness of the resulting
676 query you need to specify the C<key> attribute and supply the entire condition
677 as an argument to find (since it is not always possible to perform the
678 combination of the resultset condition with the supplied one, especially if
679 the resultset condition contains literal sql).
680
681 For example, to find a row by its primary key:
682
683   my $cd = $schema->resultset('CD')->find(5);
684
685 You can also find a row by a specific unique constraint:
686
687   my $cd = $schema->resultset('CD')->find(
688     {
689       artist => 'Massive Attack',
690       title  => 'Mezzanine',
691     },
692     { key => 'cd_artist_title' }
693   );
694
695 See also L</find_or_create> and L</update_or_create>.
696
697 =cut
698
699 sub find {
700   my $self = shift;
701   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
702
703   my $rsrc = $self->result_source;
704
705   # Parse out the condition from input
706   my $call_cond;
707   if (ref $_[0] eq 'HASH') {
708     $call_cond = { %{$_[0]} };
709   }
710   else {
711     my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
712     my @c_cols = $rsrc->unique_constraint_columns($constraint);
713
714     $self->throw_exception(
715       "No constraint columns, maybe a malformed '$constraint' constraint?"
716     ) unless @c_cols;
717
718     $self->throw_exception (
719       'find() expects either a column/value hashref, or a list of values '
720     . "corresponding to the columns of the specified unique constraint '$constraint'"
721     ) unless @c_cols == @_;
722
723     $call_cond = {};
724     @{$call_cond}{@c_cols} = @_;
725   }
726
727   my %related;
728   for my $key (keys %$call_cond) {
729     if (
730       my $keyref = ref($call_cond->{$key})
731         and
732       my $relinfo = $rsrc->relationship_info($key)
733     ) {
734       my $val = delete $call_cond->{$key};
735
736       next if $keyref eq 'ARRAY'; # has_many for multi_create
737
738       my $rel_q = $rsrc->_resolve_condition(
739         $relinfo->{cond}, $val, $key, $key
740       );
741       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
742       @related{keys %$rel_q} = values %$rel_q;
743     }
744   }
745
746   # relationship conditions take precedence (?)
747   @{$call_cond}{keys %related} = values %related;
748
749   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
750   my $final_cond;
751   if (exists $attrs->{key}) {
752     $final_cond = $self->_qualify_cond_columns (
753
754       $self->_build_unique_cond (
755         $attrs->{key},
756         $call_cond,
757       ),
758
759       $alias,
760     );
761   }
762   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
763     # This means that we got here after a merger of relationship conditions
764     # in ::Relationship::Base::search_related (the row method), and furthermore
765     # the relationship is of the 'single' type. This means that the condition
766     # provided by the relationship (already attached to $self) is sufficient,
767     # as there can be only one row in the database that would satisfy the
768     # relationship
769   }
770   else {
771     # no key was specified - fall down to heuristics mode:
772     # run through all unique queries registered on the resultset, and
773     # 'OR' all qualifying queries together
774     my (@unique_queries, %seen_column_combinations);
775     for my $c_name ($rsrc->unique_constraint_names) {
776       next if $seen_column_combinations{
777         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
778       }++;
779
780       push @unique_queries, try {
781         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
782       } || ();
783     }
784
785     $final_cond = @unique_queries
786       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
787       : $self->_non_unique_find_fallback ($call_cond, $attrs)
788     ;
789   }
790
791   # Run the query, passing the result_class since it should propagate for find
792   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
793   if (keys %{$rs->_resolved_attrs->{collapse}}) {
794     my $row = $rs->next;
795     carp "Query returned more than one row" if $rs->next;
796     return $row;
797   }
798   else {
799     return $rs->single;
800   }
801 }
802
803 # This is a stop-gap method as agreed during the discussion on find() cleanup:
804 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
805 #
806 # It is invoked when find() is called in legacy-mode with insufficiently-unique
807 # condition. It is provided for overrides until a saner way forward is devised
808 #
809 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
810 # the road. Please adjust your tests accordingly to catch this situation early
811 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
812 #
813 # The method will not be removed without an adequately complete replacement
814 # for strict-mode enforcement
815 sub _non_unique_find_fallback {
816   my ($self, $cond, $attrs) = @_;
817
818   return $self->_qualify_cond_columns(
819     $cond,
820     exists $attrs->{alias}
821       ? $attrs->{alias}
822       : $self->{attrs}{alias}
823   );
824 }
825
826
827 sub _qualify_cond_columns {
828   my ($self, $cond, $alias) = @_;
829
830   my %aliased = %$cond;
831   for (keys %aliased) {
832     $aliased{"$alias.$_"} = delete $aliased{$_}
833       if $_ !~ /\./;
834   }
835
836   return \%aliased;
837 }
838
839 sub _build_unique_cond {
840   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
841
842   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
843
844   # combination may fail if $self->{cond} is non-trivial
845   my ($final_cond) = try {
846     $self->_merge_with_rscond ($extra_cond)
847   } catch {
848     +{ %$extra_cond }
849   };
850
851   # trim out everything not in $columns
852   $final_cond = { map {
853     exists $final_cond->{$_}
854       ? ( $_ => $final_cond->{$_} )
855       : ()
856   } @c_cols };
857
858   if (my @missing = grep
859     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
860     (@c_cols)
861   ) {
862     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
863       $constraint_name,
864       join (', ', map { "'$_'" } @missing),
865     ) );
866   }
867
868   if (
869     !$croak_on_null
870       and
871     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
872       and
873     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
874   ) {
875     carp_unique ( sprintf (
876       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
877     . 'values in column(s): %s). This is almost certainly not what you wanted, '
878     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
879       $constraint_name,
880       join (', ', map { "'$_'" } @undefs),
881     ));
882   }
883
884   return $final_cond;
885 }
886
887 =head2 search_related
888
889 =over 4
890
891 =item Arguments: $rel, $cond, \%attrs?
892
893 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
894
895 =back
896
897   $new_rs = $cd_rs->search_related('artist', {
898     name => 'Emo-R-Us',
899   });
900
901 Searches the specified relationship, optionally specifying a condition and
902 attributes for matching records. See L</ATTRIBUTES> for more information.
903
904 In list context, C<< ->all() >> is called implicitly on the resultset, thus
905 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
906
907 See also L</search_related_rs>.
908
909 =cut
910
911 sub search_related {
912   return shift->related_resultset(shift)->search(@_);
913 }
914
915 =head2 search_related_rs
916
917 This method works exactly the same as search_related, except that
918 it guarantees a resultset, even in list context.
919
920 =cut
921
922 sub search_related_rs {
923   return shift->related_resultset(shift)->search_rs(@_);
924 }
925
926 =head2 cursor
927
928 =over 4
929
930 =item Arguments: none
931
932 =item Return Value: $cursor
933
934 =back
935
936 Returns a storage-driven cursor to the given resultset. See
937 L<DBIx::Class::Cursor> for more information.
938
939 =cut
940
941 sub cursor {
942   my ($self) = @_;
943
944   my $attrs = $self->_resolved_attrs_copy;
945
946   return $self->{cursor}
947     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
948           $attrs->{where},$attrs);
949 }
950
951 =head2 single
952
953 =over 4
954
955 =item Arguments: $cond?
956
957 =item Return Value: $row_object | undef
958
959 =back
960
961   my $cd = $schema->resultset('CD')->single({ year => 2001 });
962
963 Inflates the first result without creating a cursor if the resultset has
964 any records in it; if not returns C<undef>. Used by L</find> as a lean version
965 of L</search>.
966
967 While this method can take an optional search condition (just like L</search>)
968 being a fast-code-path it does not recognize search attributes. If you need to
969 add extra joins or similar, call L</search> and then chain-call L</single> on the
970 L<DBIx::Class::ResultSet> returned.
971
972 =over
973
974 =item B<Note>
975
976 As of 0.08100, this method enforces the assumption that the preceding
977 query returns only one row. If more than one row is returned, you will receive
978 a warning:
979
980   Query returned more than one row
981
982 In this case, you should be using L</next> or L</find> instead, or if you really
983 know what you are doing, use the L</rows> attribute to explicitly limit the size
984 of the resultset.
985
986 This method will also throw an exception if it is called on a resultset prefetching
987 has_many, as such a prefetch implies fetching multiple rows from the database in
988 order to assemble the resulting object.
989
990 =back
991
992 =cut
993
994 sub single {
995   my ($self, $where) = @_;
996   if(@_ > 2) {
997       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
998   }
999
1000   my $attrs = $self->_resolved_attrs_copy;
1001
1002   if (keys %{$attrs->{collapse}}) {
1003     $self->throw_exception(
1004       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1005     );
1006   }
1007
1008   if ($where) {
1009     if (defined $attrs->{where}) {
1010       $attrs->{where} = {
1011         '-and' =>
1012             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1013                $where, delete $attrs->{where} ]
1014       };
1015     } else {
1016       $attrs->{where} = $where;
1017     }
1018   }
1019
1020   my @data = $self->result_source->storage->select_single(
1021     $attrs->{from}, $attrs->{select},
1022     $attrs->{where}, $attrs
1023   );
1024
1025   return (@data ? ($self->_construct_object(@data))[0] : undef);
1026 }
1027
1028
1029 # _collapse_query
1030 #
1031 # Recursively collapse the query, accumulating values for each column.
1032
1033 sub _collapse_query {
1034   my ($self, $query, $collapsed) = @_;
1035
1036   $collapsed ||= {};
1037
1038   if (ref $query eq 'ARRAY') {
1039     foreach my $subquery (@$query) {
1040       next unless ref $subquery;  # -or
1041       $collapsed = $self->_collapse_query($subquery, $collapsed);
1042     }
1043   }
1044   elsif (ref $query eq 'HASH') {
1045     if (keys %$query and (keys %$query)[0] eq '-and') {
1046       foreach my $subquery (@{$query->{-and}}) {
1047         $collapsed = $self->_collapse_query($subquery, $collapsed);
1048       }
1049     }
1050     else {
1051       foreach my $col (keys %$query) {
1052         my $value = $query->{$col};
1053         $collapsed->{$col}{$value}++;
1054       }
1055     }
1056   }
1057
1058   return $collapsed;
1059 }
1060
1061 =head2 get_column
1062
1063 =over 4
1064
1065 =item Arguments: $cond?
1066
1067 =item Return Value: $resultsetcolumn
1068
1069 =back
1070
1071   my $max_length = $rs->get_column('length')->max;
1072
1073 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1074
1075 =cut
1076
1077 sub get_column {
1078   my ($self, $column) = @_;
1079   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1080   return $new;
1081 }
1082
1083 =head2 search_like
1084
1085 =over 4
1086
1087 =item Arguments: $cond, \%attrs?
1088
1089 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1090
1091 =back
1092
1093   # WHERE title LIKE '%blue%'
1094   $cd_rs = $rs->search_like({ title => '%blue%'});
1095
1096 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1097 that this is simply a convenience method retained for ex Class::DBI users.
1098 You most likely want to use L</search> with specific operators.
1099
1100 For more information, see L<DBIx::Class::Manual::Cookbook>.
1101
1102 This method is deprecated and will be removed in 0.09. Use L</search()>
1103 instead. An example conversion is:
1104
1105   ->search_like({ foo => 'bar' });
1106
1107   # Becomes
1108
1109   ->search({ foo => { like => 'bar' } });
1110
1111 =cut
1112
1113 sub search_like {
1114   my $class = shift;
1115   carp_unique (
1116     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1117    .' Instead use ->search({ x => { -like => "y%" } })'
1118    .' (note the outer pair of {}s - they are important!)'
1119   );
1120   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1121   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1122   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1123   return $class->search($query, { %$attrs });
1124 }
1125
1126 =head2 slice
1127
1128 =over 4
1129
1130 =item Arguments: $first, $last
1131
1132 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1133
1134 =back
1135
1136 Returns a resultset or object list representing a subset of elements from the
1137 resultset slice is called on. Indexes are from 0, i.e., to get the first
1138 three records, call:
1139
1140   my ($one, $two, $three) = $rs->slice(0, 2);
1141
1142 =cut
1143
1144 sub slice {
1145   my ($self, $min, $max) = @_;
1146   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1147   $attrs->{offset} = $self->{attrs}{offset} || 0;
1148   $attrs->{offset} += $min;
1149   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1150   return $self->search(undef, $attrs);
1151   #my $slice = (ref $self)->new($self->result_source, $attrs);
1152   #return (wantarray ? $slice->all : $slice);
1153 }
1154
1155 =head2 next
1156
1157 =over 4
1158
1159 =item Arguments: none
1160
1161 =item Return Value: $result | undef
1162
1163 =back
1164
1165 Returns the next element in the resultset (C<undef> is there is none).
1166
1167 Can be used to efficiently iterate over records in the resultset:
1168
1169   my $rs = $schema->resultset('CD')->search;
1170   while (my $cd = $rs->next) {
1171     print $cd->title;
1172   }
1173
1174 Note that you need to store the resultset object, and call C<next> on it.
1175 Calling C<< resultset('Table')->next >> repeatedly will always return the
1176 first record from the resultset.
1177
1178 =cut
1179
1180 sub next {
1181   my ($self) = @_;
1182   if (my $cache = $self->get_cache) {
1183     $self->{all_cache_position} ||= 0;
1184     return $cache->[$self->{all_cache_position}++];
1185   }
1186   if ($self->{attrs}{cache}) {
1187     delete $self->{pager};
1188     $self->{all_cache_position} = 1;
1189     return ($self->all)[0];
1190   }
1191   if ($self->{stashed_objects}) {
1192     my $obj = shift(@{$self->{stashed_objects}});
1193     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
1194     return $obj;
1195   }
1196   my @row = (
1197     exists $self->{stashed_row}
1198       ? @{delete $self->{stashed_row}}
1199       : $self->cursor->next
1200   );
1201   return undef unless (@row);
1202   my ($row, @more) = $self->_construct_object(@row);
1203   $self->{stashed_objects} = \@more if @more;
1204   return $row;
1205 }
1206
1207 sub _construct_object {
1208   my ($self, @row) = @_;
1209
1210   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
1211     or return ();
1212   my @new = $self->result_class->inflate_result($self->result_source, @$info);
1213   @new = $self->{_attrs}{record_filter}->(@new)
1214     if exists $self->{_attrs}{record_filter};
1215   return @new;
1216 }
1217
1218 sub _collapse_result {
1219   my ($self, $as_proto, $row) = @_;
1220
1221   my @copy = @$row;
1222
1223   # 'foo'         => [ undef, 'foo' ]
1224   # 'foo.bar'     => [ 'foo', 'bar' ]
1225   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
1226
1227   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
1228
1229   my %collapse = %{$self->{_attrs}{collapse}||{}};
1230
1231   my @pri_index;
1232
1233   # if we're doing collapsing (has_many prefetch) we need to grab records
1234   # until the PK changes, so fill @pri_index. if not, we leave it empty so
1235   # we know we don't have to bother.
1236
1237   # the reason for not using the collapse stuff directly is because if you
1238   # had for e.g. two artists in a row with no cds, the collapse info for
1239   # both would be NULL (undef) so you'd lose the second artist
1240
1241   # store just the index so we can check the array positions from the row
1242   # without having to contruct the full hash
1243
1244   if (keys %collapse) {
1245     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1246     foreach my $i (0 .. $#construct_as) {
1247       next if defined($construct_as[$i][0]); # only self table
1248       if (delete $pri{$construct_as[$i][1]}) {
1249         push(@pri_index, $i);
1250       }
1251       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1252     }
1253   }
1254
1255   # no need to do an if, it'll be empty if @pri_index is empty anyway
1256
1257   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1258
1259   my @const_rows;
1260
1261   do { # no need to check anything at the front, we always want the first row
1262
1263     my %const;
1264
1265     foreach my $this_as (@construct_as) {
1266       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1267     }
1268
1269     push(@const_rows, \%const);
1270
1271   } until ( # no pri_index => no collapse => drop straight out
1272       !@pri_index
1273     or
1274       do { # get another row, stash it, drop out if different PK
1275
1276         @copy = $self->cursor->next;
1277         $self->{stashed_row} = \@copy;
1278
1279         # last thing in do block, counts as true if anything doesn't match
1280
1281         # check xor defined first for NULL vs. NOT NULL then if one is
1282         # defined the other must be so check string equality
1283
1284         grep {
1285           (defined $pri_vals{$_} ^ defined $copy[$_])
1286           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1287         } @pri_index;
1288       }
1289   );
1290
1291   my $alias = $self->{attrs}{alias};
1292   my $info = [];
1293
1294   my %collapse_pos;
1295
1296   my @const_keys;
1297
1298   foreach my $const (@const_rows) {
1299     scalar @const_keys or do {
1300       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1301     };
1302     foreach my $key (@const_keys) {
1303       if (length $key) {
1304         my $target = $info;
1305         my @parts = split(/\./, $key);
1306         my $cur = '';
1307         my $data = $const->{$key};
1308         foreach my $p (@parts) {
1309           $target = $target->[1]->{$p} ||= [];
1310           $cur .= ".${p}";
1311           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1312             # collapsing at this point and on final part
1313             my $pos = $collapse_pos{$cur};
1314             CK: foreach my $ck (@ckey) {
1315               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1316                 $collapse_pos{$cur} = $data;
1317                 delete @collapse_pos{ # clear all positioning for sub-entries
1318                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1319                 };
1320                 push(@$target, []);
1321                 last CK;
1322               }
1323             }
1324           }
1325           if (exists $collapse{$cur}) {
1326             $target = $target->[-1];
1327           }
1328         }
1329         $target->[0] = $data;
1330       } else {
1331         $info->[0] = $const->{$key};
1332       }
1333     }
1334   }
1335
1336   return $info;
1337 }
1338
1339 =head2 result_source
1340
1341 =over 4
1342
1343 =item Arguments: $result_source?
1344
1345 =item Return Value: $result_source
1346
1347 =back
1348
1349 An accessor for the primary ResultSource object from which this ResultSet
1350 is derived.
1351
1352 =head2 result_class
1353
1354 =over 4
1355
1356 =item Arguments: $result_class?
1357
1358 =item Return Value: $result_class
1359
1360 =back
1361
1362 An accessor for the class to use when creating row objects. Defaults to
1363 C<< result_source->result_class >> - which in most cases is the name of the
1364 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1365
1366 Note that changing the result_class will also remove any components
1367 that were originally loaded in the source class via
1368 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1369 in the original source class will not run.
1370
1371 =cut
1372
1373 sub result_class {
1374   my ($self, $result_class) = @_;
1375   if ($result_class) {
1376     unless (ref $result_class) { # don't fire this for an object
1377       $self->ensure_class_loaded($result_class);
1378     }
1379     $self->_result_class($result_class);
1380     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1381     # permit the user to set result class on one result set only; it only
1382     # chains if provided to search()
1383     #$self->{attrs}{result_class} = $result_class if ref $self;
1384   }
1385   $self->_result_class;
1386 }
1387
1388 =head2 count
1389
1390 =over 4
1391
1392 =item Arguments: $cond, \%attrs??
1393
1394 =item Return Value: $count
1395
1396 =back
1397
1398 Performs an SQL C<COUNT> with the same query as the resultset was built
1399 with to find the number of elements. Passing arguments is equivalent to
1400 C<< $rs->search ($cond, \%attrs)->count >>
1401
1402 =cut
1403
1404 sub count {
1405   my $self = shift;
1406   return $self->search(@_)->count if @_ and defined $_[0];
1407   return scalar @{ $self->get_cache } if $self->get_cache;
1408
1409   my $attrs = $self->_resolved_attrs_copy;
1410
1411   # this is a little optimization - it is faster to do the limit
1412   # adjustments in software, instead of a subquery
1413   my $rows = delete $attrs->{rows};
1414   my $offset = delete $attrs->{offset};
1415
1416   my $crs;
1417   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1418     $crs = $self->_count_subq_rs ($attrs);
1419   }
1420   else {
1421     $crs = $self->_count_rs ($attrs);
1422   }
1423   my $count = $crs->next;
1424
1425   $count -= $offset if $offset;
1426   $count = $rows if $rows and $rows < $count;
1427   $count = 0 if ($count < 0);
1428
1429   return $count;
1430 }
1431
1432 =head2 count_rs
1433
1434 =over 4
1435
1436 =item Arguments: $cond, \%attrs??
1437
1438 =item Return Value: $count_rs
1439
1440 =back
1441
1442 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1443 This can be very handy for subqueries:
1444
1445   ->search( { amount => $some_rs->count_rs->as_query } )
1446
1447 As with regular resultsets the SQL query will be executed only after
1448 the resultset is accessed via L</next> or L</all>. That would return
1449 the same single value obtainable via L</count>.
1450
1451 =cut
1452
1453 sub count_rs {
1454   my $self = shift;
1455   return $self->search(@_)->count_rs if @_;
1456
1457   # this may look like a lack of abstraction (count() does about the same)
1458   # but in fact an _rs *must* use a subquery for the limits, as the
1459   # software based limiting can not be ported if this $rs is to be used
1460   # in a subquery itself (i.e. ->as_query)
1461   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1462     return $self->_count_subq_rs;
1463   }
1464   else {
1465     return $self->_count_rs;
1466   }
1467 }
1468
1469 #
1470 # returns a ResultSetColumn object tied to the count query
1471 #
1472 sub _count_rs {
1473   my ($self, $attrs) = @_;
1474
1475   my $rsrc = $self->result_source;
1476   $attrs ||= $self->_resolved_attrs;
1477
1478   my $tmp_attrs = { %$attrs };
1479   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1480   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1481
1482   # overwrite the selector (supplied by the storage)
1483   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1484   $tmp_attrs->{as} = 'count';
1485   delete @{$tmp_attrs}{qw/columns/};
1486
1487   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1488
1489   return $tmp_rs;
1490 }
1491
1492 #
1493 # same as above but uses a subquery
1494 #
1495 sub _count_subq_rs {
1496   my ($self, $attrs) = @_;
1497
1498   my $rsrc = $self->result_source;
1499   $attrs ||= $self->_resolved_attrs;
1500
1501   my $sub_attrs = { %$attrs };
1502   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1503   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1504
1505   # if we multi-prefetch we group_by primary keys only as this is what we would
1506   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1507   if ( keys %{$attrs->{collapse}}  ) {
1508     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1509   }
1510
1511   # Calculate subquery selector
1512   if (my $g = $sub_attrs->{group_by}) {
1513
1514     my $sql_maker = $rsrc->storage->sql_maker;
1515
1516     # necessary as the group_by may refer to aliased functions
1517     my $sel_index;
1518     for my $sel (@{$attrs->{select}}) {
1519       $sel_index->{$sel->{-as}} = $sel
1520         if (ref $sel eq 'HASH' and $sel->{-as});
1521     }
1522
1523     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1524     # also look for named aggregates referred in the having clause
1525     # having often contains scalarrefs - thus parse it out entirely
1526     my @parts = @$g;
1527     if ($attrs->{having}) {
1528       local $sql_maker->{having_bind};
1529       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1530       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1531       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1532         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1533         # if we don't unset it we screw up retarded but unfortunately working
1534         # 'MAX(foo.bar)' => { '>', 3 }
1535         $sql_maker->{name_sep} = '';
1536       }
1537
1538       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1539
1540       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1541
1542       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1543       # and if all fails for an utterly naive quoted scalar-with-function
1544       while ($sql =~ /
1545         $rquote $sep $lquote (.+?) $rquote
1546           |
1547         [\s,] \w+ \. (\w+) [\s,]
1548           |
1549         [\s,] $lquote (.+?) $rquote [\s,]
1550       /gx) {
1551         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1552       }
1553     }
1554
1555     for (@parts) {
1556       my $colpiece = $sel_index->{$_} || $_;
1557
1558       # unqualify join-based group_by's. Arcane but possible query
1559       # also horrible horrible hack to alias a column (not a func.)
1560       # (probably need to introduce SQLA syntax)
1561       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1562         my $as = $colpiece;
1563         $as =~ s/\./__/;
1564         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1565       }
1566       push @{$sub_attrs->{select}}, $colpiece;
1567     }
1568   }
1569   else {
1570     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1571     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1572   }
1573
1574   return $rsrc->resultset_class
1575                ->new ($rsrc, $sub_attrs)
1576                 ->as_subselect_rs
1577                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1578                   ->get_column ('count');
1579 }
1580
1581 sub _bool {
1582   return 1;
1583 }
1584
1585 =head2 count_literal
1586
1587 =over 4
1588
1589 =item Arguments: $sql_fragment, @bind_values
1590
1591 =item Return Value: $count
1592
1593 =back
1594
1595 Counts the results in a literal query. Equivalent to calling L</search_literal>
1596 with the passed arguments, then L</count>.
1597
1598 =cut
1599
1600 sub count_literal { shift->search_literal(@_)->count; }
1601
1602 =head2 all
1603
1604 =over 4
1605
1606 =item Arguments: none
1607
1608 =item Return Value: @objects
1609
1610 =back
1611
1612 Returns all elements in the resultset.
1613
1614 =cut
1615
1616 sub all {
1617   my $self = shift;
1618   if(@_) {
1619       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1620   }
1621
1622   return @{ $self->get_cache } if $self->get_cache;
1623
1624   my @obj;
1625
1626   if (keys %{$self->_resolved_attrs->{collapse}}) {
1627     # Using $self->cursor->all is really just an optimisation.
1628     # If we're collapsing has_many prefetches it probably makes
1629     # very little difference, and this is cleaner than hacking
1630     # _construct_object to survive the approach
1631     $self->cursor->reset;
1632     my @row = $self->cursor->next;
1633     while (@row) {
1634       push(@obj, $self->_construct_object(@row));
1635       @row = (exists $self->{stashed_row}
1636                ? @{delete $self->{stashed_row}}
1637                : $self->cursor->next);
1638     }
1639   } else {
1640     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1641   }
1642
1643   $self->set_cache(\@obj) if $self->{attrs}{cache};
1644
1645   return @obj;
1646 }
1647
1648 =head2 reset
1649
1650 =over 4
1651
1652 =item Arguments: none
1653
1654 =item Return Value: $self
1655
1656 =back
1657
1658 Resets the resultset's cursor, so you can iterate through the elements again.
1659 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1660 another query.
1661
1662 =cut
1663
1664 sub reset {
1665   my ($self) = @_;
1666   delete $self->{_attrs} if exists $self->{_attrs};
1667   $self->{all_cache_position} = 0;
1668   $self->cursor->reset;
1669   return $self;
1670 }
1671
1672 =head2 first
1673
1674 =over 4
1675
1676 =item Arguments: none
1677
1678 =item Return Value: $object | undef
1679
1680 =back
1681
1682 Resets the resultset and returns an object for the first result (or C<undef>
1683 if the resultset is empty).
1684
1685 =cut
1686
1687 sub first {
1688   return $_[0]->reset->next;
1689 }
1690
1691
1692 # _rs_update_delete
1693 #
1694 # Determines whether and what type of subquery is required for the $rs operation.
1695 # If grouping is necessary either supplies its own, or verifies the current one
1696 # After all is done delegates to the proper storage method.
1697
1698 sub _rs_update_delete {
1699   my ($self, $op, $values) = @_;
1700
1701   my $rsrc = $self->result_source;
1702
1703   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1704   my $needs_subq = $needs_group_by_subq || $self->_has_resolved_attr(qw/rows offset/);
1705
1706   if ($needs_group_by_subq or $needs_subq) {
1707
1708     # make a new $rs selecting only the PKs (that's all we really need)
1709     my $attrs = $self->_resolved_attrs_copy;
1710
1711
1712     delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_selector_range as/;
1713     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1714
1715     if ($needs_group_by_subq) {
1716       # make sure no group_by was supplied, or if there is one - make sure it matches
1717       # the columns compiled above perfectly. Anything else can not be sanely executed
1718       # on most databases so croak right then and there
1719
1720       if (my $g = $attrs->{group_by}) {
1721         my @current_group_by = map
1722           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1723           @$g
1724         ;
1725
1726         if (
1727           join ("\x00", sort @current_group_by)
1728             ne
1729           join ("\x00", sort @{$attrs->{columns}} )
1730         ) {
1731           $self->throw_exception (
1732             "You have just attempted a $op operation on a resultset which does group_by"
1733             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1734             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1735             . ' kind of queries. Please retry the operation with a modified group_by or'
1736             . ' without using one at all.'
1737           );
1738         }
1739       }
1740       else {
1741         $attrs->{group_by} = $attrs->{columns};
1742       }
1743     }
1744
1745     my $subrs = (ref $self)->new($rsrc, $attrs);
1746     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1747   }
1748   else {
1749     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1750     # a condition containing 'me' or other table prefixes will not work
1751     # at all. What this code tries to do (badly) is to generate a condition
1752     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1753     #
1754     # this is atrocious and should be replaced by normal sqla introspection
1755     # one sunny day
1756     my ($sql, @bind) = do {
1757       my $sqla = $rsrc->storage->sql_maker;
1758       local $sqla->{_dequalify_idents} = 1;
1759       $sqla->_recurse_where($self->{cond});
1760     } if $self->{cond};
1761
1762     return $rsrc->storage->$op(
1763       $rsrc,
1764       $op eq 'update' ? $values : (),
1765       $self->{cond} ? \[$sql, @bind] : (),
1766     );
1767   }
1768 }
1769
1770 =head2 update
1771
1772 =over 4
1773
1774 =item Arguments: \%values
1775
1776 =item Return Value: $storage_rv
1777
1778 =back
1779
1780 Sets the specified columns in the resultset to the supplied values in a
1781 single query. Note that this will not run any accessor/set_column/update
1782 triggers, nor will it update any row object instances derived from this
1783 resultset (this includes the contents of the L<resultset cache|/set_cache>
1784 if any). See L</update_all> if you need to execute any on-update
1785 triggers or cascades defined either by you or a
1786 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1787
1788 The return value is a pass through of what the underlying
1789 storage backend returned, and may vary. See L<DBI/execute> for the most
1790 common case.
1791
1792 =head3 CAVEAT
1793
1794 Note that L</update> does not process/deflate any of the values passed in.
1795 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1796 ensure manually that any value passed to this method will stringify to
1797 something the RDBMS knows how to deal with. A notable example is the
1798 handling of L<DateTime> objects, for more info see:
1799 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
1800
1801 =cut
1802
1803 sub update {
1804   my ($self, $values) = @_;
1805   $self->throw_exception('Values for update must be a hash')
1806     unless ref $values eq 'HASH';
1807
1808   return $self->_rs_update_delete ('update', $values);
1809 }
1810
1811 =head2 update_all
1812
1813 =over 4
1814
1815 =item Arguments: \%values
1816
1817 =item Return Value: 1
1818
1819 =back
1820
1821 Fetches all objects and updates them one at a time via
1822 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1823 triggers, while L</update> will not.
1824
1825 =cut
1826
1827 sub update_all {
1828   my ($self, $values) = @_;
1829   $self->throw_exception('Values for update_all must be a hash')
1830     unless ref $values eq 'HASH';
1831
1832   my $guard = $self->result_source->schema->txn_scope_guard;
1833   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1834   $guard->commit;
1835   return 1;
1836 }
1837
1838 =head2 delete
1839
1840 =over 4
1841
1842 =item Arguments: none
1843
1844 =item Return Value: $storage_rv
1845
1846 =back
1847
1848 Deletes the rows matching this resultset in a single query. Note that this
1849 will not run any delete triggers, nor will it alter the
1850 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1851 derived from this resultset (this includes the contents of the
1852 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1853 execute any on-delete triggers or cascades defined either by you or a
1854 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1855
1856 The return value is a pass through of what the underlying storage backend
1857 returned, and may vary. See L<DBI/execute> for the most common case.
1858
1859 =cut
1860
1861 sub delete {
1862   my $self = shift;
1863   $self->throw_exception('delete does not accept any arguments')
1864     if @_;
1865
1866   return $self->_rs_update_delete ('delete');
1867 }
1868
1869 =head2 delete_all
1870
1871 =over 4
1872
1873 =item Arguments: none
1874
1875 =item Return Value: 1
1876
1877 =back
1878
1879 Fetches all objects and deletes them one at a time via
1880 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1881 triggers, while L</delete> will not.
1882
1883 =cut
1884
1885 sub delete_all {
1886   my $self = shift;
1887   $self->throw_exception('delete_all does not accept any arguments')
1888     if @_;
1889
1890   my $guard = $self->result_source->schema->txn_scope_guard;
1891   $_->delete for $self->all;
1892   $guard->commit;
1893   return 1;
1894 }
1895
1896 =head2 populate
1897
1898 =over 4
1899
1900 =item Arguments: \@data;
1901
1902 =back
1903
1904 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1905 For the arrayref of hashrefs style each hashref should be a structure suitable
1906 for submitting to a $resultset->create(...) method.
1907
1908 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1909 to insert the data, as this is a faster method.
1910
1911 Otherwise, each set of data is inserted into the database using
1912 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1913 accumulated into an array. The array itself, or an array reference
1914 is returned depending on scalar or list context.
1915
1916 Example:  Assuming an Artist Class that has many CDs Classes relating:
1917
1918   my $Artist_rs = $schema->resultset("Artist");
1919
1920   ## Void Context Example
1921   $Artist_rs->populate([
1922      { artistid => 4, name => 'Manufactured Crap', cds => [
1923         { title => 'My First CD', year => 2006 },
1924         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1925       ],
1926      },
1927      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1928         { title => 'My parents sold me to a record company', year => 2005 },
1929         { title => 'Why Am I So Ugly?', year => 2006 },
1930         { title => 'I Got Surgery and am now Popular', year => 2007 }
1931       ],
1932      },
1933   ]);
1934
1935   ## Array Context Example
1936   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1937     { name => "Artist One"},
1938     { name => "Artist Two"},
1939     { name => "Artist Three", cds=> [
1940     { title => "First CD", year => 2007},
1941     { title => "Second CD", year => 2008},
1942   ]}
1943   ]);
1944
1945   print $ArtistOne->name; ## response is 'Artist One'
1946   print $ArtistThree->cds->count ## reponse is '2'
1947
1948 For the arrayref of arrayrefs style,  the first element should be a list of the
1949 fieldsnames to which the remaining elements are rows being inserted.  For
1950 example:
1951
1952   $Arstist_rs->populate([
1953     [qw/artistid name/],
1954     [100, 'A Formally Unknown Singer'],
1955     [101, 'A singer that jumped the shark two albums ago'],
1956     [102, 'An actually cool singer'],
1957   ]);
1958
1959 Please note an important effect on your data when choosing between void and
1960 wantarray context. Since void context goes straight to C<insert_bulk> in
1961 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1962 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1963 create primary keys for you, you will find that your PKs are empty.  In this
1964 case you will have to use the wantarray context in order to create those
1965 values.
1966
1967 =cut
1968
1969 sub populate {
1970   my $self = shift;
1971
1972   # cruft placed in standalone method
1973   my $data = $self->_normalize_populate_args(@_);
1974
1975   return unless @$data;
1976
1977   if(defined wantarray) {
1978     my @created;
1979     foreach my $item (@$data) {
1980       push(@created, $self->create($item));
1981     }
1982     return wantarray ? @created : \@created;
1983   } 
1984   else {
1985     my $first = $data->[0];
1986
1987     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1988     # it relationship data
1989     my (@rels, @columns);
1990     my $rsrc = $self->result_source;
1991     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
1992     for (keys %$first) {
1993       my $ref = ref $first->{$_};
1994       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
1995         ? push @rels, $_
1996         : push @columns, $_
1997       ;
1998     }
1999
2000     my @pks = $rsrc->primary_columns;
2001
2002     ## do the belongs_to relationships
2003     foreach my $index (0..$#$data) {
2004
2005       # delegate to create() for any dataset without primary keys with specified relationships
2006       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2007         for my $r (@rels) {
2008           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2009             my @ret = $self->populate($data);
2010             return;
2011           }
2012         }
2013       }
2014
2015       foreach my $rel (@rels) {
2016         next unless ref $data->[$index]->{$rel} eq "HASH";
2017         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2018         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2019         my $related = $result->result_source->_resolve_condition(
2020           $reverse_relinfo->{cond},
2021           $self,
2022           $result,
2023           $rel,
2024         );
2025
2026         delete $data->[$index]->{$rel};
2027         $data->[$index] = {%{$data->[$index]}, %$related};
2028
2029         push @columns, keys %$related if $index == 0;
2030       }
2031     }
2032
2033     ## inherit the data locked in the conditions of the resultset
2034     my ($rs_data) = $self->_merge_with_rscond({});
2035     delete @{$rs_data}{@columns};
2036     my @inherit_cols = keys %$rs_data;
2037     my @inherit_data = values %$rs_data;
2038
2039     ## do bulk insert on current row
2040     $rsrc->storage->insert_bulk(
2041       $rsrc,
2042       [@columns, @inherit_cols],
2043       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2044     );
2045
2046     ## do the has_many relationships
2047     foreach my $item (@$data) {
2048
2049       my $main_row;
2050
2051       foreach my $rel (@rels) {
2052         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2053
2054         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2055
2056         my $child = $main_row->$rel;
2057
2058         my $related = $child->result_source->_resolve_condition(
2059           $rels->{$rel}{cond},
2060           $child,
2061           $main_row,
2062           $rel,
2063         );
2064
2065         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2066         my @populate = map { {%$_, %$related} } @rows_to_add;
2067
2068         $child->populate( \@populate );
2069       }
2070     }
2071   }
2072 }
2073
2074
2075 # populate() argumnets went over several incarnations
2076 # What we ultimately support is AoH
2077 sub _normalize_populate_args {
2078   my ($self, $arg) = @_;
2079
2080   if (ref $arg eq 'ARRAY') {
2081     if (!@$arg) {
2082       return [];
2083     }
2084     elsif (ref $arg->[0] eq 'HASH') {
2085       return $arg;
2086     }
2087     elsif (ref $arg->[0] eq 'ARRAY') {
2088       my @ret;
2089       my @colnames = @{$arg->[0]};
2090       foreach my $values (@{$arg}[1 .. $#$arg]) {
2091         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2092       }
2093       return \@ret;
2094     }
2095   }
2096
2097   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2098 }
2099
2100 =head2 pager
2101
2102 =over 4
2103
2104 =item Arguments: none
2105
2106 =item Return Value: $pager
2107
2108 =back
2109
2110 Return Value a L<Data::Page> object for the current resultset. Only makes
2111 sense for queries with a C<page> attribute.
2112
2113 To get the full count of entries for a paged resultset, call
2114 C<total_entries> on the L<Data::Page> object.
2115
2116 =cut
2117
2118 # make a wizard good for both a scalar and a hashref
2119 my $mk_lazy_count_wizard = sub {
2120   require Variable::Magic;
2121
2122   my $stash = { total_rs => shift };
2123   my $slot = shift; # only used by the hashref magic
2124
2125   my $magic = Variable::Magic::wizard (
2126     data => sub { $stash },
2127
2128     (!$slot)
2129     ? (
2130       # the scalar magic
2131       get => sub {
2132         # set value lazily, and dispell for good
2133         ${$_[0]} = $_[1]{total_rs}->count;
2134         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
2135         return 1;
2136       },
2137       set => sub {
2138         # an explicit set implies dispell as well
2139         # the unless() is to work around "fun and giggles" below
2140         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
2141           unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2142         return 1;
2143       },
2144     )
2145     : (
2146       # the uvar magic
2147       fetch => sub {
2148         if ($_[2] eq $slot and !$_[1]{inactive}) {
2149           my $cnt = $_[1]{total_rs}->count;
2150           $_[0]->{$slot} = $cnt;
2151
2152           # attempting to dispell in a fetch handle (works in store), seems
2153           # to invariable segfault on 5.10, 5.12, 5.13 :(
2154           # so use an inactivator instead
2155           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2156           $_[1]{inactive}++;
2157         }
2158         return 1;
2159       },
2160       store => sub {
2161         if (! $_[1]{inactive} and $_[2] eq $slot) {
2162           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2163           $_[1]{inactive}++
2164             unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2165         }
2166         return 1;
2167       },
2168     ),
2169   );
2170
2171   $stash->{magic_selfref} = $magic;
2172   weaken ($stash->{magic_selfref}); # this fails on 5.8.1
2173
2174   return $magic;
2175 };
2176
2177 # the tie class for 5.8.1
2178 {
2179   package # hide from pause
2180     DBIx::Class::__DBIC_LAZY_RS_COUNT__;
2181   use base qw/Tie::Hash/;
2182
2183   sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
2184   sub NEXTKEY  { each %{$_[0]{data}} }
2185   sub EXISTS   { exists $_[0]{data}{$_[1]} }
2186   sub DELETE   { delete $_[0]{data}{$_[1]} }
2187   sub CLEAR    { %{$_[0]{data}} = () }
2188   sub SCALAR   { scalar %{$_[0]{data}} }
2189
2190   sub TIEHASH {
2191     $_[1]{data} = {%{$_[1]{selfref}}};
2192     %{$_[1]{selfref}} = ();
2193     Scalar::Util::weaken ($_[1]{selfref});
2194     return bless ($_[1], $_[0]);
2195   };
2196
2197   sub FETCH {
2198     if ($_[1] eq $_[0]{slot}) {
2199       my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
2200       untie %{$_[0]{selfref}};
2201       %{$_[0]{selfref}} = %{$_[0]{data}};
2202       return $cnt;
2203     }
2204     else {
2205       $_[0]{data}{$_[1]};
2206     }
2207   }
2208
2209   sub STORE {
2210     $_[0]{data}{$_[1]} = $_[2];
2211     if ($_[1] eq $_[0]{slot}) {
2212       untie %{$_[0]{selfref}};
2213       %{$_[0]{selfref}} = %{$_[0]{data}};
2214     }
2215     $_[2];
2216   }
2217 }
2218
2219 sub pager {
2220   my ($self) = @_;
2221
2222   return $self->{pager} if $self->{pager};
2223
2224   my $attrs = $self->{attrs};
2225   if (!defined $attrs->{page}) {
2226     $self->throw_exception("Can't create pager for non-paged rs");
2227   }
2228   elsif ($attrs->{page} <= 0) {
2229     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2230   }
2231   $attrs->{rows} ||= 10;
2232
2233   # throw away the paging flags and re-run the count (possibly
2234   # with a subselect) to get the real total count
2235   my $count_attrs = { %$attrs };
2236   delete $count_attrs->{$_} for qw/rows offset page pager/;
2237   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2238
2239
2240 ### the following may seem awkward and dirty, but it's a thought-experiment
2241 ### necessary for future development of DBIx::DS. Do *NOT* change this code
2242 ### before talking to ribasushi/mst
2243
2244   require Data::Page;
2245   my $pager = Data::Page->new(
2246     0,  #start with an empty set
2247     $attrs->{rows},
2248     $self->{attrs}{page},
2249   );
2250
2251   my $data_slot = 'total_entries';
2252
2253   # Since we are interested in a cached value (once it's set - it's set), every
2254   # technique will detach from the magic-host once the time comes to fire the
2255   # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
2256
2257   if ($] < 5.008003) {
2258     # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
2259     # to weakref the magic container :(
2260     # tested on 5.8.1
2261     tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
2262       { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
2263     );
2264   }
2265   elsif ($] < 5.010) {
2266     # We can use magic on the hash value slot. It's interesting that the magic is
2267     # attached to the hash-slot, and does *not* stop working once I do the dummy
2268     # assignments after the cast()
2269     # tested on 5.8.3 and 5.8.9
2270     my $magic = $mk_lazy_count_wizard->($total_rs);
2271     Variable::Magic::cast ( $pager->{$data_slot}, $magic );
2272
2273     # this is for fun and giggles
2274     $pager->{$data_slot} = -1;
2275     $pager->{$data_slot} = 0;
2276
2277     # this does not work for scalars, but works with
2278     # uvar magic below
2279     #my %vals = %$pager;
2280     #%$pager = ();
2281     #%{$pager} = %vals;
2282   }
2283   else {
2284     # And the uvar magic
2285     # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
2286     # however see the wizard maker for more notes
2287     my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
2288     Variable::Magic::cast ( %$pager, $magic );
2289
2290     # still works
2291     $pager->{$data_slot} = -1;
2292     $pager->{$data_slot} = 0;
2293
2294     # this now works
2295     my %vals = %$pager;
2296     %$pager = ();
2297     %{$pager} = %vals;
2298   }
2299
2300   return $self->{pager} = $pager;
2301 }
2302
2303 =head2 page
2304
2305 =over 4
2306
2307 =item Arguments: $page_number
2308
2309 =item Return Value: $rs
2310
2311 =back
2312
2313 Returns a resultset for the $page_number page of the resultset on which page
2314 is called, where each page contains a number of rows equal to the 'rows'
2315 attribute set on the resultset (10 by default).
2316
2317 =cut
2318
2319 sub page {
2320   my ($self, $page) = @_;
2321   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2322 }
2323
2324 =head2 new_result
2325
2326 =over 4
2327
2328 =item Arguments: \%vals
2329
2330 =item Return Value: $rowobject
2331
2332 =back
2333
2334 Creates a new row object in the resultset's result class and returns
2335 it. The row is not inserted into the database at this point, call
2336 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2337 will tell you whether the row object has been inserted or not.
2338
2339 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2340
2341 =cut
2342
2343 sub new_result {
2344   my ($self, $values) = @_;
2345   $self->throw_exception( "new_result needs a hash" )
2346     unless (ref $values eq 'HASH');
2347
2348   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2349
2350   my %new = (
2351     %$merged_cond,
2352     @$cols_from_relations
2353       ? (-cols_from_relations => $cols_from_relations)
2354       : (),
2355     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2356   );
2357
2358   return $self->result_class->new(\%new);
2359 }
2360
2361 # _merge_with_rscond
2362 #
2363 # Takes a simple hash of K/V data and returns its copy merged with the
2364 # condition already present on the resultset. Additionally returns an
2365 # arrayref of value/condition names, which were inferred from related
2366 # objects (this is needed for in-memory related objects)
2367 sub _merge_with_rscond {
2368   my ($self, $data) = @_;
2369
2370   my (%new_data, @cols_from_relations);
2371
2372   my $alias = $self->{attrs}{alias};
2373
2374   if (! defined $self->{cond}) {
2375     # just massage $data below
2376   }
2377   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2378     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2379     @cols_from_relations = keys %new_data;
2380   }
2381   elsif (ref $self->{cond} ne 'HASH') {
2382     $self->throw_exception(
2383       "Can't abstract implicit construct, resultset condition not a hash"
2384     );
2385   }
2386   else {
2387     # precendence must be given to passed values over values inherited from
2388     # the cond, so the order here is important.
2389     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2390     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2391
2392     while ( my($col, $value) = each %implied ) {
2393       my $vref = ref $value;
2394       if (
2395         $vref eq 'HASH'
2396           and
2397         keys(%$value) == 1
2398           and
2399         (keys %$value)[0] eq '='
2400       ) {
2401         $new_data{$col} = $value->{'='};
2402       }
2403       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2404         $new_data{$col} = $value;
2405       }
2406     }
2407   }
2408
2409   %new_data = (
2410     %new_data,
2411     %{ $self->_remove_alias($data, $alias) },
2412   );
2413
2414   return (\%new_data, \@cols_from_relations);
2415 }
2416
2417 # _has_resolved_attr
2418 #
2419 # determines if the resultset defines at least one
2420 # of the attributes supplied
2421 #
2422 # used to determine if a subquery is neccessary
2423 #
2424 # supports some virtual attributes:
2425 #   -join
2426 #     This will scan for any joins being present on the resultset.
2427 #     It is not a mere key-search but a deep inspection of {from}
2428 #
2429
2430 sub _has_resolved_attr {
2431   my ($self, @attr_names) = @_;
2432
2433   my $attrs = $self->_resolved_attrs;
2434
2435   my %extra_checks;
2436
2437   for my $n (@attr_names) {
2438     if (grep { $n eq $_ } (qw/-join/) ) {
2439       $extra_checks{$n}++;
2440       next;
2441     }
2442
2443     my $attr =  $attrs->{$n};
2444
2445     next if not defined $attr;
2446
2447     if (ref $attr eq 'HASH') {
2448       return 1 if keys %$attr;
2449     }
2450     elsif (ref $attr eq 'ARRAY') {
2451       return 1 if @$attr;
2452     }
2453     else {
2454       return 1 if $attr;
2455     }
2456   }
2457
2458   # a resolved join is expressed as a multi-level from
2459   return 1 if (
2460     $extra_checks{-join}
2461       and
2462     ref $attrs->{from} eq 'ARRAY'
2463       and
2464     @{$attrs->{from}} > 1
2465   );
2466
2467   return 0;
2468 }
2469
2470 # _collapse_cond
2471 #
2472 # Recursively collapse the condition.
2473
2474 sub _collapse_cond {
2475   my ($self, $cond, $collapsed) = @_;
2476
2477   $collapsed ||= {};
2478
2479   if (ref $cond eq 'ARRAY') {
2480     foreach my $subcond (@$cond) {
2481       next unless ref $subcond;  # -or
2482       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2483     }
2484   }
2485   elsif (ref $cond eq 'HASH') {
2486     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2487       foreach my $subcond (@{$cond->{-and}}) {
2488         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2489       }
2490     }
2491     else {
2492       foreach my $col (keys %$cond) {
2493         my $value = $cond->{$col};
2494         $collapsed->{$col} = $value;
2495       }
2496     }
2497   }
2498
2499   return $collapsed;
2500 }
2501
2502 # _remove_alias
2503 #
2504 # Remove the specified alias from the specified query hash. A copy is made so
2505 # the original query is not modified.
2506
2507 sub _remove_alias {
2508   my ($self, $query, $alias) = @_;
2509
2510   my %orig = %{ $query || {} };
2511   my %unaliased;
2512
2513   foreach my $key (keys %orig) {
2514     if ($key !~ /\./) {
2515       $unaliased{$key} = $orig{$key};
2516       next;
2517     }
2518     $unaliased{$1} = $orig{$key}
2519       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2520   }
2521
2522   return \%unaliased;
2523 }
2524
2525 =head2 as_query
2526
2527 =over 4
2528
2529 =item Arguments: none
2530
2531 =item Return Value: \[ $sql, @bind ]
2532
2533 =back
2534
2535 Returns the SQL query and bind vars associated with the invocant.
2536
2537 This is generally used as the RHS for a subquery.
2538
2539 =cut
2540
2541 sub as_query {
2542   my $self = shift;
2543
2544   my $attrs = $self->_resolved_attrs_copy;
2545
2546   # For future use:
2547   #
2548   # in list ctx:
2549   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2550   # $sql also has no wrapping parenthesis in list ctx
2551   #
2552   my $sqlbind = $self->result_source->storage
2553     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2554
2555   return $sqlbind;
2556 }
2557
2558 =head2 find_or_new
2559
2560 =over 4
2561
2562 =item Arguments: \%vals, \%attrs?
2563
2564 =item Return Value: $rowobject
2565
2566 =back
2567
2568   my $artist = $schema->resultset('Artist')->find_or_new(
2569     { artist => 'fred' }, { key => 'artists' });
2570
2571   $cd->cd_to_producer->find_or_new({ producer => $producer },
2572                                    { key => 'primary });
2573
2574 Find an existing record from this resultset using L</find>. if none exists,
2575 instantiate a new result object and return it. The object will not be saved
2576 into your storage until you call L<DBIx::Class::Row/insert> on it.
2577
2578 You most likely want this method when looking for existing rows using a unique
2579 constraint that is not the primary key, or looking for related rows.
2580
2581 If you want objects to be saved immediately, use L</find_or_create> instead.
2582
2583 B<Note>: Make sure to read the documentation of L</find> and understand the
2584 significance of the C<key> attribute, as its lack may skew your search, and
2585 subsequently result in spurious new objects.
2586
2587 B<Note>: Take care when using C<find_or_new> with a table having
2588 columns with default values that you intend to be automatically
2589 supplied by the database (e.g. an auto_increment primary key column).
2590 In normal usage, the value of such columns should NOT be included at
2591 all in the call to C<find_or_new>, even when set to C<undef>.
2592
2593 =cut
2594
2595 sub find_or_new {
2596   my $self     = shift;
2597   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2598   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2599   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2600     return $row;
2601   }
2602   return $self->new_result($hash);
2603 }
2604
2605 =head2 create
2606
2607 =over 4
2608
2609 =item Arguments: \%vals
2610
2611 =item Return Value: a L<DBIx::Class::Row> $object
2612
2613 =back
2614
2615 Attempt to create a single new row or a row with multiple related rows
2616 in the table represented by the resultset (and related tables). This
2617 will not check for duplicate rows before inserting, use
2618 L</find_or_create> to do that.
2619
2620 To create one row for this resultset, pass a hashref of key/value
2621 pairs representing the columns of the table and the values you wish to
2622 store. If the appropriate relationships are set up, foreign key fields
2623 can also be passed an object representing the foreign row, and the
2624 value will be set to its primary key.
2625
2626 To create related objects, pass a hashref of related-object column values
2627 B<keyed on the relationship name>. If the relationship is of type C<multi>
2628 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2629 The process will correctly identify columns holding foreign keys, and will
2630 transparently populate them from the keys of the corresponding relation.
2631 This can be applied recursively, and will work correctly for a structure
2632 with an arbitrary depth and width, as long as the relationships actually
2633 exists and the correct column data has been supplied.
2634
2635
2636 Instead of hashrefs of plain related data (key/value pairs), you may
2637 also pass new or inserted objects. New objects (not inserted yet, see
2638 L</new>), will be inserted into their appropriate tables.
2639
2640 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2641
2642 Example of creating a new row.
2643
2644   $person_rs->create({
2645     name=>"Some Person",
2646     email=>"somebody@someplace.com"
2647   });
2648
2649 Example of creating a new row and also creating rows in a related C<has_many>
2650 or C<has_one> resultset.  Note Arrayref.
2651
2652   $artist_rs->create(
2653      { artistid => 4, name => 'Manufactured Crap', cds => [
2654         { title => 'My First CD', year => 2006 },
2655         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2656       ],
2657      },
2658   );
2659
2660 Example of creating a new row and also creating a row in a related
2661 C<belongs_to> resultset. Note Hashref.
2662
2663   $cd_rs->create({
2664     title=>"Music for Silly Walks",
2665     year=>2000,
2666     artist => {
2667       name=>"Silly Musician",
2668     }
2669   });
2670
2671 =over
2672
2673 =item WARNING
2674
2675 When subclassing ResultSet never attempt to override this method. Since
2676 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2677 lot of the internals simply never call it, so your override will be
2678 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2679 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2680 L</create> process you need to intervene.
2681
2682 =back
2683
2684 =cut
2685
2686 sub create {
2687   my ($self, $attrs) = @_;
2688   $self->throw_exception( "create needs a hashref" )
2689     unless ref $attrs eq 'HASH';
2690   return $self->new_result($attrs)->insert;
2691 }
2692
2693 =head2 find_or_create
2694
2695 =over 4
2696
2697 =item Arguments: \%vals, \%attrs?
2698
2699 =item Return Value: $rowobject
2700
2701 =back
2702
2703   $cd->cd_to_producer->find_or_create({ producer => $producer },
2704                                       { key => 'primary' });
2705
2706 Tries to find a record based on its primary key or unique constraints; if none
2707 is found, creates one and returns that instead.
2708
2709   my $cd = $schema->resultset('CD')->find_or_create({
2710     cdid   => 5,
2711     artist => 'Massive Attack',
2712     title  => 'Mezzanine',
2713     year   => 2005,
2714   });
2715
2716 Also takes an optional C<key> attribute, to search by a specific key or unique
2717 constraint. For example:
2718
2719   my $cd = $schema->resultset('CD')->find_or_create(
2720     {
2721       artist => 'Massive Attack',
2722       title  => 'Mezzanine',
2723     },
2724     { key => 'cd_artist_title' }
2725   );
2726
2727 B<Note>: Make sure to read the documentation of L</find> and understand the
2728 significance of the C<key> attribute, as its lack may skew your search, and
2729 subsequently result in spurious row creation.
2730
2731 B<Note>: Because find_or_create() reads from the database and then
2732 possibly inserts based on the result, this method is subject to a race
2733 condition. Another process could create a record in the table after
2734 the find has completed and before the create has started. To avoid
2735 this problem, use find_or_create() inside a transaction.
2736
2737 B<Note>: Take care when using C<find_or_create> with a table having
2738 columns with default values that you intend to be automatically
2739 supplied by the database (e.g. an auto_increment primary key column).
2740 In normal usage, the value of such columns should NOT be included at
2741 all in the call to C<find_or_create>, even when set to C<undef>.
2742
2743 See also L</find> and L</update_or_create>. For information on how to declare
2744 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2745
2746 =cut
2747
2748 sub find_or_create {
2749   my $self     = shift;
2750   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2751   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2752   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2753     return $row;
2754   }
2755   return $self->create($hash);
2756 }
2757
2758 =head2 update_or_create
2759
2760 =over 4
2761
2762 =item Arguments: \%col_values, { key => $unique_constraint }?
2763
2764 =item Return Value: $row_object
2765
2766 =back
2767
2768   $resultset->update_or_create({ col => $val, ... });
2769
2770 Like L</find_or_create>, but if a row is found it is immediately updated via
2771 C<< $found_row->update (\%col_values) >>.
2772
2773
2774 Takes an optional C<key> attribute to search on a specific unique constraint.
2775 For example:
2776
2777   # In your application
2778   my $cd = $schema->resultset('CD')->update_or_create(
2779     {
2780       artist => 'Massive Attack',
2781       title  => 'Mezzanine',
2782       year   => 1998,
2783     },
2784     { key => 'cd_artist_title' }
2785   );
2786
2787   $cd->cd_to_producer->update_or_create({
2788     producer => $producer,
2789     name => 'harry',
2790   }, {
2791     key => 'primary',
2792   });
2793
2794 B<Note>: Make sure to read the documentation of L</find> and understand the
2795 significance of the C<key> attribute, as its lack may skew your search, and
2796 subsequently result in spurious row creation.
2797
2798 B<Note>: Take care when using C<update_or_create> with a table having
2799 columns with default values that you intend to be automatically
2800 supplied by the database (e.g. an auto_increment primary key column).
2801 In normal usage, the value of such columns should NOT be included at
2802 all in the call to C<update_or_create>, even when set to C<undef>.
2803
2804 See also L</find> and L</find_or_create>. For information on how to declare
2805 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2806
2807 =cut
2808
2809 sub update_or_create {
2810   my $self = shift;
2811   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2812   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2813
2814   my $row = $self->find($cond, $attrs);
2815   if (defined $row) {
2816     $row->update($cond);
2817     return $row;
2818   }
2819
2820   return $self->create($cond);
2821 }
2822
2823 =head2 update_or_new
2824
2825 =over 4
2826
2827 =item Arguments: \%col_values, { key => $unique_constraint }?
2828
2829 =item Return Value: $rowobject
2830
2831 =back
2832
2833   $resultset->update_or_new({ col => $val, ... });
2834
2835 Like L</find_or_new> but if a row is found it is immediately updated via
2836 C<< $found_row->update (\%col_values) >>.
2837
2838 For example:
2839
2840   # In your application
2841   my $cd = $schema->resultset('CD')->update_or_new(
2842     {
2843       artist => 'Massive Attack',
2844       title  => 'Mezzanine',
2845       year   => 1998,
2846     },
2847     { key => 'cd_artist_title' }
2848   );
2849
2850   if ($cd->in_storage) {
2851       # the cd was updated
2852   }
2853   else {
2854       # the cd is not yet in the database, let's insert it
2855       $cd->insert;
2856   }
2857
2858 B<Note>: Make sure to read the documentation of L</find> and understand the
2859 significance of the C<key> attribute, as its lack may skew your search, and
2860 subsequently result in spurious new objects.
2861
2862 B<Note>: Take care when using C<update_or_new> with a table having
2863 columns with default values that you intend to be automatically
2864 supplied by the database (e.g. an auto_increment primary key column).
2865 In normal usage, the value of such columns should NOT be included at
2866 all in the call to C<update_or_new>, even when set to C<undef>.
2867
2868 See also L</find>, L</find_or_create> and L</find_or_new>. 
2869
2870 =cut
2871
2872 sub update_or_new {
2873     my $self  = shift;
2874     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2875     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2876
2877     my $row = $self->find( $cond, $attrs );
2878     if ( defined $row ) {
2879         $row->update($cond);
2880         return $row;
2881     }
2882
2883     return $self->new_result($cond);
2884 }
2885
2886 =head2 get_cache
2887
2888 =over 4
2889
2890 =item Arguments: none
2891
2892 =item Return Value: \@cache_objects | undef
2893
2894 =back
2895
2896 Gets the contents of the cache for the resultset, if the cache is set.
2897
2898 The cache is populated either by using the L</prefetch> attribute to
2899 L</search> or by calling L</set_cache>.
2900
2901 =cut
2902
2903 sub get_cache {
2904   shift->{all_cache};
2905 }
2906
2907 =head2 set_cache
2908
2909 =over 4
2910
2911 =item Arguments: \@cache_objects
2912
2913 =item Return Value: \@cache_objects
2914
2915 =back
2916
2917 Sets the contents of the cache for the resultset. Expects an arrayref
2918 of objects of the same class as those produced by the resultset. Note that
2919 if the cache is set the resultset will return the cached objects rather
2920 than re-querying the database even if the cache attr is not set.
2921
2922 The contents of the cache can also be populated by using the
2923 L</prefetch> attribute to L</search>.
2924
2925 =cut
2926
2927 sub set_cache {
2928   my ( $self, $data ) = @_;
2929   $self->throw_exception("set_cache requires an arrayref")
2930       if defined($data) && (ref $data ne 'ARRAY');
2931   $self->{all_cache} = $data;
2932 }
2933
2934 =head2 clear_cache
2935
2936 =over 4
2937
2938 =item Arguments: none
2939
2940 =item Return Value: undef
2941
2942 =back
2943
2944 Clears the cache for the resultset.
2945
2946 =cut
2947
2948 sub clear_cache {
2949   shift->set_cache(undef);
2950 }
2951
2952 =head2 is_paged
2953
2954 =over 4
2955
2956 =item Arguments: none
2957
2958 =item Return Value: true, if the resultset has been paginated
2959
2960 =back
2961
2962 =cut
2963
2964 sub is_paged {
2965   my ($self) = @_;
2966   return !!$self->{attrs}{page};
2967 }
2968
2969 =head2 is_ordered
2970
2971 =over 4
2972
2973 =item Arguments: none
2974
2975 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2976
2977 =back
2978
2979 =cut
2980
2981 sub is_ordered {
2982   my ($self) = @_;
2983   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2984 }
2985
2986 =head2 related_resultset
2987
2988 =over 4
2989
2990 =item Arguments: $relationship_name
2991
2992 =item Return Value: $resultset
2993
2994 =back
2995
2996 Returns a related resultset for the supplied relationship name.
2997
2998   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2999
3000 =cut
3001
3002 sub related_resultset {
3003   my ($self, $rel) = @_;
3004
3005   $self->{related_resultsets} ||= {};
3006   return $self->{related_resultsets}{$rel} ||= do {
3007     my $rsrc = $self->result_source;
3008     my $rel_info = $rsrc->relationship_info($rel);
3009
3010     $self->throw_exception(
3011       "search_related: result source '" . $rsrc->source_name .
3012         "' has no such relationship $rel")
3013       unless $rel_info;
3014
3015     my $attrs = $self->_chain_relationship($rel);
3016
3017     my $join_count = $attrs->{seen_join}{$rel};
3018
3019     my $alias = $self->result_source->storage
3020         ->relname_to_table_alias($rel, $join_count);
3021
3022     # since this is search_related, and we already slid the select window inwards
3023     # (the select/as attrs were deleted in the beginning), we need to flip all
3024     # left joins to inner, so we get the expected results
3025     # read the comment on top of the actual function to see what this does
3026     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
3027
3028
3029     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
3030     delete @{$attrs}{qw(result_class alias)};
3031
3032     my $new_cache;
3033
3034     if (my $cache = $self->get_cache) {
3035       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
3036         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
3037                         @$cache ];
3038       }
3039     }
3040
3041     my $rel_source = $rsrc->related_source($rel);
3042
3043     my $new = do {
3044
3045       # The reason we do this now instead of passing the alias to the
3046       # search_rs below is that if you wrap/overload resultset on the
3047       # source you need to know what alias it's -going- to have for things
3048       # to work sanely (e.g. RestrictWithObject wants to be able to add
3049       # extra query restrictions, and these may need to be $alias.)
3050
3051       my $rel_attrs = $rel_source->resultset_attributes;
3052       local $rel_attrs->{alias} = $alias;
3053
3054       $rel_source->resultset
3055                  ->search_rs(
3056                      undef, {
3057                        %$attrs,
3058                        where => $attrs->{where},
3059                    });
3060     };
3061     $new->set_cache($new_cache) if $new_cache;
3062     $new;
3063   };
3064 }
3065
3066 =head2 current_source_alias
3067
3068 =over 4
3069
3070 =item Arguments: none
3071
3072 =item Return Value: $source_alias
3073
3074 =back
3075
3076 Returns the current table alias for the result source this resultset is built
3077 on, that will be used in the SQL query. Usually it is C<me>.
3078
3079 Currently the source alias that refers to the result set returned by a
3080 L</search>/L</find> family method depends on how you got to the resultset: it's
3081 C<me> by default, but eg. L</search_related> aliases it to the related result
3082 source name (and keeps C<me> referring to the original result set). The long
3083 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3084 (and make this method unnecessary).
3085
3086 Thus it's currently necessary to use this method in predefined queries (see
3087 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3088 source alias of the current result set:
3089
3090   # in a result set class
3091   sub modified_by {
3092     my ($self, $user) = @_;
3093
3094     my $me = $self->current_source_alias;
3095
3096     return $self->search(
3097       "$me.modified" => $user->id,
3098     );
3099   }
3100
3101 =cut
3102
3103 sub current_source_alias {
3104   my ($self) = @_;
3105
3106   return ($self->{attrs} || {})->{alias} || 'me';
3107 }
3108
3109 =head2 as_subselect_rs
3110
3111 =over 4
3112
3113 =item Arguments: none
3114
3115 =item Return Value: $resultset
3116
3117 =back
3118
3119 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3120 "virtual view" by including it as a subquery within the from clause.  From this
3121 point on, any joined tables are inaccessible to ->search on the resultset (as if
3122 it were simply where-filtered without joins).  For example:
3123
3124  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3125
3126  # 'x' now pollutes the query namespace
3127
3128  # So the following works as expected
3129  my $ok_rs = $rs->search({'x.other' => 1});
3130
3131  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3132  # def) we look for one row with contradictory terms and join in another table
3133  # (aliased 'x_2') which we never use
3134  my $broken_rs = $rs->search({'x.name' => 'def'});
3135
3136  my $rs2 = $rs->as_subselect_rs;
3137
3138  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3139  my $not_joined_rs = $rs2->search({'x.other' => 1});
3140
3141  # works as expected: finds a 'table' row related to two x rows (abc and def)
3142  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3143
3144 Another example of when one might use this would be to select a subset of
3145 columns in a group by clause:
3146
3147  my $rs = $schema->resultset('Bar')->search(undef, {
3148    group_by => [qw{ id foo_id baz_id }],
3149  })->as_subselect_rs->search(undef, {
3150    columns => [qw{ id foo_id }]
3151  });
3152
3153 In the above example normally columns would have to be equal to the group by,
3154 but because we isolated the group by into a subselect the above works.
3155
3156 =cut
3157
3158 sub as_subselect_rs {
3159   my $self = shift;
3160
3161   my $attrs = $self->_resolved_attrs;
3162
3163   my $fresh_rs = (ref $self)->new (
3164     $self->result_source
3165   );
3166
3167   # these pieces will be locked in the subquery
3168   delete $fresh_rs->{cond};
3169   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3170
3171   return $fresh_rs->search( {}, {
3172     from => [{
3173       $attrs->{alias} => $self->as_query,
3174       -alias  => $attrs->{alias},
3175       -rsrc   => $self->result_source,
3176     }],
3177     alias => $attrs->{alias},
3178   });
3179 }
3180
3181 # This code is called by search_related, and makes sure there
3182 # is clear separation between the joins before, during, and
3183 # after the relationship. This information is needed later
3184 # in order to properly resolve prefetch aliases (any alias
3185 # with a relation_chain_depth less than the depth of the
3186 # current prefetch is not considered)
3187 #
3188 # The increments happen twice per join. An even number means a
3189 # relationship specified via a search_related, whereas an odd
3190 # number indicates a join/prefetch added via attributes
3191 #
3192 # Also this code will wrap the current resultset (the one we
3193 # chain to) in a subselect IFF it contains limiting attributes
3194 sub _chain_relationship {
3195   my ($self, $rel) = @_;
3196   my $source = $self->result_source;
3197   my $attrs = { %{$self->{attrs}||{}} };
3198
3199   # we need to take the prefetch the attrs into account before we
3200   # ->_resolve_join as otherwise they get lost - captainL
3201   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3202
3203   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3204
3205   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3206
3207   my $from;
3208   my @force_subq_attrs = qw/offset rows group_by having/;
3209
3210   if (
3211     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3212       ||
3213     $self->_has_resolved_attr (@force_subq_attrs)
3214   ) {
3215     # Nuke the prefetch (if any) before the new $rs attrs
3216     # are resolved (prefetch is useless - we are wrapping
3217     # a subquery anyway).
3218     my $rs_copy = $self->search;
3219     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3220       $rs_copy->{attrs}{join},
3221       delete $rs_copy->{attrs}{prefetch},
3222     );
3223
3224     $from = [{
3225       -rsrc   => $source,
3226       -alias  => $attrs->{alias},
3227       $attrs->{alias} => $rs_copy->as_query,
3228     }];
3229     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3230     $seen->{-relation_chain_depth} = 0;
3231   }
3232   elsif ($attrs->{from}) {  #shallow copy suffices
3233     $from = [ @{$attrs->{from}} ];
3234   }
3235   else {
3236     $from = [{
3237       -rsrc  => $source,
3238       -alias => $attrs->{alias},
3239       $attrs->{alias} => $source->from,
3240     }];
3241   }
3242
3243   my $jpath = ($seen->{-relation_chain_depth})
3244     ? $from->[-1][0]{-join_path}
3245     : [];
3246
3247   my @requested_joins = $source->_resolve_join(
3248     $join,
3249     $attrs->{alias},
3250     $seen,
3251     $jpath,
3252   );
3253
3254   push @$from, @requested_joins;
3255
3256   $seen->{-relation_chain_depth}++;
3257
3258   # if $self already had a join/prefetch specified on it, the requested
3259   # $rel might very well be already included. What we do in this case
3260   # is effectively a no-op (except that we bump up the chain_depth on
3261   # the join in question so we could tell it *is* the search_related)
3262   my $already_joined;
3263
3264   # we consider the last one thus reverse
3265   for my $j (reverse @requested_joins) {
3266     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3267     if ($rel eq $last_j) {
3268       $j->[0]{-relation_chain_depth}++;
3269       $already_joined++;
3270       last;
3271     }
3272   }
3273
3274   unless ($already_joined) {
3275     push @$from, $source->_resolve_join(
3276       $rel,
3277       $attrs->{alias},
3278       $seen,
3279       $jpath,
3280     );
3281   }
3282
3283   $seen->{-relation_chain_depth}++;
3284
3285   return {%$attrs, from => $from, seen_join => $seen};
3286 }
3287
3288 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3289 sub _resolved_attrs_copy {
3290   my $self = shift;
3291   return { %{$self->_resolved_attrs (@_)} };
3292 }
3293
3294 sub _resolved_attrs {
3295   my $self = shift;
3296   return $self->{_attrs} if $self->{_attrs};
3297
3298   my $attrs  = { %{ $self->{attrs} || {} } };
3299   my $source = $self->result_source;
3300   my $alias  = $attrs->{alias};
3301
3302   # default selection list
3303   $attrs->{columns} = [ $source->columns ]
3304     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3305
3306   # merge selectors together
3307   for (qw/columns select as/) {
3308     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3309       if $attrs->{$_} or $attrs->{"+$_"};
3310   }
3311
3312   # disassemble columns
3313   my (@sel, @as);
3314   if (my $cols = delete $attrs->{columns}) {
3315     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3316       if (ref $c eq 'HASH') {
3317         for my $as (keys %$c) {
3318           push @sel, $c->{$as};
3319           push @as, $as;
3320         }
3321       }
3322       else {
3323         push @sel, $c;
3324         push @as, $c;
3325       }
3326     }
3327   }
3328
3329   # when trying to weed off duplicates later do not go past this point -
3330   # everything added from here on is unbalanced "anyone's guess" stuff
3331   my $dedup_stop_idx = $#as;
3332
3333   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3334     if $attrs->{as};
3335   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3336     if $attrs->{select};
3337
3338   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3339   for (@sel) {
3340     $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_";
3341   }
3342
3343   # disqualify all $alias.col as-bits (collapser mandated)
3344   for (@as) {
3345     $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_;
3346   }
3347
3348   # de-duplicate the result (remove *identical* select/as pairs)
3349   # and also die on duplicate {as} pointing to different {select}s
3350   # not using a c-style for as the condition is prone to shrinkage
3351   my $seen;
3352   my $i = 0;
3353   while ($i <= $dedup_stop_idx) {
3354     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3355       splice @sel, $i, 1;
3356       splice @as, $i, 1;
3357       $dedup_stop_idx--;
3358     }
3359     elsif ($seen->{$as[$i]}++) {
3360       $self->throw_exception(
3361         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3362       );
3363     }
3364     else {
3365       $i++;
3366     }
3367   }
3368
3369   $attrs->{select} = \@sel;
3370   $attrs->{as} = \@as;
3371
3372   $attrs->{from} ||= [{
3373     -rsrc   => $source,
3374     -alias  => $self->{attrs}{alias},
3375     $self->{attrs}{alias} => $source->from,
3376   }];
3377
3378   if ( $attrs->{join} || $attrs->{prefetch} ) {
3379
3380     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3381       if ref $attrs->{from} ne 'ARRAY';
3382
3383     my $join = (delete $attrs->{join}) || {};
3384
3385     if ( defined $attrs->{prefetch} ) {
3386       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3387     }
3388
3389     $attrs->{from} =    # have to copy here to avoid corrupting the original
3390       [
3391         @{ $attrs->{from} },
3392         $source->_resolve_join(
3393           $join,
3394           $alias,
3395           { %{ $attrs->{seen_join} || {} } },
3396           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3397             ? $attrs->{from}[-1][0]{-join_path}
3398             : []
3399           ,
3400         )
3401       ];
3402   }
3403
3404   if ( defined $attrs->{order_by} ) {
3405     $attrs->{order_by} = (
3406       ref( $attrs->{order_by} ) eq 'ARRAY'
3407       ? [ @{ $attrs->{order_by} } ]
3408       : [ $attrs->{order_by} || () ]
3409     );
3410   }
3411
3412   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3413     $attrs->{group_by} = [ $attrs->{group_by} ];
3414   }
3415
3416   # generate the distinct induced group_by early, as prefetch will be carried via a
3417   # subquery (since a group_by is present)
3418   if (delete $attrs->{distinct}) {
3419     if ($attrs->{group_by}) {
3420       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3421     }
3422     else {
3423       # distinct affects only the main selection part, not what prefetch may
3424       # add below.
3425       $attrs->{group_by} = $source->storage->_group_over_selection (
3426         $attrs->{from},
3427         $attrs->{select},
3428         $attrs->{order_by},
3429       );
3430     }
3431   }
3432
3433   $attrs->{collapse} ||= {};
3434   if ($attrs->{prefetch}) {
3435
3436     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3437       if $attrs->{_dark_selector};
3438
3439     my $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} );
3440
3441     my $prefetch_ordering = [];
3442
3443     # this is a separate structure (we don't look in {from} directly)
3444     # as the resolver needs to shift things off the lists to work
3445     # properly (identical-prefetches on different branches)
3446     my $join_map = {};
3447     if (ref $attrs->{from} eq 'ARRAY') {
3448
3449       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3450
3451       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3452         next unless $j->[0]{-alias};
3453         next unless $j->[0]{-join_path};
3454         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3455
3456         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3457
3458         my $p = $join_map;
3459         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3460         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3461       }
3462     }
3463
3464     my @prefetch =
3465       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3466
3467     # we need to somehow mark which columns came from prefetch
3468     if (@prefetch) {
3469       my $sel_end = $#{$attrs->{select}};
3470       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3471     }
3472
3473     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3474     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3475
3476     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3477     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3478   }
3479
3480
3481   # if both page and offset are specified, produce a combined offset
3482   # even though it doesn't make much sense, this is what pre 081xx has
3483   # been doing
3484   if (my $page = delete $attrs->{page}) {
3485     $attrs->{offset} =
3486       ($attrs->{rows} * ($page - 1))
3487             +
3488       ($attrs->{offset} || 0)
3489     ;
3490   }
3491
3492   return $self->{_attrs} = $attrs;
3493 }
3494
3495 sub _rollout_attr {
3496   my ($self, $attr) = @_;
3497
3498   if (ref $attr eq 'HASH') {
3499     return $self->_rollout_hash($attr);
3500   } elsif (ref $attr eq 'ARRAY') {
3501     return $self->_rollout_array($attr);
3502   } else {
3503     return [$attr];
3504   }
3505 }
3506
3507 sub _rollout_array {
3508   my ($self, $attr) = @_;
3509
3510   my @rolled_array;
3511   foreach my $element (@{$attr}) {
3512     if (ref $element eq 'HASH') {
3513       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3514     } elsif (ref $element eq 'ARRAY') {
3515       #  XXX - should probably recurse here
3516       push( @rolled_array, @{$self->_rollout_array($element)} );
3517     } else {
3518       push( @rolled_array, $element );
3519     }
3520   }
3521   return \@rolled_array;
3522 }
3523
3524 sub _rollout_hash {
3525   my ($self, $attr) = @_;
3526
3527   my @rolled_array;
3528   foreach my $key (keys %{$attr}) {
3529     push( @rolled_array, { $key => $attr->{$key} } );
3530   }
3531   return \@rolled_array;
3532 }
3533
3534 sub _calculate_score {
3535   my ($self, $a, $b) = @_;
3536
3537   if (defined $a xor defined $b) {
3538     return 0;
3539   }
3540   elsif (not defined $a) {
3541     return 1;
3542   }
3543
3544   if (ref $b eq 'HASH') {
3545     my ($b_key) = keys %{$b};
3546     if (ref $a eq 'HASH') {
3547       my ($a_key) = keys %{$a};
3548       if ($a_key eq $b_key) {
3549         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3550       } else {
3551         return 0;
3552       }
3553     } else {
3554       return ($a eq $b_key) ? 1 : 0;
3555     }
3556   } else {
3557     if (ref $a eq 'HASH') {
3558       my ($a_key) = keys %{$a};
3559       return ($b eq $a_key) ? 1 : 0;
3560     } else {
3561       return ($b eq $a) ? 1 : 0;
3562     }
3563   }
3564 }
3565
3566 sub _merge_joinpref_attr {
3567   my ($self, $orig, $import) = @_;
3568
3569   return $import unless defined($orig);
3570   return $orig unless defined($import);
3571
3572   $orig = $self->_rollout_attr($orig);
3573   $import = $self->_rollout_attr($import);
3574
3575   my $seen_keys;
3576   foreach my $import_element ( @{$import} ) {
3577     # find best candidate from $orig to merge $b_element into
3578     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3579     foreach my $orig_element ( @{$orig} ) {
3580       my $score = $self->_calculate_score( $orig_element, $import_element );
3581       if ($score > $best_candidate->{score}) {
3582         $best_candidate->{position} = $position;
3583         $best_candidate->{score} = $score;
3584       }
3585       $position++;
3586     }
3587     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3588
3589     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3590       push( @{$orig}, $import_element );
3591     } else {
3592       my $orig_best = $orig->[$best_candidate->{position}];
3593       # merge orig_best and b_element together and replace original with merged
3594       if (ref $orig_best ne 'HASH') {
3595         $orig->[$best_candidate->{position}] = $import_element;
3596       } elsif (ref $import_element eq 'HASH') {
3597         my ($key) = keys %{$orig_best};
3598         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3599       }
3600     }
3601     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3602   }
3603
3604   return $orig;
3605 }
3606
3607 {
3608   my $hm;
3609
3610   sub _merge_attr {
3611     $hm ||= do {
3612       require Hash::Merge;
3613       my $hm = Hash::Merge->new;
3614
3615       $hm->specify_behavior({
3616         SCALAR => {
3617           SCALAR => sub {
3618             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3619
3620             if ($defl xor $defr) {
3621               return [ $defl ? $_[0] : $_[1] ];
3622             }
3623             elsif (! $defl) {
3624               return [];
3625             }
3626             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3627               return [ $_[0] ];
3628             }
3629             else {
3630               return [$_[0], $_[1]];
3631             }
3632           },
3633           ARRAY => sub {
3634             return $_[1] if !defined $_[0];
3635             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3636             return [$_[0], @{$_[1]}]
3637           },
3638           HASH  => sub {
3639             return [] if !defined $_[0] and !keys %{$_[1]};
3640             return [ $_[1] ] if !defined $_[0];
3641             return [ $_[0] ] if !keys %{$_[1]};
3642             return [$_[0], $_[1]]
3643           },
3644         },
3645         ARRAY => {
3646           SCALAR => sub {
3647             return $_[0] if !defined $_[1];
3648             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3649             return [@{$_[0]}, $_[1]]
3650           },
3651           ARRAY => sub {
3652             my @ret = @{$_[0]} or return $_[1];
3653             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3654             my %idx = map { $_ => 1 } @ret;
3655             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3656             \@ret;
3657           },
3658           HASH => sub {
3659             return [ $_[1] ] if ! @{$_[0]};
3660             return $_[0] if !keys %{$_[1]};
3661             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3662             return [ @{$_[0]}, $_[1] ];
3663           },
3664         },
3665         HASH => {
3666           SCALAR => sub {
3667             return [] if !keys %{$_[0]} and !defined $_[1];
3668             return [ $_[0] ] if !defined $_[1];
3669             return [ $_[1] ] if !keys %{$_[0]};
3670             return [$_[0], $_[1]]
3671           },
3672           ARRAY => sub {
3673             return [] if !keys %{$_[0]} and !@{$_[1]};
3674             return [ $_[0] ] if !@{$_[1]};
3675             return $_[1] if !keys %{$_[0]};
3676             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3677             return [ $_[0], @{$_[1]} ];
3678           },
3679           HASH => sub {
3680             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3681             return [ $_[0] ] if !keys %{$_[1]};
3682             return [ $_[1] ] if !keys %{$_[0]};
3683             return [ $_[0] ] if $_[0] eq $_[1];
3684             return [ $_[0], $_[1] ];
3685           },
3686         }
3687       } => 'DBIC_RS_ATTR_MERGER');
3688       $hm;
3689     };
3690
3691     return $hm->merge ($_[1], $_[2]);
3692   }
3693 }
3694
3695 sub STORABLE_freeze {
3696   my ($self, $cloning) = @_;
3697   my $to_serialize = { %$self };
3698
3699   # A cursor in progress can't be serialized (and would make little sense anyway)
3700   delete $to_serialize->{cursor};
3701
3702   Storable::nfreeze($to_serialize);
3703 }
3704
3705 # need this hook for symmetry
3706 sub STORABLE_thaw {
3707   my ($self, $cloning, $serialized) = @_;
3708
3709   %$self = %{ Storable::thaw($serialized) };
3710
3711   $self;
3712 }
3713
3714
3715 =head2 throw_exception
3716
3717 See L<DBIx::Class::Schema/throw_exception> for details.
3718
3719 =cut
3720
3721 sub throw_exception {
3722   my $self=shift;
3723
3724   if (ref $self and my $rsrc = $self->result_source) {
3725     $rsrc->throw_exception(@_)
3726   }
3727   else {
3728     DBIx::Class::Exception->throw(@_);
3729   }
3730 }
3731
3732 # XXX: FIXME: Attributes docs need clearing up
3733
3734 =head1 ATTRIBUTES
3735
3736 Attributes are used to refine a ResultSet in various ways when
3737 searching for data. They can be passed to any method which takes an
3738 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3739 L</count>.
3740
3741 These are in no particular order:
3742
3743 =head2 order_by
3744
3745 =over 4
3746
3747 =item Value: ( $order_by | \@order_by | \%order_by )
3748
3749 =back
3750
3751 Which column(s) to order the results by.
3752
3753 [The full list of suitable values is documented in
3754 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3755 common options.]
3756
3757 If a single column name, or an arrayref of names is supplied, the
3758 argument is passed through directly to SQL. The hashref syntax allows
3759 for connection-agnostic specification of ordering direction:
3760
3761  For descending order:
3762
3763   order_by => { -desc => [qw/col1 col2 col3/] }
3764
3765  For explicit ascending order:
3766
3767   order_by => { -asc => 'col' }
3768
3769 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3770 supported, although you are strongly encouraged to use the hashref
3771 syntax as outlined above.
3772
3773 =head2 columns
3774
3775 =over 4
3776
3777 =item Value: \@columns
3778
3779 =back
3780
3781 Shortcut to request a particular set of columns to be retrieved. Each
3782 column spec may be a string (a table column name), or a hash (in which
3783 case the key is the C<as> value, and the value is used as the C<select>
3784 expression). Adds C<me.> onto the start of any column without a C<.> in
3785 it and sets C<select> from that, then auto-populates C<as> from
3786 C<select> as normal. (You may also use the C<cols> attribute, as in
3787 earlier versions of DBIC.)
3788
3789 Essentially C<columns> does the same as L</select> and L</as>.
3790
3791     columns => [ 'foo', { bar => 'baz' } ]
3792
3793 is the same as
3794
3795     select => [qw/foo baz/],
3796     as => [qw/foo bar/]
3797
3798 =head2 +columns
3799
3800 =over 4
3801
3802 =item Value: \@columns
3803
3804 =back
3805
3806 Indicates additional columns to be selected from storage. Works the same
3807 as L</columns> but adds columns to the selection. (You may also use the
3808 C<include_columns> attribute, as in earlier versions of DBIC). For
3809 example:-
3810
3811   $schema->resultset('CD')->search(undef, {
3812     '+columns' => ['artist.name'],
3813     join => ['artist']
3814   });
3815
3816 would return all CDs and include a 'name' column to the information
3817 passed to object inflation. Note that the 'artist' is the name of the
3818 column (or relationship) accessor, and 'name' is the name of the column
3819 accessor in the related table.
3820
3821 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3822 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3823 unary plus operator before it.
3824
3825 =head2 include_columns
3826
3827 =over 4
3828
3829 =item Value: \@columns
3830
3831 =back
3832
3833 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3834
3835 =head2 select
3836
3837 =over 4
3838
3839 =item Value: \@select_columns
3840
3841 =back
3842
3843 Indicates which columns should be selected from the storage. You can use
3844 column names, or in the case of RDBMS back ends, function or stored procedure
3845 names:
3846
3847   $rs = $schema->resultset('Employee')->search(undef, {
3848     select => [
3849       'name',
3850       { count => 'employeeid' },
3851       { max => { length => 'name' }, -as => 'longest_name' }
3852     ]
3853   });
3854
3855   # Equivalent SQL
3856   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3857
3858 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3859 use L</select>, to instruct DBIx::Class how to store the result of the column.
3860 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3861 identifier aliasing. You can however alias a function, so you can use it in
3862 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3863 attribute> supplied as shown in the example above.
3864
3865 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3866 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3867 unary plus operator before it.
3868
3869 =head2 +select
3870
3871 =over 4
3872
3873 Indicates additional columns to be selected from storage.  Works the same as
3874 L</select> but adds columns to the default selection, instead of specifying
3875 an explicit list.
3876
3877 =back
3878
3879 =head2 +as
3880
3881 =over 4
3882
3883 Indicates additional column names for those added via L</+select>. See L</as>.
3884
3885 =back
3886
3887 =head2 as
3888
3889 =over 4
3890
3891 =item Value: \@inflation_names
3892
3893 =back
3894
3895 Indicates column names for object inflation. That is L</as> indicates the
3896 slot name in which the column value will be stored within the
3897 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3898 identifier by the C<get_column> method (or via the object accessor B<if one
3899 with the same name already exists>) as shown below. The L</as> attribute has
3900 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3901
3902   $rs = $schema->resultset('Employee')->search(undef, {
3903     select => [
3904       'name',
3905       { count => 'employeeid' },
3906       { max => { length => 'name' }, -as => 'longest_name' }
3907     ],
3908     as => [qw/
3909       name
3910       employee_count
3911       max_name_length
3912     /],
3913   });
3914
3915 If the object against which the search is performed already has an accessor
3916 matching a column name specified in C<as>, the value can be retrieved using
3917 the accessor as normal:
3918
3919   my $name = $employee->name();
3920
3921 If on the other hand an accessor does not exist in the object, you need to
3922 use C<get_column> instead:
3923
3924   my $employee_count = $employee->get_column('employee_count');
3925
3926 You can create your own accessors if required - see
3927 L<DBIx::Class::Manual::Cookbook> for details.
3928
3929 =head2 join
3930
3931 =over 4
3932
3933 =item Value: ($rel_name | \@rel_names | \%rel_names)
3934
3935 =back
3936
3937 Contains a list of relationships that should be joined for this query.  For
3938 example:
3939
3940   # Get CDs by Nine Inch Nails
3941   my $rs = $schema->resultset('CD')->search(
3942     { 'artist.name' => 'Nine Inch Nails' },
3943     { join => 'artist' }
3944   );
3945
3946 Can also contain a hash reference to refer to the other relation's relations.
3947 For example:
3948
3949   package MyApp::Schema::Track;
3950   use base qw/DBIx::Class/;
3951   __PACKAGE__->table('track');
3952   __PACKAGE__->add_columns(qw/trackid cd position title/);
3953   __PACKAGE__->set_primary_key('trackid');
3954   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3955   1;
3956
3957   # In your application
3958   my $rs = $schema->resultset('Artist')->search(
3959     { 'track.title' => 'Teardrop' },
3960     {
3961       join     => { cd => 'track' },
3962       order_by => 'artist.name',
3963     }
3964   );
3965
3966 You need to use the relationship (not the table) name in  conditions,
3967 because they are aliased as such. The current table is aliased as "me", so
3968 you need to use me.column_name in order to avoid ambiguity. For example:
3969
3970   # Get CDs from 1984 with a 'Foo' track
3971   my $rs = $schema->resultset('CD')->search(
3972     {
3973       'me.year' => 1984,
3974       'tracks.name' => 'Foo'
3975     },
3976     { join => 'tracks' }
3977   );
3978
3979 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3980 similarly for a third time). For e.g.
3981
3982   my $rs = $schema->resultset('Artist')->search({
3983     'cds.title'   => 'Down to Earth',
3984     'cds_2.title' => 'Popular',
3985   }, {
3986     join => [ qw/cds cds/ ],
3987   });
3988
3989 will return a set of all artists that have both a cd with title 'Down
3990 to Earth' and a cd with title 'Popular'.
3991
3992 If you want to fetch related objects from other tables as well, see C<prefetch>
3993 below.
3994
3995 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3996
3997 =head2 prefetch
3998
3999 =over 4
4000
4001 =item Value: ($rel_name | \@rel_names | \%rel_names)
4002
4003 =back
4004
4005 Contains one or more relationships that should be fetched along with
4006 the main query (when they are accessed afterwards the data will
4007 already be available, without extra queries to the database).  This is
4008 useful for when you know you will need the related objects, because it
4009 saves at least one query:
4010
4011   my $rs = $schema->resultset('Tag')->search(
4012     undef,
4013     {
4014       prefetch => {
4015         cd => 'artist'
4016       }
4017     }
4018   );
4019
4020 The initial search results in SQL like the following:
4021
4022   SELECT tag.*, cd.*, artist.* FROM tag
4023   JOIN cd ON tag.cd = cd.cdid
4024   JOIN artist ON cd.artist = artist.artistid
4025
4026 L<DBIx::Class> has no need to go back to the database when we access the
4027 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4028 case.
4029
4030 Simple prefetches will be joined automatically, so there is no need
4031 for a C<join> attribute in the above search.
4032
4033 L</prefetch> can be used with the any of the relationship types and
4034 multiple prefetches can be specified together. Below is a more complex
4035 example that prefetches a CD's artist, its liner notes (if present),
4036 the cover image, the tracks on that cd, and the guests on those
4037 tracks.
4038
4039  # Assuming:
4040  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4041  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4042  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4043  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4044
4045  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4046
4047  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4048
4049
4050  my $rs = $schema->resultset('CD')->search(
4051    undef,
4052    {
4053      prefetch => [
4054        { artist => 'record_label'},  # belongs_to => belongs_to
4055        'liner_note',                 # might_have
4056        'cover_image',                # has_one
4057        { tracks => 'guests' },       # has_many => has_many
4058      ]
4059    }
4060  );
4061
4062 This will produce SQL like the following:
4063
4064  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4065         tracks.*, guests.*
4066    FROM cd me
4067    JOIN artist artist
4068      ON artist.artistid = me.artistid
4069    JOIN record_label record_label
4070      ON record_label.labelid = artist.labelid
4071    LEFT JOIN track tracks
4072      ON tracks.cdid = me.cdid
4073    LEFT JOIN guest guests
4074      ON guests.trackid = track.trackid
4075    LEFT JOIN liner_notes liner_note
4076      ON liner_note.cdid = me.cdid
4077    JOIN cd_artwork cover_image
4078      ON cover_image.cdid = me.cdid
4079  ORDER BY tracks.cd
4080
4081 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4082 C<tracks>, and C<guests> of the CD will all be available through the
4083 relationship accessors without the need for additional queries to the
4084 database.
4085
4086 However, there is one caveat to be observed: it can be dangerous to
4087 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4088 relationship on a given level. e.g.:
4089
4090  my $rs = $schema->resultset('CD')->search(
4091    undef,
4092    {
4093      prefetch => [
4094        'tracks',                         # has_many
4095        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4096      ]
4097    }
4098  );
4099
4100 In fact, C<DBIx::Class> will emit the following warning:
4101
4102  Prefetching multiple has_many rels tracks and cd_to_producer at top
4103  level will explode the number of row objects retrievable via ->next
4104  or ->all. Use at your own risk.
4105
4106 The collapser currently can't identify duplicate tuples for multiple
4107 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4108 result the second L<has_many|DBIx::Class::Relationship/has_many>
4109 relation could contain redundant objects.
4110
4111 =head3 Using L</prefetch> with L</join>
4112
4113 L</prefetch> implies a L</join> with the equivalent argument, and is
4114 properly merged with any existing L</join> specification. So the
4115 following:
4116
4117   my $rs = $schema->resultset('CD')->search(
4118    {'record_label.name' => 'Music Product Ltd.'},
4119    {
4120      join     => {artist => 'record_label'},
4121      prefetch => 'artist',
4122    }
4123  );
4124
4125 ... will work, searching on the record label's name, but only
4126 prefetching the C<artist>.
4127
4128 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4129
4130 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4131 prefetched relations.  So given:
4132
4133   my $rs = $schema->resultset('CD')->search(
4134    undef,
4135    {
4136      select   => ['cd.title'],
4137      as       => ['cd_title'],
4138      prefetch => 'artist',
4139    }
4140  );
4141
4142 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4143 becomes: C<'cd_title', 'artist.*'>.
4144
4145 =head3 CAVEATS
4146
4147 Prefetch does a lot of deep magic. As such, it may not behave exactly
4148 as you might expect.
4149
4150 =over 4
4151
4152 =item *
4153
4154 Prefetch uses the L</cache> to populate the prefetched relationships. This
4155 may or may not be what you want.
4156
4157 =item *
4158
4159 If you specify a condition on a prefetched relationship, ONLY those
4160 rows that match the prefetched condition will be fetched into that relationship.
4161 This means that adding prefetch to a search() B<may alter> what is returned by
4162 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4163
4164   my $artist_rs = $schema->resultset('Artist')->search({
4165       'cds.year' => 2008,
4166   }, {
4167       join => 'cds',
4168   });
4169
4170   my $count = $artist_rs->first->cds->count;
4171
4172   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4173
4174   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4175
4176   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4177
4178 that cmp_ok() may or may not pass depending on the datasets involved. This
4179 behavior may or may not survive the 0.09 transition.
4180
4181 =back
4182
4183 =head2 page
4184
4185 =over 4
4186
4187 =item Value: $page
4188
4189 =back
4190
4191 Makes the resultset paged and specifies the page to retrieve. Effectively
4192 identical to creating a non-pages resultset and then calling ->page($page)
4193 on it.
4194
4195 If L</rows> attribute is not specified it defaults to 10 rows per page.
4196
4197 When you have a paged resultset, L</count> will only return the number
4198 of rows in the page. To get the total, use the L</pager> and call
4199 C<total_entries> on it.
4200
4201 =head2 rows
4202
4203 =over 4
4204
4205 =item Value: $rows
4206
4207 =back
4208
4209 Specifies the maximum number of rows for direct retrieval or the number of
4210 rows per page if the page attribute or method is used.
4211
4212 =head2 offset
4213
4214 =over 4
4215
4216 =item Value: $offset
4217
4218 =back
4219
4220 Specifies the (zero-based) row number for the  first row to be returned, or the
4221 of the first row of the first page if paging is used.
4222
4223 =head2 group_by
4224
4225 =over 4
4226
4227 =item Value: \@columns
4228
4229 =back
4230
4231 A arrayref of columns to group by. Can include columns of joined tables.
4232
4233   group_by => [qw/ column1 column2 ... /]
4234
4235 =head2 having
4236
4237 =over 4
4238
4239 =item Value: $condition
4240
4241 =back
4242
4243 HAVING is a select statement attribute that is applied between GROUP BY and
4244 ORDER BY. It is applied to the after the grouping calculations have been
4245 done.
4246
4247   having => { 'count_employee' => { '>=', 100 } }
4248
4249 or with an in-place function in which case literal SQL is required:
4250
4251   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4252
4253 =head2 distinct
4254
4255 =over 4
4256
4257 =item Value: (0 | 1)
4258
4259 =back
4260
4261 Set to 1 to group by all columns. If the resultset already has a group_by
4262 attribute, this setting is ignored and an appropriate warning is issued.
4263
4264 =head2 where
4265
4266 =over 4
4267
4268 Adds to the WHERE clause.
4269
4270   # only return rows WHERE deleted IS NULL for all searches
4271   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4272
4273 Can be overridden by passing C<< { where => undef } >> as an attribute
4274 to a resultset.
4275
4276 =back
4277
4278 =head2 cache
4279
4280 Set to 1 to cache search results. This prevents extra SQL queries if you
4281 revisit rows in your ResultSet:
4282
4283   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4284
4285   while( my $artist = $resultset->next ) {
4286     ... do stuff ...
4287   }
4288
4289   $rs->first; # without cache, this would issue a query
4290
4291 By default, searches are not cached.
4292
4293 For more examples of using these attributes, see
4294 L<DBIx::Class::Manual::Cookbook>.
4295
4296 =head2 for
4297
4298 =over 4
4299
4300 =item Value: ( 'update' | 'shared' )
4301
4302 =back
4303
4304 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4305 ... FOR SHARED.
4306
4307 =cut
4308
4309 1;