Fix stupid oversight in update_all
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Test::Deep::NoTest 'eq_deeply';
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 EXAMPLES
78
79 =head2 Chaining resultsets
80
81 Let's say you've got a query that needs to be run to return some data
82 to the user. But, you have an authorization system in place that
83 prevents certain users from seeing certain information. So, you want
84 to construct the basic query in one method, but add constraints to it in
85 another.
86
87   sub get_data {
88     my $self = shift;
89     my $request = $self->get_request; # Get a request object somehow.
90     my $schema = $self->result_source->schema;
91
92     my $cd_rs = $schema->resultset('CD')->search({
93       title => $request->param('title'),
94       year => $request->param('year'),
95     });
96
97     $cd_rs = $self->apply_security_policy( $cd_rs );
98
99     return $cd_rs->all();
100   }
101
102   sub apply_security_policy {
103     my $self = shift;
104     my ($rs) = @_;
105
106     return $rs->search({
107       subversive => 0,
108     });
109   }
110
111 =head3 Resolving conditions and attributes
112
113 When a resultset is chained from another resultset, conditions and
114 attributes with the same keys need resolving.
115
116 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
117 into the existing ones from the original resultset.
118
119 The L</where> and L</having> attributes, and any search conditions, are
120 merged with an SQL C<AND> to the existing condition from the original
121 resultset.
122
123 All other attributes are overridden by any new ones supplied in the
124 search attributes.
125
126 =head2 Multiple queries
127
128 Since a resultset just defines a query, you can do all sorts of
129 things with it with the same object.
130
131   # Don't hit the DB yet.
132   my $cd_rs = $schema->resultset('CD')->search({
133     title => 'something',
134     year => 2009,
135   });
136
137   # Each of these hits the DB individually.
138   my $count = $cd_rs->count;
139   my $most_recent = $cd_rs->get_column('date_released')->max();
140   my @records = $cd_rs->all;
141
142 And it's not just limited to SELECT statements.
143
144   $cd_rs->delete();
145
146 This is even cooler:
147
148   $cd_rs->create({ artist => 'Fred' });
149
150 Which is the same as:
151
152   $schema->resultset('CD')->create({
153     title => 'something',
154     year => 2009,
155     artist => 'Fred'
156   });
157
158 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
159
160 =head1 METHODS
161
162 =head2 new
163
164 =over 4
165
166 =item Arguments: $source, \%$attrs
167
168 =item Return Value: $rs
169
170 =back
171
172 The resultset constructor. Takes a source object (usually a
173 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
174 L</ATTRIBUTES> below).  Does not perform any queries -- these are
175 executed as needed by the other methods.
176
177 Generally you won't need to construct a resultset manually.  You'll
178 automatically get one from e.g. a L</search> called in scalar context:
179
180   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
181
182 IMPORTANT: If called on an object, proxies to new_result instead so
183
184   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
185
186 will return a CD object, not a ResultSet.
187
188 =cut
189
190 sub new {
191   my $class = shift;
192   return $class->new_result(@_) if ref $class;
193
194   my ($source, $attrs) = @_;
195   $source = $source->resolve
196     if $source->isa('DBIx::Class::ResultSourceHandle');
197   $attrs = { %{$attrs||{}} };
198
199   if ($attrs->{page}) {
200     $attrs->{rows} ||= 10;
201   }
202
203   $attrs->{alias} ||= 'me';
204
205   my $self = bless {
206     result_source => $source,
207     cond => $attrs->{where},
208     pager => undef,
209     attrs => $attrs,
210   }, $class;
211
212   # if there is a dark selector, this means we are already in a
213   # chain and the cleanup/sanification was taken care of by
214   # _search_rs already
215   $self->_normalize_selection($attrs)
216     unless $attrs->{_dark_selector};
217
218   $self->result_class(
219     $attrs->{result_class} || $source->result_class
220   );
221
222   $self;
223 }
224
225 =head2 search
226
227 =over 4
228
229 =item Arguments: $cond, \%attrs?
230
231 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
232
233 =back
234
235   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
236   my $new_rs = $cd_rs->search({ year => 2005 });
237
238   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
239                  # year = 2005 OR year = 2004
240
241 In list context, C<< ->all() >> is called implicitly on the resultset, thus
242 returning a list of row objects instead. To avoid that, use L</search_rs>.
243
244 If you need to pass in additional attributes but no additional condition,
245 call it as C<search(undef, \%attrs)>.
246
247   # "SELECT name, artistid FROM $artist_table"
248   my @all_artists = $schema->resultset('Artist')->search(undef, {
249     columns => [qw/name artistid/],
250   });
251
252 For a list of attributes that can be passed to C<search>, see
253 L</ATTRIBUTES>. For more examples of using this function, see
254 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
255 documentation for the first argument, see L<SQL::Abstract>
256 and its extension L<DBIx::Class::SQLMaker>.
257
258 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
259
260 =head3 CAVEAT
261
262 Note that L</search> does not process/deflate any of the values passed in the
263 L<SQL::Abstract>-compatible search condition structure. This is unlike other
264 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
265 manually that any value passed to this method will stringify to something the
266 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
267 objects, for more info see:
268 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
269
270 =cut
271
272 sub search {
273   my $self = shift;
274   my $rs = $self->search_rs( @_ );
275
276   if (wantarray) {
277     return $rs->all;
278   }
279   elsif (defined wantarray) {
280     return $rs;
281   }
282   else {
283     # we can be called by a relationship helper, which in
284     # turn may be called in void context due to some braindead
285     # overload or whatever else the user decided to be clever
286     # at this particular day. Thus limit the exception to
287     # external code calls only
288     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
289       if (caller)[0] !~ /^\QDBIx::Class::/;
290
291     return ();
292   }
293 }
294
295 =head2 search_rs
296
297 =over 4
298
299 =item Arguments: $cond, \%attrs?
300
301 =item Return Value: $resultset
302
303 =back
304
305 This method does the same exact thing as search() except it will
306 always return a resultset, even in list context.
307
308 =cut
309
310 sub search_rs {
311   my $self = shift;
312
313   # Special-case handling for (undef, undef).
314   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
315     @_ = ();
316   }
317
318   my $call_attrs = {};
319   if (@_ > 1) {
320     if (ref $_[-1] eq 'HASH') {
321       # copy for _normalize_selection
322       $call_attrs = { %{ pop @_ } };
323     }
324     elsif (! defined $_[-1] ) {
325       pop @_;   # search({}, undef)
326     }
327   }
328
329   # see if we can keep the cache (no $rs changes)
330   my $cache;
331   my %safe = (alias => 1, cache => 1);
332   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
333     ! defined $_[0]
334       or
335     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
336       or
337     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
338   )) {
339     $cache = $self->get_cache;
340   }
341
342   my $rsrc = $self->result_source;
343
344   my $old_attrs = { %{$self->{attrs}} };
345   my $old_having = delete $old_attrs->{having};
346   my $old_where = delete $old_attrs->{where};
347
348   my $new_attrs = { %$old_attrs };
349
350   # take care of call attrs (only if anything is changing)
351   if (keys %$call_attrs) {
352
353     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
354
355     # reset the current selector list if new selectors are supplied
356     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
357       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
358     }
359
360     # Normalize the new selector list (operates on the passed-in attr structure)
361     # Need to do it on every chain instead of only once on _resolved_attrs, in
362     # order to allow detection of empty vs partial 'as'
363     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
364       if $old_attrs->{_dark_selector};
365     $self->_normalize_selection ($call_attrs);
366
367     # start with blind overwriting merge, exclude selector attrs
368     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
369     delete @{$new_attrs}{@selector_attrs};
370
371     for (@selector_attrs) {
372       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
373         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
374     }
375
376     # older deprecated name, use only if {columns} is not there
377     if (my $c = delete $new_attrs->{cols}) {
378       if ($new_attrs->{columns}) {
379         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
380       }
381       else {
382         $new_attrs->{columns} = $c;
383       }
384     }
385
386
387     # join/prefetch use their own crazy merging heuristics
388     foreach my $key (qw/join prefetch/) {
389       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
390         if exists $call_attrs->{$key};
391     }
392
393     # stack binds together
394     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
395   }
396
397
398   # rip apart the rest of @_, parse a condition
399   my $call_cond = do {
400
401     if (ref $_[0] eq 'HASH') {
402       (keys %{$_[0]}) ? $_[0] : undef
403     }
404     elsif (@_ == 1) {
405       $_[0]
406     }
407     elsif (@_ % 2) {
408       $self->throw_exception('Odd number of arguments to search')
409     }
410     else {
411       +{ @_ }
412     }
413
414   } if @_;
415
416   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
417     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
418   }
419
420   for ($old_where, $call_cond) {
421     if (defined $_) {
422       $new_attrs->{where} = $self->_stack_cond (
423         $_, $new_attrs->{where}
424       );
425     }
426   }
427
428   if (defined $old_having) {
429     $new_attrs->{having} = $self->_stack_cond (
430       $old_having, $new_attrs->{having}
431     )
432   }
433
434   my $rs = (ref $self)->new($rsrc, $new_attrs);
435
436   $rs->set_cache($cache) if ($cache);
437
438   return $rs;
439 }
440
441 my $dark_sel_dumper;
442 sub _normalize_selection {
443   my ($self, $attrs) = @_;
444
445   # legacy syntax
446   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
447     if exists $attrs->{include_columns};
448
449   # columns are always placed first, however 
450
451   # Keep the X vs +X separation until _resolved_attrs time - this allows to
452   # delay the decision on whether to use a default select list ($rsrc->columns)
453   # allowing stuff like the remove_columns helper to work
454   #
455   # select/as +select/+as pairs need special handling - the amount of select/as
456   # elements in each pair does *not* have to be equal (think multicolumn
457   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
458   # supplied at all) - try to infer the alias, either from the -as parameter
459   # of the selector spec, or use the parameter whole if it looks like a column
460   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
461   # is ok as well), but make sure no more additions to the 'as' chain take place
462   for my $pref ('', '+') {
463
464     my ($sel, $as) = map {
465       my $key = "${pref}${_}";
466
467       my $val = [ ref $attrs->{$key} eq 'ARRAY'
468         ? @{$attrs->{$key}}
469         : $attrs->{$key} || ()
470       ];
471       delete $attrs->{$key};
472       $val;
473     } qw/select as/;
474
475     if (! @$as and ! @$sel ) {
476       next;
477     }
478     elsif (@$as and ! @$sel) {
479       $self->throw_exception(
480         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
481       );
482     }
483     elsif( ! @$as ) {
484       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
485       # if any @$as has been supplied we assume the user knows what (s)he is doing
486       # and blindly keep stacking up pieces
487       unless ($attrs->{_dark_selector}) {
488         SELECTOR:
489         for (@$sel) {
490           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
491             push @$as, $_->{-as};
492           }
493           # assume any plain no-space, no-parenthesis string to be a column spec
494           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
495           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
496             push @$as, $_;
497           }
498           # if all else fails - raise a flag that no more aliasing will be allowed
499           else {
500             $attrs->{_dark_selector} = {
501               plus_stage => $pref,
502               string => ($dark_sel_dumper ||= do {
503                   require Data::Dumper::Concise;
504                   Data::Dumper::Concise::DumperObject()->Indent(0);
505                 })->Values([$_])->Dump
506               ,
507             };
508             last SELECTOR;
509           }
510         }
511       }
512     }
513     elsif (@$as < @$sel) {
514       $self->throw_exception(
515         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
516       );
517     }
518     elsif ($pref and $attrs->{_dark_selector}) {
519       $self->throw_exception(
520         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
521       );
522     }
523
524
525     # merge result
526     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
527     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
528   }
529 }
530
531 sub _stack_cond {
532   my ($self, $left, $right) = @_;
533
534   # collapse single element top-level conditions
535   # (single pass only, unlikely to need recursion)
536   for ($left, $right) {
537     if (ref $_ eq 'ARRAY') {
538       if (@$_ == 0) {
539         $_ = undef;
540       }
541       elsif (@$_ == 1) {
542         $_ = $_->[0];
543       }
544     }
545     elsif (ref $_ eq 'HASH') {
546       my ($first, $more) = keys %$_;
547
548       # empty hash
549       if (! defined $first) {
550         $_ = undef;
551       }
552       # one element hash
553       elsif (! defined $more) {
554         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
555           $_ = $_->{'-and'};
556         }
557         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
558           $_ = $_->{'-or'};
559         }
560       }
561     }
562   }
563
564   # merge hashes with weeding out of duplicates (simple cases only)
565   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
566
567     # shallow copy to destroy
568     $right = { %$right };
569     for (grep { exists $right->{$_} } keys %$left) {
570       # the use of eq_deeply here is justified - the rhs of an
571       # expression can contain a lot of twisted weird stuff
572       delete $right->{$_} if eq_deeply( $left->{$_}, $right->{$_} );
573     }
574
575     $right = undef unless keys %$right;
576   }
577
578
579   if (defined $left xor defined $right) {
580     return defined $left ? $left : $right;
581   }
582   elsif (! defined $left) {
583     return undef;
584   }
585   else {
586     return { -and => [ $left, $right ] };
587   }
588 }
589
590 =head2 search_literal
591
592 =over 4
593
594 =item Arguments: $sql_fragment, @bind_values
595
596 =item Return Value: $resultset (scalar context) || @row_objs (list context)
597
598 =back
599
600   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
601   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
602
603 Pass a literal chunk of SQL to be added to the conditional part of the
604 resultset query.
605
606 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
607 only be used in that context. C<search_literal> is a convenience method.
608 It is equivalent to calling $schema->search(\[]), but if you want to ensure
609 columns are bound correctly, use C<search>.
610
611 Example of how to use C<search> instead of C<search_literal>
612
613   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
614   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
615
616
617 See L<DBIx::Class::Manual::Cookbook/Searching> and
618 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
619 require C<search_literal>.
620
621 =cut
622
623 sub search_literal {
624   my ($self, $sql, @bind) = @_;
625   my $attr;
626   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
627     $attr = pop @bind;
628   }
629   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
630 }
631
632 =head2 find
633
634 =over 4
635
636 =item Arguments: \%columns_values | @pk_values, \%attrs?
637
638 =item Return Value: $row_object | undef
639
640 =back
641
642 Finds and returns a single row based on supplied criteria. Takes either a
643 hashref with the same format as L</create> (including inference of foreign
644 keys from related objects), or a list of primary key values in the same
645 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
646 declaration on the L</result_source>.
647
648 In either case an attempt is made to combine conditions already existing on
649 the resultset with the condition passed to this method.
650
651 To aid with preparing the correct query for the storage you may supply the
652 C<key> attribute, which is the name of a
653 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
654 unique constraint corresponding to the
655 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
656 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
657 to construct a query that satisfies the named unique constraint fully (
658 non-NULL values for each column member of the constraint) an exception is
659 thrown.
660
661 If no C<key> is specified, the search is carried over all unique constraints
662 which are fully defined by the available condition.
663
664 If no such constraint is found, C<find> currently defaults to a simple
665 C<< search->(\%column_values) >> which may or may not do what you expect.
666 Note that this fallback behavior may be deprecated in further versions. If
667 you need to search with arbitrary conditions - use L</search>. If the query
668 resulting from this fallback produces more than one row, a warning to the
669 effect is issued, though only the first row is constructed and returned as
670 C<$row_object>.
671
672 In addition to C<key>, L</find> recognizes and applies standard
673 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
674
675 Note that if you have extra concerns about the correctness of the resulting
676 query you need to specify the C<key> attribute and supply the entire condition
677 as an argument to find (since it is not always possible to perform the
678 combination of the resultset condition with the supplied one, especially if
679 the resultset condition contains literal sql).
680
681 For example, to find a row by its primary key:
682
683   my $cd = $schema->resultset('CD')->find(5);
684
685 You can also find a row by a specific unique constraint:
686
687   my $cd = $schema->resultset('CD')->find(
688     {
689       artist => 'Massive Attack',
690       title  => 'Mezzanine',
691     },
692     { key => 'cd_artist_title' }
693   );
694
695 See also L</find_or_create> and L</update_or_create>.
696
697 =cut
698
699 sub find {
700   my $self = shift;
701   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
702
703   my $rsrc = $self->result_source;
704
705   # Parse out the condition from input
706   my $call_cond;
707   if (ref $_[0] eq 'HASH') {
708     $call_cond = { %{$_[0]} };
709   }
710   else {
711     my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
712     my @c_cols = $rsrc->unique_constraint_columns($constraint);
713
714     $self->throw_exception(
715       "No constraint columns, maybe a malformed '$constraint' constraint?"
716     ) unless @c_cols;
717
718     $self->throw_exception (
719       'find() expects either a column/value hashref, or a list of values '
720     . "corresponding to the columns of the specified unique constraint '$constraint'"
721     ) unless @c_cols == @_;
722
723     $call_cond = {};
724     @{$call_cond}{@c_cols} = @_;
725   }
726
727   my %related;
728   for my $key (keys %$call_cond) {
729     if (
730       my $keyref = ref($call_cond->{$key})
731         and
732       my $relinfo = $rsrc->relationship_info($key)
733     ) {
734       my $val = delete $call_cond->{$key};
735
736       next if $keyref eq 'ARRAY'; # has_many for multi_create
737
738       my $rel_q = $rsrc->_resolve_condition(
739         $relinfo->{cond}, $val, $key, $key
740       );
741       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
742       @related{keys %$rel_q} = values %$rel_q;
743     }
744   }
745
746   # relationship conditions take precedence (?)
747   @{$call_cond}{keys %related} = values %related;
748
749   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
750   my $final_cond;
751   if (exists $attrs->{key}) {
752     $final_cond = $self->_qualify_cond_columns (
753
754       $self->_build_unique_cond (
755         $attrs->{key},
756         $call_cond,
757       ),
758
759       $alias,
760     );
761   }
762   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
763     # This means that we got here after a merger of relationship conditions
764     # in ::Relationship::Base::search_related (the row method), and furthermore
765     # the relationship is of the 'single' type. This means that the condition
766     # provided by the relationship (already attached to $self) is sufficient,
767     # as there can be only one row in the database that would satisfy the
768     # relationship
769   }
770   else {
771     # no key was specified - fall down to heuristics mode:
772     # run through all unique queries registered on the resultset, and
773     # 'OR' all qualifying queries together
774     my (@unique_queries, %seen_column_combinations);
775     for my $c_name ($rsrc->unique_constraint_names) {
776       next if $seen_column_combinations{
777         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
778       }++;
779
780       push @unique_queries, try {
781         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
782       } || ();
783     }
784
785     $final_cond = @unique_queries
786       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
787       : $self->_non_unique_find_fallback ($call_cond, $attrs)
788     ;
789   }
790
791   # Run the query, passing the result_class since it should propagate for find
792   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
793   if (keys %{$rs->_resolved_attrs->{collapse}}) {
794     my $row = $rs->next;
795     carp "Query returned more than one row" if $rs->next;
796     return $row;
797   }
798   else {
799     return $rs->single;
800   }
801 }
802
803 # This is a stop-gap method as agreed during the discussion on find() cleanup:
804 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
805 #
806 # It is invoked when find() is called in legacy-mode with insufficiently-unique
807 # condition. It is provided for overrides until a saner way forward is devised
808 #
809 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
810 # the road. Please adjust your tests accordingly to catch this situation early
811 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
812 #
813 # The method will not be removed without an adequately complete replacement
814 # for strict-mode enforcement
815 sub _non_unique_find_fallback {
816   my ($self, $cond, $attrs) = @_;
817
818   return $self->_qualify_cond_columns(
819     $cond,
820     exists $attrs->{alias}
821       ? $attrs->{alias}
822       : $self->{attrs}{alias}
823   );
824 }
825
826
827 sub _qualify_cond_columns {
828   my ($self, $cond, $alias) = @_;
829
830   my %aliased = %$cond;
831   for (keys %aliased) {
832     $aliased{"$alias.$_"} = delete $aliased{$_}
833       if $_ !~ /\./;
834   }
835
836   return \%aliased;
837 }
838
839 sub _build_unique_cond {
840   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
841
842   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
843
844   # combination may fail if $self->{cond} is non-trivial
845   my ($final_cond) = try {
846     $self->_merge_with_rscond ($extra_cond)
847   } catch {
848     +{ %$extra_cond }
849   };
850
851   # trim out everything not in $columns
852   $final_cond = { map {
853     exists $final_cond->{$_}
854       ? ( $_ => $final_cond->{$_} )
855       : ()
856   } @c_cols };
857
858   if (my @missing = grep
859     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
860     (@c_cols)
861   ) {
862     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
863       $constraint_name,
864       join (', ', map { "'$_'" } @missing),
865     ) );
866   }
867
868   if (
869     !$croak_on_null
870       and
871     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
872       and
873     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
874   ) {
875     carp_unique ( sprintf (
876       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
877     . 'values in column(s): %s). This is almost certainly not what you wanted, '
878     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
879       $constraint_name,
880       join (', ', map { "'$_'" } @undefs),
881     ));
882   }
883
884   return $final_cond;
885 }
886
887 =head2 search_related
888
889 =over 4
890
891 =item Arguments: $rel, $cond, \%attrs?
892
893 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
894
895 =back
896
897   $new_rs = $cd_rs->search_related('artist', {
898     name => 'Emo-R-Us',
899   });
900
901 Searches the specified relationship, optionally specifying a condition and
902 attributes for matching records. See L</ATTRIBUTES> for more information.
903
904 In list context, C<< ->all() >> is called implicitly on the resultset, thus
905 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
906
907 See also L</search_related_rs>.
908
909 =cut
910
911 sub search_related {
912   return shift->related_resultset(shift)->search(@_);
913 }
914
915 =head2 search_related_rs
916
917 This method works exactly the same as search_related, except that
918 it guarantees a resultset, even in list context.
919
920 =cut
921
922 sub search_related_rs {
923   return shift->related_resultset(shift)->search_rs(@_);
924 }
925
926 =head2 cursor
927
928 =over 4
929
930 =item Arguments: none
931
932 =item Return Value: $cursor
933
934 =back
935
936 Returns a storage-driven cursor to the given resultset. See
937 L<DBIx::Class::Cursor> for more information.
938
939 =cut
940
941 sub cursor {
942   my ($self) = @_;
943
944   my $attrs = $self->_resolved_attrs_copy;
945
946   return $self->{cursor}
947     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
948           $attrs->{where},$attrs);
949 }
950
951 =head2 single
952
953 =over 4
954
955 =item Arguments: $cond?
956
957 =item Return Value: $row_object | undef
958
959 =back
960
961   my $cd = $schema->resultset('CD')->single({ year => 2001 });
962
963 Inflates the first result without creating a cursor if the resultset has
964 any records in it; if not returns C<undef>. Used by L</find> as a lean version
965 of L</search>.
966
967 While this method can take an optional search condition (just like L</search>)
968 being a fast-code-path it does not recognize search attributes. If you need to
969 add extra joins or similar, call L</search> and then chain-call L</single> on the
970 L<DBIx::Class::ResultSet> returned.
971
972 =over
973
974 =item B<Note>
975
976 As of 0.08100, this method enforces the assumption that the preceding
977 query returns only one row. If more than one row is returned, you will receive
978 a warning:
979
980   Query returned more than one row
981
982 In this case, you should be using L</next> or L</find> instead, or if you really
983 know what you are doing, use the L</rows> attribute to explicitly limit the size
984 of the resultset.
985
986 This method will also throw an exception if it is called on a resultset prefetching
987 has_many, as such a prefetch implies fetching multiple rows from the database in
988 order to assemble the resulting object.
989
990 =back
991
992 =cut
993
994 sub single {
995   my ($self, $where) = @_;
996   if(@_ > 2) {
997       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
998   }
999
1000   my $attrs = $self->_resolved_attrs_copy;
1001
1002   if (keys %{$attrs->{collapse}}) {
1003     $self->throw_exception(
1004       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1005     );
1006   }
1007
1008   if ($where) {
1009     if (defined $attrs->{where}) {
1010       $attrs->{where} = {
1011         '-and' =>
1012             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1013                $where, delete $attrs->{where} ]
1014       };
1015     } else {
1016       $attrs->{where} = $where;
1017     }
1018   }
1019
1020   my @data = $self->result_source->storage->select_single(
1021     $attrs->{from}, $attrs->{select},
1022     $attrs->{where}, $attrs
1023   );
1024
1025   return (@data ? ($self->_construct_object(@data))[0] : undef);
1026 }
1027
1028
1029 # _collapse_query
1030 #
1031 # Recursively collapse the query, accumulating values for each column.
1032
1033 sub _collapse_query {
1034   my ($self, $query, $collapsed) = @_;
1035
1036   $collapsed ||= {};
1037
1038   if (ref $query eq 'ARRAY') {
1039     foreach my $subquery (@$query) {
1040       next unless ref $subquery;  # -or
1041       $collapsed = $self->_collapse_query($subquery, $collapsed);
1042     }
1043   }
1044   elsif (ref $query eq 'HASH') {
1045     if (keys %$query and (keys %$query)[0] eq '-and') {
1046       foreach my $subquery (@{$query->{-and}}) {
1047         $collapsed = $self->_collapse_query($subquery, $collapsed);
1048       }
1049     }
1050     else {
1051       foreach my $col (keys %$query) {
1052         my $value = $query->{$col};
1053         $collapsed->{$col}{$value}++;
1054       }
1055     }
1056   }
1057
1058   return $collapsed;
1059 }
1060
1061 =head2 get_column
1062
1063 =over 4
1064
1065 =item Arguments: $cond?
1066
1067 =item Return Value: $resultsetcolumn
1068
1069 =back
1070
1071   my $max_length = $rs->get_column('length')->max;
1072
1073 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1074
1075 =cut
1076
1077 sub get_column {
1078   my ($self, $column) = @_;
1079   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1080   return $new;
1081 }
1082
1083 =head2 search_like
1084
1085 =over 4
1086
1087 =item Arguments: $cond, \%attrs?
1088
1089 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1090
1091 =back
1092
1093   # WHERE title LIKE '%blue%'
1094   $cd_rs = $rs->search_like({ title => '%blue%'});
1095
1096 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1097 that this is simply a convenience method retained for ex Class::DBI users.
1098 You most likely want to use L</search> with specific operators.
1099
1100 For more information, see L<DBIx::Class::Manual::Cookbook>.
1101
1102 This method is deprecated and will be removed in 0.09. Use L</search()>
1103 instead. An example conversion is:
1104
1105   ->search_like({ foo => 'bar' });
1106
1107   # Becomes
1108
1109   ->search({ foo => { like => 'bar' } });
1110
1111 =cut
1112
1113 sub search_like {
1114   my $class = shift;
1115   carp_unique (
1116     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1117    .' Instead use ->search({ x => { -like => "y%" } })'
1118    .' (note the outer pair of {}s - they are important!)'
1119   );
1120   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1121   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1122   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1123   return $class->search($query, { %$attrs });
1124 }
1125
1126 =head2 slice
1127
1128 =over 4
1129
1130 =item Arguments: $first, $last
1131
1132 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1133
1134 =back
1135
1136 Returns a resultset or object list representing a subset of elements from the
1137 resultset slice is called on. Indexes are from 0, i.e., to get the first
1138 three records, call:
1139
1140   my ($one, $two, $three) = $rs->slice(0, 2);
1141
1142 =cut
1143
1144 sub slice {
1145   my ($self, $min, $max) = @_;
1146   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1147   $attrs->{offset} = $self->{attrs}{offset} || 0;
1148   $attrs->{offset} += $min;
1149   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1150   return $self->search(undef, $attrs);
1151   #my $slice = (ref $self)->new($self->result_source, $attrs);
1152   #return (wantarray ? $slice->all : $slice);
1153 }
1154
1155 =head2 next
1156
1157 =over 4
1158
1159 =item Arguments: none
1160
1161 =item Return Value: $result | undef
1162
1163 =back
1164
1165 Returns the next element in the resultset (C<undef> is there is none).
1166
1167 Can be used to efficiently iterate over records in the resultset:
1168
1169   my $rs = $schema->resultset('CD')->search;
1170   while (my $cd = $rs->next) {
1171     print $cd->title;
1172   }
1173
1174 Note that you need to store the resultset object, and call C<next> on it.
1175 Calling C<< resultset('Table')->next >> repeatedly will always return the
1176 first record from the resultset.
1177
1178 =cut
1179
1180 sub next {
1181   my ($self) = @_;
1182   if (my $cache = $self->get_cache) {
1183     $self->{all_cache_position} ||= 0;
1184     return $cache->[$self->{all_cache_position}++];
1185   }
1186   if ($self->{attrs}{cache}) {
1187     delete $self->{pager};
1188     $self->{all_cache_position} = 1;
1189     return ($self->all)[0];
1190   }
1191   if ($self->{stashed_objects}) {
1192     my $obj = shift(@{$self->{stashed_objects}});
1193     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
1194     return $obj;
1195   }
1196   my @row = (
1197     exists $self->{stashed_row}
1198       ? @{delete $self->{stashed_row}}
1199       : $self->cursor->next
1200   );
1201   return undef unless (@row);
1202   my ($row, @more) = $self->_construct_object(@row);
1203   $self->{stashed_objects} = \@more if @more;
1204   return $row;
1205 }
1206
1207 sub _construct_object {
1208   my ($self, @row) = @_;
1209
1210   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
1211     or return ();
1212   my @new = $self->result_class->inflate_result($self->result_source, @$info);
1213   @new = $self->{_attrs}{record_filter}->(@new)
1214     if exists $self->{_attrs}{record_filter};
1215   return @new;
1216 }
1217
1218 sub _collapse_result {
1219   my ($self, $as_proto, $row) = @_;
1220
1221   my @copy = @$row;
1222
1223   # 'foo'         => [ undef, 'foo' ]
1224   # 'foo.bar'     => [ 'foo', 'bar' ]
1225   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
1226
1227   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
1228
1229   my %collapse = %{$self->{_attrs}{collapse}||{}};
1230
1231   my @pri_index;
1232
1233   # if we're doing collapsing (has_many prefetch) we need to grab records
1234   # until the PK changes, so fill @pri_index. if not, we leave it empty so
1235   # we know we don't have to bother.
1236
1237   # the reason for not using the collapse stuff directly is because if you
1238   # had for e.g. two artists in a row with no cds, the collapse info for
1239   # both would be NULL (undef) so you'd lose the second artist
1240
1241   # store just the index so we can check the array positions from the row
1242   # without having to contruct the full hash
1243
1244   if (keys %collapse) {
1245     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1246     foreach my $i (0 .. $#construct_as) {
1247       next if defined($construct_as[$i][0]); # only self table
1248       if (delete $pri{$construct_as[$i][1]}) {
1249         push(@pri_index, $i);
1250       }
1251       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1252     }
1253   }
1254
1255   # no need to do an if, it'll be empty if @pri_index is empty anyway
1256
1257   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1258
1259   my @const_rows;
1260
1261   do { # no need to check anything at the front, we always want the first row
1262
1263     my %const;
1264
1265     foreach my $this_as (@construct_as) {
1266       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1267     }
1268
1269     push(@const_rows, \%const);
1270
1271   } until ( # no pri_index => no collapse => drop straight out
1272       !@pri_index
1273     or
1274       do { # get another row, stash it, drop out if different PK
1275
1276         @copy = $self->cursor->next;
1277         $self->{stashed_row} = \@copy;
1278
1279         # last thing in do block, counts as true if anything doesn't match
1280
1281         # check xor defined first for NULL vs. NOT NULL then if one is
1282         # defined the other must be so check string equality
1283
1284         grep {
1285           (defined $pri_vals{$_} ^ defined $copy[$_])
1286           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1287         } @pri_index;
1288       }
1289   );
1290
1291   my $alias = $self->{attrs}{alias};
1292   my $info = [];
1293
1294   my %collapse_pos;
1295
1296   my @const_keys;
1297
1298   foreach my $const (@const_rows) {
1299     scalar @const_keys or do {
1300       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1301     };
1302     foreach my $key (@const_keys) {
1303       if (length $key) {
1304         my $target = $info;
1305         my @parts = split(/\./, $key);
1306         my $cur = '';
1307         my $data = $const->{$key};
1308         foreach my $p (@parts) {
1309           $target = $target->[1]->{$p} ||= [];
1310           $cur .= ".${p}";
1311           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1312             # collapsing at this point and on final part
1313             my $pos = $collapse_pos{$cur};
1314             CK: foreach my $ck (@ckey) {
1315               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1316                 $collapse_pos{$cur} = $data;
1317                 delete @collapse_pos{ # clear all positioning for sub-entries
1318                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1319                 };
1320                 push(@$target, []);
1321                 last CK;
1322               }
1323             }
1324           }
1325           if (exists $collapse{$cur}) {
1326             $target = $target->[-1];
1327           }
1328         }
1329         $target->[0] = $data;
1330       } else {
1331         $info->[0] = $const->{$key};
1332       }
1333     }
1334   }
1335
1336   return $info;
1337 }
1338
1339 =head2 result_source
1340
1341 =over 4
1342
1343 =item Arguments: $result_source?
1344
1345 =item Return Value: $result_source
1346
1347 =back
1348
1349 An accessor for the primary ResultSource object from which this ResultSet
1350 is derived.
1351
1352 =head2 result_class
1353
1354 =over 4
1355
1356 =item Arguments: $result_class?
1357
1358 =item Return Value: $result_class
1359
1360 =back
1361
1362 An accessor for the class to use when creating row objects. Defaults to
1363 C<< result_source->result_class >> - which in most cases is the name of the
1364 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1365
1366 Note that changing the result_class will also remove any components
1367 that were originally loaded in the source class via
1368 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1369 in the original source class will not run.
1370
1371 =cut
1372
1373 sub result_class {
1374   my ($self, $result_class) = @_;
1375   if ($result_class) {
1376     unless (ref $result_class) { # don't fire this for an object
1377       $self->ensure_class_loaded($result_class);
1378     }
1379     $self->_result_class($result_class);
1380     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1381     # permit the user to set result class on one result set only; it only
1382     # chains if provided to search()
1383     #$self->{attrs}{result_class} = $result_class if ref $self;
1384   }
1385   $self->_result_class;
1386 }
1387
1388 =head2 count
1389
1390 =over 4
1391
1392 =item Arguments: $cond, \%attrs??
1393
1394 =item Return Value: $count
1395
1396 =back
1397
1398 Performs an SQL C<COUNT> with the same query as the resultset was built
1399 with to find the number of elements. Passing arguments is equivalent to
1400 C<< $rs->search ($cond, \%attrs)->count >>
1401
1402 =cut
1403
1404 sub count {
1405   my $self = shift;
1406   return $self->search(@_)->count if @_ and defined $_[0];
1407   return scalar @{ $self->get_cache } if $self->get_cache;
1408
1409   my $attrs = $self->_resolved_attrs_copy;
1410
1411   # this is a little optimization - it is faster to do the limit
1412   # adjustments in software, instead of a subquery
1413   my $rows = delete $attrs->{rows};
1414   my $offset = delete $attrs->{offset};
1415
1416   my $crs;
1417   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1418     $crs = $self->_count_subq_rs ($attrs);
1419   }
1420   else {
1421     $crs = $self->_count_rs ($attrs);
1422   }
1423   my $count = $crs->next;
1424
1425   $count -= $offset if $offset;
1426   $count = $rows if $rows and $rows < $count;
1427   $count = 0 if ($count < 0);
1428
1429   return $count;
1430 }
1431
1432 =head2 count_rs
1433
1434 =over 4
1435
1436 =item Arguments: $cond, \%attrs??
1437
1438 =item Return Value: $count_rs
1439
1440 =back
1441
1442 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1443 This can be very handy for subqueries:
1444
1445   ->search( { amount => $some_rs->count_rs->as_query } )
1446
1447 As with regular resultsets the SQL query will be executed only after
1448 the resultset is accessed via L</next> or L</all>. That would return
1449 the same single value obtainable via L</count>.
1450
1451 =cut
1452
1453 sub count_rs {
1454   my $self = shift;
1455   return $self->search(@_)->count_rs if @_;
1456
1457   # this may look like a lack of abstraction (count() does about the same)
1458   # but in fact an _rs *must* use a subquery for the limits, as the
1459   # software based limiting can not be ported if this $rs is to be used
1460   # in a subquery itself (i.e. ->as_query)
1461   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1462     return $self->_count_subq_rs;
1463   }
1464   else {
1465     return $self->_count_rs;
1466   }
1467 }
1468
1469 #
1470 # returns a ResultSetColumn object tied to the count query
1471 #
1472 sub _count_rs {
1473   my ($self, $attrs) = @_;
1474
1475   my $rsrc = $self->result_source;
1476   $attrs ||= $self->_resolved_attrs;
1477
1478   my $tmp_attrs = { %$attrs };
1479   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1480   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1481
1482   # overwrite the selector (supplied by the storage)
1483   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1484   $tmp_attrs->{as} = 'count';
1485   delete @{$tmp_attrs}{qw/columns/};
1486
1487   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1488
1489   return $tmp_rs;
1490 }
1491
1492 #
1493 # same as above but uses a subquery
1494 #
1495 sub _count_subq_rs {
1496   my ($self, $attrs) = @_;
1497
1498   my $rsrc = $self->result_source;
1499   $attrs ||= $self->_resolved_attrs;
1500
1501   my $sub_attrs = { %$attrs };
1502   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1503   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1504
1505   # if we multi-prefetch we group_by primary keys only as this is what we would
1506   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1507   if ( keys %{$attrs->{collapse}}  ) {
1508     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1509   }
1510
1511   # Calculate subquery selector
1512   if (my $g = $sub_attrs->{group_by}) {
1513
1514     my $sql_maker = $rsrc->storage->sql_maker;
1515
1516     # necessary as the group_by may refer to aliased functions
1517     my $sel_index;
1518     for my $sel (@{$attrs->{select}}) {
1519       $sel_index->{$sel->{-as}} = $sel
1520         if (ref $sel eq 'HASH' and $sel->{-as});
1521     }
1522
1523     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1524     # also look for named aggregates referred in the having clause
1525     # having often contains scalarrefs - thus parse it out entirely
1526     my @parts = @$g;
1527     if ($attrs->{having}) {
1528       local $sql_maker->{having_bind};
1529       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1530       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1531       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1532         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1533         # if we don't unset it we screw up retarded but unfortunately working
1534         # 'MAX(foo.bar)' => { '>', 3 }
1535         $sql_maker->{name_sep} = '';
1536       }
1537
1538       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1539
1540       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1541
1542       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1543       # and if all fails for an utterly naive quoted scalar-with-function
1544       while ($sql =~ /
1545         $rquote $sep $lquote (.+?) $rquote
1546           |
1547         [\s,] \w+ \. (\w+) [\s,]
1548           |
1549         [\s,] $lquote (.+?) $rquote [\s,]
1550       /gx) {
1551         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1552       }
1553     }
1554
1555     for (@parts) {
1556       my $colpiece = $sel_index->{$_} || $_;
1557
1558       # unqualify join-based group_by's. Arcane but possible query
1559       # also horrible horrible hack to alias a column (not a func.)
1560       # (probably need to introduce SQLA syntax)
1561       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1562         my $as = $colpiece;
1563         $as =~ s/\./__/;
1564         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1565       }
1566       push @{$sub_attrs->{select}}, $colpiece;
1567     }
1568   }
1569   else {
1570     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1571     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1572   }
1573
1574   return $rsrc->resultset_class
1575                ->new ($rsrc, $sub_attrs)
1576                 ->as_subselect_rs
1577                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1578                   ->get_column ('count');
1579 }
1580
1581 sub _bool {
1582   return 1;
1583 }
1584
1585 =head2 count_literal
1586
1587 =over 4
1588
1589 =item Arguments: $sql_fragment, @bind_values
1590
1591 =item Return Value: $count
1592
1593 =back
1594
1595 Counts the results in a literal query. Equivalent to calling L</search_literal>
1596 with the passed arguments, then L</count>.
1597
1598 =cut
1599
1600 sub count_literal { shift->search_literal(@_)->count; }
1601
1602 =head2 all
1603
1604 =over 4
1605
1606 =item Arguments: none
1607
1608 =item Return Value: @objects
1609
1610 =back
1611
1612 Returns all elements in the resultset.
1613
1614 =cut
1615
1616 sub all {
1617   my $self = shift;
1618   if(@_) {
1619       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1620   }
1621
1622   return @{ $self->get_cache } if $self->get_cache;
1623
1624   my @obj;
1625
1626   if (keys %{$self->_resolved_attrs->{collapse}}) {
1627     # Using $self->cursor->all is really just an optimisation.
1628     # If we're collapsing has_many prefetches it probably makes
1629     # very little difference, and this is cleaner than hacking
1630     # _construct_object to survive the approach
1631     $self->cursor->reset;
1632     my @row = $self->cursor->next;
1633     while (@row) {
1634       push(@obj, $self->_construct_object(@row));
1635       @row = (exists $self->{stashed_row}
1636                ? @{delete $self->{stashed_row}}
1637                : $self->cursor->next);
1638     }
1639   } else {
1640     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1641   }
1642
1643   $self->set_cache(\@obj) if $self->{attrs}{cache};
1644
1645   return @obj;
1646 }
1647
1648 =head2 reset
1649
1650 =over 4
1651
1652 =item Arguments: none
1653
1654 =item Return Value: $self
1655
1656 =back
1657
1658 Resets the resultset's cursor, so you can iterate through the elements again.
1659 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1660 another query.
1661
1662 =cut
1663
1664 sub reset {
1665   my ($self) = @_;
1666   delete $self->{_attrs} if exists $self->{_attrs};
1667   $self->{all_cache_position} = 0;
1668   $self->cursor->reset;
1669   return $self;
1670 }
1671
1672 =head2 first
1673
1674 =over 4
1675
1676 =item Arguments: none
1677
1678 =item Return Value: $object | undef
1679
1680 =back
1681
1682 Resets the resultset and returns an object for the first result (or C<undef>
1683 if the resultset is empty).
1684
1685 =cut
1686
1687 sub first {
1688   return $_[0]->reset->next;
1689 }
1690
1691
1692 # _rs_update_delete
1693 #
1694 # Determines whether and what type of subquery is required for the $rs operation.
1695 # If grouping is necessary either supplies its own, or verifies the current one
1696 # After all is done delegates to the proper storage method.
1697
1698 sub _rs_update_delete {
1699   my ($self, $op, $values) = @_;
1700
1701   my $rsrc = $self->result_source;
1702
1703   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1704   my $needs_subq = $needs_group_by_subq || $self->_has_resolved_attr(qw/rows offset/);
1705
1706   if ($needs_group_by_subq or $needs_subq) {
1707
1708     # make a new $rs selecting only the PKs (that's all we really need)
1709     my $attrs = $self->_resolved_attrs_copy;
1710
1711
1712     delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_selector_range as/;
1713     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1714
1715     if ($needs_group_by_subq) {
1716       # make sure no group_by was supplied, or if there is one - make sure it matches
1717       # the columns compiled above perfectly. Anything else can not be sanely executed
1718       # on most databases so croak right then and there
1719
1720       if (my $g = $attrs->{group_by}) {
1721         my @current_group_by = map
1722           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1723           @$g
1724         ;
1725
1726         if (
1727           join ("\x00", sort @current_group_by)
1728             ne
1729           join ("\x00", sort @{$attrs->{columns}} )
1730         ) {
1731           $self->throw_exception (
1732             "You have just attempted a $op operation on a resultset which does group_by"
1733             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1734             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1735             . ' kind of queries. Please retry the operation with a modified group_by or'
1736             . ' without using one at all.'
1737           );
1738         }
1739       }
1740       else {
1741         $attrs->{group_by} = $attrs->{columns};
1742       }
1743     }
1744
1745     my $subrs = (ref $self)->new($rsrc, $attrs);
1746     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1747   }
1748   else {
1749     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1750     # a condition containing 'me' or other table prefixes will not work
1751     # at all. What this code tries to do (badly) is to generate a condition
1752     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1753     #
1754     # this is atrocious and should be replaced by normal sqla introspection
1755     # one sunny day
1756     my ($sql, @bind) = do {
1757       my $sqla = $rsrc->storage->sql_maker;
1758       local $sqla->{_dequalify_idents} = 1;
1759       $sqla->_recurse_where($self->{cond});
1760     } if $self->{cond};
1761
1762     return $rsrc->storage->$op(
1763       $rsrc,
1764       $op eq 'update' ? $values : (),
1765       $self->{cond} ? \[$sql, @bind] : (),
1766     );
1767   }
1768 }
1769
1770 =head2 update
1771
1772 =over 4
1773
1774 =item Arguments: \%values
1775
1776 =item Return Value: $storage_rv
1777
1778 =back
1779
1780 Sets the specified columns in the resultset to the supplied values in a
1781 single query. Note that this will not run any accessor/set_column/update
1782 triggers, nor will it update any row object instances derived from this
1783 resultset (this includes the contents of the L<resultset cache|/set_cache>
1784 if any). See L</update_all> if you need to execute any on-update
1785 triggers or cascades defined either by you or a
1786 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1787
1788 The return value is a pass through of what the underlying
1789 storage backend returned, and may vary. See L<DBI/execute> for the most
1790 common case.
1791
1792 =head3 CAVEAT
1793
1794 Note that L</update> does not process/deflate any of the values passed in.
1795 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1796 ensure manually that any value passed to this method will stringify to
1797 something the RDBMS knows how to deal with. A notable example is the
1798 handling of L<DateTime> objects, for more info see:
1799 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
1800
1801 =cut
1802
1803 sub update {
1804   my ($self, $values) = @_;
1805   $self->throw_exception('Values for update must be a hash')
1806     unless ref $values eq 'HASH';
1807
1808   return $self->_rs_update_delete ('update', $values);
1809 }
1810
1811 =head2 update_all
1812
1813 =over 4
1814
1815 =item Arguments: \%values
1816
1817 =item Return Value: 1
1818
1819 =back
1820
1821 Fetches all objects and updates them one at a time via
1822 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1823 triggers, while L</update> will not.
1824
1825 =cut
1826
1827 sub update_all {
1828   my ($self, $values) = @_;
1829   $self->throw_exception('Values for update_all must be a hash')
1830     unless ref $values eq 'HASH';
1831
1832   my $guard = $self->result_source->schema->txn_scope_guard;
1833   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1834   $guard->commit;
1835   return 1;
1836 }
1837
1838 =head2 delete
1839
1840 =over 4
1841
1842 =item Arguments: none
1843
1844 =item Return Value: $storage_rv
1845
1846 =back
1847
1848 Deletes the rows matching this resultset in a single query. Note that this
1849 will not run any delete triggers, nor will it alter the
1850 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1851 derived from this resultset (this includes the contents of the
1852 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1853 execute any on-delete triggers or cascades defined either by you or a
1854 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1855
1856 The return value is a pass through of what the underlying storage backend
1857 returned, and may vary. See L<DBI/execute> for the most common case.
1858
1859 =cut
1860
1861 sub delete {
1862   my $self = shift;
1863   $self->throw_exception('delete does not accept any arguments')
1864     if @_;
1865
1866   return $self->_rs_update_delete ('delete');
1867 }
1868
1869 =head2 delete_all
1870
1871 =over 4
1872
1873 =item Arguments: none
1874
1875 =item Return Value: 1
1876
1877 =back
1878
1879 Fetches all objects and deletes them one at a time via
1880 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1881 triggers, while L</delete> will not.
1882
1883 =cut
1884
1885 sub delete_all {
1886   my $self = shift;
1887   $self->throw_exception('delete_all does not accept any arguments')
1888     if @_;
1889
1890   my $guard = $self->result_source->schema->txn_scope_guard;
1891   $_->delete for $self->all;
1892   $guard->commit;
1893   return 1;
1894 }
1895
1896 =head2 populate
1897
1898 =over 4
1899
1900 =item Arguments: \@data;
1901
1902 =back
1903
1904 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1905 For the arrayref of hashrefs style each hashref should be a structure suitable
1906 for submitting to a $resultset->create(...) method.
1907
1908 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1909 to insert the data, as this is a faster method.
1910
1911 Otherwise, each set of data is inserted into the database using
1912 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1913 accumulated into an array. The array itself, or an array reference
1914 is returned depending on scalar or list context.
1915
1916 Example:  Assuming an Artist Class that has many CDs Classes relating:
1917
1918   my $Artist_rs = $schema->resultset("Artist");
1919
1920   ## Void Context Example
1921   $Artist_rs->populate([
1922      { artistid => 4, name => 'Manufactured Crap', cds => [
1923         { title => 'My First CD', year => 2006 },
1924         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1925       ],
1926      },
1927      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1928         { title => 'My parents sold me to a record company', year => 2005 },
1929         { title => 'Why Am I So Ugly?', year => 2006 },
1930         { title => 'I Got Surgery and am now Popular', year => 2007 }
1931       ],
1932      },
1933   ]);
1934
1935   ## Array Context Example
1936   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1937     { name => "Artist One"},
1938     { name => "Artist Two"},
1939     { name => "Artist Three", cds=> [
1940     { title => "First CD", year => 2007},
1941     { title => "Second CD", year => 2008},
1942   ]}
1943   ]);
1944
1945   print $ArtistOne->name; ## response is 'Artist One'
1946   print $ArtistThree->cds->count ## reponse is '2'
1947
1948 For the arrayref of arrayrefs style,  the first element should be a list of the
1949 fieldsnames to which the remaining elements are rows being inserted.  For
1950 example:
1951
1952   $Arstist_rs->populate([
1953     [qw/artistid name/],
1954     [100, 'A Formally Unknown Singer'],
1955     [101, 'A singer that jumped the shark two albums ago'],
1956     [102, 'An actually cool singer'],
1957   ]);
1958
1959 Please note an important effect on your data when choosing between void and
1960 wantarray context. Since void context goes straight to C<insert_bulk> in
1961 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1962 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1963 create primary keys for you, you will find that your PKs are empty.  In this
1964 case you will have to use the wantarray context in order to create those
1965 values.
1966
1967 =cut
1968
1969 sub populate {
1970   my $self = shift;
1971
1972   # cruft placed in standalone method
1973   my $data = $self->_normalize_populate_args(@_);
1974
1975   if(defined wantarray) {
1976     my @created;
1977     foreach my $item (@$data) {
1978       push(@created, $self->create($item));
1979     }
1980     return wantarray ? @created : \@created;
1981   } 
1982   else {
1983     my $first = $data->[0];
1984
1985     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1986     # it relationship data
1987     my (@rels, @columns);
1988     my $rsrc = $self->result_source;
1989     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
1990     for (keys %$first) {
1991       my $ref = ref $first->{$_};
1992       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
1993         ? push @rels, $_
1994         : push @columns, $_
1995       ;
1996     }
1997
1998     my @pks = $rsrc->primary_columns;
1999
2000     ## do the belongs_to relationships
2001     foreach my $index (0..$#$data) {
2002
2003       # delegate to create() for any dataset without primary keys with specified relationships
2004       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2005         for my $r (@rels) {
2006           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2007             my @ret = $self->populate($data);
2008             return;
2009           }
2010         }
2011       }
2012
2013       foreach my $rel (@rels) {
2014         next unless ref $data->[$index]->{$rel} eq "HASH";
2015         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2016         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2017         my $related = $result->result_source->_resolve_condition(
2018           $reverse_relinfo->{cond},
2019           $self,
2020           $result,
2021           $rel,
2022         );
2023
2024         delete $data->[$index]->{$rel};
2025         $data->[$index] = {%{$data->[$index]}, %$related};
2026
2027         push @columns, keys %$related if $index == 0;
2028       }
2029     }
2030
2031     ## inherit the data locked in the conditions of the resultset
2032     my ($rs_data) = $self->_merge_with_rscond({});
2033     delete @{$rs_data}{@columns};
2034     my @inherit_cols = keys %$rs_data;
2035     my @inherit_data = values %$rs_data;
2036
2037     ## do bulk insert on current row
2038     $rsrc->storage->insert_bulk(
2039       $rsrc,
2040       [@columns, @inherit_cols],
2041       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2042     );
2043
2044     ## do the has_many relationships
2045     foreach my $item (@$data) {
2046
2047       my $main_row;
2048
2049       foreach my $rel (@rels) {
2050         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2051
2052         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2053
2054         my $child = $main_row->$rel;
2055
2056         my $related = $child->result_source->_resolve_condition(
2057           $rels->{$rel}{cond},
2058           $child,
2059           $main_row,
2060           $rel,
2061         );
2062
2063         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2064         my @populate = map { {%$_, %$related} } @rows_to_add;
2065
2066         $child->populate( \@populate );
2067       }
2068     }
2069   }
2070 }
2071
2072
2073 # populate() argumnets went over several incarnations
2074 # What we ultimately support is AoH
2075 sub _normalize_populate_args {
2076   my ($self, $arg) = @_;
2077
2078   if (ref $arg eq 'ARRAY') {
2079     if (ref $arg->[0] eq 'HASH') {
2080       return $arg;
2081     }
2082     elsif (ref $arg->[0] eq 'ARRAY') {
2083       my @ret;
2084       my @colnames = @{$arg->[0]};
2085       foreach my $values (@{$arg}[1 .. $#$arg]) {
2086         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2087       }
2088       return \@ret;
2089     }
2090   }
2091
2092   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2093 }
2094
2095 =head2 pager
2096
2097 =over 4
2098
2099 =item Arguments: none
2100
2101 =item Return Value: $pager
2102
2103 =back
2104
2105 Return Value a L<Data::Page> object for the current resultset. Only makes
2106 sense for queries with a C<page> attribute.
2107
2108 To get the full count of entries for a paged resultset, call
2109 C<total_entries> on the L<Data::Page> object.
2110
2111 =cut
2112
2113 # make a wizard good for both a scalar and a hashref
2114 my $mk_lazy_count_wizard = sub {
2115   require Variable::Magic;
2116
2117   my $stash = { total_rs => shift };
2118   my $slot = shift; # only used by the hashref magic
2119
2120   my $magic = Variable::Magic::wizard (
2121     data => sub { $stash },
2122
2123     (!$slot)
2124     ? (
2125       # the scalar magic
2126       get => sub {
2127         # set value lazily, and dispell for good
2128         ${$_[0]} = $_[1]{total_rs}->count;
2129         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
2130         return 1;
2131       },
2132       set => sub {
2133         # an explicit set implies dispell as well
2134         # the unless() is to work around "fun and giggles" below
2135         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
2136           unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2137         return 1;
2138       },
2139     )
2140     : (
2141       # the uvar magic
2142       fetch => sub {
2143         if ($_[2] eq $slot and !$_[1]{inactive}) {
2144           my $cnt = $_[1]{total_rs}->count;
2145           $_[0]->{$slot} = $cnt;
2146
2147           # attempting to dispell in a fetch handle (works in store), seems
2148           # to invariable segfault on 5.10, 5.12, 5.13 :(
2149           # so use an inactivator instead
2150           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2151           $_[1]{inactive}++;
2152         }
2153         return 1;
2154       },
2155       store => sub {
2156         if (! $_[1]{inactive} and $_[2] eq $slot) {
2157           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2158           $_[1]{inactive}++
2159             unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2160         }
2161         return 1;
2162       },
2163     ),
2164   );
2165
2166   $stash->{magic_selfref} = $magic;
2167   weaken ($stash->{magic_selfref}); # this fails on 5.8.1
2168
2169   return $magic;
2170 };
2171
2172 # the tie class for 5.8.1
2173 {
2174   package # hide from pause
2175     DBIx::Class::__DBIC_LAZY_RS_COUNT__;
2176   use base qw/Tie::Hash/;
2177
2178   sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
2179   sub NEXTKEY  { each %{$_[0]{data}} }
2180   sub EXISTS   { exists $_[0]{data}{$_[1]} }
2181   sub DELETE   { delete $_[0]{data}{$_[1]} }
2182   sub CLEAR    { %{$_[0]{data}} = () }
2183   sub SCALAR   { scalar %{$_[0]{data}} }
2184
2185   sub TIEHASH {
2186     $_[1]{data} = {%{$_[1]{selfref}}};
2187     %{$_[1]{selfref}} = ();
2188     Scalar::Util::weaken ($_[1]{selfref});
2189     return bless ($_[1], $_[0]);
2190   };
2191
2192   sub FETCH {
2193     if ($_[1] eq $_[0]{slot}) {
2194       my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
2195       untie %{$_[0]{selfref}};
2196       %{$_[0]{selfref}} = %{$_[0]{data}};
2197       return $cnt;
2198     }
2199     else {
2200       $_[0]{data}{$_[1]};
2201     }
2202   }
2203
2204   sub STORE {
2205     $_[0]{data}{$_[1]} = $_[2];
2206     if ($_[1] eq $_[0]{slot}) {
2207       untie %{$_[0]{selfref}};
2208       %{$_[0]{selfref}} = %{$_[0]{data}};
2209     }
2210     $_[2];
2211   }
2212 }
2213
2214 sub pager {
2215   my ($self) = @_;
2216
2217   return $self->{pager} if $self->{pager};
2218
2219   my $attrs = $self->{attrs};
2220   if (!defined $attrs->{page}) {
2221     $self->throw_exception("Can't create pager for non-paged rs");
2222   }
2223   elsif ($attrs->{page} <= 0) {
2224     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2225   }
2226   $attrs->{rows} ||= 10;
2227
2228   # throw away the paging flags and re-run the count (possibly
2229   # with a subselect) to get the real total count
2230   my $count_attrs = { %$attrs };
2231   delete $count_attrs->{$_} for qw/rows offset page pager/;
2232   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2233
2234
2235 ### the following may seem awkward and dirty, but it's a thought-experiment
2236 ### necessary for future development of DBIx::DS. Do *NOT* change this code
2237 ### before talking to ribasushi/mst
2238
2239   require Data::Page;
2240   my $pager = Data::Page->new(
2241     0,  #start with an empty set
2242     $attrs->{rows},
2243     $self->{attrs}{page},
2244   );
2245
2246   my $data_slot = 'total_entries';
2247
2248   # Since we are interested in a cached value (once it's set - it's set), every
2249   # technique will detach from the magic-host once the time comes to fire the
2250   # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
2251
2252   if ($] < 5.008003) {
2253     # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
2254     # to weakref the magic container :(
2255     # tested on 5.8.1
2256     tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
2257       { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
2258     );
2259   }
2260   elsif ($] < 5.010) {
2261     # We can use magic on the hash value slot. It's interesting that the magic is
2262     # attached to the hash-slot, and does *not* stop working once I do the dummy
2263     # assignments after the cast()
2264     # tested on 5.8.3 and 5.8.9
2265     my $magic = $mk_lazy_count_wizard->($total_rs);
2266     Variable::Magic::cast ( $pager->{$data_slot}, $magic );
2267
2268     # this is for fun and giggles
2269     $pager->{$data_slot} = -1;
2270     $pager->{$data_slot} = 0;
2271
2272     # this does not work for scalars, but works with
2273     # uvar magic below
2274     #my %vals = %$pager;
2275     #%$pager = ();
2276     #%{$pager} = %vals;
2277   }
2278   else {
2279     # And the uvar magic
2280     # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
2281     # however see the wizard maker for more notes
2282     my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
2283     Variable::Magic::cast ( %$pager, $magic );
2284
2285     # still works
2286     $pager->{$data_slot} = -1;
2287     $pager->{$data_slot} = 0;
2288
2289     # this now works
2290     my %vals = %$pager;
2291     %$pager = ();
2292     %{$pager} = %vals;
2293   }
2294
2295   return $self->{pager} = $pager;
2296 }
2297
2298 =head2 page
2299
2300 =over 4
2301
2302 =item Arguments: $page_number
2303
2304 =item Return Value: $rs
2305
2306 =back
2307
2308 Returns a resultset for the $page_number page of the resultset on which page
2309 is called, where each page contains a number of rows equal to the 'rows'
2310 attribute set on the resultset (10 by default).
2311
2312 =cut
2313
2314 sub page {
2315   my ($self, $page) = @_;
2316   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2317 }
2318
2319 =head2 new_result
2320
2321 =over 4
2322
2323 =item Arguments: \%vals
2324
2325 =item Return Value: $rowobject
2326
2327 =back
2328
2329 Creates a new row object in the resultset's result class and returns
2330 it. The row is not inserted into the database at this point, call
2331 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2332 will tell you whether the row object has been inserted or not.
2333
2334 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2335
2336 =cut
2337
2338 sub new_result {
2339   my ($self, $values) = @_;
2340   $self->throw_exception( "new_result needs a hash" )
2341     unless (ref $values eq 'HASH');
2342
2343   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2344
2345   my %new = (
2346     %$merged_cond,
2347     @$cols_from_relations
2348       ? (-cols_from_relations => $cols_from_relations)
2349       : (),
2350     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2351   );
2352
2353   return $self->result_class->new(\%new);
2354 }
2355
2356 # _merge_with_rscond
2357 #
2358 # Takes a simple hash of K/V data and returns its copy merged with the
2359 # condition already present on the resultset. Additionally returns an
2360 # arrayref of value/condition names, which were inferred from related
2361 # objects (this is needed for in-memory related objects)
2362 sub _merge_with_rscond {
2363   my ($self, $data) = @_;
2364
2365   my (%new_data, @cols_from_relations);
2366
2367   my $alias = $self->{attrs}{alias};
2368
2369   if (! defined $self->{cond}) {
2370     # just massage $data below
2371   }
2372   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2373     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2374     @cols_from_relations = keys %new_data;
2375   }
2376   elsif (ref $self->{cond} ne 'HASH') {
2377     $self->throw_exception(
2378       "Can't abstract implicit construct, resultset condition not a hash"
2379     );
2380   }
2381   else {
2382     # precendence must be given to passed values over values inherited from
2383     # the cond, so the order here is important.
2384     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2385     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2386
2387     while ( my($col, $value) = each %implied ) {
2388       my $vref = ref $value;
2389       if (
2390         $vref eq 'HASH'
2391           and
2392         keys(%$value) == 1
2393           and
2394         (keys %$value)[0] eq '='
2395       ) {
2396         $new_data{$col} = $value->{'='};
2397       }
2398       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2399         $new_data{$col} = $value;
2400       }
2401     }
2402   }
2403
2404   %new_data = (
2405     %new_data,
2406     %{ $self->_remove_alias($data, $alias) },
2407   );
2408
2409   return (\%new_data, \@cols_from_relations);
2410 }
2411
2412 # _has_resolved_attr
2413 #
2414 # determines if the resultset defines at least one
2415 # of the attributes supplied
2416 #
2417 # used to determine if a subquery is neccessary
2418 #
2419 # supports some virtual attributes:
2420 #   -join
2421 #     This will scan for any joins being present on the resultset.
2422 #     It is not a mere key-search but a deep inspection of {from}
2423 #
2424
2425 sub _has_resolved_attr {
2426   my ($self, @attr_names) = @_;
2427
2428   my $attrs = $self->_resolved_attrs;
2429
2430   my %extra_checks;
2431
2432   for my $n (@attr_names) {
2433     if (grep { $n eq $_ } (qw/-join/) ) {
2434       $extra_checks{$n}++;
2435       next;
2436     }
2437
2438     my $attr =  $attrs->{$n};
2439
2440     next if not defined $attr;
2441
2442     if (ref $attr eq 'HASH') {
2443       return 1 if keys %$attr;
2444     }
2445     elsif (ref $attr eq 'ARRAY') {
2446       return 1 if @$attr;
2447     }
2448     else {
2449       return 1 if $attr;
2450     }
2451   }
2452
2453   # a resolved join is expressed as a multi-level from
2454   return 1 if (
2455     $extra_checks{-join}
2456       and
2457     ref $attrs->{from} eq 'ARRAY'
2458       and
2459     @{$attrs->{from}} > 1
2460   );
2461
2462   return 0;
2463 }
2464
2465 # _collapse_cond
2466 #
2467 # Recursively collapse the condition.
2468
2469 sub _collapse_cond {
2470   my ($self, $cond, $collapsed) = @_;
2471
2472   $collapsed ||= {};
2473
2474   if (ref $cond eq 'ARRAY') {
2475     foreach my $subcond (@$cond) {
2476       next unless ref $subcond;  # -or
2477       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2478     }
2479   }
2480   elsif (ref $cond eq 'HASH') {
2481     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2482       foreach my $subcond (@{$cond->{-and}}) {
2483         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2484       }
2485     }
2486     else {
2487       foreach my $col (keys %$cond) {
2488         my $value = $cond->{$col};
2489         $collapsed->{$col} = $value;
2490       }
2491     }
2492   }
2493
2494   return $collapsed;
2495 }
2496
2497 # _remove_alias
2498 #
2499 # Remove the specified alias from the specified query hash. A copy is made so
2500 # the original query is not modified.
2501
2502 sub _remove_alias {
2503   my ($self, $query, $alias) = @_;
2504
2505   my %orig = %{ $query || {} };
2506   my %unaliased;
2507
2508   foreach my $key (keys %orig) {
2509     if ($key !~ /\./) {
2510       $unaliased{$key} = $orig{$key};
2511       next;
2512     }
2513     $unaliased{$1} = $orig{$key}
2514       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2515   }
2516
2517   return \%unaliased;
2518 }
2519
2520 =head2 as_query
2521
2522 =over 4
2523
2524 =item Arguments: none
2525
2526 =item Return Value: \[ $sql, @bind ]
2527
2528 =back
2529
2530 Returns the SQL query and bind vars associated with the invocant.
2531
2532 This is generally used as the RHS for a subquery.
2533
2534 =cut
2535
2536 sub as_query {
2537   my $self = shift;
2538
2539   my $attrs = $self->_resolved_attrs_copy;
2540
2541   # For future use:
2542   #
2543   # in list ctx:
2544   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2545   # $sql also has no wrapping parenthesis in list ctx
2546   #
2547   my $sqlbind = $self->result_source->storage
2548     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2549
2550   return $sqlbind;
2551 }
2552
2553 =head2 find_or_new
2554
2555 =over 4
2556
2557 =item Arguments: \%vals, \%attrs?
2558
2559 =item Return Value: $rowobject
2560
2561 =back
2562
2563   my $artist = $schema->resultset('Artist')->find_or_new(
2564     { artist => 'fred' }, { key => 'artists' });
2565
2566   $cd->cd_to_producer->find_or_new({ producer => $producer },
2567                                    { key => 'primary });
2568
2569 Find an existing record from this resultset using L</find>. if none exists,
2570 instantiate a new result object and return it. The object will not be saved
2571 into your storage until you call L<DBIx::Class::Row/insert> on it.
2572
2573 You most likely want this method when looking for existing rows using a unique
2574 constraint that is not the primary key, or looking for related rows.
2575
2576 If you want objects to be saved immediately, use L</find_or_create> instead.
2577
2578 B<Note>: Make sure to read the documentation of L</find> and understand the
2579 significance of the C<key> attribute, as its lack may skew your search, and
2580 subsequently result in spurious new objects.
2581
2582 B<Note>: Take care when using C<find_or_new> with a table having
2583 columns with default values that you intend to be automatically
2584 supplied by the database (e.g. an auto_increment primary key column).
2585 In normal usage, the value of such columns should NOT be included at
2586 all in the call to C<find_or_new>, even when set to C<undef>.
2587
2588 =cut
2589
2590 sub find_or_new {
2591   my $self     = shift;
2592   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2593   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2594   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2595     return $row;
2596   }
2597   return $self->new_result($hash);
2598 }
2599
2600 =head2 create
2601
2602 =over 4
2603
2604 =item Arguments: \%vals
2605
2606 =item Return Value: a L<DBIx::Class::Row> $object
2607
2608 =back
2609
2610 Attempt to create a single new row or a row with multiple related rows
2611 in the table represented by the resultset (and related tables). This
2612 will not check for duplicate rows before inserting, use
2613 L</find_or_create> to do that.
2614
2615 To create one row for this resultset, pass a hashref of key/value
2616 pairs representing the columns of the table and the values you wish to
2617 store. If the appropriate relationships are set up, foreign key fields
2618 can also be passed an object representing the foreign row, and the
2619 value will be set to its primary key.
2620
2621 To create related objects, pass a hashref of related-object column values
2622 B<keyed on the relationship name>. If the relationship is of type C<multi>
2623 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2624 The process will correctly identify columns holding foreign keys, and will
2625 transparently populate them from the keys of the corresponding relation.
2626 This can be applied recursively, and will work correctly for a structure
2627 with an arbitrary depth and width, as long as the relationships actually
2628 exists and the correct column data has been supplied.
2629
2630
2631 Instead of hashrefs of plain related data (key/value pairs), you may
2632 also pass new or inserted objects. New objects (not inserted yet, see
2633 L</new>), will be inserted into their appropriate tables.
2634
2635 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2636
2637 Example of creating a new row.
2638
2639   $person_rs->create({
2640     name=>"Some Person",
2641     email=>"somebody@someplace.com"
2642   });
2643
2644 Example of creating a new row and also creating rows in a related C<has_many>
2645 or C<has_one> resultset.  Note Arrayref.
2646
2647   $artist_rs->create(
2648      { artistid => 4, name => 'Manufactured Crap', cds => [
2649         { title => 'My First CD', year => 2006 },
2650         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2651       ],
2652      },
2653   );
2654
2655 Example of creating a new row and also creating a row in a related
2656 C<belongs_to> resultset. Note Hashref.
2657
2658   $cd_rs->create({
2659     title=>"Music for Silly Walks",
2660     year=>2000,
2661     artist => {
2662       name=>"Silly Musician",
2663     }
2664   });
2665
2666 =over
2667
2668 =item WARNING
2669
2670 When subclassing ResultSet never attempt to override this method. Since
2671 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2672 lot of the internals simply never call it, so your override will be
2673 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2674 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2675 L</create> process you need to intervene.
2676
2677 =back
2678
2679 =cut
2680
2681 sub create {
2682   my ($self, $attrs) = @_;
2683   $self->throw_exception( "create needs a hashref" )
2684     unless ref $attrs eq 'HASH';
2685   return $self->new_result($attrs)->insert;
2686 }
2687
2688 =head2 find_or_create
2689
2690 =over 4
2691
2692 =item Arguments: \%vals, \%attrs?
2693
2694 =item Return Value: $rowobject
2695
2696 =back
2697
2698   $cd->cd_to_producer->find_or_create({ producer => $producer },
2699                                       { key => 'primary' });
2700
2701 Tries to find a record based on its primary key or unique constraints; if none
2702 is found, creates one and returns that instead.
2703
2704   my $cd = $schema->resultset('CD')->find_or_create({
2705     cdid   => 5,
2706     artist => 'Massive Attack',
2707     title  => 'Mezzanine',
2708     year   => 2005,
2709   });
2710
2711 Also takes an optional C<key> attribute, to search by a specific key or unique
2712 constraint. For example:
2713
2714   my $cd = $schema->resultset('CD')->find_or_create(
2715     {
2716       artist => 'Massive Attack',
2717       title  => 'Mezzanine',
2718     },
2719     { key => 'cd_artist_title' }
2720   );
2721
2722 B<Note>: Make sure to read the documentation of L</find> and understand the
2723 significance of the C<key> attribute, as its lack may skew your search, and
2724 subsequently result in spurious row creation.
2725
2726 B<Note>: Because find_or_create() reads from the database and then
2727 possibly inserts based on the result, this method is subject to a race
2728 condition. Another process could create a record in the table after
2729 the find has completed and before the create has started. To avoid
2730 this problem, use find_or_create() inside a transaction.
2731
2732 B<Note>: Take care when using C<find_or_create> with a table having
2733 columns with default values that you intend to be automatically
2734 supplied by the database (e.g. an auto_increment primary key column).
2735 In normal usage, the value of such columns should NOT be included at
2736 all in the call to C<find_or_create>, even when set to C<undef>.
2737
2738 See also L</find> and L</update_or_create>. For information on how to declare
2739 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2740
2741 =cut
2742
2743 sub find_or_create {
2744   my $self     = shift;
2745   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2746   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2747   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2748     return $row;
2749   }
2750   return $self->create($hash);
2751 }
2752
2753 =head2 update_or_create
2754
2755 =over 4
2756
2757 =item Arguments: \%col_values, { key => $unique_constraint }?
2758
2759 =item Return Value: $row_object
2760
2761 =back
2762
2763   $resultset->update_or_create({ col => $val, ... });
2764
2765 Like L</find_or_create>, but if a row is found it is immediately updated via
2766 C<< $found_row->update (\%col_values) >>.
2767
2768
2769 Takes an optional C<key> attribute to search on a specific unique constraint.
2770 For example:
2771
2772   # In your application
2773   my $cd = $schema->resultset('CD')->update_or_create(
2774     {
2775       artist => 'Massive Attack',
2776       title  => 'Mezzanine',
2777       year   => 1998,
2778     },
2779     { key => 'cd_artist_title' }
2780   );
2781
2782   $cd->cd_to_producer->update_or_create({
2783     producer => $producer,
2784     name => 'harry',
2785   }, {
2786     key => 'primary',
2787   });
2788
2789 B<Note>: Make sure to read the documentation of L</find> and understand the
2790 significance of the C<key> attribute, as its lack may skew your search, and
2791 subsequently result in spurious row creation.
2792
2793 B<Note>: Take care when using C<update_or_create> with a table having
2794 columns with default values that you intend to be automatically
2795 supplied by the database (e.g. an auto_increment primary key column).
2796 In normal usage, the value of such columns should NOT be included at
2797 all in the call to C<update_or_create>, even when set to C<undef>.
2798
2799 See also L</find> and L</find_or_create>. For information on how to declare
2800 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2801
2802 =cut
2803
2804 sub update_or_create {
2805   my $self = shift;
2806   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2807   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2808
2809   my $row = $self->find($cond, $attrs);
2810   if (defined $row) {
2811     $row->update($cond);
2812     return $row;
2813   }
2814
2815   return $self->create($cond);
2816 }
2817
2818 =head2 update_or_new
2819
2820 =over 4
2821
2822 =item Arguments: \%col_values, { key => $unique_constraint }?
2823
2824 =item Return Value: $rowobject
2825
2826 =back
2827
2828   $resultset->update_or_new({ col => $val, ... });
2829
2830 Like L</find_or_new> but if a row is found it is immediately updated via
2831 C<< $found_row->update (\%col_values) >>.
2832
2833 For example:
2834
2835   # In your application
2836   my $cd = $schema->resultset('CD')->update_or_new(
2837     {
2838       artist => 'Massive Attack',
2839       title  => 'Mezzanine',
2840       year   => 1998,
2841     },
2842     { key => 'cd_artist_title' }
2843   );
2844
2845   if ($cd->in_storage) {
2846       # the cd was updated
2847   }
2848   else {
2849       # the cd is not yet in the database, let's insert it
2850       $cd->insert;
2851   }
2852
2853 B<Note>: Make sure to read the documentation of L</find> and understand the
2854 significance of the C<key> attribute, as its lack may skew your search, and
2855 subsequently result in spurious new objects.
2856
2857 B<Note>: Take care when using C<update_or_new> with a table having
2858 columns with default values that you intend to be automatically
2859 supplied by the database (e.g. an auto_increment primary key column).
2860 In normal usage, the value of such columns should NOT be included at
2861 all in the call to C<update_or_new>, even when set to C<undef>.
2862
2863 See also L</find>, L</find_or_create> and L</find_or_new>. 
2864
2865 =cut
2866
2867 sub update_or_new {
2868     my $self  = shift;
2869     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2870     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2871
2872     my $row = $self->find( $cond, $attrs );
2873     if ( defined $row ) {
2874         $row->update($cond);
2875         return $row;
2876     }
2877
2878     return $self->new_result($cond);
2879 }
2880
2881 =head2 get_cache
2882
2883 =over 4
2884
2885 =item Arguments: none
2886
2887 =item Return Value: \@cache_objects | undef
2888
2889 =back
2890
2891 Gets the contents of the cache for the resultset, if the cache is set.
2892
2893 The cache is populated either by using the L</prefetch> attribute to
2894 L</search> or by calling L</set_cache>.
2895
2896 =cut
2897
2898 sub get_cache {
2899   shift->{all_cache};
2900 }
2901
2902 =head2 set_cache
2903
2904 =over 4
2905
2906 =item Arguments: \@cache_objects
2907
2908 =item Return Value: \@cache_objects
2909
2910 =back
2911
2912 Sets the contents of the cache for the resultset. Expects an arrayref
2913 of objects of the same class as those produced by the resultset. Note that
2914 if the cache is set the resultset will return the cached objects rather
2915 than re-querying the database even if the cache attr is not set.
2916
2917 The contents of the cache can also be populated by using the
2918 L</prefetch> attribute to L</search>.
2919
2920 =cut
2921
2922 sub set_cache {
2923   my ( $self, $data ) = @_;
2924   $self->throw_exception("set_cache requires an arrayref")
2925       if defined($data) && (ref $data ne 'ARRAY');
2926   $self->{all_cache} = $data;
2927 }
2928
2929 =head2 clear_cache
2930
2931 =over 4
2932
2933 =item Arguments: none
2934
2935 =item Return Value: undef
2936
2937 =back
2938
2939 Clears the cache for the resultset.
2940
2941 =cut
2942
2943 sub clear_cache {
2944   shift->set_cache(undef);
2945 }
2946
2947 =head2 is_paged
2948
2949 =over 4
2950
2951 =item Arguments: none
2952
2953 =item Return Value: true, if the resultset has been paginated
2954
2955 =back
2956
2957 =cut
2958
2959 sub is_paged {
2960   my ($self) = @_;
2961   return !!$self->{attrs}{page};
2962 }
2963
2964 =head2 is_ordered
2965
2966 =over 4
2967
2968 =item Arguments: none
2969
2970 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2971
2972 =back
2973
2974 =cut
2975
2976 sub is_ordered {
2977   my ($self) = @_;
2978   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2979 }
2980
2981 =head2 related_resultset
2982
2983 =over 4
2984
2985 =item Arguments: $relationship_name
2986
2987 =item Return Value: $resultset
2988
2989 =back
2990
2991 Returns a related resultset for the supplied relationship name.
2992
2993   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2994
2995 =cut
2996
2997 sub related_resultset {
2998   my ($self, $rel) = @_;
2999
3000   $self->{related_resultsets} ||= {};
3001   return $self->{related_resultsets}{$rel} ||= do {
3002     my $rsrc = $self->result_source;
3003     my $rel_info = $rsrc->relationship_info($rel);
3004
3005     $self->throw_exception(
3006       "search_related: result source '" . $rsrc->source_name .
3007         "' has no such relationship $rel")
3008       unless $rel_info;
3009
3010     my $attrs = $self->_chain_relationship($rel);
3011
3012     my $join_count = $attrs->{seen_join}{$rel};
3013
3014     my $alias = $self->result_source->storage
3015         ->relname_to_table_alias($rel, $join_count);
3016
3017     # since this is search_related, and we already slid the select window inwards
3018     # (the select/as attrs were deleted in the beginning), we need to flip all
3019     # left joins to inner, so we get the expected results
3020     # read the comment on top of the actual function to see what this does
3021     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
3022
3023
3024     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
3025     delete @{$attrs}{qw(result_class alias)};
3026
3027     my $new_cache;
3028
3029     if (my $cache = $self->get_cache) {
3030       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
3031         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
3032                         @$cache ];
3033       }
3034     }
3035
3036     my $rel_source = $rsrc->related_source($rel);
3037
3038     my $new = do {
3039
3040       # The reason we do this now instead of passing the alias to the
3041       # search_rs below is that if you wrap/overload resultset on the
3042       # source you need to know what alias it's -going- to have for things
3043       # to work sanely (e.g. RestrictWithObject wants to be able to add
3044       # extra query restrictions, and these may need to be $alias.)
3045
3046       my $rel_attrs = $rel_source->resultset_attributes;
3047       local $rel_attrs->{alias} = $alias;
3048
3049       $rel_source->resultset
3050                  ->search_rs(
3051                      undef, {
3052                        %$attrs,
3053                        where => $attrs->{where},
3054                    });
3055     };
3056     $new->set_cache($new_cache) if $new_cache;
3057     $new;
3058   };
3059 }
3060
3061 =head2 current_source_alias
3062
3063 =over 4
3064
3065 =item Arguments: none
3066
3067 =item Return Value: $source_alias
3068
3069 =back
3070
3071 Returns the current table alias for the result source this resultset is built
3072 on, that will be used in the SQL query. Usually it is C<me>.
3073
3074 Currently the source alias that refers to the result set returned by a
3075 L</search>/L</find> family method depends on how you got to the resultset: it's
3076 C<me> by default, but eg. L</search_related> aliases it to the related result
3077 source name (and keeps C<me> referring to the original result set). The long
3078 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3079 (and make this method unnecessary).
3080
3081 Thus it's currently necessary to use this method in predefined queries (see
3082 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3083 source alias of the current result set:
3084
3085   # in a result set class
3086   sub modified_by {
3087     my ($self, $user) = @_;
3088
3089     my $me = $self->current_source_alias;
3090
3091     return $self->search(
3092       "$me.modified" => $user->id,
3093     );
3094   }
3095
3096 =cut
3097
3098 sub current_source_alias {
3099   my ($self) = @_;
3100
3101   return ($self->{attrs} || {})->{alias} || 'me';
3102 }
3103
3104 =head2 as_subselect_rs
3105
3106 =over 4
3107
3108 =item Arguments: none
3109
3110 =item Return Value: $resultset
3111
3112 =back
3113
3114 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3115 "virtual view" by including it as a subquery within the from clause.  From this
3116 point on, any joined tables are inaccessible to ->search on the resultset (as if
3117 it were simply where-filtered without joins).  For example:
3118
3119  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3120
3121  # 'x' now pollutes the query namespace
3122
3123  # So the following works as expected
3124  my $ok_rs = $rs->search({'x.other' => 1});
3125
3126  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3127  # def) we look for one row with contradictory terms and join in another table
3128  # (aliased 'x_2') which we never use
3129  my $broken_rs = $rs->search({'x.name' => 'def'});
3130
3131  my $rs2 = $rs->as_subselect_rs;
3132
3133  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3134  my $not_joined_rs = $rs2->search({'x.other' => 1});
3135
3136  # works as expected: finds a 'table' row related to two x rows (abc and def)
3137  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3138
3139 Another example of when one might use this would be to select a subset of
3140 columns in a group by clause:
3141
3142  my $rs = $schema->resultset('Bar')->search(undef, {
3143    group_by => [qw{ id foo_id baz_id }],
3144  })->as_subselect_rs->search(undef, {
3145    columns => [qw{ id foo_id }]
3146  });
3147
3148 In the above example normally columns would have to be equal to the group by,
3149 but because we isolated the group by into a subselect the above works.
3150
3151 =cut
3152
3153 sub as_subselect_rs {
3154   my $self = shift;
3155
3156   my $attrs = $self->_resolved_attrs;
3157
3158   my $fresh_rs = (ref $self)->new (
3159     $self->result_source
3160   );
3161
3162   # these pieces will be locked in the subquery
3163   delete $fresh_rs->{cond};
3164   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3165
3166   return $fresh_rs->search( {}, {
3167     from => [{
3168       $attrs->{alias} => $self->as_query,
3169       -alias  => $attrs->{alias},
3170       -rsrc   => $self->result_source,
3171     }],
3172     alias => $attrs->{alias},
3173   });
3174 }
3175
3176 # This code is called by search_related, and makes sure there
3177 # is clear separation between the joins before, during, and
3178 # after the relationship. This information is needed later
3179 # in order to properly resolve prefetch aliases (any alias
3180 # with a relation_chain_depth less than the depth of the
3181 # current prefetch is not considered)
3182 #
3183 # The increments happen twice per join. An even number means a
3184 # relationship specified via a search_related, whereas an odd
3185 # number indicates a join/prefetch added via attributes
3186 #
3187 # Also this code will wrap the current resultset (the one we
3188 # chain to) in a subselect IFF it contains limiting attributes
3189 sub _chain_relationship {
3190   my ($self, $rel) = @_;
3191   my $source = $self->result_source;
3192   my $attrs = { %{$self->{attrs}||{}} };
3193
3194   # we need to take the prefetch the attrs into account before we
3195   # ->_resolve_join as otherwise they get lost - captainL
3196   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3197
3198   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3199
3200   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3201
3202   my $from;
3203   my @force_subq_attrs = qw/offset rows group_by having/;
3204
3205   if (
3206     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3207       ||
3208     $self->_has_resolved_attr (@force_subq_attrs)
3209   ) {
3210     # Nuke the prefetch (if any) before the new $rs attrs
3211     # are resolved (prefetch is useless - we are wrapping
3212     # a subquery anyway).
3213     my $rs_copy = $self->search;
3214     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3215       $rs_copy->{attrs}{join},
3216       delete $rs_copy->{attrs}{prefetch},
3217     );
3218
3219     $from = [{
3220       -rsrc   => $source,
3221       -alias  => $attrs->{alias},
3222       $attrs->{alias} => $rs_copy->as_query,
3223     }];
3224     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3225     $seen->{-relation_chain_depth} = 0;
3226   }
3227   elsif ($attrs->{from}) {  #shallow copy suffices
3228     $from = [ @{$attrs->{from}} ];
3229   }
3230   else {
3231     $from = [{
3232       -rsrc  => $source,
3233       -alias => $attrs->{alias},
3234       $attrs->{alias} => $source->from,
3235     }];
3236   }
3237
3238   my $jpath = ($seen->{-relation_chain_depth})
3239     ? $from->[-1][0]{-join_path}
3240     : [];
3241
3242   my @requested_joins = $source->_resolve_join(
3243     $join,
3244     $attrs->{alias},
3245     $seen,
3246     $jpath,
3247   );
3248
3249   push @$from, @requested_joins;
3250
3251   $seen->{-relation_chain_depth}++;
3252
3253   # if $self already had a join/prefetch specified on it, the requested
3254   # $rel might very well be already included. What we do in this case
3255   # is effectively a no-op (except that we bump up the chain_depth on
3256   # the join in question so we could tell it *is* the search_related)
3257   my $already_joined;
3258
3259   # we consider the last one thus reverse
3260   for my $j (reverse @requested_joins) {
3261     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3262     if ($rel eq $last_j) {
3263       $j->[0]{-relation_chain_depth}++;
3264       $already_joined++;
3265       last;
3266     }
3267   }
3268
3269   unless ($already_joined) {
3270     push @$from, $source->_resolve_join(
3271       $rel,
3272       $attrs->{alias},
3273       $seen,
3274       $jpath,
3275     );
3276   }
3277
3278   $seen->{-relation_chain_depth}++;
3279
3280   return {%$attrs, from => $from, seen_join => $seen};
3281 }
3282
3283 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3284 sub _resolved_attrs_copy {
3285   my $self = shift;
3286   return { %{$self->_resolved_attrs (@_)} };
3287 }
3288
3289 sub _resolved_attrs {
3290   my $self = shift;
3291   return $self->{_attrs} if $self->{_attrs};
3292
3293   my $attrs  = { %{ $self->{attrs} || {} } };
3294   my $source = $self->result_source;
3295   my $alias  = $attrs->{alias};
3296
3297   # default selection list
3298   $attrs->{columns} = [ $source->columns ]
3299     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3300
3301   # merge selectors together
3302   for (qw/columns select as/) {
3303     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3304       if $attrs->{$_} or $attrs->{"+$_"};
3305   }
3306
3307   # disassemble columns
3308   my (@sel, @as);
3309   if (my $cols = delete $attrs->{columns}) {
3310     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3311       if (ref $c eq 'HASH') {
3312         for my $as (keys %$c) {
3313           push @sel, $c->{$as};
3314           push @as, $as;
3315         }
3316       }
3317       else {
3318         push @sel, $c;
3319         push @as, $c;
3320       }
3321     }
3322   }
3323
3324   # when trying to weed off duplicates later do not go past this point -
3325   # everything added from here on is unbalanced "anyone's guess" stuff
3326   my $dedup_stop_idx = $#as;
3327
3328   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3329     if $attrs->{as};
3330   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3331     if $attrs->{select};
3332
3333   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3334   for (@sel) {
3335     $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_";
3336   }
3337
3338   # disqualify all $alias.col as-bits (collapser mandated)
3339   for (@as) {
3340     $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_;
3341   }
3342
3343   # de-duplicate the result (remove *identical* select/as pairs)
3344   # and also die on duplicate {as} pointing to different {select}s
3345   # not using a c-style for as the condition is prone to shrinkage
3346   my $seen;
3347   my $i = 0;
3348   while ($i <= $dedup_stop_idx) {
3349     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3350       splice @sel, $i, 1;
3351       splice @as, $i, 1;
3352       $dedup_stop_idx--;
3353     }
3354     elsif ($seen->{$as[$i]}++) {
3355       $self->throw_exception(
3356         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3357       );
3358     }
3359     else {
3360       $i++;
3361     }
3362   }
3363
3364   $attrs->{select} = \@sel;
3365   $attrs->{as} = \@as;
3366
3367   $attrs->{from} ||= [{
3368     -rsrc   => $source,
3369     -alias  => $self->{attrs}{alias},
3370     $self->{attrs}{alias} => $source->from,
3371   }];
3372
3373   if ( $attrs->{join} || $attrs->{prefetch} ) {
3374
3375     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3376       if ref $attrs->{from} ne 'ARRAY';
3377
3378     my $join = (delete $attrs->{join}) || {};
3379
3380     if ( defined $attrs->{prefetch} ) {
3381       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3382     }
3383
3384     $attrs->{from} =    # have to copy here to avoid corrupting the original
3385       [
3386         @{ $attrs->{from} },
3387         $source->_resolve_join(
3388           $join,
3389           $alias,
3390           { %{ $attrs->{seen_join} || {} } },
3391           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3392             ? $attrs->{from}[-1][0]{-join_path}
3393             : []
3394           ,
3395         )
3396       ];
3397   }
3398
3399   if ( defined $attrs->{order_by} ) {
3400     $attrs->{order_by} = (
3401       ref( $attrs->{order_by} ) eq 'ARRAY'
3402       ? [ @{ $attrs->{order_by} } ]
3403       : [ $attrs->{order_by} || () ]
3404     );
3405   }
3406
3407   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3408     $attrs->{group_by} = [ $attrs->{group_by} ];
3409   }
3410
3411   # generate the distinct induced group_by early, as prefetch will be carried via a
3412   # subquery (since a group_by is present)
3413   if (delete $attrs->{distinct}) {
3414     if ($attrs->{group_by}) {
3415       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3416     }
3417     else {
3418       # distinct affects only the main selection part, not what prefetch may
3419       # add below.
3420       $attrs->{group_by} = $source->storage->_group_over_selection (
3421         $attrs->{from},
3422         $attrs->{select},
3423         $attrs->{order_by},
3424       );
3425     }
3426   }
3427
3428   $attrs->{collapse} ||= {};
3429   if ($attrs->{prefetch}) {
3430
3431     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3432       if $attrs->{_dark_selector};
3433
3434     my $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} );
3435
3436     my $prefetch_ordering = [];
3437
3438     # this is a separate structure (we don't look in {from} directly)
3439     # as the resolver needs to shift things off the lists to work
3440     # properly (identical-prefetches on different branches)
3441     my $join_map = {};
3442     if (ref $attrs->{from} eq 'ARRAY') {
3443
3444       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3445
3446       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3447         next unless $j->[0]{-alias};
3448         next unless $j->[0]{-join_path};
3449         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3450
3451         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3452
3453         my $p = $join_map;
3454         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3455         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3456       }
3457     }
3458
3459     my @prefetch =
3460       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3461
3462     # we need to somehow mark which columns came from prefetch
3463     if (@prefetch) {
3464       my $sel_end = $#{$attrs->{select}};
3465       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3466     }
3467
3468     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3469     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3470
3471     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3472     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3473   }
3474
3475
3476   # if both page and offset are specified, produce a combined offset
3477   # even though it doesn't make much sense, this is what pre 081xx has
3478   # been doing
3479   if (my $page = delete $attrs->{page}) {
3480     $attrs->{offset} =
3481       ($attrs->{rows} * ($page - 1))
3482             +
3483       ($attrs->{offset} || 0)
3484     ;
3485   }
3486
3487   return $self->{_attrs} = $attrs;
3488 }
3489
3490 sub _rollout_attr {
3491   my ($self, $attr) = @_;
3492
3493   if (ref $attr eq 'HASH') {
3494     return $self->_rollout_hash($attr);
3495   } elsif (ref $attr eq 'ARRAY') {
3496     return $self->_rollout_array($attr);
3497   } else {
3498     return [$attr];
3499   }
3500 }
3501
3502 sub _rollout_array {
3503   my ($self, $attr) = @_;
3504
3505   my @rolled_array;
3506   foreach my $element (@{$attr}) {
3507     if (ref $element eq 'HASH') {
3508       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3509     } elsif (ref $element eq 'ARRAY') {
3510       #  XXX - should probably recurse here
3511       push( @rolled_array, @{$self->_rollout_array($element)} );
3512     } else {
3513       push( @rolled_array, $element );
3514     }
3515   }
3516   return \@rolled_array;
3517 }
3518
3519 sub _rollout_hash {
3520   my ($self, $attr) = @_;
3521
3522   my @rolled_array;
3523   foreach my $key (keys %{$attr}) {
3524     push( @rolled_array, { $key => $attr->{$key} } );
3525   }
3526   return \@rolled_array;
3527 }
3528
3529 sub _calculate_score {
3530   my ($self, $a, $b) = @_;
3531
3532   if (defined $a xor defined $b) {
3533     return 0;
3534   }
3535   elsif (not defined $a) {
3536     return 1;
3537   }
3538
3539   if (ref $b eq 'HASH') {
3540     my ($b_key) = keys %{$b};
3541     if (ref $a eq 'HASH') {
3542       my ($a_key) = keys %{$a};
3543       if ($a_key eq $b_key) {
3544         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3545       } else {
3546         return 0;
3547       }
3548     } else {
3549       return ($a eq $b_key) ? 1 : 0;
3550     }
3551   } else {
3552     if (ref $a eq 'HASH') {
3553       my ($a_key) = keys %{$a};
3554       return ($b eq $a_key) ? 1 : 0;
3555     } else {
3556       return ($b eq $a) ? 1 : 0;
3557     }
3558   }
3559 }
3560
3561 sub _merge_joinpref_attr {
3562   my ($self, $orig, $import) = @_;
3563
3564   return $import unless defined($orig);
3565   return $orig unless defined($import);
3566
3567   $orig = $self->_rollout_attr($orig);
3568   $import = $self->_rollout_attr($import);
3569
3570   my $seen_keys;
3571   foreach my $import_element ( @{$import} ) {
3572     # find best candidate from $orig to merge $b_element into
3573     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3574     foreach my $orig_element ( @{$orig} ) {
3575       my $score = $self->_calculate_score( $orig_element, $import_element );
3576       if ($score > $best_candidate->{score}) {
3577         $best_candidate->{position} = $position;
3578         $best_candidate->{score} = $score;
3579       }
3580       $position++;
3581     }
3582     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3583
3584     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3585       push( @{$orig}, $import_element );
3586     } else {
3587       my $orig_best = $orig->[$best_candidate->{position}];
3588       # merge orig_best and b_element together and replace original with merged
3589       if (ref $orig_best ne 'HASH') {
3590         $orig->[$best_candidate->{position}] = $import_element;
3591       } elsif (ref $import_element eq 'HASH') {
3592         my ($key) = keys %{$orig_best};
3593         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3594       }
3595     }
3596     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3597   }
3598
3599   return $orig;
3600 }
3601
3602 {
3603   my $hm;
3604
3605   sub _merge_attr {
3606     $hm ||= do {
3607       require Hash::Merge;
3608       my $hm = Hash::Merge->new;
3609
3610       $hm->specify_behavior({
3611         SCALAR => {
3612           SCALAR => sub {
3613             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3614
3615             if ($defl xor $defr) {
3616               return [ $defl ? $_[0] : $_[1] ];
3617             }
3618             elsif (! $defl) {
3619               return [];
3620             }
3621             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3622               return [ $_[0] ];
3623             }
3624             else {
3625               return [$_[0], $_[1]];
3626             }
3627           },
3628           ARRAY => sub {
3629             return $_[1] if !defined $_[0];
3630             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3631             return [$_[0], @{$_[1]}]
3632           },
3633           HASH  => sub {
3634             return [] if !defined $_[0] and !keys %{$_[1]};
3635             return [ $_[1] ] if !defined $_[0];
3636             return [ $_[0] ] if !keys %{$_[1]};
3637             return [$_[0], $_[1]]
3638           },
3639         },
3640         ARRAY => {
3641           SCALAR => sub {
3642             return $_[0] if !defined $_[1];
3643             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3644             return [@{$_[0]}, $_[1]]
3645           },
3646           ARRAY => sub {
3647             my @ret = @{$_[0]} or return $_[1];
3648             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3649             my %idx = map { $_ => 1 } @ret;
3650             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3651             \@ret;
3652           },
3653           HASH => sub {
3654             return [ $_[1] ] if ! @{$_[0]};
3655             return $_[0] if !keys %{$_[1]};
3656             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3657             return [ @{$_[0]}, $_[1] ];
3658           },
3659         },
3660         HASH => {
3661           SCALAR => sub {
3662             return [] if !keys %{$_[0]} and !defined $_[1];
3663             return [ $_[0] ] if !defined $_[1];
3664             return [ $_[1] ] if !keys %{$_[0]};
3665             return [$_[0], $_[1]]
3666           },
3667           ARRAY => sub {
3668             return [] if !keys %{$_[0]} and !@{$_[1]};
3669             return [ $_[0] ] if !@{$_[1]};
3670             return $_[1] if !keys %{$_[0]};
3671             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3672             return [ $_[0], @{$_[1]} ];
3673           },
3674           HASH => sub {
3675             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3676             return [ $_[0] ] if !keys %{$_[1]};
3677             return [ $_[1] ] if !keys %{$_[0]};
3678             return [ $_[0] ] if $_[0] eq $_[1];
3679             return [ $_[0], $_[1] ];
3680           },
3681         }
3682       } => 'DBIC_RS_ATTR_MERGER');
3683       $hm;
3684     };
3685
3686     return $hm->merge ($_[1], $_[2]);
3687   }
3688 }
3689
3690 sub STORABLE_freeze {
3691   my ($self, $cloning) = @_;
3692   my $to_serialize = { %$self };
3693
3694   # A cursor in progress can't be serialized (and would make little sense anyway)
3695   delete $to_serialize->{cursor};
3696
3697   Storable::nfreeze($to_serialize);
3698 }
3699
3700 # need this hook for symmetry
3701 sub STORABLE_thaw {
3702   my ($self, $cloning, $serialized) = @_;
3703
3704   %$self = %{ Storable::thaw($serialized) };
3705
3706   $self;
3707 }
3708
3709
3710 =head2 throw_exception
3711
3712 See L<DBIx::Class::Schema/throw_exception> for details.
3713
3714 =cut
3715
3716 sub throw_exception {
3717   my $self=shift;
3718
3719   if (ref $self and my $rsrc = $self->result_source) {
3720     $rsrc->throw_exception(@_)
3721   }
3722   else {
3723     DBIx::Class::Exception->throw(@_);
3724   }
3725 }
3726
3727 # XXX: FIXME: Attributes docs need clearing up
3728
3729 =head1 ATTRIBUTES
3730
3731 Attributes are used to refine a ResultSet in various ways when
3732 searching for data. They can be passed to any method which takes an
3733 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3734 L</count>.
3735
3736 These are in no particular order:
3737
3738 =head2 order_by
3739
3740 =over 4
3741
3742 =item Value: ( $order_by | \@order_by | \%order_by )
3743
3744 =back
3745
3746 Which column(s) to order the results by.
3747
3748 [The full list of suitable values is documented in
3749 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3750 common options.]
3751
3752 If a single column name, or an arrayref of names is supplied, the
3753 argument is passed through directly to SQL. The hashref syntax allows
3754 for connection-agnostic specification of ordering direction:
3755
3756  For descending order:
3757
3758   order_by => { -desc => [qw/col1 col2 col3/] }
3759
3760  For explicit ascending order:
3761
3762   order_by => { -asc => 'col' }
3763
3764 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3765 supported, although you are strongly encouraged to use the hashref
3766 syntax as outlined above.
3767
3768 =head2 columns
3769
3770 =over 4
3771
3772 =item Value: \@columns
3773
3774 =back
3775
3776 Shortcut to request a particular set of columns to be retrieved. Each
3777 column spec may be a string (a table column name), or a hash (in which
3778 case the key is the C<as> value, and the value is used as the C<select>
3779 expression). Adds C<me.> onto the start of any column without a C<.> in
3780 it and sets C<select> from that, then auto-populates C<as> from
3781 C<select> as normal. (You may also use the C<cols> attribute, as in
3782 earlier versions of DBIC.)
3783
3784 Essentially C<columns> does the same as L</select> and L</as>.
3785
3786     columns => [ 'foo', { bar => 'baz' } ]
3787
3788 is the same as
3789
3790     select => [qw/foo baz/],
3791     as => [qw/foo bar/]
3792
3793 =head2 +columns
3794
3795 =over 4
3796
3797 =item Value: \@columns
3798
3799 =back
3800
3801 Indicates additional columns to be selected from storage. Works the same
3802 as L</columns> but adds columns to the selection. (You may also use the
3803 C<include_columns> attribute, as in earlier versions of DBIC). For
3804 example:-
3805
3806   $schema->resultset('CD')->search(undef, {
3807     '+columns' => ['artist.name'],
3808     join => ['artist']
3809   });
3810
3811 would return all CDs and include a 'name' column to the information
3812 passed to object inflation. Note that the 'artist' is the name of the
3813 column (or relationship) accessor, and 'name' is the name of the column
3814 accessor in the related table.
3815
3816 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3817 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3818 unary plus operator before it.
3819
3820 =head2 include_columns
3821
3822 =over 4
3823
3824 =item Value: \@columns
3825
3826 =back
3827
3828 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3829
3830 =head2 select
3831
3832 =over 4
3833
3834 =item Value: \@select_columns
3835
3836 =back
3837
3838 Indicates which columns should be selected from the storage. You can use
3839 column names, or in the case of RDBMS back ends, function or stored procedure
3840 names:
3841
3842   $rs = $schema->resultset('Employee')->search(undef, {
3843     select => [
3844       'name',
3845       { count => 'employeeid' },
3846       { max => { length => 'name' }, -as => 'longest_name' }
3847     ]
3848   });
3849
3850   # Equivalent SQL
3851   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3852
3853 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3854 use L</select>, to instruct DBIx::Class how to store the result of the column.
3855 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3856 identifier aliasing. You can however alias a function, so you can use it in
3857 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3858 attribute> supplied as shown in the example above.
3859
3860 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3861 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3862 unary plus operator before it.
3863
3864 =head2 +select
3865
3866 =over 4
3867
3868 Indicates additional columns to be selected from storage.  Works the same as
3869 L</select> but adds columns to the default selection, instead of specifying
3870 an explicit list.
3871
3872 =back
3873
3874 =head2 +as
3875
3876 =over 4
3877
3878 Indicates additional column names for those added via L</+select>. See L</as>.
3879
3880 =back
3881
3882 =head2 as
3883
3884 =over 4
3885
3886 =item Value: \@inflation_names
3887
3888 =back
3889
3890 Indicates column names for object inflation. That is L</as> indicates the
3891 slot name in which the column value will be stored within the
3892 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3893 identifier by the C<get_column> method (or via the object accessor B<if one
3894 with the same name already exists>) as shown below. The L</as> attribute has
3895 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3896
3897   $rs = $schema->resultset('Employee')->search(undef, {
3898     select => [
3899       'name',
3900       { count => 'employeeid' },
3901       { max => { length => 'name' }, -as => 'longest_name' }
3902     ],
3903     as => [qw/
3904       name
3905       employee_count
3906       max_name_length
3907     /],
3908   });
3909
3910 If the object against which the search is performed already has an accessor
3911 matching a column name specified in C<as>, the value can be retrieved using
3912 the accessor as normal:
3913
3914   my $name = $employee->name();
3915
3916 If on the other hand an accessor does not exist in the object, you need to
3917 use C<get_column> instead:
3918
3919   my $employee_count = $employee->get_column('employee_count');
3920
3921 You can create your own accessors if required - see
3922 L<DBIx::Class::Manual::Cookbook> for details.
3923
3924 =head2 join
3925
3926 =over 4
3927
3928 =item Value: ($rel_name | \@rel_names | \%rel_names)
3929
3930 =back
3931
3932 Contains a list of relationships that should be joined for this query.  For
3933 example:
3934
3935   # Get CDs by Nine Inch Nails
3936   my $rs = $schema->resultset('CD')->search(
3937     { 'artist.name' => 'Nine Inch Nails' },
3938     { join => 'artist' }
3939   );
3940
3941 Can also contain a hash reference to refer to the other relation's relations.
3942 For example:
3943
3944   package MyApp::Schema::Track;
3945   use base qw/DBIx::Class/;
3946   __PACKAGE__->table('track');
3947   __PACKAGE__->add_columns(qw/trackid cd position title/);
3948   __PACKAGE__->set_primary_key('trackid');
3949   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3950   1;
3951
3952   # In your application
3953   my $rs = $schema->resultset('Artist')->search(
3954     { 'track.title' => 'Teardrop' },
3955     {
3956       join     => { cd => 'track' },
3957       order_by => 'artist.name',
3958     }
3959   );
3960
3961 You need to use the relationship (not the table) name in  conditions,
3962 because they are aliased as such. The current table is aliased as "me", so
3963 you need to use me.column_name in order to avoid ambiguity. For example:
3964
3965   # Get CDs from 1984 with a 'Foo' track
3966   my $rs = $schema->resultset('CD')->search(
3967     {
3968       'me.year' => 1984,
3969       'tracks.name' => 'Foo'
3970     },
3971     { join => 'tracks' }
3972   );
3973
3974 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3975 similarly for a third time). For e.g.
3976
3977   my $rs = $schema->resultset('Artist')->search({
3978     'cds.title'   => 'Down to Earth',
3979     'cds_2.title' => 'Popular',
3980   }, {
3981     join => [ qw/cds cds/ ],
3982   });
3983
3984 will return a set of all artists that have both a cd with title 'Down
3985 to Earth' and a cd with title 'Popular'.
3986
3987 If you want to fetch related objects from other tables as well, see C<prefetch>
3988 below.
3989
3990 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3991
3992 =head2 prefetch
3993
3994 =over 4
3995
3996 =item Value: ($rel_name | \@rel_names | \%rel_names)
3997
3998 =back
3999
4000 Contains one or more relationships that should be fetched along with
4001 the main query (when they are accessed afterwards the data will
4002 already be available, without extra queries to the database).  This is
4003 useful for when you know you will need the related objects, because it
4004 saves at least one query:
4005
4006   my $rs = $schema->resultset('Tag')->search(
4007     undef,
4008     {
4009       prefetch => {
4010         cd => 'artist'
4011       }
4012     }
4013   );
4014
4015 The initial search results in SQL like the following:
4016
4017   SELECT tag.*, cd.*, artist.* FROM tag
4018   JOIN cd ON tag.cd = cd.cdid
4019   JOIN artist ON cd.artist = artist.artistid
4020
4021 L<DBIx::Class> has no need to go back to the database when we access the
4022 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4023 case.
4024
4025 Simple prefetches will be joined automatically, so there is no need
4026 for a C<join> attribute in the above search.
4027
4028 L</prefetch> can be used with the any of the relationship types and
4029 multiple prefetches can be specified together. Below is a more complex
4030 example that prefetches a CD's artist, its liner notes (if present),
4031 the cover image, the tracks on that cd, and the guests on those
4032 tracks.
4033
4034  # Assuming:
4035  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4036  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4037  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4038  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4039
4040  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4041
4042  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4043
4044
4045  my $rs = $schema->resultset('CD')->search(
4046    undef,
4047    {
4048      prefetch => [
4049        { artist => 'record_label'},  # belongs_to => belongs_to
4050        'liner_note',                 # might_have
4051        'cover_image',                # has_one
4052        { tracks => 'guests' },       # has_many => has_many
4053      ]
4054    }
4055  );
4056
4057 This will produce SQL like the following:
4058
4059  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4060         tracks.*, guests.*
4061    FROM cd me
4062    JOIN artist artist
4063      ON artist.artistid = me.artistid
4064    JOIN record_label record_label
4065      ON record_label.labelid = artist.labelid
4066    LEFT JOIN track tracks
4067      ON tracks.cdid = me.cdid
4068    LEFT JOIN guest guests
4069      ON guests.trackid = track.trackid
4070    LEFT JOIN liner_notes liner_note
4071      ON liner_note.cdid = me.cdid
4072    JOIN cd_artwork cover_image
4073      ON cover_image.cdid = me.cdid
4074  ORDER BY tracks.cd
4075
4076 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4077 C<tracks>, and C<guests> of the CD will all be available through the
4078 relationship accessors without the need for additional queries to the
4079 database.
4080
4081 However, there is one caveat to be observed: it can be dangerous to
4082 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4083 relationship on a given level. e.g.:
4084
4085  my $rs = $schema->resultset('CD')->search(
4086    undef,
4087    {
4088      prefetch => [
4089        'tracks',                         # has_many
4090        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4091      ]
4092    }
4093  );
4094
4095 In fact, C<DBIx::Class> will emit the following warning:
4096
4097  Prefetching multiple has_many rels tracks and cd_to_producer at top
4098  level will explode the number of row objects retrievable via ->next
4099  or ->all. Use at your own risk.
4100
4101 The collapser currently can't identify duplicate tuples for multiple
4102 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4103 result the second L<has_many|DBIx::Class::Relationship/has_many>
4104 relation could contain redundant objects.
4105
4106 =head3 Using L</prefetch> with L</join>
4107
4108 L</prefetch> implies a L</join> with the equivalent argument, and is
4109 properly merged with any existing L</join> specification. So the
4110 following:
4111
4112   my $rs = $schema->resultset('CD')->search(
4113    {'record_label.name' => 'Music Product Ltd.'},
4114    {
4115      join     => {artist => 'record_label'},
4116      prefetch => 'artist',
4117    }
4118  );
4119
4120 ... will work, searching on the record label's name, but only
4121 prefetching the C<artist>.
4122
4123 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4124
4125 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4126 prefetched relations.  So given:
4127
4128   my $rs = $schema->resultset('CD')->search(
4129    undef,
4130    {
4131      select   => ['cd.title'],
4132      as       => ['cd_title'],
4133      prefetch => 'artist',
4134    }
4135  );
4136
4137 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4138 becomes: C<'cd_title', 'artist.*'>.
4139
4140 =head3 CAVEATS
4141
4142 Prefetch does a lot of deep magic. As such, it may not behave exactly
4143 as you might expect.
4144
4145 =over 4
4146
4147 =item *
4148
4149 Prefetch uses the L</cache> to populate the prefetched relationships. This
4150 may or may not be what you want.
4151
4152 =item *
4153
4154 If you specify a condition on a prefetched relationship, ONLY those
4155 rows that match the prefetched condition will be fetched into that relationship.
4156 This means that adding prefetch to a search() B<may alter> what is returned by
4157 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4158
4159   my $artist_rs = $schema->resultset('Artist')->search({
4160       'cds.year' => 2008,
4161   }, {
4162       join => 'cds',
4163   });
4164
4165   my $count = $artist_rs->first->cds->count;
4166
4167   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4168
4169   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4170
4171   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4172
4173 that cmp_ok() may or may not pass depending on the datasets involved. This
4174 behavior may or may not survive the 0.09 transition.
4175
4176 =back
4177
4178 =head2 page
4179
4180 =over 4
4181
4182 =item Value: $page
4183
4184 =back
4185
4186 Makes the resultset paged and specifies the page to retrieve. Effectively
4187 identical to creating a non-pages resultset and then calling ->page($page)
4188 on it.
4189
4190 If L</rows> attribute is not specified it defaults to 10 rows per page.
4191
4192 When you have a paged resultset, L</count> will only return the number
4193 of rows in the page. To get the total, use the L</pager> and call
4194 C<total_entries> on it.
4195
4196 =head2 rows
4197
4198 =over 4
4199
4200 =item Value: $rows
4201
4202 =back
4203
4204 Specifies the maximum number of rows for direct retrieval or the number of
4205 rows per page if the page attribute or method is used.
4206
4207 =head2 offset
4208
4209 =over 4
4210
4211 =item Value: $offset
4212
4213 =back
4214
4215 Specifies the (zero-based) row number for the  first row to be returned, or the
4216 of the first row of the first page if paging is used.
4217
4218 =head2 group_by
4219
4220 =over 4
4221
4222 =item Value: \@columns
4223
4224 =back
4225
4226 A arrayref of columns to group by. Can include columns of joined tables.
4227
4228   group_by => [qw/ column1 column2 ... /]
4229
4230 =head2 having
4231
4232 =over 4
4233
4234 =item Value: $condition
4235
4236 =back
4237
4238 HAVING is a select statement attribute that is applied between GROUP BY and
4239 ORDER BY. It is applied to the after the grouping calculations have been
4240 done.
4241
4242   having => { 'count_employee' => { '>=', 100 } }
4243
4244 or with an in-place function in which case literal SQL is required:
4245
4246   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4247
4248 =head2 distinct
4249
4250 =over 4
4251
4252 =item Value: (0 | 1)
4253
4254 =back
4255
4256 Set to 1 to group by all columns. If the resultset already has a group_by
4257 attribute, this setting is ignored and an appropriate warning is issued.
4258
4259 =head2 where
4260
4261 =over 4
4262
4263 Adds to the WHERE clause.
4264
4265   # only return rows WHERE deleted IS NULL for all searches
4266   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4267
4268 Can be overridden by passing C<< { where => undef } >> as an attribute
4269 to a resultset.
4270
4271 =back
4272
4273 =head2 cache
4274
4275 Set to 1 to cache search results. This prevents extra SQL queries if you
4276 revisit rows in your ResultSet:
4277
4278   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4279
4280   while( my $artist = $resultset->next ) {
4281     ... do stuff ...
4282   }
4283
4284   $rs->first; # without cache, this would issue a query
4285
4286 By default, searches are not cached.
4287
4288 For more examples of using these attributes, see
4289 L<DBIx::Class::Manual::Cookbook>.
4290
4291 =head2 for
4292
4293 =over 4
4294
4295 =item Value: ( 'update' | 'shared' )
4296
4297 =back
4298
4299 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4300 ... FOR SHARED.
4301
4302 =cut
4303
4304 1;