Initial full test pass - all fetches are eager for now
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11 use Data::Compare (); # no imports!!! guard against insane architecture
12
13 # not importing first() as it will clash with our own method
14 use List::Util ();
15
16 BEGIN {
17   # De-duplication in _merge_attr() is disabled, but left in for reference
18   # (the merger is used for other things that ought not to be de-duped)
19   *__HM_DEDUP = sub () { 0 };
20 }
21
22 use namespace::clean;
23
24 use overload
25         '0+'     => "count",
26         'bool'   => "_bool",
27         fallback => 1;
28
29 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
30
31 =head1 NAME
32
33 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
34
35 =head1 SYNOPSIS
36
37   my $users_rs   = $schema->resultset('User');
38   while( $user = $users_rs->next) {
39     print $user->username;
40   }
41
42   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
43   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
44
45 =head1 DESCRIPTION
46
47 A ResultSet is an object which stores a set of conditions representing
48 a query. It is the backbone of DBIx::Class (i.e. the really
49 important/useful bit).
50
51 No SQL is executed on the database when a ResultSet is created, it
52 just stores all the conditions needed to create the query.
53
54 A basic ResultSet representing the data of an entire table is returned
55 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
56 L<Source|DBIx::Class::Manual::Glossary/Source> name.
57
58   my $users_rs = $schema->resultset('User');
59
60 A new ResultSet is returned from calling L</search> on an existing
61 ResultSet. The new one will contain all the conditions of the
62 original, plus any new conditions added in the C<search> call.
63
64 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
65 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
66 represents.
67
68 The query that the ResultSet represents is B<only> executed against
69 the database when these methods are called:
70 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
71
72 If a resultset is used in a numeric context it returns the L</count>.
73 However, if it is used in a boolean context it is B<always> true.  So if
74 you want to check if a resultset has any results, you must use C<if $rs
75 != 0>.
76
77 =head1 CUSTOM ResultSet CLASSES THAT USE Moose
78
79 If you want to make your custom ResultSet classes with L<Moose>, use a template
80 similar to:
81
82     package MyApp::Schema::ResultSet::User;
83
84     use Moose;
85     use namespace::autoclean;
86     use MooseX::NonMoose;
87     extends 'DBIx::Class::ResultSet';
88
89     sub BUILDARGS { $_[2] }
90
91     ...your code...
92
93     __PACKAGE__->meta->make_immutable;
94
95     1;
96
97 The L<MooseX::NonMoose> is necessary so that the L<Moose> constructor does not
98 clash with the regular ResultSet constructor. Alternatively, you can use:
99
100     __PACKAGE__->meta->make_immutable(inline_constructor => 0);
101
102 The L<BUILDARGS|Moose::Manual::Construction/BUILDARGS> is necessary because the
103 signature of the ResultSet C<new> is C<< ->new($source, \%args) >>.
104
105 =head1 EXAMPLES
106
107 =head2 Chaining resultsets
108
109 Let's say you've got a query that needs to be run to return some data
110 to the user. But, you have an authorization system in place that
111 prevents certain users from seeing certain information. So, you want
112 to construct the basic query in one method, but add constraints to it in
113 another.
114
115   sub get_data {
116     my $self = shift;
117     my $request = $self->get_request; # Get a request object somehow.
118     my $schema = $self->result_source->schema;
119
120     my $cd_rs = $schema->resultset('CD')->search({
121       title => $request->param('title'),
122       year => $request->param('year'),
123     });
124
125     $cd_rs = $self->apply_security_policy( $cd_rs );
126
127     return $cd_rs->all();
128   }
129
130   sub apply_security_policy {
131     my $self = shift;
132     my ($rs) = @_;
133
134     return $rs->search({
135       subversive => 0,
136     });
137   }
138
139 =head3 Resolving conditions and attributes
140
141 When a resultset is chained from another resultset, conditions and
142 attributes with the same keys need resolving.
143
144 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
145 into the existing ones from the original resultset.
146
147 The L</where> and L</having> attributes, and any search conditions, are
148 merged with an SQL C<AND> to the existing condition from the original
149 resultset.
150
151 All other attributes are overridden by any new ones supplied in the
152 search attributes.
153
154 =head2 Multiple queries
155
156 Since a resultset just defines a query, you can do all sorts of
157 things with it with the same object.
158
159   # Don't hit the DB yet.
160   my $cd_rs = $schema->resultset('CD')->search({
161     title => 'something',
162     year => 2009,
163   });
164
165   # Each of these hits the DB individually.
166   my $count = $cd_rs->count;
167   my $most_recent = $cd_rs->get_column('date_released')->max();
168   my @records = $cd_rs->all;
169
170 And it's not just limited to SELECT statements.
171
172   $cd_rs->delete();
173
174 This is even cooler:
175
176   $cd_rs->create({ artist => 'Fred' });
177
178 Which is the same as:
179
180   $schema->resultset('CD')->create({
181     title => 'something',
182     year => 2009,
183     artist => 'Fred'
184   });
185
186 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
187
188 =head1 METHODS
189
190 =head2 new
191
192 =over 4
193
194 =item Arguments: $source, \%$attrs
195
196 =item Return Value: $rs
197
198 =back
199
200 The resultset constructor. Takes a source object (usually a
201 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
202 L</ATTRIBUTES> below).  Does not perform any queries -- these are
203 executed as needed by the other methods.
204
205 Generally you won't need to construct a resultset manually.  You'll
206 automatically get one from e.g. a L</search> called in scalar context:
207
208   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
209
210 IMPORTANT: If called on an object, proxies to new_result instead so
211
212   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
213
214 will return a CD object, not a ResultSet.
215
216 =cut
217
218 sub new {
219   my $class = shift;
220   return $class->new_result(@_) if ref $class;
221
222   my ($source, $attrs) = @_;
223   $source = $source->resolve
224     if $source->isa('DBIx::Class::ResultSourceHandle');
225   $attrs = { %{$attrs||{}} };
226
227   if ($attrs->{page}) {
228     $attrs->{rows} ||= 10;
229   }
230
231   $attrs->{alias} ||= 'me';
232
233   my $self = bless {
234     result_source => $source,
235     cond => $attrs->{where},
236     pager => undef,
237     attrs => $attrs,
238   }, $class;
239
240   # if there is a dark selector, this means we are already in a
241   # chain and the cleanup/sanification was taken care of by
242   # _search_rs already
243   $self->_normalize_selection($attrs)
244     unless $attrs->{_dark_selector};
245
246   $self->result_class(
247     $attrs->{result_class} || $source->result_class
248   );
249
250   $self;
251 }
252
253 =head2 search
254
255 =over 4
256
257 =item Arguments: $cond, \%attrs?
258
259 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
260
261 =back
262
263   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
264   my $new_rs = $cd_rs->search({ year => 2005 });
265
266   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
267                  # year = 2005 OR year = 2004
268
269 In list context, C<< ->all() >> is called implicitly on the resultset, thus
270 returning a list of row objects instead. To avoid that, use L</search_rs>.
271
272 If you need to pass in additional attributes but no additional condition,
273 call it as C<search(undef, \%attrs)>.
274
275   # "SELECT name, artistid FROM $artist_table"
276   my @all_artists = $schema->resultset('Artist')->search(undef, {
277     columns => [qw/name artistid/],
278   });
279
280 For a list of attributes that can be passed to C<search>, see
281 L</ATTRIBUTES>. For more examples of using this function, see
282 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
283 documentation for the first argument, see L<SQL::Abstract>
284 and its extension L<DBIx::Class::SQLMaker>.
285
286 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
287
288 =head3 CAVEAT
289
290 Note that L</search> does not process/deflate any of the values passed in the
291 L<SQL::Abstract>-compatible search condition structure. This is unlike other
292 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
293 manually that any value passed to this method will stringify to something the
294 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
295 objects, for more info see:
296 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
297
298 =cut
299
300 sub search {
301   my $self = shift;
302   my $rs = $self->search_rs( @_ );
303
304   if (wantarray) {
305     return $rs->all;
306   }
307   elsif (defined wantarray) {
308     return $rs;
309   }
310   else {
311     # we can be called by a relationship helper, which in
312     # turn may be called in void context due to some braindead
313     # overload or whatever else the user decided to be clever
314     # at this particular day. Thus limit the exception to
315     # external code calls only
316     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
317       if (caller)[0] !~ /^\QDBIx::Class::/;
318
319     return ();
320   }
321 }
322
323 =head2 search_rs
324
325 =over 4
326
327 =item Arguments: $cond, \%attrs?
328
329 =item Return Value: $resultset
330
331 =back
332
333 This method does the same exact thing as search() except it will
334 always return a resultset, even in list context.
335
336 =cut
337
338 sub search_rs {
339   my $self = shift;
340
341   # Special-case handling for (undef, undef).
342   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
343     @_ = ();
344   }
345
346   my $call_attrs = {};
347   if (@_ > 1) {
348     if (ref $_[-1] eq 'HASH') {
349       # copy for _normalize_selection
350       $call_attrs = { %{ pop @_ } };
351     }
352     elsif (! defined $_[-1] ) {
353       pop @_;   # search({}, undef)
354     }
355   }
356
357   # see if we can keep the cache (no $rs changes)
358   my $cache;
359   my %safe = (alias => 1, cache => 1);
360   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
361     ! defined $_[0]
362       or
363     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
364       or
365     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
366   )) {
367     $cache = $self->get_cache;
368   }
369
370   my $rsrc = $self->result_source;
371
372   my $old_attrs = { %{$self->{attrs}} };
373   my $old_having = delete $old_attrs->{having};
374   my $old_where = delete $old_attrs->{where};
375
376   my $new_attrs = { %$old_attrs };
377
378   # take care of call attrs (only if anything is changing)
379   if (keys %$call_attrs) {
380
381     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
382
383     # reset the current selector list if new selectors are supplied
384     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
385       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
386     }
387
388     # Normalize the new selector list (operates on the passed-in attr structure)
389     # Need to do it on every chain instead of only once on _resolved_attrs, in
390     # order to allow detection of empty vs partial 'as'
391     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
392       if $old_attrs->{_dark_selector};
393     $self->_normalize_selection ($call_attrs);
394
395     # start with blind overwriting merge, exclude selector attrs
396     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
397     delete @{$new_attrs}{@selector_attrs};
398
399     for (@selector_attrs) {
400       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
401         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
402     }
403
404     # older deprecated name, use only if {columns} is not there
405     if (my $c = delete $new_attrs->{cols}) {
406       if ($new_attrs->{columns}) {
407         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
408       }
409       else {
410         $new_attrs->{columns} = $c;
411       }
412     }
413
414
415     # join/prefetch use their own crazy merging heuristics
416     foreach my $key (qw/join prefetch/) {
417       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
418         if exists $call_attrs->{$key};
419     }
420
421     # stack binds together
422     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
423   }
424
425
426   # rip apart the rest of @_, parse a condition
427   my $call_cond = do {
428
429     if (ref $_[0] eq 'HASH') {
430       (keys %{$_[0]}) ? $_[0] : undef
431     }
432     elsif (@_ == 1) {
433       $_[0]
434     }
435     elsif (@_ % 2) {
436       $self->throw_exception('Odd number of arguments to search')
437     }
438     else {
439       +{ @_ }
440     }
441
442   } if @_;
443
444   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
445     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
446   }
447
448   for ($old_where, $call_cond) {
449     if (defined $_) {
450       $new_attrs->{where} = $self->_stack_cond (
451         $_, $new_attrs->{where}
452       );
453     }
454   }
455
456   if (defined $old_having) {
457     $new_attrs->{having} = $self->_stack_cond (
458       $old_having, $new_attrs->{having}
459     )
460   }
461
462   my $rs = (ref $self)->new($rsrc, $new_attrs);
463
464   $rs->set_cache($cache) if ($cache);
465
466   return $rs;
467 }
468
469 my $dark_sel_dumper;
470 sub _normalize_selection {
471   my ($self, $attrs) = @_;
472
473   # legacy syntax
474   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
475     if exists $attrs->{include_columns};
476
477   # columns are always placed first, however
478
479   # Keep the X vs +X separation until _resolved_attrs time - this allows to
480   # delay the decision on whether to use a default select list ($rsrc->columns)
481   # allowing stuff like the remove_columns helper to work
482   #
483   # select/as +select/+as pairs need special handling - the amount of select/as
484   # elements in each pair does *not* have to be equal (think multicolumn
485   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
486   # supplied at all) - try to infer the alias, either from the -as parameter
487   # of the selector spec, or use the parameter whole if it looks like a column
488   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
489   # is ok as well), but make sure no more additions to the 'as' chain take place
490   for my $pref ('', '+') {
491
492     my ($sel, $as) = map {
493       my $key = "${pref}${_}";
494
495       my $val = [ ref $attrs->{$key} eq 'ARRAY'
496         ? @{$attrs->{$key}}
497         : $attrs->{$key} || ()
498       ];
499       delete $attrs->{$key};
500       $val;
501     } qw/select as/;
502
503     if (! @$as and ! @$sel ) {
504       next;
505     }
506     elsif (@$as and ! @$sel) {
507       $self->throw_exception(
508         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
509       );
510     }
511     elsif( ! @$as ) {
512       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
513       # if any @$as has been supplied we assume the user knows what (s)he is doing
514       # and blindly keep stacking up pieces
515       unless ($attrs->{_dark_selector}) {
516         SELECTOR:
517         for (@$sel) {
518           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
519             push @$as, $_->{-as};
520           }
521           # assume any plain no-space, no-parenthesis string to be a column spec
522           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
523           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
524             push @$as, $_;
525           }
526           # if all else fails - raise a flag that no more aliasing will be allowed
527           else {
528             $attrs->{_dark_selector} = {
529               plus_stage => $pref,
530               string => ($dark_sel_dumper ||= do {
531                   require Data::Dumper::Concise;
532                   Data::Dumper::Concise::DumperObject()->Indent(0);
533                 })->Values([$_])->Dump
534               ,
535             };
536             last SELECTOR;
537           }
538         }
539       }
540     }
541     elsif (@$as < @$sel) {
542       $self->throw_exception(
543         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
544       );
545     }
546     elsif ($pref and $attrs->{_dark_selector}) {
547       $self->throw_exception(
548         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
549       );
550     }
551
552
553     # merge result
554     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
555     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
556   }
557 }
558
559 sub _stack_cond {
560   my ($self, $left, $right) = @_;
561
562   # collapse single element top-level conditions
563   # (single pass only, unlikely to need recursion)
564   for ($left, $right) {
565     if (ref $_ eq 'ARRAY') {
566       if (@$_ == 0) {
567         $_ = undef;
568       }
569       elsif (@$_ == 1) {
570         $_ = $_->[0];
571       }
572     }
573     elsif (ref $_ eq 'HASH') {
574       my ($first, $more) = keys %$_;
575
576       # empty hash
577       if (! defined $first) {
578         $_ = undef;
579       }
580       # one element hash
581       elsif (! defined $more) {
582         if ($first eq '-and' and ref $_->{'-and'} eq 'HASH') {
583           $_ = $_->{'-and'};
584         }
585         elsif ($first eq '-or' and ref $_->{'-or'} eq 'ARRAY') {
586           $_ = $_->{'-or'};
587         }
588       }
589     }
590   }
591
592   # merge hashes with weeding out of duplicates (simple cases only)
593   if (ref $left eq 'HASH' and ref $right eq 'HASH') {
594
595     # shallow copy to destroy
596     $right = { %$right };
597     for (grep { exists $right->{$_} } keys %$left) {
598       # the use of eq_deeply here is justified - the rhs of an
599       # expression can contain a lot of twisted weird stuff
600       delete $right->{$_} if Data::Compare::Compare( $left->{$_}, $right->{$_} );
601     }
602
603     $right = undef unless keys %$right;
604   }
605
606
607   if (defined $left xor defined $right) {
608     return defined $left ? $left : $right;
609   }
610   elsif (! defined $left) {
611     return undef;
612   }
613   else {
614     return { -and => [ $left, $right ] };
615   }
616 }
617
618 =head2 search_literal
619
620 =over 4
621
622 =item Arguments: $sql_fragment, @bind_values
623
624 =item Return Value: $resultset (scalar context) || @row_objs (list context)
625
626 =back
627
628   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
629   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
630
631 Pass a literal chunk of SQL to be added to the conditional part of the
632 resultset query.
633
634 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
635 only be used in that context. C<search_literal> is a convenience method.
636 It is equivalent to calling $schema->search(\[]), but if you want to ensure
637 columns are bound correctly, use C<search>.
638
639 Example of how to use C<search> instead of C<search_literal>
640
641   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
642   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
643
644
645 See L<DBIx::Class::Manual::Cookbook/Searching> and
646 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
647 require C<search_literal>.
648
649 =cut
650
651 sub search_literal {
652   my ($self, $sql, @bind) = @_;
653   my $attr;
654   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
655     $attr = pop @bind;
656   }
657   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
658 }
659
660 =head2 find
661
662 =over 4
663
664 =item Arguments: \%columns_values | @pk_values, \%attrs?
665
666 =item Return Value: $row_object | undef
667
668 =back
669
670 Finds and returns a single row based on supplied criteria. Takes either a
671 hashref with the same format as L</create> (including inference of foreign
672 keys from related objects), or a list of primary key values in the same
673 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
674 declaration on the L</result_source>.
675
676 In either case an attempt is made to combine conditions already existing on
677 the resultset with the condition passed to this method.
678
679 To aid with preparing the correct query for the storage you may supply the
680 C<key> attribute, which is the name of a
681 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
682 unique constraint corresponding to the
683 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
684 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
685 to construct a query that satisfies the named unique constraint fully (
686 non-NULL values for each column member of the constraint) an exception is
687 thrown.
688
689 If no C<key> is specified, the search is carried over all unique constraints
690 which are fully defined by the available condition.
691
692 If no such constraint is found, C<find> currently defaults to a simple
693 C<< search->(\%column_values) >> which may or may not do what you expect.
694 Note that this fallback behavior may be deprecated in further versions. If
695 you need to search with arbitrary conditions - use L</search>. If the query
696 resulting from this fallback produces more than one row, a warning to the
697 effect is issued, though only the first row is constructed and returned as
698 C<$row_object>.
699
700 In addition to C<key>, L</find> recognizes and applies standard
701 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
702
703 Note that if you have extra concerns about the correctness of the resulting
704 query you need to specify the C<key> attribute and supply the entire condition
705 as an argument to find (since it is not always possible to perform the
706 combination of the resultset condition with the supplied one, especially if
707 the resultset condition contains literal sql).
708
709 For example, to find a row by its primary key:
710
711   my $cd = $schema->resultset('CD')->find(5);
712
713 You can also find a row by a specific unique constraint:
714
715   my $cd = $schema->resultset('CD')->find(
716     {
717       artist => 'Massive Attack',
718       title  => 'Mezzanine',
719     },
720     { key => 'cd_artist_title' }
721   );
722
723 See also L</find_or_create> and L</update_or_create>.
724
725 =cut
726
727 sub find {
728   my $self = shift;
729   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
730
731   my $rsrc = $self->result_source;
732
733   my $constraint_name;
734   if (exists $attrs->{key}) {
735     $constraint_name = defined $attrs->{key}
736       ? $attrs->{key}
737       : $self->throw_exception("An undefined 'key' resultset attribute makes no sense")
738     ;
739   }
740
741   # Parse out the condition from input
742   my $call_cond;
743
744   if (ref $_[0] eq 'HASH') {
745     $call_cond = { %{$_[0]} };
746   }
747   else {
748     # if only values are supplied we need to default to 'primary'
749     $constraint_name = 'primary' unless defined $constraint_name;
750
751     my @c_cols = $rsrc->unique_constraint_columns($constraint_name);
752
753     $self->throw_exception(
754       "No constraint columns, maybe a malformed '$constraint_name' constraint?"
755     ) unless @c_cols;
756
757     $self->throw_exception (
758       'find() expects either a column/value hashref, or a list of values '
759     . "corresponding to the columns of the specified unique constraint '$constraint_name'"
760     ) unless @c_cols == @_;
761
762     $call_cond = {};
763     @{$call_cond}{@c_cols} = @_;
764   }
765
766   my %related;
767   for my $key (keys %$call_cond) {
768     if (
769       my $keyref = ref($call_cond->{$key})
770         and
771       my $relinfo = $rsrc->relationship_info($key)
772     ) {
773       my $val = delete $call_cond->{$key};
774
775       next if $keyref eq 'ARRAY'; # has_many for multi_create
776
777       my $rel_q = $rsrc->_resolve_condition(
778         $relinfo->{cond}, $val, $key, $key
779       );
780       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
781       @related{keys %$rel_q} = values %$rel_q;
782     }
783   }
784
785   # relationship conditions take precedence (?)
786   @{$call_cond}{keys %related} = values %related;
787
788   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
789   my $final_cond;
790   if (defined $constraint_name) {
791     $final_cond = $self->_qualify_cond_columns (
792
793       $self->_build_unique_cond (
794         $constraint_name,
795         $call_cond,
796       ),
797
798       $alias,
799     );
800   }
801   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
802     # This means that we got here after a merger of relationship conditions
803     # in ::Relationship::Base::search_related (the row method), and furthermore
804     # the relationship is of the 'single' type. This means that the condition
805     # provided by the relationship (already attached to $self) is sufficient,
806     # as there can be only one row in the database that would satisfy the
807     # relationship
808   }
809   else {
810     # no key was specified - fall down to heuristics mode:
811     # run through all unique queries registered on the resultset, and
812     # 'OR' all qualifying queries together
813     my (@unique_queries, %seen_column_combinations);
814     for my $c_name ($rsrc->unique_constraint_names) {
815       next if $seen_column_combinations{
816         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
817       }++;
818
819       push @unique_queries, try {
820         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
821       } || ();
822     }
823
824     $final_cond = @unique_queries
825       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
826       : $self->_non_unique_find_fallback ($call_cond, $attrs)
827     ;
828   }
829
830   # Run the query, passing the result_class since it should propagate for find
831   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
832   if ($rs->_resolved_attrs->{collapse}) {
833     my $row = $rs->next;
834     carp "Query returned more than one row" if $rs->next;
835     return $row;
836   }
837   else {
838     return $rs->single;
839   }
840 }
841
842 # This is a stop-gap method as agreed during the discussion on find() cleanup:
843 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
844 #
845 # It is invoked when find() is called in legacy-mode with insufficiently-unique
846 # condition. It is provided for overrides until a saner way forward is devised
847 #
848 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
849 # the road. Please adjust your tests accordingly to catch this situation early
850 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
851 #
852 # The method will not be removed without an adequately complete replacement
853 # for strict-mode enforcement
854 sub _non_unique_find_fallback {
855   my ($self, $cond, $attrs) = @_;
856
857   return $self->_qualify_cond_columns(
858     $cond,
859     exists $attrs->{alias}
860       ? $attrs->{alias}
861       : $self->{attrs}{alias}
862   );
863 }
864
865
866 sub _qualify_cond_columns {
867   my ($self, $cond, $alias) = @_;
868
869   my %aliased = %$cond;
870   for (keys %aliased) {
871     $aliased{"$alias.$_"} = delete $aliased{$_}
872       if $_ !~ /\./;
873   }
874
875   return \%aliased;
876 }
877
878 sub _build_unique_cond {
879   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
880
881   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
882
883   # combination may fail if $self->{cond} is non-trivial
884   my ($final_cond) = try {
885     $self->_merge_with_rscond ($extra_cond)
886   } catch {
887     +{ %$extra_cond }
888   };
889
890   # trim out everything not in $columns
891   $final_cond = { map {
892     exists $final_cond->{$_}
893       ? ( $_ => $final_cond->{$_} )
894       : ()
895   } @c_cols };
896
897   if (my @missing = grep
898     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
899     (@c_cols)
900   ) {
901     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
902       $constraint_name,
903       join (', ', map { "'$_'" } @missing),
904     ) );
905   }
906
907   if (
908     !$croak_on_null
909       and
910     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
911       and
912     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
913   ) {
914     carp_unique ( sprintf (
915       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
916     . 'values in column(s): %s). This is almost certainly not what you wanted, '
917     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
918       $constraint_name,
919       join (', ', map { "'$_'" } @undefs),
920     ));
921   }
922
923   return $final_cond;
924 }
925
926 =head2 search_related
927
928 =over 4
929
930 =item Arguments: $rel, $cond?, \%attrs?
931
932 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
933
934 =back
935
936   $new_rs = $cd_rs->search_related('artist', {
937     name => 'Emo-R-Us',
938   });
939
940 Searches the specified relationship, optionally specifying a condition and
941 attributes for matching records. See L</ATTRIBUTES> for more information.
942
943 In list context, C<< ->all() >> is called implicitly on the resultset, thus
944 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
945
946 See also L</search_related_rs>.
947
948 =cut
949
950 sub search_related {
951   return shift->related_resultset(shift)->search(@_);
952 }
953
954 =head2 search_related_rs
955
956 This method works exactly the same as search_related, except that
957 it guarantees a resultset, even in list context.
958
959 =cut
960
961 sub search_related_rs {
962   return shift->related_resultset(shift)->search_rs(@_);
963 }
964
965 =head2 cursor
966
967 =over 4
968
969 =item Arguments: none
970
971 =item Return Value: $cursor
972
973 =back
974
975 Returns a storage-driven cursor to the given resultset. See
976 L<DBIx::Class::Cursor> for more information.
977
978 =cut
979
980 sub cursor {
981   my ($self) = @_;
982
983   my $attrs = $self->_resolved_attrs_copy;
984
985   return $self->{cursor}
986     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
987           $attrs->{where},$attrs);
988 }
989
990 =head2 single
991
992 =over 4
993
994 =item Arguments: $cond?
995
996 =item Return Value: $row_object | undef
997
998 =back
999
1000   my $cd = $schema->resultset('CD')->single({ year => 2001 });
1001
1002 Inflates the first result without creating a cursor if the resultset has
1003 any records in it; if not returns C<undef>. Used by L</find> as a lean version
1004 of L</search>.
1005
1006 While this method can take an optional search condition (just like L</search>)
1007 being a fast-code-path it does not recognize search attributes. If you need to
1008 add extra joins or similar, call L</search> and then chain-call L</single> on the
1009 L<DBIx::Class::ResultSet> returned.
1010
1011 =over
1012
1013 =item B<Note>
1014
1015 As of 0.08100, this method enforces the assumption that the preceding
1016 query returns only one row. If more than one row is returned, you will receive
1017 a warning:
1018
1019   Query returned more than one row
1020
1021 In this case, you should be using L</next> or L</find> instead, or if you really
1022 know what you are doing, use the L</rows> attribute to explicitly limit the size
1023 of the resultset.
1024
1025 This method will also throw an exception if it is called on a resultset prefetching
1026 has_many, as such a prefetch implies fetching multiple rows from the database in
1027 order to assemble the resulting object.
1028
1029 =back
1030
1031 =cut
1032
1033 sub single {
1034   my ($self, $where) = @_;
1035   if(@_ > 2) {
1036       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
1037   }
1038
1039   my $attrs = $self->_resolved_attrs_copy;
1040
1041   $self->throw_exception(
1042     'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
1043   ) if $attrs->{collapse};
1044
1045   if ($where) {
1046     if (defined $attrs->{where}) {
1047       $attrs->{where} = {
1048         '-and' =>
1049             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
1050                $where, delete $attrs->{where} ]
1051       };
1052     } else {
1053       $attrs->{where} = $where;
1054     }
1055   }
1056
1057   my $data = [ $self->result_source->storage->select_single(
1058     $attrs->{from}, $attrs->{select},
1059     $attrs->{where}, $attrs
1060   )];
1061
1062   return @$data ? $self->_construct_objects($data)->[0] : undef;
1063 }
1064
1065
1066 # _collapse_query
1067 #
1068 # Recursively collapse the query, accumulating values for each column.
1069
1070 sub _collapse_query {
1071   my ($self, $query, $collapsed) = @_;
1072
1073   $collapsed ||= {};
1074
1075   if (ref $query eq 'ARRAY') {
1076     foreach my $subquery (@$query) {
1077       next unless ref $subquery;  # -or
1078       $collapsed = $self->_collapse_query($subquery, $collapsed);
1079     }
1080   }
1081   elsif (ref $query eq 'HASH') {
1082     if (keys %$query and (keys %$query)[0] eq '-and') {
1083       foreach my $subquery (@{$query->{-and}}) {
1084         $collapsed = $self->_collapse_query($subquery, $collapsed);
1085       }
1086     }
1087     else {
1088       foreach my $col (keys %$query) {
1089         my $value = $query->{$col};
1090         $collapsed->{$col}{$value}++;
1091       }
1092     }
1093   }
1094
1095   return $collapsed;
1096 }
1097
1098 =head2 get_column
1099
1100 =over 4
1101
1102 =item Arguments: $cond?
1103
1104 =item Return Value: $resultsetcolumn
1105
1106 =back
1107
1108   my $max_length = $rs->get_column('length')->max;
1109
1110 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1111
1112 =cut
1113
1114 sub get_column {
1115   my ($self, $column) = @_;
1116   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1117   return $new;
1118 }
1119
1120 =head2 search_like
1121
1122 =over 4
1123
1124 =item Arguments: $cond, \%attrs?
1125
1126 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1127
1128 =back
1129
1130   # WHERE title LIKE '%blue%'
1131   $cd_rs = $rs->search_like({ title => '%blue%'});
1132
1133 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1134 that this is simply a convenience method retained for ex Class::DBI users.
1135 You most likely want to use L</search> with specific operators.
1136
1137 For more information, see L<DBIx::Class::Manual::Cookbook>.
1138
1139 This method is deprecated and will be removed in 0.09. Use L</search()>
1140 instead. An example conversion is:
1141
1142   ->search_like({ foo => 'bar' });
1143
1144   # Becomes
1145
1146   ->search({ foo => { like => 'bar' } });
1147
1148 =cut
1149
1150 sub search_like {
1151   my $class = shift;
1152   carp_unique (
1153     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1154    .' Instead use ->search({ x => { -like => "y%" } })'
1155    .' (note the outer pair of {}s - they are important!)'
1156   );
1157   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1158   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1159   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1160   return $class->search($query, { %$attrs });
1161 }
1162
1163 =head2 slice
1164
1165 =over 4
1166
1167 =item Arguments: $first, $last
1168
1169 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1170
1171 =back
1172
1173 Returns a resultset or object list representing a subset of elements from the
1174 resultset slice is called on. Indexes are from 0, i.e., to get the first
1175 three records, call:
1176
1177   my ($one, $two, $three) = $rs->slice(0, 2);
1178
1179 =cut
1180
1181 sub slice {
1182   my ($self, $min, $max) = @_;
1183   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1184   $attrs->{offset} = $self->{attrs}{offset} || 0;
1185   $attrs->{offset} += $min;
1186   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1187   return $self->search(undef, $attrs);
1188   #my $slice = (ref $self)->new($self->result_source, $attrs);
1189   #return (wantarray ? $slice->all : $slice);
1190 }
1191
1192 =head2 next
1193
1194 =over 4
1195
1196 =item Arguments: none
1197
1198 =item Return Value: $result | undef
1199
1200 =back
1201
1202 Returns the next element in the resultset (C<undef> is there is none).
1203
1204 Can be used to efficiently iterate over records in the resultset:
1205
1206   my $rs = $schema->resultset('CD')->search;
1207   while (my $cd = $rs->next) {
1208     print $cd->title;
1209   }
1210
1211 Note that you need to store the resultset object, and call C<next> on it.
1212 Calling C<< resultset('Table')->next >> repeatedly will always return the
1213 first record from the resultset.
1214
1215 =cut
1216
1217 sub next {
1218   my ($self) = @_;
1219
1220   if (my $cache = $self->get_cache) {
1221     $self->{all_cache_position} ||= 0;
1222     return $cache->[$self->{all_cache_position}++];
1223   }
1224
1225   if ($self->{attrs}{cache}) {
1226     delete $self->{pager};
1227     $self->{all_cache_position} = 1;
1228     return ($self->all)[0];
1229   }
1230
1231   return shift(@{$self->{stashed_objects}}) if @{ $self->{stashed_objects}||[] };
1232
1233   $self->{stashed_objects} = $self->_construct_objects
1234     or return undef;
1235
1236   return shift @{$self->{stashed_objects}};
1237 }
1238
1239 # takes a single DBI-row of data and coinstructs as many objects
1240 # as the resultset attributes call for.
1241 # This can be a bit of an action at a distance - it takes as an argument
1242 # the *current* cursor-row (already taken off the $sth), but if
1243 # collapsing is requested it will keep advancing the cursor either
1244 # until the current row-object is assembled (the collapser was able to
1245 # order the result sensibly) OR until the cursor is exhausted (an
1246 # unordered collapsing resultset effectively triggers ->all)
1247 sub _construct_objects {
1248   my ($self, $fetched_row, $fetch_all) = @_;
1249
1250   my $attrs = $self->_resolved_attrs;
1251   my $unordered = 0;  # will deal with this later
1252
1253   # this will be used as both initial raw-row collector AND as a RV of
1254   # _construct_objects. Not regrowing the   # array twice matters a lot...
1255   # a suprising amount actually
1256   my $rows;
1257
1258   # $fetch_all implies all() which means all stashes have been cleared
1259   # and the cursor reset
1260   if ($fetch_all) {
1261     # FIXME - we can do better, cursor->all (well a diff. method) should return a ref
1262     $rows = [ $self->cursor->all ];
1263   }
1264   elsif ($unordered) {
1265     $rows = [
1266       $fetched_row||(),
1267       @{ delete $self->{stashed_rows} || []},
1268       $self->cursor->all,
1269     ];
1270   }
1271   else {  # simple single object
1272     $rows = [ $fetched_row || ( @{$self->{stashed_rows}||[]} ? shift @{$self->{stashed_rows}} : [$self->cursor->next] ) ];
1273   }
1274
1275   return undef unless @{$rows->[0]||[]};
1276
1277   my $rsrc = $self->result_source;
1278   my $res_class = $self->result_class;
1279   my $inflator = $res_class->can ('inflate_result')
1280     or $self->throw_exception("Inflator $res_class does not provide an inflate_result() method");
1281
1282   # construct a much simpler array->hash folder for the one-table cases right here
1283   if ($attrs->{_single_object_inflation} and ! $attrs->{collapse}) {
1284     # FIXME this is a very very very hot spot
1285     # while rather optimal we can *still* do much better, by
1286     # building a smarter [Row|HRI]::inflate_result(), and
1287     # switch to feeding it data via some leaner interface
1288     #
1289     my $infmap = $attrs->{as};
1290     my @as_idx = 0..$#$infmap;
1291     for my $r (@$rows) {
1292       $r = [{ map { $infmap->[$_] => $r->[$_] } @as_idx }]
1293     }
1294
1295     # FIXME - this seems to be faster than the hashmapper aove, especially
1296     # on more rows, but need a better bench-environment to confirm
1297     #eval sprintf (
1298     #  '$_ = [{ %s }] for @$rows',
1299     #  join (', ', map { "\$infmap->[$_] => \$_->[$_]" } 0..$#$infmap )
1300     #);
1301   }
1302   else {
1303     push @$rows, @{$self->{stashed_rows}||[]};
1304
1305     $rsrc->_mk_row_parser({
1306       inflate_map => $attrs->{as},
1307       selection => $attrs->{select},
1308       collapse => $attrs->{collapse},
1309       unordered => $unordered,
1310     })->(
1311       $rows,  # modify in-place, shrinking/extending as necessary
1312       ($attrs->{collapse} and ! $fetch_all and ! $unordered)
1313         ? (
1314             sub { my @r = $self->cursor->next or return undef; \@r },
1315             ($self->{stashed_rows} = []), # this is where we empty things and prepare for leftovers
1316           )
1317         : ()
1318       ,
1319     );
1320   }
1321
1322   $_ = $res_class->$inflator($rsrc, @$_) for @$rows;
1323
1324   # CDBI compat stuff
1325   if ($attrs->{record_filter}) {
1326     $_ = $attrs->{record_filter}->($_) for @$rows;
1327   }
1328
1329   return $rows;
1330 }
1331
1332 =head2 result_source
1333
1334 =over 4
1335
1336 =item Arguments: $result_source?
1337
1338 =item Return Value: $result_source
1339
1340 =back
1341
1342 An accessor for the primary ResultSource object from which this ResultSet
1343 is derived.
1344
1345 =head2 result_class
1346
1347 =over 4
1348
1349 =item Arguments: $result_class?
1350
1351 =item Return Value: $result_class
1352
1353 =back
1354
1355 An accessor for the class to use when creating row objects. Defaults to
1356 C<< result_source->result_class >> - which in most cases is the name of the
1357 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1358
1359 Note that changing the result_class will also remove any components
1360 that were originally loaded in the source class via
1361 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1362 in the original source class will not run.
1363
1364 =cut
1365
1366 sub result_class {
1367   my ($self, $result_class) = @_;
1368   if ($result_class) {
1369     unless (ref $result_class) { # don't fire this for an object
1370       $self->ensure_class_loaded($result_class);
1371     }
1372     $self->_result_class($result_class);
1373     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1374     # permit the user to set result class on one result set only; it only
1375     # chains if provided to search()
1376     #$self->{attrs}{result_class} = $result_class if ref $self;
1377   }
1378   $self->_result_class;
1379 }
1380
1381 =head2 count
1382
1383 =over 4
1384
1385 =item Arguments: $cond, \%attrs??
1386
1387 =item Return Value: $count
1388
1389 =back
1390
1391 Performs an SQL C<COUNT> with the same query as the resultset was built
1392 with to find the number of elements. Passing arguments is equivalent to
1393 C<< $rs->search ($cond, \%attrs)->count >>
1394
1395 =cut
1396
1397 sub count {
1398   my $self = shift;
1399   return $self->search(@_)->count if @_ and defined $_[0];
1400   return scalar @{ $self->get_cache } if $self->get_cache;
1401
1402   my $attrs = $self->_resolved_attrs_copy;
1403
1404   # this is a little optimization - it is faster to do the limit
1405   # adjustments in software, instead of a subquery
1406   my $rows = delete $attrs->{rows};
1407   my $offset = delete $attrs->{offset};
1408
1409   my $crs;
1410   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1411     $crs = $self->_count_subq_rs ($attrs);
1412   }
1413   else {
1414     $crs = $self->_count_rs ($attrs);
1415   }
1416   my $count = $crs->next;
1417
1418   $count -= $offset if $offset;
1419   $count = $rows if $rows and $rows < $count;
1420   $count = 0 if ($count < 0);
1421
1422   return $count;
1423 }
1424
1425 =head2 count_rs
1426
1427 =over 4
1428
1429 =item Arguments: $cond, \%attrs??
1430
1431 =item Return Value: $count_rs
1432
1433 =back
1434
1435 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1436 This can be very handy for subqueries:
1437
1438   ->search( { amount => $some_rs->count_rs->as_query } )
1439
1440 As with regular resultsets the SQL query will be executed only after
1441 the resultset is accessed via L</next> or L</all>. That would return
1442 the same single value obtainable via L</count>.
1443
1444 =cut
1445
1446 sub count_rs {
1447   my $self = shift;
1448   return $self->search(@_)->count_rs if @_;
1449
1450   # this may look like a lack of abstraction (count() does about the same)
1451   # but in fact an _rs *must* use a subquery for the limits, as the
1452   # software based limiting can not be ported if this $rs is to be used
1453   # in a subquery itself (i.e. ->as_query)
1454   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1455     return $self->_count_subq_rs;
1456   }
1457   else {
1458     return $self->_count_rs;
1459   }
1460 }
1461
1462 #
1463 # returns a ResultSetColumn object tied to the count query
1464 #
1465 sub _count_rs {
1466   my ($self, $attrs) = @_;
1467
1468   my $rsrc = $self->result_source;
1469   $attrs ||= $self->_resolved_attrs;
1470
1471   my $tmp_attrs = { %$attrs };
1472   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1473   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1474
1475   # overwrite the selector (supplied by the storage)
1476   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1477   $tmp_attrs->{as} = 'count';
1478   delete @{$tmp_attrs}{qw/columns/};
1479
1480   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1481
1482   return $tmp_rs;
1483 }
1484
1485 #
1486 # same as above but uses a subquery
1487 #
1488 sub _count_subq_rs {
1489   my ($self, $attrs) = @_;
1490
1491   my $rsrc = $self->result_source;
1492   $attrs ||= $self->_resolved_attrs;
1493
1494   my $sub_attrs = { %$attrs };
1495   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1496   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1497
1498   # if we multi-prefetch we group_by something unique, as this is what we would
1499   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1500   if ( $attrs->{collapse}  ) {
1501     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } @{
1502       $rsrc->_identifying_column_set || $self->throw_exception(
1503         'Unable to construct a unique group_by criteria properly collapsing the '
1504       . 'has_many prefetch before count()'
1505       );
1506     } ]
1507   }
1508
1509   # Calculate subquery selector
1510   if (my $g = $sub_attrs->{group_by}) {
1511
1512     my $sql_maker = $rsrc->storage->sql_maker;
1513
1514     # necessary as the group_by may refer to aliased functions
1515     my $sel_index;
1516     for my $sel (@{$attrs->{select}}) {
1517       $sel_index->{$sel->{-as}} = $sel
1518         if (ref $sel eq 'HASH' and $sel->{-as});
1519     }
1520
1521     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1522     # also look for named aggregates referred in the having clause
1523     # having often contains scalarrefs - thus parse it out entirely
1524     my @parts = @$g;
1525     if ($attrs->{having}) {
1526       local $sql_maker->{having_bind};
1527       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1528       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1529       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1530         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1531         # if we don't unset it we screw up retarded but unfortunately working
1532         # 'MAX(foo.bar)' => { '>', 3 }
1533         $sql_maker->{name_sep} = '';
1534       }
1535
1536       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1537
1538       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1539
1540       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1541       # and if all fails for an utterly naive quoted scalar-with-function
1542       while ($sql =~ /
1543         $rquote $sep $lquote (.+?) $rquote
1544           |
1545         [\s,] \w+ \. (\w+) [\s,]
1546           |
1547         [\s,] $lquote (.+?) $rquote [\s,]
1548       /gx) {
1549         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1550       }
1551     }
1552
1553     for (@parts) {
1554       my $colpiece = $sel_index->{$_} || $_;
1555
1556       # unqualify join-based group_by's. Arcane but possible query
1557       # also horrible horrible hack to alias a column (not a func.)
1558       # (probably need to introduce SQLA syntax)
1559       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1560         my $as = $colpiece;
1561         $as =~ s/\./__/;
1562         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1563       }
1564       push @{$sub_attrs->{select}}, $colpiece;
1565     }
1566   }
1567   else {
1568     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1569     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1570   }
1571
1572   return $rsrc->resultset_class
1573                ->new ($rsrc, $sub_attrs)
1574                 ->as_subselect_rs
1575                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1576                   ->get_column ('count');
1577 }
1578
1579 sub _bool {
1580   return 1;
1581 }
1582
1583 =head2 count_literal
1584
1585 =over 4
1586
1587 =item Arguments: $sql_fragment, @bind_values
1588
1589 =item Return Value: $count
1590
1591 =back
1592
1593 Counts the results in a literal query. Equivalent to calling L</search_literal>
1594 with the passed arguments, then L</count>.
1595
1596 =cut
1597
1598 sub count_literal { shift->search_literal(@_)->count; }
1599
1600 =head2 all
1601
1602 =over 4
1603
1604 =item Arguments: none
1605
1606 =item Return Value: @objects
1607
1608 =back
1609
1610 Returns all elements in the resultset.
1611
1612 =cut
1613
1614 sub all {
1615   my $self = shift;
1616   if(@_) {
1617     $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1618   }
1619
1620   delete $self->{stashed_rows};
1621   delete $self->{stashed_objects};
1622
1623   if (my $c = $self->get_cache) {
1624     return @$c;
1625   }
1626
1627   $self->cursor->reset;
1628
1629   my $objs = $self->_construct_objects(undef, 'fetch_all') || [];
1630
1631   $self->set_cache($objs) if $self->{attrs}{cache};
1632
1633   return @$objs;
1634 }
1635
1636 =head2 reset
1637
1638 =over 4
1639
1640 =item Arguments: none
1641
1642 =item Return Value: $self
1643
1644 =back
1645
1646 Resets the resultset's cursor, so you can iterate through the elements again.
1647 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1648 another query.
1649
1650 =cut
1651
1652 sub reset {
1653   my ($self) = @_;
1654   delete $self->{_attrs};
1655   delete $self->{stashed_rows};
1656   delete $self->{stashed_objects};
1657
1658   $self->{all_cache_position} = 0;
1659   $self->cursor->reset;
1660   return $self;
1661 }
1662
1663 =head2 first
1664
1665 =over 4
1666
1667 =item Arguments: none
1668
1669 =item Return Value: $object | undef
1670
1671 =back
1672
1673 Resets the resultset and returns an object for the first result (or C<undef>
1674 if the resultset is empty).
1675
1676 =cut
1677
1678 sub first {
1679   return $_[0]->reset->next;
1680 }
1681
1682
1683 # _rs_update_delete
1684 #
1685 # Determines whether and what type of subquery is required for the $rs operation.
1686 # If grouping is necessary either supplies its own, or verifies the current one
1687 # After all is done delegates to the proper storage method.
1688
1689 sub _rs_update_delete {
1690   my ($self, $op, $values) = @_;
1691
1692   my $cond = $self->{cond};
1693   my $rsrc = $self->result_source;
1694   my $storage = $rsrc->schema->storage;
1695
1696   my $attrs = { %{$self->_resolved_attrs} };
1697
1698   # "needs" is a strong word here - if the subquery is part of an IN clause - no point of
1699   # even adding the group_by. It will really be used only when composing a poor-man's
1700   # multicolumn-IN equivalent OR set
1701   my $needs_group_by_subq = defined $attrs->{group_by};
1702
1703   # simplify the joinmap and maybe decide if a grouping (and thus subquery) is necessary
1704   my $relation_classifications;
1705   if (ref($attrs->{from}) eq 'ARRAY') {
1706     $attrs->{from} = $storage->_prune_unused_joins ($attrs->{from}, $attrs->{select}, $cond, $attrs);
1707
1708     $relation_classifications = $storage->_resolve_aliastypes_from_select_args (
1709       [ @{$attrs->{from}}[1 .. $#{$attrs->{from}}] ],
1710       $attrs->{select},
1711       $cond,
1712       $attrs
1713     ) unless $needs_group_by_subq;  # we already know we need a group, no point of resolving them
1714   }
1715   else {
1716     $needs_group_by_subq ||= 1; # if {from} is unparseable assume the worst
1717   }
1718
1719   $needs_group_by_subq ||= exists $relation_classifications->{multiplying};
1720
1721   # if no subquery - life is easy-ish
1722   unless (
1723     $needs_group_by_subq
1724       or
1725     keys %$relation_classifications # if any joins at all - need to wrap a subq
1726       or
1727     $self->_has_resolved_attr(qw/rows offset/) # limits call for a subq
1728   ) {
1729     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1730     # a condition containing 'me' or other table prefixes will not work
1731     # at all. What this code tries to do (badly) is to generate a condition
1732     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1733     #
1734     # this is atrocious and should be replaced by normal sqla introspection
1735     # one sunny day
1736     my ($sql, @bind) = do {
1737       my $sqla = $rsrc->storage->sql_maker;
1738       local $sqla->{_dequalify_idents} = 1;
1739       $sqla->_recurse_where($self->{cond});
1740     } if $self->{cond};
1741
1742     return $rsrc->storage->$op(
1743       $rsrc,
1744       $op eq 'update' ? $values : (),
1745       $self->{cond} ? \[$sql, @bind] : (),
1746     );
1747   }
1748
1749   # we got this far - means it is time to wrap a subquery
1750   my $idcols = $rsrc->_identifying_column_set || $self->throw_exception(
1751     sprintf(
1752       "Unable to perform complex resultset %s() without an identifying set of columns on source '%s'",
1753       $op,
1754       $rsrc->source_name,
1755     )
1756   );
1757   my $existing_group_by = delete $attrs->{group_by};
1758
1759   # make a new $rs selecting only the PKs (that's all we really need for the subq)
1760   delete $attrs->{$_} for qw/collapse select _prefetch_selector_range as/;
1761   $attrs->{columns} = [ map { "$attrs->{alias}.$_" } @$idcols ];
1762   $attrs->{group_by} = \ '';  # FIXME - this is an evil hack, it causes the optimiser to kick in and throw away the LEFT joins
1763   my $subrs = (ref $self)->new($rsrc, $attrs);
1764
1765   if (@$idcols == 1) {
1766     return $storage->$op (
1767       $rsrc,
1768       $op eq 'update' ? $values : (),
1769       { $idcols->[0] => { -in => $subrs->as_query } },
1770     );
1771   }
1772   elsif ($storage->_use_multicolumn_in) {
1773     # This is hideously ugly, but SQLA does not understand multicol IN expressions
1774     my $sql_maker = $storage->sql_maker;
1775     my ($sql, @bind) = @${$subrs->as_query};
1776     $sql = sprintf ('(%s) IN %s', # the as_query already comes with a set of parenthesis
1777       join (', ', map { $sql_maker->_quote ($_) } @$idcols),
1778       $sql,
1779     );
1780
1781     return $storage->$op (
1782       $rsrc,
1783       $op eq 'update' ? $values : (),
1784       \[$sql, @bind],
1785     );
1786   }
1787   else {
1788     # if all else fails - get all primary keys and operate over a ORed set
1789     # wrap in a transaction for consistency
1790     # this is where the group_by starts to matter
1791     my $subq_group_by;
1792     if ($needs_group_by_subq) {
1793       $subq_group_by = $attrs->{columns};
1794
1795       # make sure if there is a supplied group_by it matches the columns compiled above
1796       # perfectly. Anything else can not be sanely executed on most databases so croak
1797       # right then and there
1798       if ($existing_group_by) {
1799         my @current_group_by = map
1800           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1801           @$existing_group_by
1802         ;
1803
1804         if (
1805           join ("\x00", sort @current_group_by)
1806             ne
1807           join ("\x00", sort @$subq_group_by )
1808         ) {
1809           $self->throw_exception (
1810             "You have just attempted a $op operation on a resultset which does group_by"
1811             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1812             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1813             . ' kind of queries. Please retry the operation with a modified group_by or'
1814             . ' without using one at all.'
1815           );
1816         }
1817       }
1818     }
1819
1820     my $guard = $storage->txn_scope_guard;
1821
1822     my @op_condition;
1823     for my $row ($subrs->search({}, { group_by => $subq_group_by })->cursor->all) {
1824       push @op_condition, { map
1825         { $idcols->[$_] => $row->[$_] }
1826         (0 .. $#$idcols)
1827       };
1828     }
1829
1830     my $res = $storage->$op (
1831       $rsrc,
1832       $op eq 'update' ? $values : (),
1833       \@op_condition,
1834     );
1835
1836     $guard->commit;
1837
1838     return $res;
1839   }
1840 }
1841
1842 =head2 update
1843
1844 =over 4
1845
1846 =item Arguments: \%values
1847
1848 =item Return Value: $storage_rv
1849
1850 =back
1851
1852 Sets the specified columns in the resultset to the supplied values in a
1853 single query. Note that this will not run any accessor/set_column/update
1854 triggers, nor will it update any row object instances derived from this
1855 resultset (this includes the contents of the L<resultset cache|/set_cache>
1856 if any). See L</update_all> if you need to execute any on-update
1857 triggers or cascades defined either by you or a
1858 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1859
1860 The return value is a pass through of what the underlying
1861 storage backend returned, and may vary. See L<DBI/execute> for the most
1862 common case.
1863
1864 =head3 CAVEAT
1865
1866 Note that L</update> does not process/deflate any of the values passed in.
1867 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1868 ensure manually that any value passed to this method will stringify to
1869 something the RDBMS knows how to deal with. A notable example is the
1870 handling of L<DateTime> objects, for more info see:
1871 L<DBIx::Class::Manual::Cookbook/Formatting DateTime objects in queries>.
1872
1873 =cut
1874
1875 sub update {
1876   my ($self, $values) = @_;
1877   $self->throw_exception('Values for update must be a hash')
1878     unless ref $values eq 'HASH';
1879
1880   return $self->_rs_update_delete ('update', $values);
1881 }
1882
1883 =head2 update_all
1884
1885 =over 4
1886
1887 =item Arguments: \%values
1888
1889 =item Return Value: 1
1890
1891 =back
1892
1893 Fetches all objects and updates them one at a time via
1894 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1895 triggers, while L</update> will not.
1896
1897 =cut
1898
1899 sub update_all {
1900   my ($self, $values) = @_;
1901   $self->throw_exception('Values for update_all must be a hash')
1902     unless ref $values eq 'HASH';
1903
1904   my $guard = $self->result_source->schema->txn_scope_guard;
1905   $_->update({%$values}) for $self->all;  # shallow copy - update will mangle it
1906   $guard->commit;
1907   return 1;
1908 }
1909
1910 =head2 delete
1911
1912 =over 4
1913
1914 =item Arguments: none
1915
1916 =item Return Value: $storage_rv
1917
1918 =back
1919
1920 Deletes the rows matching this resultset in a single query. Note that this
1921 will not run any delete triggers, nor will it alter the
1922 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1923 derived from this resultset (this includes the contents of the
1924 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1925 execute any on-delete triggers or cascades defined either by you or a
1926 L<result component|DBIx::Class::Manual::Component/WHAT IS A COMPONENT>.
1927
1928 The return value is a pass through of what the underlying storage backend
1929 returned, and may vary. See L<DBI/execute> for the most common case.
1930
1931 =cut
1932
1933 sub delete {
1934   my $self = shift;
1935   $self->throw_exception('delete does not accept any arguments')
1936     if @_;
1937
1938   return $self->_rs_update_delete ('delete');
1939 }
1940
1941 =head2 delete_all
1942
1943 =over 4
1944
1945 =item Arguments: none
1946
1947 =item Return Value: 1
1948
1949 =back
1950
1951 Fetches all objects and deletes them one at a time via
1952 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1953 triggers, while L</delete> will not.
1954
1955 =cut
1956
1957 sub delete_all {
1958   my $self = shift;
1959   $self->throw_exception('delete_all does not accept any arguments')
1960     if @_;
1961
1962   my $guard = $self->result_source->schema->txn_scope_guard;
1963   $_->delete for $self->all;
1964   $guard->commit;
1965   return 1;
1966 }
1967
1968 =head2 populate
1969
1970 =over 4
1971
1972 =item Arguments: \@data;
1973
1974 =back
1975
1976 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1977 For the arrayref of hashrefs style each hashref should be a structure suitable
1978 for submitting to a $resultset->create(...) method.
1979
1980 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1981 to insert the data, as this is a faster method.
1982
1983 Otherwise, each set of data is inserted into the database using
1984 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1985 accumulated into an array. The array itself, or an array reference
1986 is returned depending on scalar or list context.
1987
1988 Example:  Assuming an Artist Class that has many CDs Classes relating:
1989
1990   my $Artist_rs = $schema->resultset("Artist");
1991
1992   ## Void Context Example
1993   $Artist_rs->populate([
1994      { artistid => 4, name => 'Manufactured Crap', cds => [
1995         { title => 'My First CD', year => 2006 },
1996         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1997       ],
1998      },
1999      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
2000         { title => 'My parents sold me to a record company', year => 2005 },
2001         { title => 'Why Am I So Ugly?', year => 2006 },
2002         { title => 'I Got Surgery and am now Popular', year => 2007 }
2003       ],
2004      },
2005   ]);
2006
2007   ## Array Context Example
2008   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
2009     { name => "Artist One"},
2010     { name => "Artist Two"},
2011     { name => "Artist Three", cds=> [
2012     { title => "First CD", year => 2007},
2013     { title => "Second CD", year => 2008},
2014   ]}
2015   ]);
2016
2017   print $ArtistOne->name; ## response is 'Artist One'
2018   print $ArtistThree->cds->count ## reponse is '2'
2019
2020 For the arrayref of arrayrefs style,  the first element should be a list of the
2021 fieldsnames to which the remaining elements are rows being inserted.  For
2022 example:
2023
2024   $Arstist_rs->populate([
2025     [qw/artistid name/],
2026     [100, 'A Formally Unknown Singer'],
2027     [101, 'A singer that jumped the shark two albums ago'],
2028     [102, 'An actually cool singer'],
2029   ]);
2030
2031 Please note an important effect on your data when choosing between void and
2032 wantarray context. Since void context goes straight to C<insert_bulk> in
2033 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
2034 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
2035 create primary keys for you, you will find that your PKs are empty.  In this
2036 case you will have to use the wantarray context in order to create those
2037 values.
2038
2039 =cut
2040
2041 sub populate {
2042   my $self = shift;
2043
2044   # cruft placed in standalone method
2045   my $data = $self->_normalize_populate_args(@_);
2046
2047   return unless @$data;
2048
2049   if(defined wantarray) {
2050     my @created;
2051     foreach my $item (@$data) {
2052       push(@created, $self->create($item));
2053     }
2054     return wantarray ? @created : \@created;
2055   }
2056   else {
2057     my $first = $data->[0];
2058
2059     # if a column is a registered relationship, and is a non-blessed hash/array, consider
2060     # it relationship data
2061     my (@rels, @columns);
2062     my $rsrc = $self->result_source;
2063     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
2064     for (keys %$first) {
2065       my $ref = ref $first->{$_};
2066       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
2067         ? push @rels, $_
2068         : push @columns, $_
2069       ;
2070     }
2071
2072     my @pks = $rsrc->primary_columns;
2073
2074     ## do the belongs_to relationships
2075     foreach my $index (0..$#$data) {
2076
2077       # delegate to create() for any dataset without primary keys with specified relationships
2078       if (grep { !defined $data->[$index]->{$_} } @pks ) {
2079         for my $r (@rels) {
2080           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
2081             my @ret = $self->populate($data);
2082             return;
2083           }
2084         }
2085       }
2086
2087       foreach my $rel (@rels) {
2088         next unless ref $data->[$index]->{$rel} eq "HASH";
2089         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
2090         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
2091         my $related = $result->result_source->_resolve_condition(
2092           $reverse_relinfo->{cond},
2093           $self,
2094           $result,
2095           $rel,
2096         );
2097
2098         delete $data->[$index]->{$rel};
2099         $data->[$index] = {%{$data->[$index]}, %$related};
2100
2101         push @columns, keys %$related if $index == 0;
2102       }
2103     }
2104
2105     ## inherit the data locked in the conditions of the resultset
2106     my ($rs_data) = $self->_merge_with_rscond({});
2107     delete @{$rs_data}{@columns};
2108     my @inherit_cols = keys %$rs_data;
2109     my @inherit_data = values %$rs_data;
2110
2111     ## do bulk insert on current row
2112     $rsrc->storage->insert_bulk(
2113       $rsrc,
2114       [@columns, @inherit_cols],
2115       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
2116     );
2117
2118     ## do the has_many relationships
2119     foreach my $item (@$data) {
2120
2121       my $main_row;
2122
2123       foreach my $rel (@rels) {
2124         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2125
2126         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2127
2128         my $child = $main_row->$rel;
2129
2130         my $related = $child->result_source->_resolve_condition(
2131           $rels->{$rel}{cond},
2132           $child,
2133           $main_row,
2134           $rel,
2135         );
2136
2137         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2138         my @populate = map { {%$_, %$related} } @rows_to_add;
2139
2140         $child->populate( \@populate );
2141       }
2142     }
2143   }
2144 }
2145
2146
2147 # populate() argumnets went over several incarnations
2148 # What we ultimately support is AoH
2149 sub _normalize_populate_args {
2150   my ($self, $arg) = @_;
2151
2152   if (ref $arg eq 'ARRAY') {
2153     if (!@$arg) {
2154       return [];
2155     }
2156     elsif (ref $arg->[0] eq 'HASH') {
2157       return $arg;
2158     }
2159     elsif (ref $arg->[0] eq 'ARRAY') {
2160       my @ret;
2161       my @colnames = @{$arg->[0]};
2162       foreach my $values (@{$arg}[1 .. $#$arg]) {
2163         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2164       }
2165       return \@ret;
2166     }
2167   }
2168
2169   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2170 }
2171
2172 =head2 pager
2173
2174 =over 4
2175
2176 =item Arguments: none
2177
2178 =item Return Value: $pager
2179
2180 =back
2181
2182 Return Value a L<Data::Page> object for the current resultset. Only makes
2183 sense for queries with a C<page> attribute.
2184
2185 To get the full count of entries for a paged resultset, call
2186 C<total_entries> on the L<Data::Page> object.
2187
2188 =cut
2189
2190 sub pager {
2191   my ($self) = @_;
2192
2193   return $self->{pager} if $self->{pager};
2194
2195   my $attrs = $self->{attrs};
2196   if (!defined $attrs->{page}) {
2197     $self->throw_exception("Can't create pager for non-paged rs");
2198   }
2199   elsif ($attrs->{page} <= 0) {
2200     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2201   }
2202   $attrs->{rows} ||= 10;
2203
2204   # throw away the paging flags and re-run the count (possibly
2205   # with a subselect) to get the real total count
2206   my $count_attrs = { %$attrs };
2207   delete $count_attrs->{$_} for qw/rows offset page pager/;
2208
2209   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2210
2211   require DBIx::Class::ResultSet::Pager;
2212   return $self->{pager} = DBIx::Class::ResultSet::Pager->new(
2213     sub { $total_rs->count },  #lazy-get the total
2214     $attrs->{rows},
2215     $self->{attrs}{page},
2216   );
2217 }
2218
2219 =head2 page
2220
2221 =over 4
2222
2223 =item Arguments: $page_number
2224
2225 =item Return Value: $rs
2226
2227 =back
2228
2229 Returns a resultset for the $page_number page of the resultset on which page
2230 is called, where each page contains a number of rows equal to the 'rows'
2231 attribute set on the resultset (10 by default).
2232
2233 =cut
2234
2235 sub page {
2236   my ($self, $page) = @_;
2237   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2238 }
2239
2240 =head2 new_result
2241
2242 =over 4
2243
2244 =item Arguments: \%vals
2245
2246 =item Return Value: $rowobject
2247
2248 =back
2249
2250 Creates a new row object in the resultset's result class and returns
2251 it. The row is not inserted into the database at this point, call
2252 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2253 will tell you whether the row object has been inserted or not.
2254
2255 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2256
2257 =cut
2258
2259 sub new_result {
2260   my ($self, $values) = @_;
2261   $self->throw_exception( "new_result needs a hash" )
2262     unless (ref $values eq 'HASH');
2263
2264   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2265
2266   my %new = (
2267     %$merged_cond,
2268     @$cols_from_relations
2269       ? (-cols_from_relations => $cols_from_relations)
2270       : (),
2271     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2272   );
2273
2274   return $self->result_class->new(\%new);
2275 }
2276
2277 # _merge_with_rscond
2278 #
2279 # Takes a simple hash of K/V data and returns its copy merged with the
2280 # condition already present on the resultset. Additionally returns an
2281 # arrayref of value/condition names, which were inferred from related
2282 # objects (this is needed for in-memory related objects)
2283 sub _merge_with_rscond {
2284   my ($self, $data) = @_;
2285
2286   my (%new_data, @cols_from_relations);
2287
2288   my $alias = $self->{attrs}{alias};
2289
2290   if (! defined $self->{cond}) {
2291     # just massage $data below
2292   }
2293   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2294     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2295     @cols_from_relations = keys %new_data;
2296   }
2297   elsif (ref $self->{cond} ne 'HASH') {
2298     $self->throw_exception(
2299       "Can't abstract implicit construct, resultset condition not a hash"
2300     );
2301   }
2302   else {
2303     # precendence must be given to passed values over values inherited from
2304     # the cond, so the order here is important.
2305     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2306     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2307
2308     while ( my($col, $value) = each %implied ) {
2309       my $vref = ref $value;
2310       if (
2311         $vref eq 'HASH'
2312           and
2313         keys(%$value) == 1
2314           and
2315         (keys %$value)[0] eq '='
2316       ) {
2317         $new_data{$col} = $value->{'='};
2318       }
2319       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2320         $new_data{$col} = $value;
2321       }
2322     }
2323   }
2324
2325   %new_data = (
2326     %new_data,
2327     %{ $self->_remove_alias($data, $alias) },
2328   );
2329
2330   return (\%new_data, \@cols_from_relations);
2331 }
2332
2333 # _has_resolved_attr
2334 #
2335 # determines if the resultset defines at least one
2336 # of the attributes supplied
2337 #
2338 # used to determine if a subquery is neccessary
2339 #
2340 # supports some virtual attributes:
2341 #   -join
2342 #     This will scan for any joins being present on the resultset.
2343 #     It is not a mere key-search but a deep inspection of {from}
2344 #
2345
2346 sub _has_resolved_attr {
2347   my ($self, @attr_names) = @_;
2348
2349   my $attrs = $self->_resolved_attrs;
2350
2351   my %extra_checks;
2352
2353   for my $n (@attr_names) {
2354     if (grep { $n eq $_ } (qw/-join/) ) {
2355       $extra_checks{$n}++;
2356       next;
2357     }
2358
2359     my $attr =  $attrs->{$n};
2360
2361     next if not defined $attr;
2362
2363     if (ref $attr eq 'HASH') {
2364       return 1 if keys %$attr;
2365     }
2366     elsif (ref $attr eq 'ARRAY') {
2367       return 1 if @$attr;
2368     }
2369     else {
2370       return 1 if $attr;
2371     }
2372   }
2373
2374   # a resolved join is expressed as a multi-level from
2375   return 1 if (
2376     $extra_checks{-join}
2377       and
2378     ref $attrs->{from} eq 'ARRAY'
2379       and
2380     @{$attrs->{from}} > 1
2381   );
2382
2383   return 0;
2384 }
2385
2386 # _collapse_cond
2387 #
2388 # Recursively collapse the condition.
2389
2390 sub _collapse_cond {
2391   my ($self, $cond, $collapsed) = @_;
2392
2393   $collapsed ||= {};
2394
2395   if (ref $cond eq 'ARRAY') {
2396     foreach my $subcond (@$cond) {
2397       next unless ref $subcond;  # -or
2398       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2399     }
2400   }
2401   elsif (ref $cond eq 'HASH') {
2402     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2403       foreach my $subcond (@{$cond->{-and}}) {
2404         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2405       }
2406     }
2407     else {
2408       foreach my $col (keys %$cond) {
2409         my $value = $cond->{$col};
2410         $collapsed->{$col} = $value;
2411       }
2412     }
2413   }
2414
2415   return $collapsed;
2416 }
2417
2418 # _remove_alias
2419 #
2420 # Remove the specified alias from the specified query hash. A copy is made so
2421 # the original query is not modified.
2422
2423 sub _remove_alias {
2424   my ($self, $query, $alias) = @_;
2425
2426   my %orig = %{ $query || {} };
2427   my %unaliased;
2428
2429   foreach my $key (keys %orig) {
2430     if ($key !~ /\./) {
2431       $unaliased{$key} = $orig{$key};
2432       next;
2433     }
2434     $unaliased{$1} = $orig{$key}
2435       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2436   }
2437
2438   return \%unaliased;
2439 }
2440
2441 =head2 as_query
2442
2443 =over 4
2444
2445 =item Arguments: none
2446
2447 =item Return Value: \[ $sql, @bind ]
2448
2449 =back
2450
2451 Returns the SQL query and bind vars associated with the invocant.
2452
2453 This is generally used as the RHS for a subquery.
2454
2455 =cut
2456
2457 sub as_query {
2458   my $self = shift;
2459
2460   my $attrs = $self->_resolved_attrs_copy;
2461
2462   # For future use:
2463   #
2464   # in list ctx:
2465   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2466   # $sql also has no wrapping parenthesis in list ctx
2467   #
2468   my $sqlbind = $self->result_source->storage
2469     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2470
2471   return $sqlbind;
2472 }
2473
2474 =head2 find_or_new
2475
2476 =over 4
2477
2478 =item Arguments: \%vals, \%attrs?
2479
2480 =item Return Value: $rowobject
2481
2482 =back
2483
2484   my $artist = $schema->resultset('Artist')->find_or_new(
2485     { artist => 'fred' }, { key => 'artists' });
2486
2487   $cd->cd_to_producer->find_or_new({ producer => $producer },
2488                                    { key => 'primary });
2489
2490 Find an existing record from this resultset using L</find>. if none exists,
2491 instantiate a new result object and return it. The object will not be saved
2492 into your storage until you call L<DBIx::Class::Row/insert> on it.
2493
2494 You most likely want this method when looking for existing rows using a unique
2495 constraint that is not the primary key, or looking for related rows.
2496
2497 If you want objects to be saved immediately, use L</find_or_create> instead.
2498
2499 B<Note>: Make sure to read the documentation of L</find> and understand the
2500 significance of the C<key> attribute, as its lack may skew your search, and
2501 subsequently result in spurious new objects.
2502
2503 B<Note>: Take care when using C<find_or_new> with a table having
2504 columns with default values that you intend to be automatically
2505 supplied by the database (e.g. an auto_increment primary key column).
2506 In normal usage, the value of such columns should NOT be included at
2507 all in the call to C<find_or_new>, even when set to C<undef>.
2508
2509 =cut
2510
2511 sub find_or_new {
2512   my $self     = shift;
2513   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2514   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2515   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2516     return $row;
2517   }
2518   return $self->new_result($hash);
2519 }
2520
2521 =head2 create
2522
2523 =over 4
2524
2525 =item Arguments: \%vals
2526
2527 =item Return Value: a L<DBIx::Class::Row> $object
2528
2529 =back
2530
2531 Attempt to create a single new row or a row with multiple related rows
2532 in the table represented by the resultset (and related tables). This
2533 will not check for duplicate rows before inserting, use
2534 L</find_or_create> to do that.
2535
2536 To create one row for this resultset, pass a hashref of key/value
2537 pairs representing the columns of the table and the values you wish to
2538 store. If the appropriate relationships are set up, foreign key fields
2539 can also be passed an object representing the foreign row, and the
2540 value will be set to its primary key.
2541
2542 To create related objects, pass a hashref of related-object column values
2543 B<keyed on the relationship name>. If the relationship is of type C<multi>
2544 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2545 The process will correctly identify columns holding foreign keys, and will
2546 transparently populate them from the keys of the corresponding relation.
2547 This can be applied recursively, and will work correctly for a structure
2548 with an arbitrary depth and width, as long as the relationships actually
2549 exists and the correct column data has been supplied.
2550
2551
2552 Instead of hashrefs of plain related data (key/value pairs), you may
2553 also pass new or inserted objects. New objects (not inserted yet, see
2554 L</new>), will be inserted into their appropriate tables.
2555
2556 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2557
2558 Example of creating a new row.
2559
2560   $person_rs->create({
2561     name=>"Some Person",
2562     email=>"somebody@someplace.com"
2563   });
2564
2565 Example of creating a new row and also creating rows in a related C<has_many>
2566 or C<has_one> resultset.  Note Arrayref.
2567
2568   $artist_rs->create(
2569      { artistid => 4, name => 'Manufactured Crap', cds => [
2570         { title => 'My First CD', year => 2006 },
2571         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2572       ],
2573      },
2574   );
2575
2576 Example of creating a new row and also creating a row in a related
2577 C<belongs_to> resultset. Note Hashref.
2578
2579   $cd_rs->create({
2580     title=>"Music for Silly Walks",
2581     year=>2000,
2582     artist => {
2583       name=>"Silly Musician",
2584     }
2585   });
2586
2587 =over
2588
2589 =item WARNING
2590
2591 When subclassing ResultSet never attempt to override this method. Since
2592 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2593 lot of the internals simply never call it, so your override will be
2594 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2595 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2596 L</create> process you need to intervene.
2597
2598 =back
2599
2600 =cut
2601
2602 sub create {
2603   my ($self, $attrs) = @_;
2604   $self->throw_exception( "create needs a hashref" )
2605     unless ref $attrs eq 'HASH';
2606   return $self->new_result($attrs)->insert;
2607 }
2608
2609 =head2 find_or_create
2610
2611 =over 4
2612
2613 =item Arguments: \%vals, \%attrs?
2614
2615 =item Return Value: $rowobject
2616
2617 =back
2618
2619   $cd->cd_to_producer->find_or_create({ producer => $producer },
2620                                       { key => 'primary' });
2621
2622 Tries to find a record based on its primary key or unique constraints; if none
2623 is found, creates one and returns that instead.
2624
2625   my $cd = $schema->resultset('CD')->find_or_create({
2626     cdid   => 5,
2627     artist => 'Massive Attack',
2628     title  => 'Mezzanine',
2629     year   => 2005,
2630   });
2631
2632 Also takes an optional C<key> attribute, to search by a specific key or unique
2633 constraint. For example:
2634
2635   my $cd = $schema->resultset('CD')->find_or_create(
2636     {
2637       artist => 'Massive Attack',
2638       title  => 'Mezzanine',
2639     },
2640     { key => 'cd_artist_title' }
2641   );
2642
2643 B<Note>: Make sure to read the documentation of L</find> and understand the
2644 significance of the C<key> attribute, as its lack may skew your search, and
2645 subsequently result in spurious row creation.
2646
2647 B<Note>: Because find_or_create() reads from the database and then
2648 possibly inserts based on the result, this method is subject to a race
2649 condition. Another process could create a record in the table after
2650 the find has completed and before the create has started. To avoid
2651 this problem, use find_or_create() inside a transaction.
2652
2653 B<Note>: Take care when using C<find_or_create> with a table having
2654 columns with default values that you intend to be automatically
2655 supplied by the database (e.g. an auto_increment primary key column).
2656 In normal usage, the value of such columns should NOT be included at
2657 all in the call to C<find_or_create>, even when set to C<undef>.
2658
2659 See also L</find> and L</update_or_create>. For information on how to declare
2660 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2661
2662 If you need to know if an existing row was found or a new one created use
2663 L</find_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2664 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2665 database!
2666
2667   my $cd = $schema->resultset('CD')->find_or_new({
2668     cdid   => 5,
2669     artist => 'Massive Attack',
2670     title  => 'Mezzanine',
2671     year   => 2005,
2672   });
2673
2674   if( $cd->in_storage ) {
2675       # do some stuff
2676       $cd->insert;
2677   }
2678
2679 =cut
2680
2681 sub find_or_create {
2682   my $self     = shift;
2683   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2684   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2685   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2686     return $row;
2687   }
2688   return $self->create($hash);
2689 }
2690
2691 =head2 update_or_create
2692
2693 =over 4
2694
2695 =item Arguments: \%col_values, { key => $unique_constraint }?
2696
2697 =item Return Value: $row_object
2698
2699 =back
2700
2701   $resultset->update_or_create({ col => $val, ... });
2702
2703 Like L</find_or_create>, but if a row is found it is immediately updated via
2704 C<< $found_row->update (\%col_values) >>.
2705
2706
2707 Takes an optional C<key> attribute to search on a specific unique constraint.
2708 For example:
2709
2710   # In your application
2711   my $cd = $schema->resultset('CD')->update_or_create(
2712     {
2713       artist => 'Massive Attack',
2714       title  => 'Mezzanine',
2715       year   => 1998,
2716     },
2717     { key => 'cd_artist_title' }
2718   );
2719
2720   $cd->cd_to_producer->update_or_create({
2721     producer => $producer,
2722     name => 'harry',
2723   }, {
2724     key => 'primary',
2725   });
2726
2727 B<Note>: Make sure to read the documentation of L</find> and understand the
2728 significance of the C<key> attribute, as its lack may skew your search, and
2729 subsequently result in spurious row creation.
2730
2731 B<Note>: Take care when using C<update_or_create> with a table having
2732 columns with default values that you intend to be automatically
2733 supplied by the database (e.g. an auto_increment primary key column).
2734 In normal usage, the value of such columns should NOT be included at
2735 all in the call to C<update_or_create>, even when set to C<undef>.
2736
2737 See also L</find> and L</find_or_create>. For information on how to declare
2738 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2739
2740 If you need to know if an existing row was updated or a new one created use
2741 L</update_or_new> and L<DBIx::Class::Row/in_storage> instead. Don't forget
2742 to call L<DBIx::Class::Row/insert> to save the newly created row to the
2743 database!
2744
2745   my $cd = $schema->resultset('CD')->update_or_new(
2746     {
2747       artist => 'Massive Attack',
2748       title  => 'Mezzanine',
2749       year   => 1998,
2750     },
2751     { key => 'cd_artist_title' }
2752   );
2753
2754   if( $cd->in_storage ) {
2755       # do some stuff
2756       $cd->insert;
2757   }
2758
2759 =cut
2760
2761 sub update_or_create {
2762   my $self = shift;
2763   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2764   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2765
2766   my $row = $self->find($cond, $attrs);
2767   if (defined $row) {
2768     $row->update($cond);
2769     return $row;
2770   }
2771
2772   return $self->create($cond);
2773 }
2774
2775 =head2 update_or_new
2776
2777 =over 4
2778
2779 =item Arguments: \%col_values, { key => $unique_constraint }?
2780
2781 =item Return Value: $rowobject
2782
2783 =back
2784
2785   $resultset->update_or_new({ col => $val, ... });
2786
2787 Like L</find_or_new> but if a row is found it is immediately updated via
2788 C<< $found_row->update (\%col_values) >>.
2789
2790 For example:
2791
2792   # In your application
2793   my $cd = $schema->resultset('CD')->update_or_new(
2794     {
2795       artist => 'Massive Attack',
2796       title  => 'Mezzanine',
2797       year   => 1998,
2798     },
2799     { key => 'cd_artist_title' }
2800   );
2801
2802   if ($cd->in_storage) {
2803       # the cd was updated
2804   }
2805   else {
2806       # the cd is not yet in the database, let's insert it
2807       $cd->insert;
2808   }
2809
2810 B<Note>: Make sure to read the documentation of L</find> and understand the
2811 significance of the C<key> attribute, as its lack may skew your search, and
2812 subsequently result in spurious new objects.
2813
2814 B<Note>: Take care when using C<update_or_new> with a table having
2815 columns with default values that you intend to be automatically
2816 supplied by the database (e.g. an auto_increment primary key column).
2817 In normal usage, the value of such columns should NOT be included at
2818 all in the call to C<update_or_new>, even when set to C<undef>.
2819
2820 See also L</find>, L</find_or_create> and L</find_or_new>.
2821
2822 =cut
2823
2824 sub update_or_new {
2825     my $self  = shift;
2826     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2827     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2828
2829     my $row = $self->find( $cond, $attrs );
2830     if ( defined $row ) {
2831         $row->update($cond);
2832         return $row;
2833     }
2834
2835     return $self->new_result($cond);
2836 }
2837
2838 =head2 get_cache
2839
2840 =over 4
2841
2842 =item Arguments: none
2843
2844 =item Return Value: \@cache_objects | undef
2845
2846 =back
2847
2848 Gets the contents of the cache for the resultset, if the cache is set.
2849
2850 The cache is populated either by using the L</prefetch> attribute to
2851 L</search> or by calling L</set_cache>.
2852
2853 =cut
2854
2855 sub get_cache {
2856   shift->{all_cache};
2857 }
2858
2859 =head2 set_cache
2860
2861 =over 4
2862
2863 =item Arguments: \@cache_objects
2864
2865 =item Return Value: \@cache_objects
2866
2867 =back
2868
2869 Sets the contents of the cache for the resultset. Expects an arrayref
2870 of objects of the same class as those produced by the resultset. Note that
2871 if the cache is set the resultset will return the cached objects rather
2872 than re-querying the database even if the cache attr is not set.
2873
2874 The contents of the cache can also be populated by using the
2875 L</prefetch> attribute to L</search>.
2876
2877 =cut
2878
2879 sub set_cache {
2880   my ( $self, $data ) = @_;
2881   $self->throw_exception("set_cache requires an arrayref")
2882       if defined($data) && (ref $data ne 'ARRAY');
2883   $self->{all_cache} = $data;
2884 }
2885
2886 =head2 clear_cache
2887
2888 =over 4
2889
2890 =item Arguments: none
2891
2892 =item Return Value: undef
2893
2894 =back
2895
2896 Clears the cache for the resultset.
2897
2898 =cut
2899
2900 sub clear_cache {
2901   shift->set_cache(undef);
2902 }
2903
2904 =head2 is_paged
2905
2906 =over 4
2907
2908 =item Arguments: none
2909
2910 =item Return Value: true, if the resultset has been paginated
2911
2912 =back
2913
2914 =cut
2915
2916 sub is_paged {
2917   my ($self) = @_;
2918   return !!$self->{attrs}{page};
2919 }
2920
2921 =head2 is_ordered
2922
2923 =over 4
2924
2925 =item Arguments: none
2926
2927 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2928
2929 =back
2930
2931 =cut
2932
2933 sub is_ordered {
2934   my ($self) = @_;
2935   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2936 }
2937
2938 =head2 related_resultset
2939
2940 =over 4
2941
2942 =item Arguments: $relationship_name
2943
2944 =item Return Value: $resultset
2945
2946 =back
2947
2948 Returns a related resultset for the supplied relationship name.
2949
2950   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2951
2952 =cut
2953
2954 sub related_resultset {
2955   my ($self, $rel) = @_;
2956
2957   $self->{related_resultsets} ||= {};
2958   return $self->{related_resultsets}{$rel} ||= do {
2959     my $rsrc = $self->result_source;
2960     my $rel_info = $rsrc->relationship_info($rel);
2961
2962     $self->throw_exception(
2963       "search_related: result source '" . $rsrc->source_name .
2964         "' has no such relationship $rel")
2965       unless $rel_info;
2966
2967     my $attrs = $self->_chain_relationship($rel);
2968
2969     my $join_count = $attrs->{seen_join}{$rel};
2970
2971     my $alias = $self->result_source->storage
2972         ->relname_to_table_alias($rel, $join_count);
2973
2974     # since this is search_related, and we already slid the select window inwards
2975     # (the select/as attrs were deleted in the beginning), we need to flip all
2976     # left joins to inner, so we get the expected results
2977     # read the comment on top of the actual function to see what this does
2978     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
2979
2980
2981     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2982     delete @{$attrs}{qw(result_class alias)};
2983
2984     my $new_cache;
2985
2986     if (my $cache = $self->get_cache) {
2987       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2988         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache||[]} }
2989                         @$cache ];
2990       }
2991     }
2992
2993     my $rel_source = $rsrc->related_source($rel);
2994
2995     my $new = do {
2996
2997       # The reason we do this now instead of passing the alias to the
2998       # search_rs below is that if you wrap/overload resultset on the
2999       # source you need to know what alias it's -going- to have for things
3000       # to work sanely (e.g. RestrictWithObject wants to be able to add
3001       # extra query restrictions, and these may need to be $alias.)
3002
3003       my $rel_attrs = $rel_source->resultset_attributes;
3004       local $rel_attrs->{alias} = $alias;
3005
3006       $rel_source->resultset
3007                  ->search_rs(
3008                      undef, {
3009                        %$attrs,
3010                        where => $attrs->{where},
3011                    });
3012     };
3013     $new->set_cache($new_cache) if $new_cache;
3014     $new;
3015   };
3016 }
3017
3018 =head2 current_source_alias
3019
3020 =over 4
3021
3022 =item Arguments: none
3023
3024 =item Return Value: $source_alias
3025
3026 =back
3027
3028 Returns the current table alias for the result source this resultset is built
3029 on, that will be used in the SQL query. Usually it is C<me>.
3030
3031 Currently the source alias that refers to the result set returned by a
3032 L</search>/L</find> family method depends on how you got to the resultset: it's
3033 C<me> by default, but eg. L</search_related> aliases it to the related result
3034 source name (and keeps C<me> referring to the original result set). The long
3035 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3036 (and make this method unnecessary).
3037
3038 Thus it's currently necessary to use this method in predefined queries (see
3039 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3040 source alias of the current result set:
3041
3042   # in a result set class
3043   sub modified_by {
3044     my ($self, $user) = @_;
3045
3046     my $me = $self->current_source_alias;
3047
3048     return $self->search({
3049       "$me.modified" => $user->id,
3050     });
3051   }
3052
3053 =cut
3054
3055 sub current_source_alias {
3056   my ($self) = @_;
3057
3058   return ($self->{attrs} || {})->{alias} || 'me';
3059 }
3060
3061 =head2 as_subselect_rs
3062
3063 =over 4
3064
3065 =item Arguments: none
3066
3067 =item Return Value: $resultset
3068
3069 =back
3070
3071 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3072 "virtual view" by including it as a subquery within the from clause.  From this
3073 point on, any joined tables are inaccessible to ->search on the resultset (as if
3074 it were simply where-filtered without joins).  For example:
3075
3076  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3077
3078  # 'x' now pollutes the query namespace
3079
3080  # So the following works as expected
3081  my $ok_rs = $rs->search({'x.other' => 1});
3082
3083  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3084  # def) we look for one row with contradictory terms and join in another table
3085  # (aliased 'x_2') which we never use
3086  my $broken_rs = $rs->search({'x.name' => 'def'});
3087
3088  my $rs2 = $rs->as_subselect_rs;
3089
3090  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3091  my $not_joined_rs = $rs2->search({'x.other' => 1});
3092
3093  # works as expected: finds a 'table' row related to two x rows (abc and def)
3094  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3095
3096 Another example of when one might use this would be to select a subset of
3097 columns in a group by clause:
3098
3099  my $rs = $schema->resultset('Bar')->search(undef, {
3100    group_by => [qw{ id foo_id baz_id }],
3101  })->as_subselect_rs->search(undef, {
3102    columns => [qw{ id foo_id }]
3103  });
3104
3105 In the above example normally columns would have to be equal to the group by,
3106 but because we isolated the group by into a subselect the above works.
3107
3108 =cut
3109
3110 sub as_subselect_rs {
3111   my $self = shift;
3112
3113   my $attrs = $self->_resolved_attrs;
3114
3115   my $fresh_rs = (ref $self)->new (
3116     $self->result_source
3117   );
3118
3119   # these pieces will be locked in the subquery
3120   delete $fresh_rs->{cond};
3121   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3122
3123   return $fresh_rs->search( {}, {
3124     from => [{
3125       $attrs->{alias} => $self->as_query,
3126       -alias  => $attrs->{alias},
3127       -rsrc   => $self->result_source,
3128     }],
3129     alias => $attrs->{alias},
3130   });
3131 }
3132
3133 # This code is called by search_related, and makes sure there
3134 # is clear separation between the joins before, during, and
3135 # after the relationship. This information is needed later
3136 # in order to properly resolve prefetch aliases (any alias
3137 # with a relation_chain_depth less than the depth of the
3138 # current prefetch is not considered)
3139 #
3140 # The increments happen twice per join. An even number means a
3141 # relationship specified via a search_related, whereas an odd
3142 # number indicates a join/prefetch added via attributes
3143 #
3144 # Also this code will wrap the current resultset (the one we
3145 # chain to) in a subselect IFF it contains limiting attributes
3146 sub _chain_relationship {
3147   my ($self, $rel) = @_;
3148   my $source = $self->result_source;
3149   my $attrs = { %{$self->{attrs}||{}} };
3150
3151   # we need to take the prefetch the attrs into account before we
3152   # ->_resolve_join as otherwise they get lost - captainL
3153   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3154
3155   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3156
3157   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3158
3159   my $from;
3160   my @force_subq_attrs = qw/offset rows group_by having/;
3161
3162   if (
3163     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3164       ||
3165     $self->_has_resolved_attr (@force_subq_attrs)
3166   ) {
3167     # Nuke the prefetch (if any) before the new $rs attrs
3168     # are resolved (prefetch is useless - we are wrapping
3169     # a subquery anyway).
3170     my $rs_copy = $self->search;
3171     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3172       $rs_copy->{attrs}{join},
3173       delete $rs_copy->{attrs}{prefetch},
3174     );
3175
3176     $from = [{
3177       -rsrc   => $source,
3178       -alias  => $attrs->{alias},
3179       $attrs->{alias} => $rs_copy->as_query,
3180     }];
3181     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3182     $seen->{-relation_chain_depth} = 0;
3183   }
3184   elsif ($attrs->{from}) {  #shallow copy suffices
3185     $from = [ @{$attrs->{from}} ];
3186   }
3187   else {
3188     $from = [{
3189       -rsrc  => $source,
3190       -alias => $attrs->{alias},
3191       $attrs->{alias} => $source->from,
3192     }];
3193   }
3194
3195   my $jpath = ($seen->{-relation_chain_depth})
3196     ? $from->[-1][0]{-join_path}
3197     : [];
3198
3199   my @requested_joins = $source->_resolve_join(
3200     $join,
3201     $attrs->{alias},
3202     $seen,
3203     $jpath,
3204   );
3205
3206   push @$from, @requested_joins;
3207
3208   $seen->{-relation_chain_depth}++;
3209
3210   # if $self already had a join/prefetch specified on it, the requested
3211   # $rel might very well be already included. What we do in this case
3212   # is effectively a no-op (except that we bump up the chain_depth on
3213   # the join in question so we could tell it *is* the search_related)
3214   my $already_joined;
3215
3216   # we consider the last one thus reverse
3217   for my $j (reverse @requested_joins) {
3218     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3219     if ($rel eq $last_j) {
3220       $j->[0]{-relation_chain_depth}++;
3221       $already_joined++;
3222       last;
3223     }
3224   }
3225
3226   unless ($already_joined) {
3227     push @$from, $source->_resolve_join(
3228       $rel,
3229       $attrs->{alias},
3230       $seen,
3231       $jpath,
3232     );
3233   }
3234
3235   $seen->{-relation_chain_depth}++;
3236
3237   return {%$attrs, from => $from, seen_join => $seen};
3238 }
3239
3240 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3241 sub _resolved_attrs_copy {
3242   my $self = shift;
3243   return { %{$self->_resolved_attrs (@_)} };
3244 }
3245
3246 sub _resolved_attrs {
3247   my $self = shift;
3248   return $self->{_attrs} if $self->{_attrs};
3249
3250   my $attrs  = { %{ $self->{attrs} || {} } };
3251   my $source = $self->result_source;
3252   my $alias  = $attrs->{alias};
3253
3254   # default selection list
3255   $attrs->{columns} = [ $source->columns ]
3256     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3257
3258   # merge selectors together
3259   for (qw/columns select as/) {
3260     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3261       if $attrs->{$_} or $attrs->{"+$_"};
3262   }
3263
3264   # disassemble columns
3265   my (@sel, @as);
3266   if (my $cols = delete $attrs->{columns}) {
3267     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3268       if (ref $c eq 'HASH') {
3269         for my $as (keys %$c) {
3270           push @sel, $c->{$as};
3271           push @as, $as;
3272         }
3273       }
3274       else {
3275         push @sel, $c;
3276         push @as, $c;
3277       }
3278     }
3279   }
3280
3281   # when trying to weed off duplicates later do not go past this point -
3282   # everything added from here on is unbalanced "anyone's guess" stuff
3283   my $dedup_stop_idx = $#as;
3284
3285   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3286     if $attrs->{as};
3287   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3288     if $attrs->{select};
3289
3290   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3291   $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_" for @sel;
3292
3293   # disqualify all $alias.col as-bits (inflate-map mandated)
3294   $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_ for @as;
3295
3296   # de-duplicate the result (remove *identical* select/as pairs)
3297   # and also die on duplicate {as} pointing to different {select}s
3298   # not using a c-style for as the condition is prone to shrinkage
3299   my $seen;
3300   my $i = 0;
3301   while ($i <= $dedup_stop_idx) {
3302     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3303       splice @sel, $i, 1;
3304       splice @as, $i, 1;
3305       $dedup_stop_idx--;
3306     }
3307     elsif ($seen->{$as[$i]}++) {
3308       $self->throw_exception(
3309         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3310       );
3311     }
3312     else {
3313       $i++;
3314     }
3315   }
3316
3317   $attrs->{select} = \@sel;
3318   $attrs->{as} = \@as;
3319
3320   $attrs->{from} ||= [{
3321     -rsrc   => $source,
3322     -alias  => $self->{attrs}{alias},
3323     $self->{attrs}{alias} => $source->from,
3324   }];
3325
3326   if ( $attrs->{join} || $attrs->{prefetch} ) {
3327
3328     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3329       if ref $attrs->{from} ne 'ARRAY';
3330
3331     my $join = (delete $attrs->{join}) || {};
3332
3333     if ( defined $attrs->{prefetch} ) {
3334       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3335     }
3336
3337     $attrs->{from} =    # have to copy here to avoid corrupting the original
3338       [
3339         @{ $attrs->{from} },
3340         $source->_resolve_join(
3341           $join,
3342           $alias,
3343           { %{ $attrs->{seen_join} || {} } },
3344           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3345             ? $attrs->{from}[-1][0]{-join_path}
3346             : []
3347           ,
3348         )
3349       ];
3350   }
3351
3352   if ( defined $attrs->{order_by} ) {
3353     $attrs->{order_by} = (
3354       ref( $attrs->{order_by} ) eq 'ARRAY'
3355       ? [ @{ $attrs->{order_by} } ]
3356       : [ $attrs->{order_by} || () ]
3357     );
3358   }
3359
3360   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3361     $attrs->{group_by} = [ $attrs->{group_by} ];
3362   }
3363
3364   # generate the distinct induced group_by early, as prefetch will be carried via a
3365   # subquery (since a group_by is present)
3366   if (delete $attrs->{distinct}) {
3367     if ($attrs->{group_by}) {
3368       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3369     }
3370     else {
3371       # distinct affects only the main selection part, not what prefetch may
3372       # add below.
3373       $attrs->{group_by} = $source->storage->_group_over_selection (
3374         $attrs->{from},
3375         $attrs->{select},
3376         $attrs->{order_by},
3377       );
3378     }
3379   }
3380
3381   # generate selections based on the prefetch helper
3382   my $prefetch;
3383   $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} )
3384     if defined $attrs->{prefetch};
3385
3386   if ($prefetch) {
3387
3388     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3389       if $attrs->{_dark_selector};
3390
3391     $attrs->{collapse} = 1;
3392
3393     # this is a separate structure (we don't look in {from} directly)
3394     # as the resolver needs to shift things off the lists to work
3395     # properly (identical-prefetches on different branches)
3396     my $join_map = {};
3397     if (ref $attrs->{from} eq 'ARRAY') {
3398
3399       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3400
3401       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3402         next unless $j->[0]{-alias};
3403         next unless $j->[0]{-join_path};
3404         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3405
3406         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3407
3408         my $p = $join_map;
3409         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3410         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3411       }
3412     }
3413
3414     my @prefetch = $source->_resolve_prefetch( $prefetch, $alias, $join_map );
3415
3416     # we need to somehow mark which columns came from prefetch
3417     if (@prefetch) {
3418       my $sel_end = $#{$attrs->{select}};
3419       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3420     }
3421
3422     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3423     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3424   }
3425
3426   # run through the resulting joinstructure (starting from our current slot)
3427   # and unset collapse if proven unnesessary
3428   if ($attrs->{collapse} && ref $attrs->{from} eq 'ARRAY') {
3429
3430     if (@{$attrs->{from}} > 1) {
3431
3432       # find where our table-spec starts and consider only things after us
3433       my @fromlist = @{$attrs->{from}};
3434       while (@fromlist) {
3435         my $t = shift @fromlist;
3436         $t = $t->[0] if ref $t eq 'ARRAY';  #me vs join from-spec mismatch
3437         last if ($t->{-alias} && $t->{-alias} eq $alias);
3438       }
3439
3440       for (@fromlist) {
3441         $attrs->{collapse} = ! $_->[0]{-is_single}
3442           and last;
3443       }
3444     }
3445     else {
3446       # no joins - no collapse
3447       $attrs->{collapse} = 0;
3448     }
3449   }
3450
3451   $attrs->{_single_object_inflation} = ! List::Util::first { $_ =~ /\./ } @{$attrs->{as}};
3452
3453   # if both page and offset are specified, produce a combined offset
3454   # even though it doesn't make much sense, this is what pre 081xx has
3455   # been doing
3456   if (my $page = delete $attrs->{page}) {
3457     $attrs->{offset} =
3458       ($attrs->{rows} * ($page - 1))
3459             +
3460       ($attrs->{offset} || 0)
3461     ;
3462   }
3463
3464   return $self->{_attrs} = $attrs;
3465 }
3466
3467 sub _rollout_attr {
3468   my ($self, $attr) = @_;
3469
3470   if (ref $attr eq 'HASH') {
3471     return $self->_rollout_hash($attr);
3472   } elsif (ref $attr eq 'ARRAY') {
3473     return $self->_rollout_array($attr);
3474   } else {
3475     return [$attr];
3476   }
3477 }
3478
3479 sub _rollout_array {
3480   my ($self, $attr) = @_;
3481
3482   my @rolled_array;
3483   foreach my $element (@{$attr}) {
3484     if (ref $element eq 'HASH') {
3485       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3486     } elsif (ref $element eq 'ARRAY') {
3487       #  XXX - should probably recurse here
3488       push( @rolled_array, @{$self->_rollout_array($element)} );
3489     } else {
3490       push( @rolled_array, $element );
3491     }
3492   }
3493   return \@rolled_array;
3494 }
3495
3496 sub _rollout_hash {
3497   my ($self, $attr) = @_;
3498
3499   my @rolled_array;
3500   foreach my $key (keys %{$attr}) {
3501     push( @rolled_array, { $key => $attr->{$key} } );
3502   }
3503   return \@rolled_array;
3504 }
3505
3506 sub _calculate_score {
3507   my ($self, $a, $b) = @_;
3508
3509   if (defined $a xor defined $b) {
3510     return 0;
3511   }
3512   elsif (not defined $a) {
3513     return 1;
3514   }
3515
3516   if (ref $b eq 'HASH') {
3517     my ($b_key) = keys %{$b};
3518     if (ref $a eq 'HASH') {
3519       my ($a_key) = keys %{$a};
3520       if ($a_key eq $b_key) {
3521         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3522       } else {
3523         return 0;
3524       }
3525     } else {
3526       return ($a eq $b_key) ? 1 : 0;
3527     }
3528   } else {
3529     if (ref $a eq 'HASH') {
3530       my ($a_key) = keys %{$a};
3531       return ($b eq $a_key) ? 1 : 0;
3532     } else {
3533       return ($b eq $a) ? 1 : 0;
3534     }
3535   }
3536 }
3537
3538 sub _merge_joinpref_attr {
3539   my ($self, $orig, $import) = @_;
3540
3541   return $import unless defined($orig);
3542   return $orig unless defined($import);
3543
3544   $orig = $self->_rollout_attr($orig);
3545   $import = $self->_rollout_attr($import);
3546
3547   my $seen_keys;
3548   foreach my $import_element ( @{$import} ) {
3549     # find best candidate from $orig to merge $b_element into
3550     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3551     foreach my $orig_element ( @{$orig} ) {
3552       my $score = $self->_calculate_score( $orig_element, $import_element );
3553       if ($score > $best_candidate->{score}) {
3554         $best_candidate->{position} = $position;
3555         $best_candidate->{score} = $score;
3556       }
3557       $position++;
3558     }
3559     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3560     $import_key = '' if not defined $import_key;
3561
3562     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3563       push( @{$orig}, $import_element );
3564     } else {
3565       my $orig_best = $orig->[$best_candidate->{position}];
3566       # merge orig_best and b_element together and replace original with merged
3567       if (ref $orig_best ne 'HASH') {
3568         $orig->[$best_candidate->{position}] = $import_element;
3569       } elsif (ref $import_element eq 'HASH') {
3570         my ($key) = keys %{$orig_best};
3571         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3572       }
3573     }
3574     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3575   }
3576
3577   return $orig;
3578 }
3579
3580 {
3581   my $hm;
3582
3583   sub _merge_attr {
3584     $hm ||= do {
3585       require Hash::Merge;
3586       my $hm = Hash::Merge->new;
3587
3588       $hm->specify_behavior({
3589         SCALAR => {
3590           SCALAR => sub {
3591             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3592
3593             if ($defl xor $defr) {
3594               return [ $defl ? $_[0] : $_[1] ];
3595             }
3596             elsif (! $defl) {
3597               return [];
3598             }
3599             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3600               return [ $_[0] ];
3601             }
3602             else {
3603               return [$_[0], $_[1]];
3604             }
3605           },
3606           ARRAY => sub {
3607             return $_[1] if !defined $_[0];
3608             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3609             return [$_[0], @{$_[1]}]
3610           },
3611           HASH  => sub {
3612             return [] if !defined $_[0] and !keys %{$_[1]};
3613             return [ $_[1] ] if !defined $_[0];
3614             return [ $_[0] ] if !keys %{$_[1]};
3615             return [$_[0], $_[1]]
3616           },
3617         },
3618         ARRAY => {
3619           SCALAR => sub {
3620             return $_[0] if !defined $_[1];
3621             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3622             return [@{$_[0]}, $_[1]]
3623           },
3624           ARRAY => sub {
3625             my @ret = @{$_[0]} or return $_[1];
3626             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3627             my %idx = map { $_ => 1 } @ret;
3628             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3629             \@ret;
3630           },
3631           HASH => sub {
3632             return [ $_[1] ] if ! @{$_[0]};
3633             return $_[0] if !keys %{$_[1]};
3634             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3635             return [ @{$_[0]}, $_[1] ];
3636           },
3637         },
3638         HASH => {
3639           SCALAR => sub {
3640             return [] if !keys %{$_[0]} and !defined $_[1];
3641             return [ $_[0] ] if !defined $_[1];
3642             return [ $_[1] ] if !keys %{$_[0]};
3643             return [$_[0], $_[1]]
3644           },
3645           ARRAY => sub {
3646             return [] if !keys %{$_[0]} and !@{$_[1]};
3647             return [ $_[0] ] if !@{$_[1]};
3648             return $_[1] if !keys %{$_[0]};
3649             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3650             return [ $_[0], @{$_[1]} ];
3651           },
3652           HASH => sub {
3653             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3654             return [ $_[0] ] if !keys %{$_[1]};
3655             return [ $_[1] ] if !keys %{$_[0]};
3656             return [ $_[0] ] if $_[0] eq $_[1];
3657             return [ $_[0], $_[1] ];
3658           },
3659         }
3660       } => 'DBIC_RS_ATTR_MERGER');
3661       $hm;
3662     };
3663
3664     return $hm->merge ($_[1], $_[2]);
3665   }
3666 }
3667
3668 sub STORABLE_freeze {
3669   my ($self, $cloning) = @_;
3670   my $to_serialize = { %$self };
3671
3672   # A cursor in progress can't be serialized (and would make little sense anyway)
3673   delete $to_serialize->{cursor};
3674
3675   # nor is it sensical to store a not-yet-fired-count pager
3676   if ($to_serialize->{pager} and ref $to_serialize->{pager}{total_entries} eq 'CODE') {
3677     delete $to_serialize->{pager};
3678   }
3679
3680   Storable::nfreeze($to_serialize);
3681 }
3682
3683 # need this hook for symmetry
3684 sub STORABLE_thaw {
3685   my ($self, $cloning, $serialized) = @_;
3686
3687   %$self = %{ Storable::thaw($serialized) };
3688
3689   $self;
3690 }
3691
3692
3693 =head2 throw_exception
3694
3695 See L<DBIx::Class::Schema/throw_exception> for details.
3696
3697 =cut
3698
3699 sub throw_exception {
3700   my $self=shift;
3701
3702   if (ref $self and my $rsrc = $self->result_source) {
3703     $rsrc->throw_exception(@_)
3704   }
3705   else {
3706     DBIx::Class::Exception->throw(@_);
3707   }
3708 }
3709
3710 # XXX: FIXME: Attributes docs need clearing up
3711
3712 =head1 ATTRIBUTES
3713
3714 Attributes are used to refine a ResultSet in various ways when
3715 searching for data. They can be passed to any method which takes an
3716 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3717 L</count>.
3718
3719 These are in no particular order:
3720
3721 =head2 order_by
3722
3723 =over 4
3724
3725 =item Value: ( $order_by | \@order_by | \%order_by )
3726
3727 =back
3728
3729 Which column(s) to order the results by.
3730
3731 [The full list of suitable values is documented in
3732 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3733 common options.]
3734
3735 If a single column name, or an arrayref of names is supplied, the
3736 argument is passed through directly to SQL. The hashref syntax allows
3737 for connection-agnostic specification of ordering direction:
3738
3739  For descending order:
3740
3741   order_by => { -desc => [qw/col1 col2 col3/] }
3742
3743  For explicit ascending order:
3744
3745   order_by => { -asc => 'col' }
3746
3747 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3748 supported, although you are strongly encouraged to use the hashref
3749 syntax as outlined above.
3750
3751 =head2 columns
3752
3753 =over 4
3754
3755 =item Value: \@columns
3756
3757 =back
3758
3759 Shortcut to request a particular set of columns to be retrieved. Each
3760 column spec may be a string (a table column name), or a hash (in which
3761 case the key is the C<as> value, and the value is used as the C<select>
3762 expression). Adds C<me.> onto the start of any column without a C<.> in
3763 it and sets C<select> from that, then auto-populates C<as> from
3764 C<select> as normal. (You may also use the C<cols> attribute, as in
3765 earlier versions of DBIC.)
3766
3767 Essentially C<columns> does the same as L</select> and L</as>.
3768
3769     columns => [ 'foo', { bar => 'baz' } ]
3770
3771 is the same as
3772
3773     select => [qw/foo baz/],
3774     as => [qw/foo bar/]
3775
3776 =head2 +columns
3777
3778 =over 4
3779
3780 =item Value: \@columns
3781
3782 =back
3783
3784 Indicates additional columns to be selected from storage. Works the same
3785 as L</columns> but adds columns to the selection. (You may also use the
3786 C<include_columns> attribute, as in earlier versions of DBIC). For
3787 example:-
3788
3789   $schema->resultset('CD')->search(undef, {
3790     '+columns' => ['artist.name'],
3791     join => ['artist']
3792   });
3793
3794 would return all CDs and include a 'name' column to the information
3795 passed to object inflation. Note that the 'artist' is the name of the
3796 column (or relationship) accessor, and 'name' is the name of the column
3797 accessor in the related table.
3798
3799 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3800 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3801 unary plus operator before it.
3802
3803 =head2 include_columns
3804
3805 =over 4
3806
3807 =item Value: \@columns
3808
3809 =back
3810
3811 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3812
3813 =head2 select
3814
3815 =over 4
3816
3817 =item Value: \@select_columns
3818
3819 =back
3820
3821 Indicates which columns should be selected from the storage. You can use
3822 column names, or in the case of RDBMS back ends, function or stored procedure
3823 names:
3824
3825   $rs = $schema->resultset('Employee')->search(undef, {
3826     select => [
3827       'name',
3828       { count => 'employeeid' },
3829       { max => { length => 'name' }, -as => 'longest_name' }
3830     ]
3831   });
3832
3833   # Equivalent SQL
3834   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3835
3836 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3837 use L</select>, to instruct DBIx::Class how to store the result of the column.
3838 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3839 identifier aliasing. You can however alias a function, so you can use it in
3840 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3841 attribute> supplied as shown in the example above.
3842
3843 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3844 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3845 unary plus operator before it.
3846
3847 =head2 +select
3848
3849 =over 4
3850
3851 Indicates additional columns to be selected from storage.  Works the same as
3852 L</select> but adds columns to the default selection, instead of specifying
3853 an explicit list.
3854
3855 =back
3856
3857 =head2 +as
3858
3859 =over 4
3860
3861 Indicates additional column names for those added via L</+select>. See L</as>.
3862
3863 =back
3864
3865 =head2 as
3866
3867 =over 4
3868
3869 =item Value: \@inflation_names
3870
3871 =back
3872
3873 Indicates column names for object inflation. That is L</as> indicates the
3874 slot name in which the column value will be stored within the
3875 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3876 identifier by the C<get_column> method (or via the object accessor B<if one
3877 with the same name already exists>) as shown below. The L</as> attribute has
3878 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3879
3880   $rs = $schema->resultset('Employee')->search(undef, {
3881     select => [
3882       'name',
3883       { count => 'employeeid' },
3884       { max => { length => 'name' }, -as => 'longest_name' }
3885     ],
3886     as => [qw/
3887       name
3888       employee_count
3889       max_name_length
3890     /],
3891   });
3892
3893 If the object against which the search is performed already has an accessor
3894 matching a column name specified in C<as>, the value can be retrieved using
3895 the accessor as normal:
3896
3897   my $name = $employee->name();
3898
3899 If on the other hand an accessor does not exist in the object, you need to
3900 use C<get_column> instead:
3901
3902   my $employee_count = $employee->get_column('employee_count');
3903
3904 You can create your own accessors if required - see
3905 L<DBIx::Class::Manual::Cookbook> for details.
3906
3907 =head2 join
3908
3909 =over 4
3910
3911 =item Value: ($rel_name | \@rel_names | \%rel_names)
3912
3913 =back
3914
3915 Contains a list of relationships that should be joined for this query.  For
3916 example:
3917
3918   # Get CDs by Nine Inch Nails
3919   my $rs = $schema->resultset('CD')->search(
3920     { 'artist.name' => 'Nine Inch Nails' },
3921     { join => 'artist' }
3922   );
3923
3924 Can also contain a hash reference to refer to the other relation's relations.
3925 For example:
3926
3927   package MyApp::Schema::Track;
3928   use base qw/DBIx::Class/;
3929   __PACKAGE__->table('track');
3930   __PACKAGE__->add_columns(qw/trackid cd position title/);
3931   __PACKAGE__->set_primary_key('trackid');
3932   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3933   1;
3934
3935   # In your application
3936   my $rs = $schema->resultset('Artist')->search(
3937     { 'track.title' => 'Teardrop' },
3938     {
3939       join     => { cd => 'track' },
3940       order_by => 'artist.name',
3941     }
3942   );
3943
3944 You need to use the relationship (not the table) name in  conditions,
3945 because they are aliased as such. The current table is aliased as "me", so
3946 you need to use me.column_name in order to avoid ambiguity. For example:
3947
3948   # Get CDs from 1984 with a 'Foo' track
3949   my $rs = $schema->resultset('CD')->search(
3950     {
3951       'me.year' => 1984,
3952       'tracks.name' => 'Foo'
3953     },
3954     { join => 'tracks' }
3955   );
3956
3957 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3958 similarly for a third time). For e.g.
3959
3960   my $rs = $schema->resultset('Artist')->search({
3961     'cds.title'   => 'Down to Earth',
3962     'cds_2.title' => 'Popular',
3963   }, {
3964     join => [ qw/cds cds/ ],
3965   });
3966
3967 will return a set of all artists that have both a cd with title 'Down
3968 to Earth' and a cd with title 'Popular'.
3969
3970 If you want to fetch related objects from other tables as well, see C<prefetch>
3971 below.
3972
3973 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3974
3975 =head2 prefetch
3976
3977 =over 4
3978
3979 =item Value: ($rel_name | \@rel_names | \%rel_names)
3980
3981 =back
3982
3983 Contains one or more relationships that should be fetched along with
3984 the main query (when they are accessed afterwards the data will
3985 already be available, without extra queries to the database).  This is
3986 useful for when you know you will need the related objects, because it
3987 saves at least one query:
3988
3989   my $rs = $schema->resultset('Tag')->search(
3990     undef,
3991     {
3992       prefetch => {
3993         cd => 'artist'
3994       }
3995     }
3996   );
3997
3998 The initial search results in SQL like the following:
3999
4000   SELECT tag.*, cd.*, artist.* FROM tag
4001   JOIN cd ON tag.cd = cd.cdid
4002   JOIN artist ON cd.artist = artist.artistid
4003
4004 L<DBIx::Class> has no need to go back to the database when we access the
4005 C<cd> or C<artist> relationships, which saves us two SQL statements in this
4006 case.
4007
4008 Simple prefetches will be joined automatically, so there is no need
4009 for a C<join> attribute in the above search.
4010
4011 L</prefetch> can be used with the any of the relationship types and
4012 multiple prefetches can be specified together. Below is a more complex
4013 example that prefetches a CD's artist, its liner notes (if present),
4014 the cover image, the tracks on that cd, and the guests on those
4015 tracks.
4016
4017  # Assuming:
4018  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
4019  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
4020  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
4021  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
4022
4023  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
4024
4025  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
4026
4027
4028  my $rs = $schema->resultset('CD')->search(
4029    undef,
4030    {
4031      prefetch => [
4032        { artist => 'record_label'},  # belongs_to => belongs_to
4033        'liner_note',                 # might_have
4034        'cover_image',                # has_one
4035        { tracks => 'guests' },       # has_many => has_many
4036      ]
4037    }
4038  );
4039
4040 This will produce SQL like the following:
4041
4042  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4043         tracks.*, guests.*
4044    FROM cd me
4045    JOIN artist artist
4046      ON artist.artistid = me.artistid
4047    JOIN record_label record_label
4048      ON record_label.labelid = artist.labelid
4049    LEFT JOIN track tracks
4050      ON tracks.cdid = me.cdid
4051    LEFT JOIN guest guests
4052      ON guests.trackid = track.trackid
4053    LEFT JOIN liner_notes liner_note
4054      ON liner_note.cdid = me.cdid
4055    JOIN cd_artwork cover_image
4056      ON cover_image.cdid = me.cdid
4057  ORDER BY tracks.cd
4058
4059 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4060 C<tracks>, and C<guests> of the CD will all be available through the
4061 relationship accessors without the need for additional queries to the
4062 database.
4063
4064 However, there is one caveat to be observed: it can be dangerous to
4065 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4066 relationship on a given level. e.g.:
4067
4068  my $rs = $schema->resultset('CD')->search(
4069    undef,
4070    {
4071      prefetch => [
4072        'tracks',                         # has_many
4073        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4074      ]
4075    }
4076  );
4077
4078 In fact, C<DBIx::Class> will emit the following warning:
4079
4080  Prefetching multiple has_many rels tracks and cd_to_producer at top
4081  level will explode the number of row objects retrievable via ->next
4082  or ->all. Use at your own risk.
4083
4084 The collapser currently can't identify duplicate tuples for multiple
4085 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4086 result the second L<has_many|DBIx::Class::Relationship/has_many>
4087 relation could contain redundant objects.
4088
4089 =head3 Using L</prefetch> with L</join>
4090
4091 L</prefetch> implies a L</join> with the equivalent argument, and is
4092 properly merged with any existing L</join> specification. So the
4093 following:
4094
4095   my $rs = $schema->resultset('CD')->search(
4096    {'record_label.name' => 'Music Product Ltd.'},
4097    {
4098      join     => {artist => 'record_label'},
4099      prefetch => 'artist',
4100    }
4101  );
4102
4103 ... will work, searching on the record label's name, but only
4104 prefetching the C<artist>.
4105
4106 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4107
4108 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4109 prefetched relations.  So given:
4110
4111   my $rs = $schema->resultset('CD')->search(
4112    undef,
4113    {
4114      select   => ['cd.title'],
4115      as       => ['cd_title'],
4116      prefetch => 'artist',
4117    }
4118  );
4119
4120 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4121 becomes: C<'cd_title', 'artist.*'>.
4122
4123 =head3 CAVEATS
4124
4125 Prefetch does a lot of deep magic. As such, it may not behave exactly
4126 as you might expect.
4127
4128 =over 4
4129
4130 =item *
4131
4132 Prefetch uses the L</cache> to populate the prefetched relationships. This
4133 may or may not be what you want.
4134
4135 =item *
4136
4137 If you specify a condition on a prefetched relationship, ONLY those
4138 rows that match the prefetched condition will be fetched into that relationship.
4139 This means that adding prefetch to a search() B<may alter> what is returned by
4140 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4141
4142   my $artist_rs = $schema->resultset('Artist')->search({
4143       'cds.year' => 2008,
4144   }, {
4145       join => 'cds',
4146   });
4147
4148   my $count = $artist_rs->first->cds->count;
4149
4150   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4151
4152   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4153
4154   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4155
4156 that cmp_ok() may or may not pass depending on the datasets involved. This
4157 behavior may or may not survive the 0.09 transition.
4158
4159 =back
4160
4161 =head2 page
4162
4163 =over 4
4164
4165 =item Value: $page
4166
4167 =back
4168
4169 Makes the resultset paged and specifies the page to retrieve. Effectively
4170 identical to creating a non-pages resultset and then calling ->page($page)
4171 on it.
4172
4173 If L</rows> attribute is not specified it defaults to 10 rows per page.
4174
4175 When you have a paged resultset, L</count> will only return the number
4176 of rows in the page. To get the total, use the L</pager> and call
4177 C<total_entries> on it.
4178
4179 =head2 rows
4180
4181 =over 4
4182
4183 =item Value: $rows
4184
4185 =back
4186
4187 Specifies the maximum number of rows for direct retrieval or the number of
4188 rows per page if the page attribute or method is used.
4189
4190 =head2 offset
4191
4192 =over 4
4193
4194 =item Value: $offset
4195
4196 =back
4197
4198 Specifies the (zero-based) row number for the  first row to be returned, or the
4199 of the first row of the first page if paging is used.
4200
4201 =head2 software_limit
4202
4203 =over 4
4204
4205 =item Value: (0 | 1)
4206
4207 =back
4208
4209 When combined with L</rows> and/or L</offset> the generated SQL will not
4210 include any limit dialect stanzas. Instead the entire result will be selected
4211 as if no limits were specified, and DBIC will perform the limit locally, by
4212 artificially advancing and finishing the resulting L</cursor>.
4213
4214 This is the recommended way of performing resultset limiting when no sane RDBMS
4215 implementation is available (e.g.
4216 L<Sybase ASE|DBIx::Class::Storage::DBI::Sybase::ASE> using the
4217 L<Generic Sub Query|DBIx::Class::SQLMaker::LimitDialects/GenericSubQ> hack)
4218
4219 =head2 group_by
4220
4221 =over 4
4222
4223 =item Value: \@columns
4224
4225 =back
4226
4227 A arrayref of columns to group by. Can include columns of joined tables.
4228
4229   group_by => [qw/ column1 column2 ... /]
4230
4231 =head2 having
4232
4233 =over 4
4234
4235 =item Value: $condition
4236
4237 =back
4238
4239 HAVING is a select statement attribute that is applied between GROUP BY and
4240 ORDER BY. It is applied to the after the grouping calculations have been
4241 done.
4242
4243   having => { 'count_employee' => { '>=', 100 } }
4244
4245 or with an in-place function in which case literal SQL is required:
4246
4247   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4248
4249 =head2 distinct
4250
4251 =over 4
4252
4253 =item Value: (0 | 1)
4254
4255 =back
4256
4257 Set to 1 to group by all columns. If the resultset already has a group_by
4258 attribute, this setting is ignored and an appropriate warning is issued.
4259
4260 =head2 where
4261
4262 =over 4
4263
4264 Adds to the WHERE clause.
4265
4266   # only return rows WHERE deleted IS NULL for all searches
4267   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4268
4269 Can be overridden by passing C<< { where => undef } >> as an attribute
4270 to a resultset.
4271
4272 For more complicated where clauses see L<SQL::Abstract/WHERE CLAUSES>.
4273
4274 =back
4275
4276 =head2 cache
4277
4278 Set to 1 to cache search results. This prevents extra SQL queries if you
4279 revisit rows in your ResultSet:
4280
4281   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4282
4283   while( my $artist = $resultset->next ) {
4284     ... do stuff ...
4285   }
4286
4287   $rs->first; # without cache, this would issue a query
4288
4289 By default, searches are not cached.
4290
4291 For more examples of using these attributes, see
4292 L<DBIx::Class::Manual::Cookbook>.
4293
4294 =head2 for
4295
4296 =over 4
4297
4298 =item Value: ( 'update' | 'shared' )
4299
4300 =back
4301
4302 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4303 ... FOR SHARED.
4304
4305 =cut
4306
4307 1;