[doc] replace pseudocode with real code
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use DBIx::Class::Carp;
7 use DBIx::Class::Exception;
8 use DBIx::Class::ResultSetColumn;
9 use Scalar::Util qw/blessed weaken/;
10 use Try::Tiny;
11
12 # not importing first() as it will clash with our own method
13 use List::Util ();
14
15 BEGIN {
16   # De-duplication in _merge_attr() is disabled, but left in for reference
17   # (the merger is used for other things that ought not to be de-duped)
18   *__HM_DEDUP = sub () { 0 };
19 }
20
21 use namespace::clean;
22
23 use overload
24         '0+'     => "count",
25         'bool'   => "_bool",
26         fallback => 1;
27
28 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class result_source/);
29
30 =head1 NAME
31
32 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
33
34 =head1 SYNOPSIS
35
36   my $users_rs   = $schema->resultset('User');
37   while( $user = $users_rs->next) {
38     print $user->username;
39   }
40
41   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
42   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
43
44 =head1 DESCRIPTION
45
46 A ResultSet is an object which stores a set of conditions representing
47 a query. It is the backbone of DBIx::Class (i.e. the really
48 important/useful bit).
49
50 No SQL is executed on the database when a ResultSet is created, it
51 just stores all the conditions needed to create the query.
52
53 A basic ResultSet representing the data of an entire table is returned
54 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
55 L<Source|DBIx::Class::Manual::Glossary/Source> name.
56
57   my $users_rs = $schema->resultset('User');
58
59 A new ResultSet is returned from calling L</search> on an existing
60 ResultSet. The new one will contain all the conditions of the
61 original, plus any new conditions added in the C<search> call.
62
63 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
64 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
65 represents.
66
67 The query that the ResultSet represents is B<only> executed against
68 the database when these methods are called:
69 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
70
71 If a resultset is used in a numeric context it returns the L</count>.
72 However, if it is used in a boolean context it is B<always> true.  So if
73 you want to check if a resultset has any results, you must use C<if $rs
74 != 0>.
75
76 =head1 EXAMPLES
77
78 =head2 Chaining resultsets
79
80 Let's say you've got a query that needs to be run to return some data
81 to the user. But, you have an authorization system in place that
82 prevents certain users from seeing certain information. So, you want
83 to construct the basic query in one method, but add constraints to it in
84 another.
85
86   sub get_data {
87     my $self = shift;
88     my $request = $self->get_request; # Get a request object somehow.
89     my $schema = $self->result_source->schema;
90
91     my $cd_rs = $schema->resultset('CD')->search({
92       title => $request->param('title'),
93       year => $request->param('year'),
94     });
95
96     $cd_rs = $self->apply_security_policy( $cd_rs );
97
98     return $cd_rs->all();
99   }
100
101   sub apply_security_policy {
102     my $self = shift;
103     my ($rs) = @_;
104
105     return $rs->search({
106       subversive => 0,
107     });
108   }
109
110 =head3 Resolving conditions and attributes
111
112 When a resultset is chained from another resultset, conditions and
113 attributes with the same keys need resolving.
114
115 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
116 into the existing ones from the original resultset.
117
118 The L</where> and L</having> attributes, and any search conditions, are
119 merged with an SQL C<AND> to the existing condition from the original
120 resultset.
121
122 All other attributes are overridden by any new ones supplied in the
123 search attributes.
124
125 =head2 Multiple queries
126
127 Since a resultset just defines a query, you can do all sorts of
128 things with it with the same object.
129
130   # Don't hit the DB yet.
131   my $cd_rs = $schema->resultset('CD')->search({
132     title => 'something',
133     year => 2009,
134   });
135
136   # Each of these hits the DB individually.
137   my $count = $cd_rs->count;
138   my $most_recent = $cd_rs->get_column('date_released')->max();
139   my @records = $cd_rs->all;
140
141 And it's not just limited to SELECT statements.
142
143   $cd_rs->delete();
144
145 This is even cooler:
146
147   $cd_rs->create({ artist => 'Fred' });
148
149 Which is the same as:
150
151   $schema->resultset('CD')->create({
152     title => 'something',
153     year => 2009,
154     artist => 'Fred'
155   });
156
157 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
158
159 =head1 METHODS
160
161 =head2 new
162
163 =over 4
164
165 =item Arguments: $source, \%$attrs
166
167 =item Return Value: $rs
168
169 =back
170
171 The resultset constructor. Takes a source object (usually a
172 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
173 L</ATTRIBUTES> below).  Does not perform any queries -- these are
174 executed as needed by the other methods.
175
176 Generally you won't need to construct a resultset manually.  You'll
177 automatically get one from e.g. a L</search> called in scalar context:
178
179   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
180
181 IMPORTANT: If called on an object, proxies to new_result instead so
182
183   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
184
185 will return a CD object, not a ResultSet.
186
187 =cut
188
189 sub new {
190   my $class = shift;
191   return $class->new_result(@_) if ref $class;
192
193   my ($source, $attrs) = @_;
194   $source = $source->resolve
195     if $source->isa('DBIx::Class::ResultSourceHandle');
196   $attrs = { %{$attrs||{}} };
197
198   if ($attrs->{page}) {
199     $attrs->{rows} ||= 10;
200   }
201
202   $attrs->{alias} ||= 'me';
203
204   my $self = bless {
205     result_source => $source,
206     cond => $attrs->{where},
207     pager => undef,
208     attrs => $attrs,
209   }, $class;
210
211   # if there is a dark selector, this means we are already in a
212   # chain and the cleanup/sanification was taken care of by
213   # _search_rs already
214   $self->_normalize_selection($attrs)
215     unless $attrs->{_dark_selector};
216
217   $self->result_class(
218     $attrs->{result_class} || $source->result_class
219   );
220
221   $self;
222 }
223
224 =head2 search
225
226 =over 4
227
228 =item Arguments: $cond, \%attrs?
229
230 =item Return Value: $resultset (scalar context) ||  @row_objs (list context)
231
232 =back
233
234   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
235   my $new_rs = $cd_rs->search({ year => 2005 });
236
237   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
238                  # year = 2005 OR year = 2004
239
240 In list context, C<< ->all() >> is called implicitly on the resultset, thus
241 returning a list of row objects instead. To avoid that, use L</search_rs>.
242
243 If you need to pass in additional attributes but no additional condition,
244 call it as C<search(undef, \%attrs)>.
245
246   # "SELECT name, artistid FROM $artist_table"
247   my @all_artists = $schema->resultset('Artist')->search(undef, {
248     columns => [qw/name artistid/],
249   });
250
251 For a list of attributes that can be passed to C<search>, see
252 L</ATTRIBUTES>. For more examples of using this function, see
253 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
254 documentation for the first argument, see L<SQL::Abstract>
255 and its extension L<DBIx::Class::SQLMaker>.
256
257 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
258
259 =head3 CAVEAT
260
261 Note that L</search> does not process/deflate any of the values passed in the
262 L<SQL::Abstract>-compatible search condition structure. This is unlike other
263 condition-bound methods L</new>, L</create> and L</find>. The user must ensure
264 manually that any value passed to this method will stringify to something the
265 RDBMS knows how to deal with. A notable example is the handling of L<DateTime>
266 objects, for more info see:
267 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
268
269 =cut
270
271 sub search {
272   my $self = shift;
273   my $rs = $self->search_rs( @_ );
274
275   if (wantarray) {
276     return $rs->all;
277   }
278   elsif (defined wantarray) {
279     return $rs;
280   }
281   else {
282     # we can be called by a relationship helper, which in
283     # turn may be called in void context due to some braindead
284     # overload or whatever else the user decided to be clever
285     # at this particular day. Thus limit the exception to
286     # external code calls only
287     $self->throw_exception ('->search is *not* a mutator, calling it in void context makes no sense')
288       if (caller)[0] !~ /^\QDBIx::Class::/;
289
290     return ();
291   }
292 }
293
294 =head2 search_rs
295
296 =over 4
297
298 =item Arguments: $cond, \%attrs?
299
300 =item Return Value: $resultset
301
302 =back
303
304 This method does the same exact thing as search() except it will
305 always return a resultset, even in list context.
306
307 =cut
308
309 sub search_rs {
310   my $self = shift;
311
312   # Special-case handling for (undef, undef).
313   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
314     @_ = ();
315   }
316
317   my $call_attrs = {};
318   if (@_ > 1) {
319     if (ref $_[-1] eq 'HASH') {
320       # copy for _normalize_selection
321       $call_attrs = { %{ pop @_ } };
322     }
323     elsif (! defined $_[-1] ) {
324       pop @_;   # search({}, undef)
325     }
326   }
327
328   # see if we can keep the cache (no $rs changes)
329   my $cache;
330   my %safe = (alias => 1, cache => 1);
331   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
332     ! defined $_[0]
333       or
334     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
335       or
336     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
337   )) {
338     $cache = $self->get_cache;
339   }
340
341   my $rsrc = $self->result_source;
342
343   my $old_attrs = { %{$self->{attrs}} };
344   my $old_having = delete $old_attrs->{having};
345   my $old_where = delete $old_attrs->{where};
346
347   my $new_attrs = { %$old_attrs };
348
349   # take care of call attrs (only if anything is changing)
350   if (keys %$call_attrs) {
351
352     my @selector_attrs = qw/select as columns cols +select +as +columns include_columns/;
353
354     # reset the current selector list if new selectors are supplied
355     if (List::Util::first { exists $call_attrs->{$_} } qw/columns cols select as/) {
356       delete @{$old_attrs}{(@selector_attrs, '_dark_selector')};
357     }
358
359     # Normalize the new selector list (operates on the passed-in attr structure)
360     # Need to do it on every chain instead of only once on _resolved_attrs, in
361     # order to allow detection of empty vs partial 'as'
362     $call_attrs->{_dark_selector} = $old_attrs->{_dark_selector}
363       if $old_attrs->{_dark_selector};
364     $self->_normalize_selection ($call_attrs);
365
366     # start with blind overwriting merge, exclude selector attrs
367     $new_attrs = { %{$old_attrs}, %{$call_attrs} };
368     delete @{$new_attrs}{@selector_attrs};
369
370     for (@selector_attrs) {
371       $new_attrs->{$_} = $self->_merge_attr($old_attrs->{$_}, $call_attrs->{$_})
372         if ( exists $old_attrs->{$_} or exists $call_attrs->{$_} );
373     }
374
375     # older deprecated name, use only if {columns} is not there
376     if (my $c = delete $new_attrs->{cols}) {
377       if ($new_attrs->{columns}) {
378         carp "Resultset specifies both the 'columns' and the legacy 'cols' attributes - ignoring 'cols'";
379       }
380       else {
381         $new_attrs->{columns} = $c;
382       }
383     }
384
385
386     # join/prefetch use their own crazy merging heuristics
387     foreach my $key (qw/join prefetch/) {
388       $new_attrs->{$key} = $self->_merge_joinpref_attr($old_attrs->{$key}, $call_attrs->{$key})
389         if exists $call_attrs->{$key};
390     }
391
392     # stack binds together
393     $new_attrs->{bind} = [ @{ $old_attrs->{bind} || [] }, @{ $call_attrs->{bind} || [] } ];
394   }
395
396
397   # rip apart the rest of @_, parse a condition
398   my $call_cond = do {
399
400     if (ref $_[0] eq 'HASH') {
401       (keys %{$_[0]}) ? $_[0] : undef
402     }
403     elsif (@_ == 1) {
404       $_[0]
405     }
406     elsif (@_ % 2) {
407       $self->throw_exception('Odd number of arguments to search')
408     }
409     else {
410       +{ @_ }
411     }
412
413   } if @_;
414
415   if( @_ > 1 and ! $rsrc->result_class->isa('DBIx::Class::CDBICompat') ) {
416     carp_unique 'search( %condition ) is deprecated, use search( \%condition ) instead';
417   }
418
419   for ($old_where, $call_cond) {
420     if (defined $_) {
421       $new_attrs->{where} = $self->_stack_cond (
422         $_, $new_attrs->{where}
423       );
424     }
425   }
426
427   if (defined $old_having) {
428     $new_attrs->{having} = $self->_stack_cond (
429       $old_having, $new_attrs->{having}
430     )
431   }
432
433   my $rs = (ref $self)->new($rsrc, $new_attrs);
434
435   $rs->set_cache($cache) if ($cache);
436
437   return $rs;
438 }
439
440 my $dark_sel_dumper;
441 sub _normalize_selection {
442   my ($self, $attrs) = @_;
443
444   # legacy syntax
445   $attrs->{'+columns'} = $self->_merge_attr($attrs->{'+columns'}, delete $attrs->{include_columns})
446     if exists $attrs->{include_columns};
447
448   # columns are always placed first, however 
449
450   # Keep the X vs +X separation until _resolved_attrs time - this allows to
451   # delay the decision on whether to use a default select list ($rsrc->columns)
452   # allowing stuff like the remove_columns helper to work
453   #
454   # select/as +select/+as pairs need special handling - the amount of select/as
455   # elements in each pair does *not* have to be equal (think multicolumn
456   # selectors like distinct(foo, bar) ). If the selector is bare (no 'as'
457   # supplied at all) - try to infer the alias, either from the -as parameter
458   # of the selector spec, or use the parameter whole if it looks like a column
459   # name (ugly legacy heuristic). If all fails - leave the selector bare (which
460   # is ok as well), but make sure no more additions to the 'as' chain take place
461   for my $pref ('', '+') {
462
463     my ($sel, $as) = map {
464       my $key = "${pref}${_}";
465
466       my $val = [ ref $attrs->{$key} eq 'ARRAY'
467         ? @{$attrs->{$key}}
468         : $attrs->{$key} || ()
469       ];
470       delete $attrs->{$key};
471       $val;
472     } qw/select as/;
473
474     if (! @$as and ! @$sel ) {
475       next;
476     }
477     elsif (@$as and ! @$sel) {
478       $self->throw_exception(
479         "Unable to handle ${pref}as specification (@$as) without a corresponding ${pref}select"
480       );
481     }
482     elsif( ! @$as ) {
483       # no as part supplied at all - try to deduce (unless explicit end of named selection is declared)
484       # if any @$as has been supplied we assume the user knows what (s)he is doing
485       # and blindly keep stacking up pieces
486       unless ($attrs->{_dark_selector}) {
487         SELECTOR:
488         for (@$sel) {
489           if ( ref $_ eq 'HASH' and exists $_->{-as} ) {
490             push @$as, $_->{-as};
491           }
492           # assume any plain no-space, no-parenthesis string to be a column spec
493           # FIXME - this is retarded but is necessary to support shit like 'count(foo)'
494           elsif ( ! ref $_ and $_ =~ /^ [^\s\(\)]+ $/x) {
495             push @$as, $_;
496           }
497           # if all else fails - raise a flag that no more aliasing will be allowed
498           else {
499             $attrs->{_dark_selector} = {
500               plus_stage => $pref,
501               string => ($dark_sel_dumper ||= do {
502                   require Data::Dumper::Concise;
503                   Data::Dumper::Concise::DumperObject()->Indent(0);
504                 })->Values([$_])->Dump
505               ,
506             };
507             last SELECTOR;
508           }
509         }
510       }
511     }
512     elsif (@$as < @$sel) {
513       $self->throw_exception(
514         "Unable to handle an ${pref}as specification (@$as) with less elements than the corresponding ${pref}select"
515       );
516     }
517     elsif ($pref and $attrs->{_dark_selector}) {
518       $self->throw_exception(
519         "Unable to process named '+select', resultset contains an unnamed selector $attrs->{_dark_selector}{string}"
520       );
521     }
522
523
524     # merge result
525     $attrs->{"${pref}select"} = $self->_merge_attr($attrs->{"${pref}select"}, $sel);
526     $attrs->{"${pref}as"} = $self->_merge_attr($attrs->{"${pref}as"}, $as);
527   }
528 }
529
530 sub _stack_cond {
531   my ($self, $left, $right) = @_;
532   if (defined $left xor defined $right) {
533     return defined $left ? $left : $right;
534   }
535   elsif (defined $left) {
536     return { -and => [ map
537       { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
538       ($left, $right)
539     ]};
540   }
541
542   return undef;
543 }
544
545 =head2 search_literal
546
547 =over 4
548
549 =item Arguments: $sql_fragment, @bind_values
550
551 =item Return Value: $resultset (scalar context) || @row_objs (list context)
552
553 =back
554
555   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
556   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
557
558 Pass a literal chunk of SQL to be added to the conditional part of the
559 resultset query.
560
561 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
562 only be used in that context. C<search_literal> is a convenience method.
563 It is equivalent to calling $schema->search(\[]), but if you want to ensure
564 columns are bound correctly, use C<search>.
565
566 Example of how to use C<search> instead of C<search_literal>
567
568   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
569   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
570
571
572 See L<DBIx::Class::Manual::Cookbook/Searching> and
573 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
574 require C<search_literal>.
575
576 =cut
577
578 sub search_literal {
579   my ($self, $sql, @bind) = @_;
580   my $attr;
581   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
582     $attr = pop @bind;
583   }
584   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
585 }
586
587 =head2 find
588
589 =over 4
590
591 =item Arguments: \%columns_values | @pk_values, \%attrs?
592
593 =item Return Value: $row_object | undef
594
595 =back
596
597 Finds and returns a single row based on supplied criteria. Takes either a
598 hashref with the same format as L</create> (including inference of foreign
599 keys from related objects), or a list of primary key values in the same
600 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
601 declaration on the L</result_source>.
602
603 In either case an attempt is made to combine conditions already existing on
604 the resultset with the condition passed to this method.
605
606 To aid with preparing the correct query for the storage you may supply the
607 C<key> attribute, which is the name of a
608 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
609 unique constraint corresponding to the
610 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
611 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
612 to construct a query that satisfies the named unique constraint fully (
613 non-NULL values for each column member of the constraint) an exception is
614 thrown.
615
616 If no C<key> is specified, the search is carried over all unique constraints
617 which are fully defined by the available condition.
618
619 If no such constraint is found, C<find> currently defaults to a simple
620 C<< search->(\%column_values) >> which may or may not do what you expect.
621 Note that this fallback behavior may be deprecated in further versions. If
622 you need to search with arbitrary conditions - use L</search>. If the query
623 resulting from this fallback produces more than one row, a warning to the
624 effect is issued, though only the first row is constructed and returned as
625 C<$row_object>.
626
627 In addition to C<key>, L</find> recognizes and applies standard
628 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
629
630 Note that if you have extra concerns about the correctness of the resulting
631 query you need to specify the C<key> attribute and supply the entire condition
632 as an argument to find (since it is not always possible to perform the
633 combination of the resultset condition with the supplied one, especially if
634 the resultset condition contains literal sql).
635
636 For example, to find a row by its primary key:
637
638   my $cd = $schema->resultset('CD')->find(5);
639
640 You can also find a row by a specific unique constraint:
641
642   my $cd = $schema->resultset('CD')->find(
643     {
644       artist => 'Massive Attack',
645       title  => 'Mezzanine',
646     },
647     { key => 'cd_artist_title' }
648   );
649
650 See also L</find_or_create> and L</update_or_create>.
651
652 =cut
653
654 sub find {
655   my $self = shift;
656   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
657
658   my $rsrc = $self->result_source;
659
660   # Parse out the condition from input
661   my $call_cond;
662   if (ref $_[0] eq 'HASH') {
663     $call_cond = { %{$_[0]} };
664   }
665   else {
666     my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
667     my @c_cols = $rsrc->unique_constraint_columns($constraint);
668
669     $self->throw_exception(
670       "No constraint columns, maybe a malformed '$constraint' constraint?"
671     ) unless @c_cols;
672
673     $self->throw_exception (
674       'find() expects either a column/value hashref, or a list of values '
675     . "corresponding to the columns of the specified unique constraint '$constraint'"
676     ) unless @c_cols == @_;
677
678     $call_cond = {};
679     @{$call_cond}{@c_cols} = @_;
680   }
681
682   my %related;
683   for my $key (keys %$call_cond) {
684     if (
685       my $keyref = ref($call_cond->{$key})
686         and
687       my $relinfo = $rsrc->relationship_info($key)
688     ) {
689       my $val = delete $call_cond->{$key};
690
691       next if $keyref eq 'ARRAY'; # has_many for multi_create
692
693       my $rel_q = $rsrc->_resolve_condition(
694         $relinfo->{cond}, $val, $key, $key
695       );
696       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
697       @related{keys %$rel_q} = values %$rel_q;
698     }
699   }
700
701   # relationship conditions take precedence (?)
702   @{$call_cond}{keys %related} = values %related;
703
704   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
705   my $final_cond;
706   if (exists $attrs->{key}) {
707     $final_cond = $self->_qualify_cond_columns (
708
709       $self->_build_unique_cond (
710         $attrs->{key},
711         $call_cond,
712       ),
713
714       $alias,
715     );
716   }
717   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
718     # This means that we got here after a merger of relationship conditions
719     # in ::Relationship::Base::search_related (the row method), and furthermore
720     # the relationship is of the 'single' type. This means that the condition
721     # provided by the relationship (already attached to $self) is sufficient,
722     # as there can be only one row in the database that would satisfy the
723     # relationship
724   }
725   else {
726     # no key was specified - fall down to heuristics mode:
727     # run through all unique queries registered on the resultset, and
728     # 'OR' all qualifying queries together
729     my (@unique_queries, %seen_column_combinations);
730     for my $c_name ($rsrc->unique_constraint_names) {
731       next if $seen_column_combinations{
732         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
733       }++;
734
735       push @unique_queries, try {
736         $self->_build_unique_cond ($c_name, $call_cond, 'croak_on_nulls')
737       } || ();
738     }
739
740     $final_cond = @unique_queries
741       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
742       : $self->_non_unique_find_fallback ($call_cond, $attrs)
743     ;
744   }
745
746   # Run the query, passing the result_class since it should propagate for find
747   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
748   if (keys %{$rs->_resolved_attrs->{collapse}}) {
749     my $row = $rs->next;
750     carp "Query returned more than one row" if $rs->next;
751     return $row;
752   }
753   else {
754     return $rs->single;
755   }
756 }
757
758 # This is a stop-gap method as agreed during the discussion on find() cleanup:
759 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
760 #
761 # It is invoked when find() is called in legacy-mode with insufficiently-unique
762 # condition. It is provided for overrides until a saner way forward is devised
763 #
764 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
765 # the road. Please adjust your tests accordingly to catch this situation early
766 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
767 #
768 # The method will not be removed without an adequately complete replacement
769 # for strict-mode enforcement
770 sub _non_unique_find_fallback {
771   my ($self, $cond, $attrs) = @_;
772
773   return $self->_qualify_cond_columns(
774     $cond,
775     exists $attrs->{alias}
776       ? $attrs->{alias}
777       : $self->{attrs}{alias}
778   );
779 }
780
781
782 sub _qualify_cond_columns {
783   my ($self, $cond, $alias) = @_;
784
785   my %aliased = %$cond;
786   for (keys %aliased) {
787     $aliased{"$alias.$_"} = delete $aliased{$_}
788       if $_ !~ /\./;
789   }
790
791   return \%aliased;
792 }
793
794 sub _build_unique_cond {
795   my ($self, $constraint_name, $extra_cond, $croak_on_null) = @_;
796
797   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
798
799   # combination may fail if $self->{cond} is non-trivial
800   my ($final_cond) = try {
801     $self->_merge_with_rscond ($extra_cond)
802   } catch {
803     +{ %$extra_cond }
804   };
805
806   # trim out everything not in $columns
807   $final_cond = { map {
808     exists $final_cond->{$_}
809       ? ( $_ => $final_cond->{$_} )
810       : ()
811   } @c_cols };
812
813   if (my @missing = grep
814     { ! ($croak_on_null ? defined $final_cond->{$_} : exists $final_cond->{$_}) }
815     (@c_cols)
816   ) {
817     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
818       $constraint_name,
819       join (', ', map { "'$_'" } @missing),
820     ) );
821   }
822
823   if (
824     !$croak_on_null
825       and
826     !$ENV{DBIC_NULLABLE_KEY_NOWARN}
827       and
828     my @undefs = grep { ! defined $final_cond->{$_} } (keys %$final_cond)
829   ) {
830     carp_unique ( sprintf (
831       "NULL/undef values supplied for requested unique constraint '%s' (NULL "
832     . 'values in column(s): %s). This is almost certainly not what you wanted, '
833     . 'though you can set DBIC_NULLABLE_KEY_NOWARN to disable this warning.',
834       $constraint_name,
835       join (', ', map { "'$_'" } @undefs),
836     ));
837   }
838
839   return $final_cond;
840 }
841
842 =head2 search_related
843
844 =over 4
845
846 =item Arguments: $rel, $cond, \%attrs?
847
848 =item Return Value: $new_resultset (scalar context) || @row_objs (list context)
849
850 =back
851
852   $new_rs = $cd_rs->search_related('artist', {
853     name => 'Emo-R-Us',
854   });
855
856 Searches the specified relationship, optionally specifying a condition and
857 attributes for matching records. See L</ATTRIBUTES> for more information.
858
859 In list context, C<< ->all() >> is called implicitly on the resultset, thus
860 returning a list of row objects instead. To avoid that, use L</search_related_rs>.
861
862 See also L</search_related_rs>.
863
864 =cut
865
866 sub search_related {
867   return shift->related_resultset(shift)->search(@_);
868 }
869
870 =head2 search_related_rs
871
872 This method works exactly the same as search_related, except that
873 it guarantees a resultset, even in list context.
874
875 =cut
876
877 sub search_related_rs {
878   return shift->related_resultset(shift)->search_rs(@_);
879 }
880
881 =head2 cursor
882
883 =over 4
884
885 =item Arguments: none
886
887 =item Return Value: $cursor
888
889 =back
890
891 Returns a storage-driven cursor to the given resultset. See
892 L<DBIx::Class::Cursor> for more information.
893
894 =cut
895
896 sub cursor {
897   my ($self) = @_;
898
899   my $attrs = $self->_resolved_attrs_copy;
900
901   return $self->{cursor}
902     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
903           $attrs->{where},$attrs);
904 }
905
906 =head2 single
907
908 =over 4
909
910 =item Arguments: $cond?
911
912 =item Return Value: $row_object | undef
913
914 =back
915
916   my $cd = $schema->resultset('CD')->single({ year => 2001 });
917
918 Inflates the first result without creating a cursor if the resultset has
919 any records in it; if not returns C<undef>. Used by L</find> as a lean version
920 of L</search>.
921
922 While this method can take an optional search condition (just like L</search>)
923 being a fast-code-path it does not recognize search attributes. If you need to
924 add extra joins or similar, call L</search> and then chain-call L</single> on the
925 L<DBIx::Class::ResultSet> returned.
926
927 =over
928
929 =item B<Note>
930
931 As of 0.08100, this method enforces the assumption that the preceding
932 query returns only one row. If more than one row is returned, you will receive
933 a warning:
934
935   Query returned more than one row
936
937 In this case, you should be using L</next> or L</find> instead, or if you really
938 know what you are doing, use the L</rows> attribute to explicitly limit the size
939 of the resultset.
940
941 This method will also throw an exception if it is called on a resultset prefetching
942 has_many, as such a prefetch implies fetching multiple rows from the database in
943 order to assemble the resulting object.
944
945 =back
946
947 =cut
948
949 sub single {
950   my ($self, $where) = @_;
951   if(@_ > 2) {
952       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
953   }
954
955   my $attrs = $self->_resolved_attrs_copy;
956
957   if (keys %{$attrs->{collapse}}) {
958     $self->throw_exception(
959       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
960     );
961   }
962
963   if ($where) {
964     if (defined $attrs->{where}) {
965       $attrs->{where} = {
966         '-and' =>
967             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
968                $where, delete $attrs->{where} ]
969       };
970     } else {
971       $attrs->{where} = $where;
972     }
973   }
974
975   my @data = $self->result_source->storage->select_single(
976     $attrs->{from}, $attrs->{select},
977     $attrs->{where}, $attrs
978   );
979
980   return (@data ? ($self->_construct_object(@data))[0] : undef);
981 }
982
983
984 # _collapse_query
985 #
986 # Recursively collapse the query, accumulating values for each column.
987
988 sub _collapse_query {
989   my ($self, $query, $collapsed) = @_;
990
991   $collapsed ||= {};
992
993   if (ref $query eq 'ARRAY') {
994     foreach my $subquery (@$query) {
995       next unless ref $subquery;  # -or
996       $collapsed = $self->_collapse_query($subquery, $collapsed);
997     }
998   }
999   elsif (ref $query eq 'HASH') {
1000     if (keys %$query and (keys %$query)[0] eq '-and') {
1001       foreach my $subquery (@{$query->{-and}}) {
1002         $collapsed = $self->_collapse_query($subquery, $collapsed);
1003       }
1004     }
1005     else {
1006       foreach my $col (keys %$query) {
1007         my $value = $query->{$col};
1008         $collapsed->{$col}{$value}++;
1009       }
1010     }
1011   }
1012
1013   return $collapsed;
1014 }
1015
1016 =head2 get_column
1017
1018 =over 4
1019
1020 =item Arguments: $cond?
1021
1022 =item Return Value: $resultsetcolumn
1023
1024 =back
1025
1026   my $max_length = $rs->get_column('length')->max;
1027
1028 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
1029
1030 =cut
1031
1032 sub get_column {
1033   my ($self, $column) = @_;
1034   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
1035   return $new;
1036 }
1037
1038 =head2 search_like
1039
1040 =over 4
1041
1042 =item Arguments: $cond, \%attrs?
1043
1044 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1045
1046 =back
1047
1048   # WHERE title LIKE '%blue%'
1049   $cd_rs = $rs->search_like({ title => '%blue%'});
1050
1051 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
1052 that this is simply a convenience method retained for ex Class::DBI users.
1053 You most likely want to use L</search> with specific operators.
1054
1055 For more information, see L<DBIx::Class::Manual::Cookbook>.
1056
1057 This method is deprecated and will be removed in 0.09. Use L</search()>
1058 instead. An example conversion is:
1059
1060   ->search_like({ foo => 'bar' });
1061
1062   # Becomes
1063
1064   ->search({ foo => { like => 'bar' } });
1065
1066 =cut
1067
1068 sub search_like {
1069   my $class = shift;
1070   carp_unique (
1071     'search_like() is deprecated and will be removed in DBIC version 0.09.'
1072    .' Instead use ->search({ x => { -like => "y%" } })'
1073    .' (note the outer pair of {}s - they are important!)'
1074   );
1075   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1076   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
1077   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
1078   return $class->search($query, { %$attrs });
1079 }
1080
1081 =head2 slice
1082
1083 =over 4
1084
1085 =item Arguments: $first, $last
1086
1087 =item Return Value: $resultset (scalar context) || @row_objs (list context)
1088
1089 =back
1090
1091 Returns a resultset or object list representing a subset of elements from the
1092 resultset slice is called on. Indexes are from 0, i.e., to get the first
1093 three records, call:
1094
1095   my ($one, $two, $three) = $rs->slice(0, 2);
1096
1097 =cut
1098
1099 sub slice {
1100   my ($self, $min, $max) = @_;
1101   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
1102   $attrs->{offset} = $self->{attrs}{offset} || 0;
1103   $attrs->{offset} += $min;
1104   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
1105   return $self->search(undef, $attrs);
1106   #my $slice = (ref $self)->new($self->result_source, $attrs);
1107   #return (wantarray ? $slice->all : $slice);
1108 }
1109
1110 =head2 next
1111
1112 =over 4
1113
1114 =item Arguments: none
1115
1116 =item Return Value: $result | undef
1117
1118 =back
1119
1120 Returns the next element in the resultset (C<undef> is there is none).
1121
1122 Can be used to efficiently iterate over records in the resultset:
1123
1124   my $rs = $schema->resultset('CD')->search;
1125   while (my $cd = $rs->next) {
1126     print $cd->title;
1127   }
1128
1129 Note that you need to store the resultset object, and call C<next> on it.
1130 Calling C<< resultset('Table')->next >> repeatedly will always return the
1131 first record from the resultset.
1132
1133 =cut
1134
1135 sub next {
1136   my ($self) = @_;
1137   if (my $cache = $self->get_cache) {
1138     $self->{all_cache_position} ||= 0;
1139     return $cache->[$self->{all_cache_position}++];
1140   }
1141   if ($self->{attrs}{cache}) {
1142     delete $self->{pager};
1143     $self->{all_cache_position} = 1;
1144     return ($self->all)[0];
1145   }
1146   if ($self->{stashed_objects}) {
1147     my $obj = shift(@{$self->{stashed_objects}});
1148     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
1149     return $obj;
1150   }
1151   my @row = (
1152     exists $self->{stashed_row}
1153       ? @{delete $self->{stashed_row}}
1154       : $self->cursor->next
1155   );
1156   return undef unless (@row);
1157   my ($row, @more) = $self->_construct_object(@row);
1158   $self->{stashed_objects} = \@more if @more;
1159   return $row;
1160 }
1161
1162 sub _construct_object {
1163   my ($self, @row) = @_;
1164
1165   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
1166     or return ();
1167   my @new = $self->result_class->inflate_result($self->result_source, @$info);
1168   @new = $self->{_attrs}{record_filter}->(@new)
1169     if exists $self->{_attrs}{record_filter};
1170   return @new;
1171 }
1172
1173 sub _collapse_result {
1174   my ($self, $as_proto, $row) = @_;
1175
1176   my @copy = @$row;
1177
1178   # 'foo'         => [ undef, 'foo' ]
1179   # 'foo.bar'     => [ 'foo', 'bar' ]
1180   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
1181
1182   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
1183
1184   my %collapse = %{$self->{_attrs}{collapse}||{}};
1185
1186   my @pri_index;
1187
1188   # if we're doing collapsing (has_many prefetch) we need to grab records
1189   # until the PK changes, so fill @pri_index. if not, we leave it empty so
1190   # we know we don't have to bother.
1191
1192   # the reason for not using the collapse stuff directly is because if you
1193   # had for e.g. two artists in a row with no cds, the collapse info for
1194   # both would be NULL (undef) so you'd lose the second artist
1195
1196   # store just the index so we can check the array positions from the row
1197   # without having to contruct the full hash
1198
1199   if (keys %collapse) {
1200     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1201     foreach my $i (0 .. $#construct_as) {
1202       next if defined($construct_as[$i][0]); # only self table
1203       if (delete $pri{$construct_as[$i][1]}) {
1204         push(@pri_index, $i);
1205       }
1206       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1207     }
1208   }
1209
1210   # no need to do an if, it'll be empty if @pri_index is empty anyway
1211
1212   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1213
1214   my @const_rows;
1215
1216   do { # no need to check anything at the front, we always want the first row
1217
1218     my %const;
1219
1220     foreach my $this_as (@construct_as) {
1221       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1222     }
1223
1224     push(@const_rows, \%const);
1225
1226   } until ( # no pri_index => no collapse => drop straight out
1227       !@pri_index
1228     or
1229       do { # get another row, stash it, drop out if different PK
1230
1231         @copy = $self->cursor->next;
1232         $self->{stashed_row} = \@copy;
1233
1234         # last thing in do block, counts as true if anything doesn't match
1235
1236         # check xor defined first for NULL vs. NOT NULL then if one is
1237         # defined the other must be so check string equality
1238
1239         grep {
1240           (defined $pri_vals{$_} ^ defined $copy[$_])
1241           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1242         } @pri_index;
1243       }
1244   );
1245
1246   my $alias = $self->{attrs}{alias};
1247   my $info = [];
1248
1249   my %collapse_pos;
1250
1251   my @const_keys;
1252
1253   foreach my $const (@const_rows) {
1254     scalar @const_keys or do {
1255       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1256     };
1257     foreach my $key (@const_keys) {
1258       if (length $key) {
1259         my $target = $info;
1260         my @parts = split(/\./, $key);
1261         my $cur = '';
1262         my $data = $const->{$key};
1263         foreach my $p (@parts) {
1264           $target = $target->[1]->{$p} ||= [];
1265           $cur .= ".${p}";
1266           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1267             # collapsing at this point and on final part
1268             my $pos = $collapse_pos{$cur};
1269             CK: foreach my $ck (@ckey) {
1270               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1271                 $collapse_pos{$cur} = $data;
1272                 delete @collapse_pos{ # clear all positioning for sub-entries
1273                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1274                 };
1275                 push(@$target, []);
1276                 last CK;
1277               }
1278             }
1279           }
1280           if (exists $collapse{$cur}) {
1281             $target = $target->[-1];
1282           }
1283         }
1284         $target->[0] = $data;
1285       } else {
1286         $info->[0] = $const->{$key};
1287       }
1288     }
1289   }
1290
1291   return $info;
1292 }
1293
1294 =head2 result_source
1295
1296 =over 4
1297
1298 =item Arguments: $result_source?
1299
1300 =item Return Value: $result_source
1301
1302 =back
1303
1304 An accessor for the primary ResultSource object from which this ResultSet
1305 is derived.
1306
1307 =head2 result_class
1308
1309 =over 4
1310
1311 =item Arguments: $result_class?
1312
1313 =item Return Value: $result_class
1314
1315 =back
1316
1317 An accessor for the class to use when creating row objects. Defaults to
1318 C<< result_source->result_class >> - which in most cases is the name of the
1319 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1320
1321 Note that changing the result_class will also remove any components
1322 that were originally loaded in the source class via
1323 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1324 in the original source class will not run.
1325
1326 =cut
1327
1328 sub result_class {
1329   my ($self, $result_class) = @_;
1330   if ($result_class) {
1331     unless (ref $result_class) { # don't fire this for an object
1332       $self->ensure_class_loaded($result_class);
1333     }
1334     $self->_result_class($result_class);
1335     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1336     # permit the user to set result class on one result set only; it only
1337     # chains if provided to search()
1338     #$self->{attrs}{result_class} = $result_class if ref $self;
1339   }
1340   $self->_result_class;
1341 }
1342
1343 =head2 count
1344
1345 =over 4
1346
1347 =item Arguments: $cond, \%attrs??
1348
1349 =item Return Value: $count
1350
1351 =back
1352
1353 Performs an SQL C<COUNT> with the same query as the resultset was built
1354 with to find the number of elements. Passing arguments is equivalent to
1355 C<< $rs->search ($cond, \%attrs)->count >>
1356
1357 =cut
1358
1359 sub count {
1360   my $self = shift;
1361   return $self->search(@_)->count if @_ and defined $_[0];
1362   return scalar @{ $self->get_cache } if $self->get_cache;
1363
1364   my $attrs = $self->_resolved_attrs_copy;
1365
1366   # this is a little optimization - it is faster to do the limit
1367   # adjustments in software, instead of a subquery
1368   my $rows = delete $attrs->{rows};
1369   my $offset = delete $attrs->{offset};
1370
1371   my $crs;
1372   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1373     $crs = $self->_count_subq_rs ($attrs);
1374   }
1375   else {
1376     $crs = $self->_count_rs ($attrs);
1377   }
1378   my $count = $crs->next;
1379
1380   $count -= $offset if $offset;
1381   $count = $rows if $rows and $rows < $count;
1382   $count = 0 if ($count < 0);
1383
1384   return $count;
1385 }
1386
1387 =head2 count_rs
1388
1389 =over 4
1390
1391 =item Arguments: $cond, \%attrs??
1392
1393 =item Return Value: $count_rs
1394
1395 =back
1396
1397 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1398 This can be very handy for subqueries:
1399
1400   ->search( { amount => $some_rs->count_rs->as_query } )
1401
1402 As with regular resultsets the SQL query will be executed only after
1403 the resultset is accessed via L</next> or L</all>. That would return
1404 the same single value obtainable via L</count>.
1405
1406 =cut
1407
1408 sub count_rs {
1409   my $self = shift;
1410   return $self->search(@_)->count_rs if @_;
1411
1412   # this may look like a lack of abstraction (count() does about the same)
1413   # but in fact an _rs *must* use a subquery for the limits, as the
1414   # software based limiting can not be ported if this $rs is to be used
1415   # in a subquery itself (i.e. ->as_query)
1416   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1417     return $self->_count_subq_rs;
1418   }
1419   else {
1420     return $self->_count_rs;
1421   }
1422 }
1423
1424 #
1425 # returns a ResultSetColumn object tied to the count query
1426 #
1427 sub _count_rs {
1428   my ($self, $attrs) = @_;
1429
1430   my $rsrc = $self->result_source;
1431   $attrs ||= $self->_resolved_attrs;
1432
1433   my $tmp_attrs = { %$attrs };
1434   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1435   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1436
1437   # overwrite the selector (supplied by the storage)
1438   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1439   $tmp_attrs->{as} = 'count';
1440   delete @{$tmp_attrs}{qw/columns/};
1441
1442   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1443
1444   return $tmp_rs;
1445 }
1446
1447 #
1448 # same as above but uses a subquery
1449 #
1450 sub _count_subq_rs {
1451   my ($self, $attrs) = @_;
1452
1453   my $rsrc = $self->result_source;
1454   $attrs ||= $self->_resolved_attrs;
1455
1456   my $sub_attrs = { %$attrs };
1457   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1458   delete @{$sub_attrs}{qw/collapse columns as select _prefetch_selector_range order_by for/};
1459
1460   # if we multi-prefetch we group_by primary keys only as this is what we would
1461   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1462   if ( keys %{$attrs->{collapse}}  ) {
1463     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1464   }
1465
1466   # Calculate subquery selector
1467   if (my $g = $sub_attrs->{group_by}) {
1468
1469     my $sql_maker = $rsrc->storage->sql_maker;
1470
1471     # necessary as the group_by may refer to aliased functions
1472     my $sel_index;
1473     for my $sel (@{$attrs->{select}}) {
1474       $sel_index->{$sel->{-as}} = $sel
1475         if (ref $sel eq 'HASH' and $sel->{-as});
1476     }
1477
1478     # anything from the original select mentioned on the group-by needs to make it to the inner selector
1479     # also look for named aggregates referred in the having clause
1480     # having often contains scalarrefs - thus parse it out entirely
1481     my @parts = @$g;
1482     if ($attrs->{having}) {
1483       local $sql_maker->{having_bind};
1484       local $sql_maker->{quote_char} = $sql_maker->{quote_char};
1485       local $sql_maker->{name_sep} = $sql_maker->{name_sep};
1486       unless (defined $sql_maker->{quote_char} and length $sql_maker->{quote_char}) {
1487         $sql_maker->{quote_char} = [ "\x00", "\xFF" ];
1488         # if we don't unset it we screw up retarded but unfortunately working
1489         # 'MAX(foo.bar)' => { '>', 3 }
1490         $sql_maker->{name_sep} = '';
1491       }
1492
1493       my ($lquote, $rquote, $sep) = map { quotemeta $_ } ($sql_maker->_quote_chars, $sql_maker->name_sep);
1494
1495       my $sql = $sql_maker->_parse_rs_attrs ({ having => $attrs->{having} });
1496
1497       # search for both a proper quoted qualified string, for a naive unquoted scalarref
1498       # and if all fails for an utterly naive quoted scalar-with-function
1499       while ($sql =~ /
1500         $rquote $sep $lquote (.+?) $rquote
1501           |
1502         [\s,] \w+ \. (\w+) [\s,]
1503           |
1504         [\s,] $lquote (.+?) $rquote [\s,]
1505       /gx) {
1506         push @parts, ($1 || $2 || $3);  # one of them matched if we got here
1507       }
1508     }
1509
1510     for (@parts) {
1511       my $colpiece = $sel_index->{$_} || $_;
1512
1513       # unqualify join-based group_by's. Arcane but possible query
1514       # also horrible horrible hack to alias a column (not a func.)
1515       # (probably need to introduce SQLA syntax)
1516       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1517         my $as = $colpiece;
1518         $as =~ s/\./__/;
1519         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1520       }
1521       push @{$sub_attrs->{select}}, $colpiece;
1522     }
1523   }
1524   else {
1525     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1526     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1527   }
1528
1529   return $rsrc->resultset_class
1530                ->new ($rsrc, $sub_attrs)
1531                 ->as_subselect_rs
1532                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1533                   ->get_column ('count');
1534 }
1535
1536 sub _bool {
1537   return 1;
1538 }
1539
1540 =head2 count_literal
1541
1542 =over 4
1543
1544 =item Arguments: $sql_fragment, @bind_values
1545
1546 =item Return Value: $count
1547
1548 =back
1549
1550 Counts the results in a literal query. Equivalent to calling L</search_literal>
1551 with the passed arguments, then L</count>.
1552
1553 =cut
1554
1555 sub count_literal { shift->search_literal(@_)->count; }
1556
1557 =head2 all
1558
1559 =over 4
1560
1561 =item Arguments: none
1562
1563 =item Return Value: @objects
1564
1565 =back
1566
1567 Returns all elements in the resultset.
1568
1569 =cut
1570
1571 sub all {
1572   my $self = shift;
1573   if(@_) {
1574       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1575   }
1576
1577   return @{ $self->get_cache } if $self->get_cache;
1578
1579   my @obj;
1580
1581   if (keys %{$self->_resolved_attrs->{collapse}}) {
1582     # Using $self->cursor->all is really just an optimisation.
1583     # If we're collapsing has_many prefetches it probably makes
1584     # very little difference, and this is cleaner than hacking
1585     # _construct_object to survive the approach
1586     $self->cursor->reset;
1587     my @row = $self->cursor->next;
1588     while (@row) {
1589       push(@obj, $self->_construct_object(@row));
1590       @row = (exists $self->{stashed_row}
1591                ? @{delete $self->{stashed_row}}
1592                : $self->cursor->next);
1593     }
1594   } else {
1595     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1596   }
1597
1598   $self->set_cache(\@obj) if $self->{attrs}{cache};
1599
1600   return @obj;
1601 }
1602
1603 =head2 reset
1604
1605 =over 4
1606
1607 =item Arguments: none
1608
1609 =item Return Value: $self
1610
1611 =back
1612
1613 Resets the resultset's cursor, so you can iterate through the elements again.
1614 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1615 another query.
1616
1617 =cut
1618
1619 sub reset {
1620   my ($self) = @_;
1621   delete $self->{_attrs} if exists $self->{_attrs};
1622   $self->{all_cache_position} = 0;
1623   $self->cursor->reset;
1624   return $self;
1625 }
1626
1627 =head2 first
1628
1629 =over 4
1630
1631 =item Arguments: none
1632
1633 =item Return Value: $object | undef
1634
1635 =back
1636
1637 Resets the resultset and returns an object for the first result (or C<undef>
1638 if the resultset is empty).
1639
1640 =cut
1641
1642 sub first {
1643   return $_[0]->reset->next;
1644 }
1645
1646
1647 # _rs_update_delete
1648 #
1649 # Determines whether and what type of subquery is required for the $rs operation.
1650 # If grouping is necessary either supplies its own, or verifies the current one
1651 # After all is done delegates to the proper storage method.
1652
1653 sub _rs_update_delete {
1654   my ($self, $op, $values) = @_;
1655
1656   my $rsrc = $self->result_source;
1657
1658   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1659   my $needs_subq = $needs_group_by_subq || $self->_has_resolved_attr(qw/rows offset/);
1660
1661   if ($needs_group_by_subq or $needs_subq) {
1662
1663     # make a new $rs selecting only the PKs (that's all we really need)
1664     my $attrs = $self->_resolved_attrs_copy;
1665
1666
1667     delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_selector_range as/;
1668     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1669
1670     if ($needs_group_by_subq) {
1671       # make sure no group_by was supplied, or if there is one - make sure it matches
1672       # the columns compiled above perfectly. Anything else can not be sanely executed
1673       # on most databases so croak right then and there
1674
1675       if (my $g = $attrs->{group_by}) {
1676         my @current_group_by = map
1677           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1678           @$g
1679         ;
1680
1681         if (
1682           join ("\x00", sort @current_group_by)
1683             ne
1684           join ("\x00", sort @{$attrs->{columns}} )
1685         ) {
1686           $self->throw_exception (
1687             "You have just attempted a $op operation on a resultset which does group_by"
1688             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1689             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1690             . ' kind of queries. Please retry the operation with a modified group_by or'
1691             . ' without using one at all.'
1692           );
1693         }
1694       }
1695       else {
1696         $attrs->{group_by} = $attrs->{columns};
1697       }
1698     }
1699
1700     my $subrs = (ref $self)->new($rsrc, $attrs);
1701     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1702   }
1703   else {
1704     # Most databases do not allow aliasing of tables in UPDATE/DELETE. Thus
1705     # a condition containing 'me' or other table prefixes will not work
1706     # at all. What this code tries to do (badly) is to generate a condition
1707     # with the qualifiers removed, by exploiting the quote mechanism of sqla
1708     #
1709     # this is atrocious and should be replaced by normal sqla introspection
1710     # one sunny day
1711     my ($sql, @bind) = do {
1712       my $sqla = $rsrc->storage->sql_maker;
1713       local $sqla->{_dequalify_idents} = 1;
1714       $sqla->_recurse_where($self->{cond});
1715     } if $self->{cond};
1716
1717     return $rsrc->storage->$op(
1718       $rsrc,
1719       $op eq 'update' ? $values : (),
1720       $self->{cond} ? \[$sql, @bind] : (),
1721     );
1722   }
1723 }
1724
1725 =head2 update
1726
1727 =over 4
1728
1729 =item Arguments: \%values
1730
1731 =item Return Value: $storage_rv
1732
1733 =back
1734
1735 Sets the specified columns in the resultset to the supplied values in a
1736 single query. Note that this will not run any accessor/set_column/update
1737 triggers, nor will it update any row object instances derived from this
1738 resultset (this includes the contents of the L<resultset cache|/set_cache>
1739 if any). See L</update_all> if you need to execute any on-update
1740 triggers or cascades defined either by you or a
1741 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1742
1743 The return value is a pass through of what the underlying
1744 storage backend returned, and may vary. See L<DBI/execute> for the most
1745 common case.
1746
1747 =head3 CAVEAT
1748
1749 Note that L</update> does not process/deflate any of the values passed in.
1750 This is unlike the corresponding L<DBIx::Class::Row/update>. The user must
1751 ensure manually that any value passed to this method will stringify to
1752 something the RDBMS knows how to deal with. A notable example is the
1753 handling of L<DateTime> objects, for more info see:
1754 L<DBIx::Class::Manual::Cookbook/Formatting_DateTime_objects_in_queries>.
1755
1756 =cut
1757
1758 sub update {
1759   my ($self, $values) = @_;
1760   $self->throw_exception('Values for update must be a hash')
1761     unless ref $values eq 'HASH';
1762
1763   return $self->_rs_update_delete ('update', $values);
1764 }
1765
1766 =head2 update_all
1767
1768 =over 4
1769
1770 =item Arguments: \%values
1771
1772 =item Return Value: 1
1773
1774 =back
1775
1776 Fetches all objects and updates them one at a time via
1777 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1778 triggers, while L</update> will not.
1779
1780 =cut
1781
1782 sub update_all {
1783   my ($self, $values) = @_;
1784   $self->throw_exception('Values for update_all must be a hash')
1785     unless ref $values eq 'HASH';
1786
1787   my $guard = $self->result_source->schema->txn_scope_guard;
1788   $_->update($values) for $self->all;
1789   $guard->commit;
1790   return 1;
1791 }
1792
1793 =head2 delete
1794
1795 =over 4
1796
1797 =item Arguments: none
1798
1799 =item Return Value: $storage_rv
1800
1801 =back
1802
1803 Deletes the rows matching this resultset in a single query. Note that this
1804 will not run any delete triggers, nor will it alter the
1805 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1806 derived from this resultset (this includes the contents of the
1807 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1808 execute any on-delete triggers or cascades defined either by you or a
1809 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1810
1811 The return value is a pass through of what the underlying storage backend
1812 returned, and may vary. See L<DBI/execute> for the most common case.
1813
1814 =cut
1815
1816 sub delete {
1817   my $self = shift;
1818   $self->throw_exception('delete does not accept any arguments')
1819     if @_;
1820
1821   return $self->_rs_update_delete ('delete');
1822 }
1823
1824 =head2 delete_all
1825
1826 =over 4
1827
1828 =item Arguments: none
1829
1830 =item Return Value: 1
1831
1832 =back
1833
1834 Fetches all objects and deletes them one at a time via
1835 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1836 triggers, while L</delete> will not.
1837
1838 =cut
1839
1840 sub delete_all {
1841   my $self = shift;
1842   $self->throw_exception('delete_all does not accept any arguments')
1843     if @_;
1844
1845   my $guard = $self->result_source->schema->txn_scope_guard;
1846   $_->delete for $self->all;
1847   $guard->commit;
1848   return 1;
1849 }
1850
1851 =head2 populate
1852
1853 =over 4
1854
1855 =item Arguments: \@data;
1856
1857 =back
1858
1859 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1860 For the arrayref of hashrefs style each hashref should be a structure suitable
1861 for submitting to a $resultset->create(...) method.
1862
1863 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1864 to insert the data, as this is a faster method.
1865
1866 Otherwise, each set of data is inserted into the database using
1867 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1868 accumulated into an array. The array itself, or an array reference
1869 is returned depending on scalar or list context.
1870
1871 Example:  Assuming an Artist Class that has many CDs Classes relating:
1872
1873   my $Artist_rs = $schema->resultset("Artist");
1874
1875   ## Void Context Example
1876   $Artist_rs->populate([
1877      { artistid => 4, name => 'Manufactured Crap', cds => [
1878         { title => 'My First CD', year => 2006 },
1879         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1880       ],
1881      },
1882      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1883         { title => 'My parents sold me to a record company', year => 2005 },
1884         { title => 'Why Am I So Ugly?', year => 2006 },
1885         { title => 'I Got Surgery and am now Popular', year => 2007 }
1886       ],
1887      },
1888   ]);
1889
1890   ## Array Context Example
1891   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1892     { name => "Artist One"},
1893     { name => "Artist Two"},
1894     { name => "Artist Three", cds=> [
1895     { title => "First CD", year => 2007},
1896     { title => "Second CD", year => 2008},
1897   ]}
1898   ]);
1899
1900   print $ArtistOne->name; ## response is 'Artist One'
1901   print $ArtistThree->cds->count ## reponse is '2'
1902
1903 For the arrayref of arrayrefs style,  the first element should be a list of the
1904 fieldsnames to which the remaining elements are rows being inserted.  For
1905 example:
1906
1907   $Arstist_rs->populate([
1908     [qw/artistid name/],
1909     [100, 'A Formally Unknown Singer'],
1910     [101, 'A singer that jumped the shark two albums ago'],
1911     [102, 'An actually cool singer'],
1912   ]);
1913
1914 Please note an important effect on your data when choosing between void and
1915 wantarray context. Since void context goes straight to C<insert_bulk> in
1916 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1917 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1918 create primary keys for you, you will find that your PKs are empty.  In this
1919 case you will have to use the wantarray context in order to create those
1920 values.
1921
1922 =cut
1923
1924 sub populate {
1925   my $self = shift;
1926
1927   # cruft placed in standalone method
1928   my $data = $self->_normalize_populate_args(@_);
1929
1930   if(defined wantarray) {
1931     my @created;
1932     foreach my $item (@$data) {
1933       push(@created, $self->create($item));
1934     }
1935     return wantarray ? @created : \@created;
1936   } 
1937   else {
1938     my $first = $data->[0];
1939
1940     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1941     # it relationship data
1942     my (@rels, @columns);
1943     my $rsrc = $self->result_source;
1944     my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
1945     for (keys %$first) {
1946       my $ref = ref $first->{$_};
1947       $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
1948         ? push @rels, $_
1949         : push @columns, $_
1950       ;
1951     }
1952
1953     my @pks = $rsrc->primary_columns;
1954
1955     ## do the belongs_to relationships
1956     foreach my $index (0..$#$data) {
1957
1958       # delegate to create() for any dataset without primary keys with specified relationships
1959       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1960         for my $r (@rels) {
1961           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1962             my @ret = $self->populate($data);
1963             return;
1964           }
1965         }
1966       }
1967
1968       foreach my $rel (@rels) {
1969         next unless ref $data->[$index]->{$rel} eq "HASH";
1970         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1971         my ($reverse_relname, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
1972         my $related = $result->result_source->_resolve_condition(
1973           $reverse_relinfo->{cond},
1974           $self,
1975           $result,
1976           $rel,
1977         );
1978
1979         delete $data->[$index]->{$rel};
1980         $data->[$index] = {%{$data->[$index]}, %$related};
1981
1982         push @columns, keys %$related if $index == 0;
1983       }
1984     }
1985
1986     ## inherit the data locked in the conditions of the resultset
1987     my ($rs_data) = $self->_merge_with_rscond({});
1988     delete @{$rs_data}{@columns};
1989     my @inherit_cols = keys %$rs_data;
1990     my @inherit_data = values %$rs_data;
1991
1992     ## do bulk insert on current row
1993     $rsrc->storage->insert_bulk(
1994       $rsrc,
1995       [@columns, @inherit_cols],
1996       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
1997     );
1998
1999     ## do the has_many relationships
2000     foreach my $item (@$data) {
2001
2002       my $main_row;
2003
2004       foreach my $rel (@rels) {
2005         next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
2006
2007         $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
2008
2009         my $child = $main_row->$rel;
2010
2011         my $related = $child->result_source->_resolve_condition(
2012           $rels->{$rel}{cond},
2013           $child,
2014           $main_row,
2015           $rel,
2016         );
2017
2018         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
2019         my @populate = map { {%$_, %$related} } @rows_to_add;
2020
2021         $child->populate( \@populate );
2022       }
2023     }
2024   }
2025 }
2026
2027
2028 # populate() argumnets went over several incarnations
2029 # What we ultimately support is AoH
2030 sub _normalize_populate_args {
2031   my ($self, $arg) = @_;
2032
2033   if (ref $arg eq 'ARRAY') {
2034     if (ref $arg->[0] eq 'HASH') {
2035       return $arg;
2036     }
2037     elsif (ref $arg->[0] eq 'ARRAY') {
2038       my @ret;
2039       my @colnames = @{$arg->[0]};
2040       foreach my $values (@{$arg}[1 .. $#$arg]) {
2041         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
2042       }
2043       return \@ret;
2044     }
2045   }
2046
2047   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
2048 }
2049
2050 =head2 pager
2051
2052 =over 4
2053
2054 =item Arguments: none
2055
2056 =item Return Value: $pager
2057
2058 =back
2059
2060 Return Value a L<Data::Page> object for the current resultset. Only makes
2061 sense for queries with a C<page> attribute.
2062
2063 To get the full count of entries for a paged resultset, call
2064 C<total_entries> on the L<Data::Page> object.
2065
2066 =cut
2067
2068 # make a wizard good for both a scalar and a hashref
2069 my $mk_lazy_count_wizard = sub {
2070   require Variable::Magic;
2071
2072   my $stash = { total_rs => shift };
2073   my $slot = shift; # only used by the hashref magic
2074
2075   my $magic = Variable::Magic::wizard (
2076     data => sub { $stash },
2077
2078     (!$slot)
2079     ? (
2080       # the scalar magic
2081       get => sub {
2082         # set value lazily, and dispell for good
2083         ${$_[0]} = $_[1]{total_rs}->count;
2084         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
2085         return 1;
2086       },
2087       set => sub {
2088         # an explicit set implies dispell as well
2089         # the unless() is to work around "fun and giggles" below
2090         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
2091           unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2092         return 1;
2093       },
2094     )
2095     : (
2096       # the uvar magic
2097       fetch => sub {
2098         if ($_[2] eq $slot and !$_[1]{inactive}) {
2099           my $cnt = $_[1]{total_rs}->count;
2100           $_[0]->{$slot} = $cnt;
2101
2102           # attempting to dispell in a fetch handle (works in store), seems
2103           # to invariable segfault on 5.10, 5.12, 5.13 :(
2104           # so use an inactivator instead
2105           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2106           $_[1]{inactive}++;
2107         }
2108         return 1;
2109       },
2110       store => sub {
2111         if (! $_[1]{inactive} and $_[2] eq $slot) {
2112           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
2113           $_[1]{inactive}++
2114             unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
2115         }
2116         return 1;
2117       },
2118     ),
2119   );
2120
2121   $stash->{magic_selfref} = $magic;
2122   weaken ($stash->{magic_selfref}); # this fails on 5.8.1
2123
2124   return $magic;
2125 };
2126
2127 # the tie class for 5.8.1
2128 {
2129   package # hide from pause
2130     DBIx::Class::__DBIC_LAZY_RS_COUNT__;
2131   use base qw/Tie::Hash/;
2132
2133   sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
2134   sub NEXTKEY  { each %{$_[0]{data}} }
2135   sub EXISTS   { exists $_[0]{data}{$_[1]} }
2136   sub DELETE   { delete $_[0]{data}{$_[1]} }
2137   sub CLEAR    { %{$_[0]{data}} = () }
2138   sub SCALAR   { scalar %{$_[0]{data}} }
2139
2140   sub TIEHASH {
2141     $_[1]{data} = {%{$_[1]{selfref}}};
2142     %{$_[1]{selfref}} = ();
2143     Scalar::Util::weaken ($_[1]{selfref});
2144     return bless ($_[1], $_[0]);
2145   };
2146
2147   sub FETCH {
2148     if ($_[1] eq $_[0]{slot}) {
2149       my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
2150       untie %{$_[0]{selfref}};
2151       %{$_[0]{selfref}} = %{$_[0]{data}};
2152       return $cnt;
2153     }
2154     else {
2155       $_[0]{data}{$_[1]};
2156     }
2157   }
2158
2159   sub STORE {
2160     $_[0]{data}{$_[1]} = $_[2];
2161     if ($_[1] eq $_[0]{slot}) {
2162       untie %{$_[0]{selfref}};
2163       %{$_[0]{selfref}} = %{$_[0]{data}};
2164     }
2165     $_[2];
2166   }
2167 }
2168
2169 sub pager {
2170   my ($self) = @_;
2171
2172   return $self->{pager} if $self->{pager};
2173
2174   my $attrs = $self->{attrs};
2175   if (!defined $attrs->{page}) {
2176     $self->throw_exception("Can't create pager for non-paged rs");
2177   }
2178   elsif ($attrs->{page} <= 0) {
2179     $self->throw_exception('Invalid page number (page-numbers are 1-based)');
2180   }
2181   $attrs->{rows} ||= 10;
2182
2183   # throw away the paging flags and re-run the count (possibly
2184   # with a subselect) to get the real total count
2185   my $count_attrs = { %$attrs };
2186   delete $count_attrs->{$_} for qw/rows offset page pager/;
2187   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
2188
2189
2190 ### the following may seem awkward and dirty, but it's a thought-experiment
2191 ### necessary for future development of DBIx::DS. Do *NOT* change this code
2192 ### before talking to ribasushi/mst
2193
2194   require Data::Page;
2195   my $pager = Data::Page->new(
2196     0,  #start with an empty set
2197     $attrs->{rows},
2198     $self->{attrs}{page},
2199   );
2200
2201   my $data_slot = 'total_entries';
2202
2203   # Since we are interested in a cached value (once it's set - it's set), every
2204   # technique will detach from the magic-host once the time comes to fire the
2205   # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
2206
2207   if ($] < 5.008003) {
2208     # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
2209     # to weakref the magic container :(
2210     # tested on 5.8.1
2211     tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
2212       { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
2213     );
2214   }
2215   elsif ($] < 5.010) {
2216     # We can use magic on the hash value slot. It's interesting that the magic is
2217     # attached to the hash-slot, and does *not* stop working once I do the dummy
2218     # assignments after the cast()
2219     # tested on 5.8.3 and 5.8.9
2220     my $magic = $mk_lazy_count_wizard->($total_rs);
2221     Variable::Magic::cast ( $pager->{$data_slot}, $magic );
2222
2223     # this is for fun and giggles
2224     $pager->{$data_slot} = -1;
2225     $pager->{$data_slot} = 0;
2226
2227     # this does not work for scalars, but works with
2228     # uvar magic below
2229     #my %vals = %$pager;
2230     #%$pager = ();
2231     #%{$pager} = %vals;
2232   }
2233   else {
2234     # And the uvar magic
2235     # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
2236     # however see the wizard maker for more notes
2237     my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
2238     Variable::Magic::cast ( %$pager, $magic );
2239
2240     # still works
2241     $pager->{$data_slot} = -1;
2242     $pager->{$data_slot} = 0;
2243
2244     # this now works
2245     my %vals = %$pager;
2246     %$pager = ();
2247     %{$pager} = %vals;
2248   }
2249
2250   return $self->{pager} = $pager;
2251 }
2252
2253 =head2 page
2254
2255 =over 4
2256
2257 =item Arguments: $page_number
2258
2259 =item Return Value: $rs
2260
2261 =back
2262
2263 Returns a resultset for the $page_number page of the resultset on which page
2264 is called, where each page contains a number of rows equal to the 'rows'
2265 attribute set on the resultset (10 by default).
2266
2267 =cut
2268
2269 sub page {
2270   my ($self, $page) = @_;
2271   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2272 }
2273
2274 =head2 new_result
2275
2276 =over 4
2277
2278 =item Arguments: \%vals
2279
2280 =item Return Value: $rowobject
2281
2282 =back
2283
2284 Creates a new row object in the resultset's result class and returns
2285 it. The row is not inserted into the database at this point, call
2286 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2287 will tell you whether the row object has been inserted or not.
2288
2289 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2290
2291 =cut
2292
2293 sub new_result {
2294   my ($self, $values) = @_;
2295   $self->throw_exception( "new_result needs a hash" )
2296     unless (ref $values eq 'HASH');
2297
2298   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2299
2300   my %new = (
2301     %$merged_cond,
2302     @$cols_from_relations
2303       ? (-cols_from_relations => $cols_from_relations)
2304       : (),
2305     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2306   );
2307
2308   return $self->result_class->new(\%new);
2309 }
2310
2311 # _merge_with_rscond
2312 #
2313 # Takes a simple hash of K/V data and returns its copy merged with the
2314 # condition already present on the resultset. Additionally returns an
2315 # arrayref of value/condition names, which were inferred from related
2316 # objects (this is needed for in-memory related objects)
2317 sub _merge_with_rscond {
2318   my ($self, $data) = @_;
2319
2320   my (%new_data, @cols_from_relations);
2321
2322   my $alias = $self->{attrs}{alias};
2323
2324   if (! defined $self->{cond}) {
2325     # just massage $data below
2326   }
2327   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2328     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2329     @cols_from_relations = keys %new_data;
2330   }
2331   elsif (ref $self->{cond} ne 'HASH') {
2332     $self->throw_exception(
2333       "Can't abstract implicit construct, resultset condition not a hash"
2334     );
2335   }
2336   else {
2337     # precendence must be given to passed values over values inherited from
2338     # the cond, so the order here is important.
2339     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2340     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2341
2342     while ( my($col, $value) = each %implied ) {
2343       my $vref = ref $value;
2344       if (
2345         $vref eq 'HASH'
2346           and
2347         keys(%$value) == 1
2348           and
2349         (keys %$value)[0] eq '='
2350       ) {
2351         $new_data{$col} = $value->{'='};
2352       }
2353       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2354         $new_data{$col} = $value;
2355       }
2356     }
2357   }
2358
2359   %new_data = (
2360     %new_data,
2361     %{ $self->_remove_alias($data, $alias) },
2362   );
2363
2364   return (\%new_data, \@cols_from_relations);
2365 }
2366
2367 # _has_resolved_attr
2368 #
2369 # determines if the resultset defines at least one
2370 # of the attributes supplied
2371 #
2372 # used to determine if a subquery is neccessary
2373 #
2374 # supports some virtual attributes:
2375 #   -join
2376 #     This will scan for any joins being present on the resultset.
2377 #     It is not a mere key-search but a deep inspection of {from}
2378 #
2379
2380 sub _has_resolved_attr {
2381   my ($self, @attr_names) = @_;
2382
2383   my $attrs = $self->_resolved_attrs;
2384
2385   my %extra_checks;
2386
2387   for my $n (@attr_names) {
2388     if (grep { $n eq $_ } (qw/-join/) ) {
2389       $extra_checks{$n}++;
2390       next;
2391     }
2392
2393     my $attr =  $attrs->{$n};
2394
2395     next if not defined $attr;
2396
2397     if (ref $attr eq 'HASH') {
2398       return 1 if keys %$attr;
2399     }
2400     elsif (ref $attr eq 'ARRAY') {
2401       return 1 if @$attr;
2402     }
2403     else {
2404       return 1 if $attr;
2405     }
2406   }
2407
2408   # a resolved join is expressed as a multi-level from
2409   return 1 if (
2410     $extra_checks{-join}
2411       and
2412     ref $attrs->{from} eq 'ARRAY'
2413       and
2414     @{$attrs->{from}} > 1
2415   );
2416
2417   return 0;
2418 }
2419
2420 # _collapse_cond
2421 #
2422 # Recursively collapse the condition.
2423
2424 sub _collapse_cond {
2425   my ($self, $cond, $collapsed) = @_;
2426
2427   $collapsed ||= {};
2428
2429   if (ref $cond eq 'ARRAY') {
2430     foreach my $subcond (@$cond) {
2431       next unless ref $subcond;  # -or
2432       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2433     }
2434   }
2435   elsif (ref $cond eq 'HASH') {
2436     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2437       foreach my $subcond (@{$cond->{-and}}) {
2438         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2439       }
2440     }
2441     else {
2442       foreach my $col (keys %$cond) {
2443         my $value = $cond->{$col};
2444         $collapsed->{$col} = $value;
2445       }
2446     }
2447   }
2448
2449   return $collapsed;
2450 }
2451
2452 # _remove_alias
2453 #
2454 # Remove the specified alias from the specified query hash. A copy is made so
2455 # the original query is not modified.
2456
2457 sub _remove_alias {
2458   my ($self, $query, $alias) = @_;
2459
2460   my %orig = %{ $query || {} };
2461   my %unaliased;
2462
2463   foreach my $key (keys %orig) {
2464     if ($key !~ /\./) {
2465       $unaliased{$key} = $orig{$key};
2466       next;
2467     }
2468     $unaliased{$1} = $orig{$key}
2469       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2470   }
2471
2472   return \%unaliased;
2473 }
2474
2475 =head2 as_query
2476
2477 =over 4
2478
2479 =item Arguments: none
2480
2481 =item Return Value: \[ $sql, @bind ]
2482
2483 =back
2484
2485 Returns the SQL query and bind vars associated with the invocant.
2486
2487 This is generally used as the RHS for a subquery.
2488
2489 =cut
2490
2491 sub as_query {
2492   my $self = shift;
2493
2494   my $attrs = $self->_resolved_attrs_copy;
2495
2496   # For future use:
2497   #
2498   # in list ctx:
2499   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2500   # $sql also has no wrapping parenthesis in list ctx
2501   #
2502   my $sqlbind = $self->result_source->storage
2503     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2504
2505   return $sqlbind;
2506 }
2507
2508 =head2 find_or_new
2509
2510 =over 4
2511
2512 =item Arguments: \%vals, \%attrs?
2513
2514 =item Return Value: $rowobject
2515
2516 =back
2517
2518   my $artist = $schema->resultset('Artist')->find_or_new(
2519     { artist => 'fred' }, { key => 'artists' });
2520
2521   $cd->cd_to_producer->find_or_new({ producer => $producer },
2522                                    { key => 'primary });
2523
2524 Find an existing record from this resultset using L</find>. if none exists,
2525 instantiate a new result object and return it. The object will not be saved
2526 into your storage until you call L<DBIx::Class::Row/insert> on it.
2527
2528 You most likely want this method when looking for existing rows using a unique
2529 constraint that is not the primary key, or looking for related rows.
2530
2531 If you want objects to be saved immediately, use L</find_or_create> instead.
2532
2533 B<Note>: Make sure to read the documentation of L</find> and understand the
2534 significance of the C<key> attribute, as its lack may skew your search, and
2535 subsequently result in spurious new objects.
2536
2537 B<Note>: Take care when using C<find_or_new> with a table having
2538 columns with default values that you intend to be automatically
2539 supplied by the database (e.g. an auto_increment primary key column).
2540 In normal usage, the value of such columns should NOT be included at
2541 all in the call to C<find_or_new>, even when set to C<undef>.
2542
2543 =cut
2544
2545 sub find_or_new {
2546   my $self     = shift;
2547   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2548   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2549   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2550     return $row;
2551   }
2552   return $self->new_result($hash);
2553 }
2554
2555 =head2 create
2556
2557 =over 4
2558
2559 =item Arguments: \%vals
2560
2561 =item Return Value: a L<DBIx::Class::Row> $object
2562
2563 =back
2564
2565 Attempt to create a single new row or a row with multiple related rows
2566 in the table represented by the resultset (and related tables). This
2567 will not check for duplicate rows before inserting, use
2568 L</find_or_create> to do that.
2569
2570 To create one row for this resultset, pass a hashref of key/value
2571 pairs representing the columns of the table and the values you wish to
2572 store. If the appropriate relationships are set up, foreign key fields
2573 can also be passed an object representing the foreign row, and the
2574 value will be set to its primary key.
2575
2576 To create related objects, pass a hashref of related-object column values
2577 B<keyed on the relationship name>. If the relationship is of type C<multi>
2578 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2579 The process will correctly identify columns holding foreign keys, and will
2580 transparently populate them from the keys of the corresponding relation.
2581 This can be applied recursively, and will work correctly for a structure
2582 with an arbitrary depth and width, as long as the relationships actually
2583 exists and the correct column data has been supplied.
2584
2585
2586 Instead of hashrefs of plain related data (key/value pairs), you may
2587 also pass new or inserted objects. New objects (not inserted yet, see
2588 L</new>), will be inserted into their appropriate tables.
2589
2590 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2591
2592 Example of creating a new row.
2593
2594   $person_rs->create({
2595     name=>"Some Person",
2596     email=>"somebody@someplace.com"
2597   });
2598
2599 Example of creating a new row and also creating rows in a related C<has_many>
2600 or C<has_one> resultset.  Note Arrayref.
2601
2602   $artist_rs->create(
2603      { artistid => 4, name => 'Manufactured Crap', cds => [
2604         { title => 'My First CD', year => 2006 },
2605         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2606       ],
2607      },
2608   );
2609
2610 Example of creating a new row and also creating a row in a related
2611 C<belongs_to> resultset. Note Hashref.
2612
2613   $cd_rs->create({
2614     title=>"Music for Silly Walks",
2615     year=>2000,
2616     artist => {
2617       name=>"Silly Musician",
2618     }
2619   });
2620
2621 =over
2622
2623 =item WARNING
2624
2625 When subclassing ResultSet never attempt to override this method. Since
2626 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2627 lot of the internals simply never call it, so your override will be
2628 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2629 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2630 L</create> process you need to intervene.
2631
2632 =back
2633
2634 =cut
2635
2636 sub create {
2637   my ($self, $attrs) = @_;
2638   $self->throw_exception( "create needs a hashref" )
2639     unless ref $attrs eq 'HASH';
2640   return $self->new_result($attrs)->insert;
2641 }
2642
2643 =head2 find_or_create
2644
2645 =over 4
2646
2647 =item Arguments: \%vals, \%attrs?
2648
2649 =item Return Value: $rowobject
2650
2651 =back
2652
2653   $cd->cd_to_producer->find_or_create({ producer => $producer },
2654                                       { key => 'primary' });
2655
2656 Tries to find a record based on its primary key or unique constraints; if none
2657 is found, creates one and returns that instead.
2658
2659   my $cd = $schema->resultset('CD')->find_or_create({
2660     cdid   => 5,
2661     artist => 'Massive Attack',
2662     title  => 'Mezzanine',
2663     year   => 2005,
2664   });
2665
2666 Also takes an optional C<key> attribute, to search by a specific key or unique
2667 constraint. For example:
2668
2669   my $cd = $schema->resultset('CD')->find_or_create(
2670     {
2671       artist => 'Massive Attack',
2672       title  => 'Mezzanine',
2673     },
2674     { key => 'cd_artist_title' }
2675   );
2676
2677 B<Note>: Make sure to read the documentation of L</find> and understand the
2678 significance of the C<key> attribute, as its lack may skew your search, and
2679 subsequently result in spurious row creation.
2680
2681 B<Note>: Because find_or_create() reads from the database and then
2682 possibly inserts based on the result, this method is subject to a race
2683 condition. Another process could create a record in the table after
2684 the find has completed and before the create has started. To avoid
2685 this problem, use find_or_create() inside a transaction.
2686
2687 B<Note>: Take care when using C<find_or_create> with a table having
2688 columns with default values that you intend to be automatically
2689 supplied by the database (e.g. an auto_increment primary key column).
2690 In normal usage, the value of such columns should NOT be included at
2691 all in the call to C<find_or_create>, even when set to C<undef>.
2692
2693 See also L</find> and L</update_or_create>. For information on how to declare
2694 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2695
2696 =cut
2697
2698 sub find_or_create {
2699   my $self     = shift;
2700   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2701   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2702   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2703     return $row;
2704   }
2705   return $self->create($hash);
2706 }
2707
2708 =head2 update_or_create
2709
2710 =over 4
2711
2712 =item Arguments: \%col_values, { key => $unique_constraint }?
2713
2714 =item Return Value: $row_object
2715
2716 =back
2717
2718   $resultset->update_or_create({ col => $val, ... });
2719
2720 Like L</find_or_create>, but if a row is found it is immediately updated via
2721 C<< $found_row->update (\%col_values) >>.
2722
2723
2724 Takes an optional C<key> attribute to search on a specific unique constraint.
2725 For example:
2726
2727   # In your application
2728   my $cd = $schema->resultset('CD')->update_or_create(
2729     {
2730       artist => 'Massive Attack',
2731       title  => 'Mezzanine',
2732       year   => 1998,
2733     },
2734     { key => 'cd_artist_title' }
2735   );
2736
2737   $cd->cd_to_producer->update_or_create({
2738     producer => $producer,
2739     name => 'harry',
2740   }, {
2741     key => 'primary',
2742   });
2743
2744 B<Note>: Make sure to read the documentation of L</find> and understand the
2745 significance of the C<key> attribute, as its lack may skew your search, and
2746 subsequently result in spurious row creation.
2747
2748 B<Note>: Take care when using C<update_or_create> with a table having
2749 columns with default values that you intend to be automatically
2750 supplied by the database (e.g. an auto_increment primary key column).
2751 In normal usage, the value of such columns should NOT be included at
2752 all in the call to C<update_or_create>, even when set to C<undef>.
2753
2754 See also L</find> and L</find_or_create>. For information on how to declare
2755 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2756
2757 =cut
2758
2759 sub update_or_create {
2760   my $self = shift;
2761   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2762   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2763
2764   my $row = $self->find($cond, $attrs);
2765   if (defined $row) {
2766     $row->update($cond);
2767     return $row;
2768   }
2769
2770   return $self->create($cond);
2771 }
2772
2773 =head2 update_or_new
2774
2775 =over 4
2776
2777 =item Arguments: \%col_values, { key => $unique_constraint }?
2778
2779 =item Return Value: $rowobject
2780
2781 =back
2782
2783   $resultset->update_or_new({ col => $val, ... });
2784
2785 Like L</find_or_new> but if a row is found it is immediately updated via
2786 C<< $found_row->update (\%col_values) >>.
2787
2788 For example:
2789
2790   # In your application
2791   my $cd = $schema->resultset('CD')->update_or_new(
2792     {
2793       artist => 'Massive Attack',
2794       title  => 'Mezzanine',
2795       year   => 1998,
2796     },
2797     { key => 'cd_artist_title' }
2798   );
2799
2800   if ($cd->in_storage) {
2801       # the cd was updated
2802   }
2803   else {
2804       # the cd is not yet in the database, let's insert it
2805       $cd->insert;
2806   }
2807
2808 B<Note>: Make sure to read the documentation of L</find> and understand the
2809 significance of the C<key> attribute, as its lack may skew your search, and
2810 subsequently result in spurious new objects.
2811
2812 B<Note>: Take care when using C<update_or_new> with a table having
2813 columns with default values that you intend to be automatically
2814 supplied by the database (e.g. an auto_increment primary key column).
2815 In normal usage, the value of such columns should NOT be included at
2816 all in the call to C<update_or_new>, even when set to C<undef>.
2817
2818 See also L</find>, L</find_or_create> and L</find_or_new>. 
2819
2820 =cut
2821
2822 sub update_or_new {
2823     my $self  = shift;
2824     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2825     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2826
2827     my $row = $self->find( $cond, $attrs );
2828     if ( defined $row ) {
2829         $row->update($cond);
2830         return $row;
2831     }
2832
2833     return $self->new_result($cond);
2834 }
2835
2836 =head2 get_cache
2837
2838 =over 4
2839
2840 =item Arguments: none
2841
2842 =item Return Value: \@cache_objects | undef
2843
2844 =back
2845
2846 Gets the contents of the cache for the resultset, if the cache is set.
2847
2848 The cache is populated either by using the L</prefetch> attribute to
2849 L</search> or by calling L</set_cache>.
2850
2851 =cut
2852
2853 sub get_cache {
2854   shift->{all_cache};
2855 }
2856
2857 =head2 set_cache
2858
2859 =over 4
2860
2861 =item Arguments: \@cache_objects
2862
2863 =item Return Value: \@cache_objects
2864
2865 =back
2866
2867 Sets the contents of the cache for the resultset. Expects an arrayref
2868 of objects of the same class as those produced by the resultset. Note that
2869 if the cache is set the resultset will return the cached objects rather
2870 than re-querying the database even if the cache attr is not set.
2871
2872 The contents of the cache can also be populated by using the
2873 L</prefetch> attribute to L</search>.
2874
2875 =cut
2876
2877 sub set_cache {
2878   my ( $self, $data ) = @_;
2879   $self->throw_exception("set_cache requires an arrayref")
2880       if defined($data) && (ref $data ne 'ARRAY');
2881   $self->{all_cache} = $data;
2882 }
2883
2884 =head2 clear_cache
2885
2886 =over 4
2887
2888 =item Arguments: none
2889
2890 =item Return Value: undef
2891
2892 =back
2893
2894 Clears the cache for the resultset.
2895
2896 =cut
2897
2898 sub clear_cache {
2899   shift->set_cache(undef);
2900 }
2901
2902 =head2 is_paged
2903
2904 =over 4
2905
2906 =item Arguments: none
2907
2908 =item Return Value: true, if the resultset has been paginated
2909
2910 =back
2911
2912 =cut
2913
2914 sub is_paged {
2915   my ($self) = @_;
2916   return !!$self->{attrs}{page};
2917 }
2918
2919 =head2 is_ordered
2920
2921 =over 4
2922
2923 =item Arguments: none
2924
2925 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2926
2927 =back
2928
2929 =cut
2930
2931 sub is_ordered {
2932   my ($self) = @_;
2933   return scalar $self->result_source->storage->_extract_order_criteria($self->{attrs}{order_by});
2934 }
2935
2936 =head2 related_resultset
2937
2938 =over 4
2939
2940 =item Arguments: $relationship_name
2941
2942 =item Return Value: $resultset
2943
2944 =back
2945
2946 Returns a related resultset for the supplied relationship name.
2947
2948   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2949
2950 =cut
2951
2952 sub related_resultset {
2953   my ($self, $rel) = @_;
2954
2955   $self->{related_resultsets} ||= {};
2956   return $self->{related_resultsets}{$rel} ||= do {
2957     my $rsrc = $self->result_source;
2958     my $rel_info = $rsrc->relationship_info($rel);
2959
2960     $self->throw_exception(
2961       "search_related: result source '" . $rsrc->source_name .
2962         "' has no such relationship $rel")
2963       unless $rel_info;
2964
2965     my $attrs = $self->_chain_relationship($rel);
2966
2967     my $join_count = $attrs->{seen_join}{$rel};
2968
2969     my $alias = $self->result_source->storage
2970         ->relname_to_table_alias($rel, $join_count);
2971
2972     # since this is search_related, and we already slid the select window inwards
2973     # (the select/as attrs were deleted in the beginning), we need to flip all
2974     # left joins to inner, so we get the expected results
2975     # read the comment on top of the actual function to see what this does
2976     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
2977
2978
2979     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2980     delete @{$attrs}{qw(result_class alias)};
2981
2982     my $new_cache;
2983
2984     if (my $cache = $self->get_cache) {
2985       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2986         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2987                         @$cache ];
2988       }
2989     }
2990
2991     my $rel_source = $rsrc->related_source($rel);
2992
2993     my $new = do {
2994
2995       # The reason we do this now instead of passing the alias to the
2996       # search_rs below is that if you wrap/overload resultset on the
2997       # source you need to know what alias it's -going- to have for things
2998       # to work sanely (e.g. RestrictWithObject wants to be able to add
2999       # extra query restrictions, and these may need to be $alias.)
3000
3001       my $rel_attrs = $rel_source->resultset_attributes;
3002       local $rel_attrs->{alias} = $alias;
3003
3004       $rel_source->resultset
3005                  ->search_rs(
3006                      undef, {
3007                        %$attrs,
3008                        where => $attrs->{where},
3009                    });
3010     };
3011     $new->set_cache($new_cache) if $new_cache;
3012     $new;
3013   };
3014 }
3015
3016 =head2 current_source_alias
3017
3018 =over 4
3019
3020 =item Arguments: none
3021
3022 =item Return Value: $source_alias
3023
3024 =back
3025
3026 Returns the current table alias for the result source this resultset is built
3027 on, that will be used in the SQL query. Usually it is C<me>.
3028
3029 Currently the source alias that refers to the result set returned by a
3030 L</search>/L</find> family method depends on how you got to the resultset: it's
3031 C<me> by default, but eg. L</search_related> aliases it to the related result
3032 source name (and keeps C<me> referring to the original result set). The long
3033 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
3034 (and make this method unnecessary).
3035
3036 Thus it's currently necessary to use this method in predefined queries (see
3037 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
3038 source alias of the current result set:
3039
3040   # in a result set class
3041   sub modified_by {
3042     my ($self, $user) = @_;
3043
3044     my $me = $self->current_source_alias;
3045
3046     return $self->search(
3047       "$me.modified" => $user->id,
3048     );
3049   }
3050
3051 =cut
3052
3053 sub current_source_alias {
3054   my ($self) = @_;
3055
3056   return ($self->{attrs} || {})->{alias} || 'me';
3057 }
3058
3059 =head2 as_subselect_rs
3060
3061 =over 4
3062
3063 =item Arguments: none
3064
3065 =item Return Value: $resultset
3066
3067 =back
3068
3069 Act as a barrier to SQL symbols.  The resultset provided will be made into a
3070 "virtual view" by including it as a subquery within the from clause.  From this
3071 point on, any joined tables are inaccessible to ->search on the resultset (as if
3072 it were simply where-filtered without joins).  For example:
3073
3074  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
3075
3076  # 'x' now pollutes the query namespace
3077
3078  # So the following works as expected
3079  my $ok_rs = $rs->search({'x.other' => 1});
3080
3081  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
3082  # def) we look for one row with contradictory terms and join in another table
3083  # (aliased 'x_2') which we never use
3084  my $broken_rs = $rs->search({'x.name' => 'def'});
3085
3086  my $rs2 = $rs->as_subselect_rs;
3087
3088  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
3089  my $not_joined_rs = $rs2->search({'x.other' => 1});
3090
3091  # works as expected: finds a 'table' row related to two x rows (abc and def)
3092  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
3093
3094 Another example of when one might use this would be to select a subset of
3095 columns in a group by clause:
3096
3097  my $rs = $schema->resultset('Bar')->search(undef, {
3098    group_by => [qw{ id foo_id baz_id }],
3099  })->as_subselect_rs->search(undef, {
3100    columns => [qw{ id foo_id }]
3101  });
3102
3103 In the above example normally columns would have to be equal to the group by,
3104 but because we isolated the group by into a subselect the above works.
3105
3106 =cut
3107
3108 sub as_subselect_rs {
3109   my $self = shift;
3110
3111   my $attrs = $self->_resolved_attrs;
3112
3113   my $fresh_rs = (ref $self)->new (
3114     $self->result_source
3115   );
3116
3117   # these pieces will be locked in the subquery
3118   delete $fresh_rs->{cond};
3119   delete @{$fresh_rs->{attrs}}{qw/where bind/};
3120
3121   return $fresh_rs->search( {}, {
3122     from => [{
3123       $attrs->{alias} => $self->as_query,
3124       -alias  => $attrs->{alias},
3125       -rsrc   => $self->result_source,
3126     }],
3127     alias => $attrs->{alias},
3128   });
3129 }
3130
3131 # This code is called by search_related, and makes sure there
3132 # is clear separation between the joins before, during, and
3133 # after the relationship. This information is needed later
3134 # in order to properly resolve prefetch aliases (any alias
3135 # with a relation_chain_depth less than the depth of the
3136 # current prefetch is not considered)
3137 #
3138 # The increments happen twice per join. An even number means a
3139 # relationship specified via a search_related, whereas an odd
3140 # number indicates a join/prefetch added via attributes
3141 #
3142 # Also this code will wrap the current resultset (the one we
3143 # chain to) in a subselect IFF it contains limiting attributes
3144 sub _chain_relationship {
3145   my ($self, $rel) = @_;
3146   my $source = $self->result_source;
3147   my $attrs = { %{$self->{attrs}||{}} };
3148
3149   # we need to take the prefetch the attrs into account before we
3150   # ->_resolve_join as otherwise they get lost - captainL
3151   my $join = $self->_merge_joinpref_attr( $attrs->{join}, $attrs->{prefetch} );
3152
3153   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
3154
3155   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
3156
3157   my $from;
3158   my @force_subq_attrs = qw/offset rows group_by having/;
3159
3160   if (
3161     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
3162       ||
3163     $self->_has_resolved_attr (@force_subq_attrs)
3164   ) {
3165     # Nuke the prefetch (if any) before the new $rs attrs
3166     # are resolved (prefetch is useless - we are wrapping
3167     # a subquery anyway).
3168     my $rs_copy = $self->search;
3169     $rs_copy->{attrs}{join} = $self->_merge_joinpref_attr (
3170       $rs_copy->{attrs}{join},
3171       delete $rs_copy->{attrs}{prefetch},
3172     );
3173
3174     $from = [{
3175       -rsrc   => $source,
3176       -alias  => $attrs->{alias},
3177       $attrs->{alias} => $rs_copy->as_query,
3178     }];
3179     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
3180     $seen->{-relation_chain_depth} = 0;
3181   }
3182   elsif ($attrs->{from}) {  #shallow copy suffices
3183     $from = [ @{$attrs->{from}} ];
3184   }
3185   else {
3186     $from = [{
3187       -rsrc  => $source,
3188       -alias => $attrs->{alias},
3189       $attrs->{alias} => $source->from,
3190     }];
3191   }
3192
3193   my $jpath = ($seen->{-relation_chain_depth})
3194     ? $from->[-1][0]{-join_path}
3195     : [];
3196
3197   my @requested_joins = $source->_resolve_join(
3198     $join,
3199     $attrs->{alias},
3200     $seen,
3201     $jpath,
3202   );
3203
3204   push @$from, @requested_joins;
3205
3206   $seen->{-relation_chain_depth}++;
3207
3208   # if $self already had a join/prefetch specified on it, the requested
3209   # $rel might very well be already included. What we do in this case
3210   # is effectively a no-op (except that we bump up the chain_depth on
3211   # the join in question so we could tell it *is* the search_related)
3212   my $already_joined;
3213
3214   # we consider the last one thus reverse
3215   for my $j (reverse @requested_joins) {
3216     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
3217     if ($rel eq $last_j) {
3218       $j->[0]{-relation_chain_depth}++;
3219       $already_joined++;
3220       last;
3221     }
3222   }
3223
3224   unless ($already_joined) {
3225     push @$from, $source->_resolve_join(
3226       $rel,
3227       $attrs->{alias},
3228       $seen,
3229       $jpath,
3230     );
3231   }
3232
3233   $seen->{-relation_chain_depth}++;
3234
3235   return {%$attrs, from => $from, seen_join => $seen};
3236 }
3237
3238 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
3239 sub _resolved_attrs_copy {
3240   my $self = shift;
3241   return { %{$self->_resolved_attrs (@_)} };
3242 }
3243
3244 sub _resolved_attrs {
3245   my $self = shift;
3246   return $self->{_attrs} if $self->{_attrs};
3247
3248   my $attrs  = { %{ $self->{attrs} || {} } };
3249   my $source = $self->result_source;
3250   my $alias  = $attrs->{alias};
3251
3252   # default selection list
3253   $attrs->{columns} = [ $source->columns ]
3254     unless List::Util::first { exists $attrs->{$_} } qw/columns cols select as/;
3255
3256   # merge selectors together
3257   for (qw/columns select as/) {
3258     $attrs->{$_} = $self->_merge_attr($attrs->{$_}, delete $attrs->{"+$_"})
3259       if $attrs->{$_} or $attrs->{"+$_"};
3260   }
3261
3262   # disassemble columns
3263   my (@sel, @as);
3264   if (my $cols = delete $attrs->{columns}) {
3265     for my $c (ref $cols eq 'ARRAY' ? @$cols : $cols) {
3266       if (ref $c eq 'HASH') {
3267         for my $as (keys %$c) {
3268           push @sel, $c->{$as};
3269           push @as, $as;
3270         }
3271       }
3272       else {
3273         push @sel, $c;
3274         push @as, $c;
3275       }
3276     }
3277   }
3278
3279   # when trying to weed off duplicates later do not go past this point -
3280   # everything added from here on is unbalanced "anyone's guess" stuff
3281   my $dedup_stop_idx = $#as;
3282
3283   push @as, @{ ref $attrs->{as} eq 'ARRAY' ? $attrs->{as} : [ $attrs->{as} ] }
3284     if $attrs->{as};
3285   push @sel, @{ ref $attrs->{select} eq 'ARRAY' ? $attrs->{select} : [ $attrs->{select} ] }
3286     if $attrs->{select};
3287
3288   # assume all unqualified selectors to apply to the current alias (legacy stuff)
3289   for (@sel) {
3290     $_ = (ref $_ or $_ =~ /\./) ? $_ : "$alias.$_";
3291   }
3292
3293   # disqualify all $alias.col as-bits (collapser mandated)
3294   for (@as) {
3295     $_ = ($_ =~ /^\Q$alias.\E(.+)$/) ? $1 : $_;
3296   }
3297
3298   # de-duplicate the result (remove *identical* select/as pairs)
3299   # and also die on duplicate {as} pointing to different {select}s
3300   # not using a c-style for as the condition is prone to shrinkage
3301   my $seen;
3302   my $i = 0;
3303   while ($i <= $dedup_stop_idx) {
3304     if ($seen->{"$sel[$i] \x00\x00 $as[$i]"}++) {
3305       splice @sel, $i, 1;
3306       splice @as, $i, 1;
3307       $dedup_stop_idx--;
3308     }
3309     elsif ($seen->{$as[$i]}++) {
3310       $self->throw_exception(
3311         "inflate_result() alias '$as[$i]' specified twice with different SQL-side {select}-ors"
3312       );
3313     }
3314     else {
3315       $i++;
3316     }
3317   }
3318
3319   $attrs->{select} = \@sel;
3320   $attrs->{as} = \@as;
3321
3322   $attrs->{from} ||= [{
3323     -rsrc   => $source,
3324     -alias  => $self->{attrs}{alias},
3325     $self->{attrs}{alias} => $source->from,
3326   }];
3327
3328   if ( $attrs->{join} || $attrs->{prefetch} ) {
3329
3330     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3331       if ref $attrs->{from} ne 'ARRAY';
3332
3333     my $join = (delete $attrs->{join}) || {};
3334
3335     if ( defined $attrs->{prefetch} ) {
3336       $join = $self->_merge_joinpref_attr( $join, $attrs->{prefetch} );
3337     }
3338
3339     $attrs->{from} =    # have to copy here to avoid corrupting the original
3340       [
3341         @{ $attrs->{from} },
3342         $source->_resolve_join(
3343           $join,
3344           $alias,
3345           { %{ $attrs->{seen_join} || {} } },
3346           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3347             ? $attrs->{from}[-1][0]{-join_path}
3348             : []
3349           ,
3350         )
3351       ];
3352   }
3353
3354   if ( defined $attrs->{order_by} ) {
3355     $attrs->{order_by} = (
3356       ref( $attrs->{order_by} ) eq 'ARRAY'
3357       ? [ @{ $attrs->{order_by} } ]
3358       : [ $attrs->{order_by} || () ]
3359     );
3360   }
3361
3362   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3363     $attrs->{group_by} = [ $attrs->{group_by} ];
3364   }
3365
3366   # generate the distinct induced group_by early, as prefetch will be carried via a
3367   # subquery (since a group_by is present)
3368   if (delete $attrs->{distinct}) {
3369     if ($attrs->{group_by}) {
3370       carp_unique ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3371     }
3372     else {
3373       # distinct affects only the main selection part, not what prefetch may
3374       # add below.
3375       $attrs->{group_by} = $source->storage->_group_over_selection (
3376         $attrs->{from},
3377         $attrs->{select},
3378         $attrs->{order_by},
3379       );
3380     }
3381   }
3382
3383   $attrs->{collapse} ||= {};
3384   if ($attrs->{prefetch}) {
3385
3386     $self->throw_exception("Unable to prefetch, resultset contains an unnamed selector $attrs->{_dark_selector}{string}")
3387       if $attrs->{_dark_selector};
3388
3389     my $prefetch = $self->_merge_joinpref_attr( {}, delete $attrs->{prefetch} );
3390
3391     my $prefetch_ordering = [];
3392
3393     # this is a separate structure (we don't look in {from} directly)
3394     # as the resolver needs to shift things off the lists to work
3395     # properly (identical-prefetches on different branches)
3396     my $join_map = {};
3397     if (ref $attrs->{from} eq 'ARRAY') {
3398
3399       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3400
3401       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3402         next unless $j->[0]{-alias};
3403         next unless $j->[0]{-join_path};
3404         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3405
3406         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3407
3408         my $p = $join_map;
3409         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3410         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3411       }
3412     }
3413
3414     my @prefetch =
3415       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3416
3417     # we need to somehow mark which columns came from prefetch
3418     if (@prefetch) {
3419       my $sel_end = $#{$attrs->{select}};
3420       $attrs->{_prefetch_selector_range} = [ $sel_end + 1, $sel_end + @prefetch ];
3421     }
3422
3423     push @{ $attrs->{select} }, (map { $_->[0] } @prefetch);
3424     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3425
3426     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3427     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3428   }
3429
3430
3431   # if both page and offset are specified, produce a combined offset
3432   # even though it doesn't make much sense, this is what pre 081xx has
3433   # been doing
3434   if (my $page = delete $attrs->{page}) {
3435     $attrs->{offset} =
3436       ($attrs->{rows} * ($page - 1))
3437             +
3438       ($attrs->{offset} || 0)
3439     ;
3440   }
3441
3442   return $self->{_attrs} = $attrs;
3443 }
3444
3445 sub _rollout_attr {
3446   my ($self, $attr) = @_;
3447
3448   if (ref $attr eq 'HASH') {
3449     return $self->_rollout_hash($attr);
3450   } elsif (ref $attr eq 'ARRAY') {
3451     return $self->_rollout_array($attr);
3452   } else {
3453     return [$attr];
3454   }
3455 }
3456
3457 sub _rollout_array {
3458   my ($self, $attr) = @_;
3459
3460   my @rolled_array;
3461   foreach my $element (@{$attr}) {
3462     if (ref $element eq 'HASH') {
3463       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3464     } elsif (ref $element eq 'ARRAY') {
3465       #  XXX - should probably recurse here
3466       push( @rolled_array, @{$self->_rollout_array($element)} );
3467     } else {
3468       push( @rolled_array, $element );
3469     }
3470   }
3471   return \@rolled_array;
3472 }
3473
3474 sub _rollout_hash {
3475   my ($self, $attr) = @_;
3476
3477   my @rolled_array;
3478   foreach my $key (keys %{$attr}) {
3479     push( @rolled_array, { $key => $attr->{$key} } );
3480   }
3481   return \@rolled_array;
3482 }
3483
3484 sub _calculate_score {
3485   my ($self, $a, $b) = @_;
3486
3487   if (defined $a xor defined $b) {
3488     return 0;
3489   }
3490   elsif (not defined $a) {
3491     return 1;
3492   }
3493
3494   if (ref $b eq 'HASH') {
3495     my ($b_key) = keys %{$b};
3496     if (ref $a eq 'HASH') {
3497       my ($a_key) = keys %{$a};
3498       if ($a_key eq $b_key) {
3499         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3500       } else {
3501         return 0;
3502       }
3503     } else {
3504       return ($a eq $b_key) ? 1 : 0;
3505     }
3506   } else {
3507     if (ref $a eq 'HASH') {
3508       my ($a_key) = keys %{$a};
3509       return ($b eq $a_key) ? 1 : 0;
3510     } else {
3511       return ($b eq $a) ? 1 : 0;
3512     }
3513   }
3514 }
3515
3516 sub _merge_joinpref_attr {
3517   my ($self, $orig, $import) = @_;
3518
3519   return $import unless defined($orig);
3520   return $orig unless defined($import);
3521
3522   $orig = $self->_rollout_attr($orig);
3523   $import = $self->_rollout_attr($import);
3524
3525   my $seen_keys;
3526   foreach my $import_element ( @{$import} ) {
3527     # find best candidate from $orig to merge $b_element into
3528     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3529     foreach my $orig_element ( @{$orig} ) {
3530       my $score = $self->_calculate_score( $orig_element, $import_element );
3531       if ($score > $best_candidate->{score}) {
3532         $best_candidate->{position} = $position;
3533         $best_candidate->{score} = $score;
3534       }
3535       $position++;
3536     }
3537     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3538
3539     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3540       push( @{$orig}, $import_element );
3541     } else {
3542       my $orig_best = $orig->[$best_candidate->{position}];
3543       # merge orig_best and b_element together and replace original with merged
3544       if (ref $orig_best ne 'HASH') {
3545         $orig->[$best_candidate->{position}] = $import_element;
3546       } elsif (ref $import_element eq 'HASH') {
3547         my ($key) = keys %{$orig_best};
3548         $orig->[$best_candidate->{position}] = { $key => $self->_merge_joinpref_attr($orig_best->{$key}, $import_element->{$key}) };
3549       }
3550     }
3551     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3552   }
3553
3554   return $orig;
3555 }
3556
3557 {
3558   my $hm;
3559
3560   sub _merge_attr {
3561     $hm ||= do {
3562       require Hash::Merge;
3563       my $hm = Hash::Merge->new;
3564
3565       $hm->specify_behavior({
3566         SCALAR => {
3567           SCALAR => sub {
3568             my ($defl, $defr) = map { defined $_ } (@_[0,1]);
3569
3570             if ($defl xor $defr) {
3571               return [ $defl ? $_[0] : $_[1] ];
3572             }
3573             elsif (! $defl) {
3574               return [];
3575             }
3576             elsif (__HM_DEDUP and $_[0] eq $_[1]) {
3577               return [ $_[0] ];
3578             }
3579             else {
3580               return [$_[0], $_[1]];
3581             }
3582           },
3583           ARRAY => sub {
3584             return $_[1] if !defined $_[0];
3585             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3586             return [$_[0], @{$_[1]}]
3587           },
3588           HASH  => sub {
3589             return [] if !defined $_[0] and !keys %{$_[1]};
3590             return [ $_[1] ] if !defined $_[0];
3591             return [ $_[0] ] if !keys %{$_[1]};
3592             return [$_[0], $_[1]]
3593           },
3594         },
3595         ARRAY => {
3596           SCALAR => sub {
3597             return $_[0] if !defined $_[1];
3598             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3599             return [@{$_[0]}, $_[1]]
3600           },
3601           ARRAY => sub {
3602             my @ret = @{$_[0]} or return $_[1];
3603             return [ @ret, @{$_[1]} ] unless __HM_DEDUP;
3604             my %idx = map { $_ => 1 } @ret;
3605             push @ret, grep { ! defined $idx{$_} } (@{$_[1]});
3606             \@ret;
3607           },
3608           HASH => sub {
3609             return [ $_[1] ] if ! @{$_[0]};
3610             return $_[0] if !keys %{$_[1]};
3611             return $_[0] if __HM_DEDUP and List::Util::first { $_ eq $_[1] } @{$_[0]};
3612             return [ @{$_[0]}, $_[1] ];
3613           },
3614         },
3615         HASH => {
3616           SCALAR => sub {
3617             return [] if !keys %{$_[0]} and !defined $_[1];
3618             return [ $_[0] ] if !defined $_[1];
3619             return [ $_[1] ] if !keys %{$_[0]};
3620             return [$_[0], $_[1]]
3621           },
3622           ARRAY => sub {
3623             return [] if !keys %{$_[0]} and !@{$_[1]};
3624             return [ $_[0] ] if !@{$_[1]};
3625             return $_[1] if !keys %{$_[0]};
3626             return $_[1] if __HM_DEDUP and List::Util::first { $_ eq $_[0] } @{$_[1]};
3627             return [ $_[0], @{$_[1]} ];
3628           },
3629           HASH => sub {
3630             return [] if !keys %{$_[0]} and !keys %{$_[1]};
3631             return [ $_[0] ] if !keys %{$_[1]};
3632             return [ $_[1] ] if !keys %{$_[0]};
3633             return [ $_[0] ] if $_[0] eq $_[1];
3634             return [ $_[0], $_[1] ];
3635           },
3636         }
3637       } => 'DBIC_RS_ATTR_MERGER');
3638       $hm;
3639     };
3640
3641     return $hm->merge ($_[1], $_[2]);
3642   }
3643 }
3644
3645 sub STORABLE_freeze {
3646   my ($self, $cloning) = @_;
3647   my $to_serialize = { %$self };
3648
3649   # A cursor in progress can't be serialized (and would make little sense anyway)
3650   delete $to_serialize->{cursor};
3651
3652   Storable::nfreeze($to_serialize);
3653 }
3654
3655 # need this hook for symmetry
3656 sub STORABLE_thaw {
3657   my ($self, $cloning, $serialized) = @_;
3658
3659   %$self = %{ Storable::thaw($serialized) };
3660
3661   $self;
3662 }
3663
3664
3665 =head2 throw_exception
3666
3667 See L<DBIx::Class::Schema/throw_exception> for details.
3668
3669 =cut
3670
3671 sub throw_exception {
3672   my $self=shift;
3673
3674   if (ref $self and my $rsrc = $self->result_source) {
3675     $rsrc->throw_exception(@_)
3676   }
3677   else {
3678     DBIx::Class::Exception->throw(@_);
3679   }
3680 }
3681
3682 # XXX: FIXME: Attributes docs need clearing up
3683
3684 =head1 ATTRIBUTES
3685
3686 Attributes are used to refine a ResultSet in various ways when
3687 searching for data. They can be passed to any method which takes an
3688 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3689 L</count>.
3690
3691 These are in no particular order:
3692
3693 =head2 order_by
3694
3695 =over 4
3696
3697 =item Value: ( $order_by | \@order_by | \%order_by )
3698
3699 =back
3700
3701 Which column(s) to order the results by.
3702
3703 [The full list of suitable values is documented in
3704 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3705 common options.]
3706
3707 If a single column name, or an arrayref of names is supplied, the
3708 argument is passed through directly to SQL. The hashref syntax allows
3709 for connection-agnostic specification of ordering direction:
3710
3711  For descending order:
3712
3713   order_by => { -desc => [qw/col1 col2 col3/] }
3714
3715  For explicit ascending order:
3716
3717   order_by => { -asc => 'col' }
3718
3719 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3720 supported, although you are strongly encouraged to use the hashref
3721 syntax as outlined above.
3722
3723 =head2 columns
3724
3725 =over 4
3726
3727 =item Value: \@columns
3728
3729 =back
3730
3731 Shortcut to request a particular set of columns to be retrieved. Each
3732 column spec may be a string (a table column name), or a hash (in which
3733 case the key is the C<as> value, and the value is used as the C<select>
3734 expression). Adds C<me.> onto the start of any column without a C<.> in
3735 it and sets C<select> from that, then auto-populates C<as> from
3736 C<select> as normal. (You may also use the C<cols> attribute, as in
3737 earlier versions of DBIC.)
3738
3739 Essentially C<columns> does the same as L</select> and L</as>.
3740
3741     columns => [ 'foo', { bar => 'baz' } ]
3742
3743 is the same as
3744
3745     select => [qw/foo baz/],
3746     as => [qw/foo bar/]
3747
3748 =head2 +columns
3749
3750 =over 4
3751
3752 =item Value: \@columns
3753
3754 =back
3755
3756 Indicates additional columns to be selected from storage. Works the same
3757 as L</columns> but adds columns to the selection. (You may also use the
3758 C<include_columns> attribute, as in earlier versions of DBIC). For
3759 example:-
3760
3761   $schema->resultset('CD')->search(undef, {
3762     '+columns' => ['artist.name'],
3763     join => ['artist']
3764   });
3765
3766 would return all CDs and include a 'name' column to the information
3767 passed to object inflation. Note that the 'artist' is the name of the
3768 column (or relationship) accessor, and 'name' is the name of the column
3769 accessor in the related table.
3770
3771 B<NOTE:> You need to explicitly quote '+columns' when defining the attribute.
3772 Not doing so causes Perl to incorrectly interpret +columns as a bareword with a
3773 unary plus operator before it.
3774
3775 =head2 include_columns
3776
3777 =over 4
3778
3779 =item Value: \@columns
3780
3781 =back
3782
3783 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3784
3785 =head2 select
3786
3787 =over 4
3788
3789 =item Value: \@select_columns
3790
3791 =back
3792
3793 Indicates which columns should be selected from the storage. You can use
3794 column names, or in the case of RDBMS back ends, function or stored procedure
3795 names:
3796
3797   $rs = $schema->resultset('Employee')->search(undef, {
3798     select => [
3799       'name',
3800       { count => 'employeeid' },
3801       { max => { length => 'name' }, -as => 'longest_name' }
3802     ]
3803   });
3804
3805   # Equivalent SQL
3806   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3807
3808 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3809 use L</select>, to instruct DBIx::Class how to store the result of the column.
3810 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3811 identifier aliasing. You can however alias a function, so you can use it in
3812 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3813 attribute> supplied as shown in the example above.
3814
3815 B<NOTE:> You need to explicitly quote '+select'/'+as' when defining the attributes.
3816 Not doing so causes Perl to incorrectly interpret them as a bareword with a
3817 unary plus operator before it.
3818
3819 =head2 +select
3820
3821 =over 4
3822
3823 Indicates additional columns to be selected from storage.  Works the same as
3824 L</select> but adds columns to the default selection, instead of specifying
3825 an explicit list.
3826
3827 =back
3828
3829 =head2 +as
3830
3831 =over 4
3832
3833 Indicates additional column names for those added via L</+select>. See L</as>.
3834
3835 =back
3836
3837 =head2 as
3838
3839 =over 4
3840
3841 =item Value: \@inflation_names
3842
3843 =back
3844
3845 Indicates column names for object inflation. That is L</as> indicates the
3846 slot name in which the column value will be stored within the
3847 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3848 identifier by the C<get_column> method (or via the object accessor B<if one
3849 with the same name already exists>) as shown below. The L</as> attribute has
3850 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3851
3852   $rs = $schema->resultset('Employee')->search(undef, {
3853     select => [
3854       'name',
3855       { count => 'employeeid' },
3856       { max => { length => 'name' }, -as => 'longest_name' }
3857     ],
3858     as => [qw/
3859       name
3860       employee_count
3861       max_name_length
3862     /],
3863   });
3864
3865 If the object against which the search is performed already has an accessor
3866 matching a column name specified in C<as>, the value can be retrieved using
3867 the accessor as normal:
3868
3869   my $name = $employee->name();
3870
3871 If on the other hand an accessor does not exist in the object, you need to
3872 use C<get_column> instead:
3873
3874   my $employee_count = $employee->get_column('employee_count');
3875
3876 You can create your own accessors if required - see
3877 L<DBIx::Class::Manual::Cookbook> for details.
3878
3879 =head2 join
3880
3881 =over 4
3882
3883 =item Value: ($rel_name | \@rel_names | \%rel_names)
3884
3885 =back
3886
3887 Contains a list of relationships that should be joined for this query.  For
3888 example:
3889
3890   # Get CDs by Nine Inch Nails
3891   my $rs = $schema->resultset('CD')->search(
3892     { 'artist.name' => 'Nine Inch Nails' },
3893     { join => 'artist' }
3894   );
3895
3896 Can also contain a hash reference to refer to the other relation's relations.
3897 For example:
3898
3899   package MyApp::Schema::Track;
3900   use base qw/DBIx::Class/;
3901   __PACKAGE__->table('track');
3902   __PACKAGE__->add_columns(qw/trackid cd position title/);
3903   __PACKAGE__->set_primary_key('trackid');
3904   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3905   1;
3906
3907   # In your application
3908   my $rs = $schema->resultset('Artist')->search(
3909     { 'track.title' => 'Teardrop' },
3910     {
3911       join     => { cd => 'track' },
3912       order_by => 'artist.name',
3913     }
3914   );
3915
3916 You need to use the relationship (not the table) name in  conditions,
3917 because they are aliased as such. The current table is aliased as "me", so
3918 you need to use me.column_name in order to avoid ambiguity. For example:
3919
3920   # Get CDs from 1984 with a 'Foo' track
3921   my $rs = $schema->resultset('CD')->search(
3922     {
3923       'me.year' => 1984,
3924       'tracks.name' => 'Foo'
3925     },
3926     { join => 'tracks' }
3927   );
3928
3929 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3930 similarly for a third time). For e.g.
3931
3932   my $rs = $schema->resultset('Artist')->search({
3933     'cds.title'   => 'Down to Earth',
3934     'cds_2.title' => 'Popular',
3935   }, {
3936     join => [ qw/cds cds/ ],
3937   });
3938
3939 will return a set of all artists that have both a cd with title 'Down
3940 to Earth' and a cd with title 'Popular'.
3941
3942 If you want to fetch related objects from other tables as well, see C<prefetch>
3943 below.
3944
3945 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3946
3947 =head2 prefetch
3948
3949 =over 4
3950
3951 =item Value: ($rel_name | \@rel_names | \%rel_names)
3952
3953 =back
3954
3955 Contains one or more relationships that should be fetched along with
3956 the main query (when they are accessed afterwards the data will
3957 already be available, without extra queries to the database).  This is
3958 useful for when you know you will need the related objects, because it
3959 saves at least one query:
3960
3961   my $rs = $schema->resultset('Tag')->search(
3962     undef,
3963     {
3964       prefetch => {
3965         cd => 'artist'
3966       }
3967     }
3968   );
3969
3970 The initial search results in SQL like the following:
3971
3972   SELECT tag.*, cd.*, artist.* FROM tag
3973   JOIN cd ON tag.cd = cd.cdid
3974   JOIN artist ON cd.artist = artist.artistid
3975
3976 L<DBIx::Class> has no need to go back to the database when we access the
3977 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3978 case.
3979
3980 Simple prefetches will be joined automatically, so there is no need
3981 for a C<join> attribute in the above search.
3982
3983 L</prefetch> can be used with the any of the relationship types and
3984 multiple prefetches can be specified together. Below is a more complex
3985 example that prefetches a CD's artist, its liner notes (if present),
3986 the cover image, the tracks on that cd, and the guests on those
3987 tracks.
3988
3989  # Assuming:
3990  My::Schema::CD->belongs_to( artist      => 'My::Schema::Artist'     );
3991  My::Schema::CD->might_have( liner_note  => 'My::Schema::LinerNotes' );
3992  My::Schema::CD->has_one(    cover_image => 'My::Schema::Artwork'    );
3993  My::Schema::CD->has_many(   tracks      => 'My::Schema::Track'      );
3994
3995  My::Schema::Artist->belongs_to( record_label => 'My::Schema::RecordLabel' );
3996
3997  My::Schema::Track->has_many( guests => 'My::Schema::Guest' );
3998
3999
4000  my $rs = $schema->resultset('CD')->search(
4001    undef,
4002    {
4003      prefetch => [
4004        { artist => 'record_label'},  # belongs_to => belongs_to
4005        'liner_note',                 # might_have
4006        'cover_image',                # has_one
4007        { tracks => 'guests' },       # has_many => has_many
4008      ]
4009    }
4010  );
4011
4012 This will produce SQL like the following:
4013
4014  SELECT cd.*, artist.*, record_label.*, liner_note.*, cover_image.*,
4015         tracks.*, guests.*
4016    FROM cd me
4017    JOIN artist artist
4018      ON artist.artistid = me.artistid
4019    JOIN record_label record_label
4020      ON record_label.labelid = artist.labelid
4021    LEFT JOIN track tracks
4022      ON tracks.cdid = me.cdid
4023    LEFT JOIN guest guests
4024      ON guests.trackid = track.trackid
4025    LEFT JOIN liner_notes liner_note
4026      ON liner_note.cdid = me.cdid
4027    JOIN cd_artwork cover_image
4028      ON cover_image.cdid = me.cdid
4029  ORDER BY tracks.cd
4030
4031 Now the C<artist>, C<record_label>, C<liner_note>, C<cover_image>,
4032 C<tracks>, and C<guests> of the CD will all be available through the
4033 relationship accessors without the need for additional queries to the
4034 database.
4035
4036 However, there is one caveat to be observed: it can be dangerous to
4037 prefetch more than one L<has_many|DBIx::Class::Relationship/has_many>
4038 relationship on a given level. e.g.:
4039
4040  my $rs = $schema->resultset('CD')->search(
4041    undef,
4042    {
4043      prefetch => [
4044        'tracks',                         # has_many
4045        { cd_to_producer => 'producer' }, # has_many => belongs_to (i.e. m2m)
4046      ]
4047    }
4048  );
4049
4050 In fact, C<DBIx::Class> will emit the following warning:
4051
4052  Prefetching multiple has_many rels tracks and cd_to_producer at top
4053  level will explode the number of row objects retrievable via ->next
4054  or ->all. Use at your own risk.
4055
4056 The collapser currently can't identify duplicate tuples for multiple
4057 L<has_many|DBIx::Class::Relationship/has_many> relationships and as a
4058 result the second L<has_many|DBIx::Class::Relationship/has_many>
4059 relation could contain redundant objects.
4060
4061 =head3 Using L</prefetch> with L</join>
4062
4063 L</prefetch> implies a L</join> with the equivalent argument, and is
4064 properly merged with any existing L</join> specification. So the
4065 following:
4066
4067   my $rs = $schema->resultset('CD')->search(
4068    {'record_label.name' => 'Music Product Ltd.'},
4069    {
4070      join     => {artist => 'record_label'},
4071      prefetch => 'artist',
4072    }
4073  );
4074
4075 ... will work, searching on the record label's name, but only
4076 prefetching the C<artist>.
4077
4078 =head3 Using L</prefetch> with L</select> / L</+select> / L</as> / L</+as>
4079
4080 L</prefetch> implies a L</+select>/L</+as> with the fields of the
4081 prefetched relations.  So given:
4082
4083   my $rs = $schema->resultset('CD')->search(
4084    undef,
4085    {
4086      select   => ['cd.title'],
4087      as       => ['cd_title'],
4088      prefetch => 'artist',
4089    }
4090  );
4091
4092 The L</select> becomes: C<'cd.title', 'artist.*'> and the L</as>
4093 becomes: C<'cd_title', 'artist.*'>.
4094
4095 =head3 CAVEATS
4096
4097 Prefetch does a lot of deep magic. As such, it may not behave exactly
4098 as you might expect.
4099
4100 =over 4
4101
4102 =item *
4103
4104 Prefetch uses the L</cache> to populate the prefetched relationships. This
4105 may or may not be what you want.
4106
4107 =item *
4108
4109 If you specify a condition on a prefetched relationship, ONLY those
4110 rows that match the prefetched condition will be fetched into that relationship.
4111 This means that adding prefetch to a search() B<may alter> what is returned by
4112 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
4113
4114   my $artist_rs = $schema->resultset('Artist')->search({
4115       'cds.year' => 2008,
4116   }, {
4117       join => 'cds',
4118   });
4119
4120   my $count = $artist_rs->first->cds->count;
4121
4122   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
4123
4124   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
4125
4126   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
4127
4128 that cmp_ok() may or may not pass depending on the datasets involved. This
4129 behavior may or may not survive the 0.09 transition.
4130
4131 =back
4132
4133 =head2 page
4134
4135 =over 4
4136
4137 =item Value: $page
4138
4139 =back
4140
4141 Makes the resultset paged and specifies the page to retrieve. Effectively
4142 identical to creating a non-pages resultset and then calling ->page($page)
4143 on it.
4144
4145 If L</rows> attribute is not specified it defaults to 10 rows per page.
4146
4147 When you have a paged resultset, L</count> will only return the number
4148 of rows in the page. To get the total, use the L</pager> and call
4149 C<total_entries> on it.
4150
4151 =head2 rows
4152
4153 =over 4
4154
4155 =item Value: $rows
4156
4157 =back
4158
4159 Specifies the maximum number of rows for direct retrieval or the number of
4160 rows per page if the page attribute or method is used.
4161
4162 =head2 offset
4163
4164 =over 4
4165
4166 =item Value: $offset
4167
4168 =back
4169
4170 Specifies the (zero-based) row number for the  first row to be returned, or the
4171 of the first row of the first page if paging is used.
4172
4173 =head2 group_by
4174
4175 =over 4
4176
4177 =item Value: \@columns
4178
4179 =back
4180
4181 A arrayref of columns to group by. Can include columns of joined tables.
4182
4183   group_by => [qw/ column1 column2 ... /]
4184
4185 =head2 having
4186
4187 =over 4
4188
4189 =item Value: $condition
4190
4191 =back
4192
4193 HAVING is a select statement attribute that is applied between GROUP BY and
4194 ORDER BY. It is applied to the after the grouping calculations have been
4195 done.
4196
4197   having => { 'count_employee' => { '>=', 100 } }
4198
4199 or with an in-place function in which case literal SQL is required:
4200
4201   having => \[ 'count(employee) >= ?', [ count => 100 ] ]
4202
4203 =head2 distinct
4204
4205 =over 4
4206
4207 =item Value: (0 | 1)
4208
4209 =back
4210
4211 Set to 1 to group by all columns. If the resultset already has a group_by
4212 attribute, this setting is ignored and an appropriate warning is issued.
4213
4214 =head2 where
4215
4216 =over 4
4217
4218 Adds to the WHERE clause.
4219
4220   # only return rows WHERE deleted IS NULL for all searches
4221   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
4222
4223 Can be overridden by passing C<< { where => undef } >> as an attribute
4224 to a resultset.
4225
4226 =back
4227
4228 =head2 cache
4229
4230 Set to 1 to cache search results. This prevents extra SQL queries if you
4231 revisit rows in your ResultSet:
4232
4233   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
4234
4235   while( my $artist = $resultset->next ) {
4236     ... do stuff ...
4237   }
4238
4239   $rs->first; # without cache, this would issue a query
4240
4241 By default, searches are not cached.
4242
4243 For more examples of using these attributes, see
4244 L<DBIx::Class::Manual::Cookbook>.
4245
4246 =head2 for
4247
4248 =over 4
4249
4250 =item Value: ( 'update' | 'shared' )
4251
4252 =back
4253
4254 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
4255 ... FOR SHARED.
4256
4257 =cut
4258
4259 1;