represent page as a normal resultset attribute; use _orig_alias instead of _live_join...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14 use Data::Dumper;
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99   $attrs->{_orig_alias} ||= $attrs->{alias};
100
101   bless {
102     result_source => $source,
103     result_class => $attrs->{result_class} || $source->result_class,
104     cond => $attrs->{where},
105 #    from => $attrs->{from},
106 #    collapse => $collapse,
107     count => undef,
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = exists $attrs->{_parent_attrs}
166     ? { %{delete $attrs->{_parent_attrs}} }
167     : { %{$self->{attrs}} };
168   my $having = delete $our_attrs->{having};
169
170   # XXX should only maintain _live_join_stack and generate _live_join_h from that
171   if ($attrs->{_live_join_stack}) {
172     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
173       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
174         ? { $join => $attrs->{_live_join_h} }
175         : $join;
176     }
177   }
178
179   # merge new attrs into inherited
180   foreach my $key (qw/join prefetch/) {
181     next unless exists $attrs->{$key};
182     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
183       foreach my $join (reverse @{$live_join}) {
184         $attrs->{$key} = { $join => $attrs->{$key} };
185       }
186     }
187
188     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
189   }
190
191   $our_attrs->{join} = $self->_merge_attr(
192     $our_attrs->{join}, $attrs->{_live_join_h}
193   ) if ($attrs->{_live_join_h});
194
195   if (defined $our_attrs->{prefetch}) {
196     $our_attrs->{join} = $self->_merge_attr(
197       $our_attrs->{join}, $our_attrs->{prefetch}
198     );
199   }
200
201   my $new_attrs = { %{$our_attrs}, %{$attrs} };
202   my $where = (@_
203     ? (
204         (@_ == 1 || ref $_[0] eq "HASH")
205           ? shift
206           : (
207               (@_ % 2)
208                 ? $self->throw_exception("Odd number of arguments to search")
209                 : {@_}
210              )
211        )
212     : undef
213   );
214
215   if (defined $where) {
216     $new_attrs->{where} = (
217       defined $new_attrs->{where}
218         ? { '-and' => [
219               map {
220                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
221               } $where, $new_attrs->{where}
222             ]
223           }
224         : $where);
225   }
226
227   if (defined $having) {
228     $new_attrs->{having} = (
229       defined $new_attrs->{having}
230         ? { '-and' => [
231               map {
232                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
233               } $having, $new_attrs->{having}
234             ]
235           }
236         : $having);
237   }
238
239   my $rs = (ref $self)->new($self->result_source, $new_attrs);
240   $rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
241
242   unless (@_) {                 # no search, effectively just a clone
243     my $rows = $self->get_cache;
244     if ($rows) {
245       $rs->set_cache($rows);
246     }
247   }
248   return $rs;
249 }
250
251 =head2 search_literal
252
253 =over 4
254
255 =item Arguments: $sql_fragment, @bind_values
256
257 =item Return Value: $resultset (scalar context), @row_objs (list context)
258
259 =back
260
261   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
262   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
263
264 Pass a literal chunk of SQL to be added to the conditional part of the
265 resultset query.
266
267 =cut
268
269 sub search_literal {
270   my ($self, $cond, @vals) = @_;
271   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
272   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
273   return $self->search(\$cond, $attrs);
274 }
275
276 =head2 find
277
278 =over 4
279
280 =item Arguments: @values | \%cols, \%attrs?
281
282 =item Return Value: $row_object
283
284 =back
285
286 Finds a row based on its primary key or unique constraint. For example, to find
287 a row by its primary key:
288
289   my $cd = $schema->resultset('CD')->find(5);
290
291 You can also find a row by a specific unique constraint using the C<key>
292 attribute. For example:
293
294   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
295     key => 'cd_artist_title'
296   });
297
298 Additionally, you can specify the columns explicitly by name:
299
300   my $cd = $schema->resultset('CD')->find(
301     {
302       artist => 'Massive Attack',
303       title  => 'Mezzanine',
304     },
305     { key => 'cd_artist_title' }
306   );
307
308 If the C<key> is specified as C<primary>, it searches only on the primary key.
309
310 If no C<key> is specified, it searches on all unique constraints defined on the
311 source, including the primary key.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key or unique constraint is defined"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my @unique_queries = $self->_unique_queries($input_query, $attrs);
347
348   # Handle cases where the ResultSet defines the query, or where the user is
349   # abusing find
350   my $query = @unique_queries ? \@unique_queries : $input_query;
351
352   # Run the query
353   if (keys %$attrs) {
354     my $rs = $self->search($query, $attrs);
355     $rs->_resolve;
356     return keys %{$rs->{_attrs}{collapse}} ? $rs->next : $rs->single;
357   }
358   else {
359     $self->_resolve;
360     return (keys %{$self->{_attrs}{collapse}})
361       ? $self->search($query)->next
362       : $self->single($query);
363   }
364 }
365
366 # _unique_queries
367 #
368 # Build a list of queries which satisfy unique constraints.
369
370 sub _unique_queries {
371   my ($self, $query, $attrs) = @_;
372
373   my @constraint_names = exists $attrs->{key}
374     ? ($attrs->{key})
375     : $self->result_source->unique_constraint_names;
376
377   my @unique_queries;
378   foreach my $name (@constraint_names) {
379     my @unique_cols = $self->result_source->unique_constraint_columns($name);
380     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
381
382     next unless scalar keys %$unique_query;
383
384     # Add the ResultSet's alias
385     my $alias = $self->{attrs}{alias};
386     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
387       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
388     }
389
390     push @unique_queries, $unique_query;
391   }
392
393   return @unique_queries;
394 }
395
396 # _build_unique_query
397 #
398 # Constrain the specified query hash based on the specified column names.
399
400 sub _build_unique_query {
401   my ($self, $query, $unique_cols) = @_;
402
403   return {
404     map  { $_ => $query->{$_} }
405     grep { exists $query->{$_} }
406       @$unique_cols
407   };
408 }
409
410 =head2 search_related
411
412 =over 4
413
414 =item Arguments: $rel, $cond, \%attrs?
415
416 =item Return Value: $new_resultset
417
418 =back
419
420   $new_rs = $cd_rs->search_related('artist', {
421     name => 'Emo-R-Us',
422   });
423
424 Searches the specified relationship, optionally specifying a condition and
425 attributes for matching records. See L</ATTRIBUTES> for more information.
426
427 =cut
428
429 sub search_related {
430   return shift->related_resultset(shift)->search(@_);
431 }
432
433 =head2 cursor
434
435 =over 4
436
437 =item Arguments: none
438
439 =item Return Value: $cursor
440
441 =back
442
443 Returns a storage-driven cursor to the given resultset. See
444 L<DBIx::Class::Cursor> for more information.
445
446 =cut
447
448 sub cursor {
449   my ($self) = @_;
450
451   $self->_resolve;
452   my $attrs = { %{$self->{_attrs}} };
453   return $self->{cursor}
454     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
455           $attrs->{where},$attrs);
456 }
457
458 =head2 single
459
460 =over 4
461
462 =item Arguments: $cond?
463
464 =item Return Value: $row_object?
465
466 =back
467
468   my $cd = $schema->resultset('CD')->single({ year => 2001 });
469
470 Inflates the first result without creating a cursor if the resultset has
471 any records in it; if not returns nothing. Used by L</find> as an optimisation.
472
473 Can optionally take an additional condition *only* - this is a fast-code-path
474 method; if you need to add extra joins or similar call ->search and then
475 ->single without a condition on the $rs returned from that.
476
477 =cut
478
479 sub single {
480   my ($self, $where) = @_;
481   $self->_resolve;
482   my $attrs = { %{$self->{_attrs}} };
483   if ($where) {
484     if (defined $attrs->{where}) {
485       $attrs->{where} = {
486         '-and' =>
487             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
488                $where, delete $attrs->{where} ]
489       };
490     } else {
491       $attrs->{where} = $where;
492     }
493   }
494
495   unless ($self->_is_unique_query($attrs->{where})) {
496     carp "Query not guaranteed to return a single row"
497       . "; please declare your unique constraints or use search instead";
498   }
499
500   my @data = $self->result_source->storage->select_single(
501     $attrs->{from}, $attrs->{select},
502     $attrs->{where},$attrs
503   );
504
505   return (@data ? $self->_construct_object(@data) : ());
506 }
507
508 # _is_unique_query
509 #
510 # Try to determine if the specified query is guaranteed to be unique, based on
511 # the declared unique constraints.
512
513 sub _is_unique_query {
514   my ($self, $query) = @_;
515
516   my $collapsed = $self->_collapse_query($query);
517   my $alias = $self->{attrs}{alias};
518
519   foreach my $name ($self->result_source->unique_constraint_names) {
520     my @unique_cols = map {
521       "$alias.$_"
522     } $self->result_source->unique_constraint_columns($name);
523
524     # Count the values for each unique column
525     my %seen = map { $_ => 0 } @unique_cols;
526
527     foreach my $key (keys %$collapsed) {
528       my $aliased = $key;
529       $aliased = "$alias.$key" unless $key =~ /\./;
530
531       next unless exists $seen{$aliased};  # Additional constraints are okay
532       $seen{$aliased} = scalar @{ $collapsed->{$key} };
533     }
534
535     # If we get 0 or more than 1 value for a column, it's not necessarily unique
536     return 1 unless grep { $_ != 1 } values %seen;
537   }
538
539   return 0;
540 }
541
542 # _collapse_query
543 #
544 # Recursively collapse the query, accumulating values for each column.
545
546 sub _collapse_query {
547   my ($self, $query, $collapsed) = @_;
548
549   $collapsed ||= {};
550
551   if (ref $query eq 'ARRAY') {
552     foreach my $subquery (@$query) {
553       next unless ref $subquery;  # -or
554 #      warn "ARRAY: " . Dumper $subquery;
555       $collapsed = $self->_collapse_query($subquery, $collapsed);
556     }
557   }
558   elsif (ref $query eq 'HASH') {
559     if (keys %$query and (keys %$query)[0] eq '-and') {
560       foreach my $subquery (@{$query->{-and}}) {
561 #        warn "HASH: " . Dumper $subquery;
562         $collapsed = $self->_collapse_query($subquery, $collapsed);
563       }
564     }
565     else {
566 #      warn "LEAF: " . Dumper $query;
567       foreach my $key (keys %$query) {
568         push @{$collapsed->{$key}}, $query->{$key};
569       }
570     }
571   }
572
573   return $collapsed;
574 }
575
576 =head2 get_column
577
578 =over 4
579
580 =item Arguments: $cond?
581
582 =item Return Value: $resultsetcolumn
583
584 =back
585
586   my $max_length = $rs->get_column('length')->max;
587
588 Returns a ResultSetColumn instance for $column based on $self
589
590 =cut
591
592 sub get_column {
593   my ($self, $column) = @_;
594   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
595   return $new;
596 }
597
598 =head2 search_like
599
600 =over 4
601
602 =item Arguments: $cond, \%attrs?
603
604 =item Return Value: $resultset (scalar context), @row_objs (list context)
605
606 =back
607
608   # WHERE title LIKE '%blue%'
609   $cd_rs = $rs->search_like({ title => '%blue%'});
610
611 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
612 that this is simply a convenience method. You most likely want to use
613 L</search> with specific operators.
614
615 For more information, see L<DBIx::Class::Manual::Cookbook>.
616
617 =cut
618
619 sub search_like {
620   my $class = shift;
621   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
622   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
623   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
624   return $class->search($query, { %$attrs });
625 }
626
627 =head2 slice
628
629 =over 4
630
631 =item Arguments: $first, $last
632
633 =item Return Value: $resultset (scalar context), @row_objs (list context)
634
635 =back
636
637 Returns a resultset or object list representing a subset of elements from the
638 resultset slice is called on. Indexes are from 0, i.e., to get the first
639 three records, call:
640
641   my ($one, $two, $three) = $rs->slice(0, 2);
642
643 =cut
644
645 sub slice {
646   my ($self, $min, $max) = @_;
647   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
648   $attrs->{offset} = $self->{attrs}{offset} || 0;
649   $attrs->{offset} += $min;
650   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
651   return $self->search(undef(), $attrs);
652   #my $slice = (ref $self)->new($self->result_source, $attrs);
653   #return (wantarray ? $slice->all : $slice);
654 }
655
656 =head2 next
657
658 =over 4
659
660 =item Arguments: none
661
662 =item Return Value: $result?
663
664 =back
665
666 Returns the next element in the resultset (C<undef> is there is none).
667
668 Can be used to efficiently iterate over records in the resultset:
669
670   my $rs = $schema->resultset('CD')->search;
671   while (my $cd = $rs->next) {
672     print $cd->title;
673   }
674
675 Note that you need to store the resultset object, and call C<next> on it.
676 Calling C<< resultset('Table')->next >> repeatedly will always return the
677 first record from the resultset.
678
679 =cut
680
681 sub next {
682   my ($self) = @_;
683   if (my $cache = $self->get_cache) {
684     $self->{all_cache_position} ||= 0;
685     return $cache->[$self->{all_cache_position}++];
686   }
687   if ($self->{attrs}{cache}) {
688     $self->{all_cache_position} = 1;
689     return ($self->all)[0];
690   }
691   my @row = (
692     exists $self->{stashed_row}
693       ? @{delete $self->{stashed_row}}
694       : $self->cursor->next
695   );
696   return unless (@row);
697   return $self->_construct_object(@row);
698 }
699
700 sub _resolve {
701   my $self = shift;
702
703   return if exists $self->{_attrs}; #return if _resolve has already been called
704
705   my $attrs = $self->{attrs};
706   my $source = $self->{_parent_rs}
707     ? $self->{_parent_rs}
708     : $self->{result_source};
709
710   # XXX - lose storable dclone
711   my $record_filter = delete $attrs->{record_filter}
712     if defined $attrs->{record_filter};
713   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
714   $attrs->{record_filter} = $record_filter if ($record_filter);
715   $self->{attrs}{record_filter} = $record_filter if ($record_filter);
716
717   my $alias = $attrs->{_orig_alias};
718
719   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
720   delete $attrs->{as} if $attrs->{columns};
721   $attrs->{columns} ||= [ $self->{result_source}->columns ]
722     unless $attrs->{select};
723   my $select_alias = $self->{_parent_rs}
724     ? $self->{attrs}{alias}
725     : $alias;
726   $attrs->{select} = [
727     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
728   ] if $attrs->{columns};
729   $attrs->{as} ||= [
730     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
731   ];
732   if (my $include = delete $attrs->{include_columns}) {
733     push(@{$attrs->{select}}, @$include);
734     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
735   }
736
737   $attrs->{from} ||= [ { $alias => $source->from } ];
738   $attrs->{seen_join} ||= {};
739   my %seen;
740   if (my $join = delete $attrs->{join}) {
741     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
742       if (ref $j eq 'HASH') {
743         $seen{$_} = 1 foreach keys %$j;
744       } else {
745         $seen{$j} = 1;
746       }
747     }
748     push(@{$attrs->{from}},
749       $source->resolve_join($join, $alias, $attrs->{seen_join})
750     );
751   }
752   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
753   $attrs->{order_by} = [ $attrs->{order_by} ] if
754       $attrs->{order_by} and !ref($attrs->{order_by});
755   $attrs->{order_by} ||= [];
756
757   if(my $seladds = delete($attrs->{'+select'})) {
758     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
759     $attrs->{select} = [
760       @{ $attrs->{select} },
761       map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
762     ];
763   }
764   if(my $asadds = delete($attrs->{'+as'})) {
765     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
766     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
767   }
768   my $collapse = $attrs->{collapse} || {};
769   if (my $prefetch = delete $attrs->{prefetch}) {
770     my @pre_order;
771     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
772       if ( ref $p eq 'HASH' ) {
773         foreach my $key (keys %$p) {
774           push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
775             unless $seen{$key};
776         }
777       } else {
778         push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
779           unless $seen{$p};
780       }
781       # bring joins back to level of current class
782       $p = $self->_reduce_joins($p, $attrs) if ($attrs->{_live_join_stack});
783       if ($p) {
784         my @prefetch = $self->result_source->resolve_prefetch(
785           $p, $alias, {}, \@pre_order, $collapse
786         );
787         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
788         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
789       }
790     }
791     push(@{$attrs->{order_by}}, @pre_order);
792   }
793   $attrs->{collapse} = $collapse;
794   $self->{_attrs} = $attrs;
795 }
796
797 sub _merge_attr {
798   my ($self, $a, $b) = @_;
799
800   return $b unless $a;
801   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
802     foreach my $key (keys %{$b}) {
803       if (exists $a->{$key}) {
804         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
805       } else {
806         $a->{$key} = $b->{$key};
807       }
808     }
809     return $a;
810   } else {
811     $a = [$a] unless (ref $a eq 'ARRAY');
812     $b = [$b] unless (ref $b eq 'ARRAY');
813
814     my $hash = {};
815     my $array = [];
816     foreach ($a, $b) {
817       foreach my $element (@{$_}) {
818         if (ref $element eq 'HASH') {
819           $hash = $self->_merge_attr($hash, $element);
820         } elsif (ref $element eq 'ARRAY') {
821           $array = [@{$array}, @{$element}];
822         } else {
823           if ($b == $_) {
824             $self->_merge_array($array, $element);
825           } else {
826             push(@{$array}, $element);
827           }
828         }
829       }
830     }
831
832     my $final_array = [];
833     foreach my $element (@{$array}) {
834       push(@{$final_array}, $element) unless (exists $hash->{$element});
835     }
836     $array = $final_array;
837
838     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
839       return [$hash, @{$array}];
840     } else {
841       return (keys %{$hash}) ? $hash : $array;
842     }
843   }
844 }
845
846 sub _merge_array {
847   my ($self, $a, $b) = @_;
848
849   $b = [$b] unless (ref $b eq 'ARRAY');
850   # add elements from @{$b} to @{$a} which aren't already in @{$a}
851   foreach my $b_element (@{$b}) {
852     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
853   }
854 }
855
856 # bring the joins (which are from the original class) to the level
857 # of the current class so that we can resolve them properly
858 sub _reduce_joins {
859   my ($self, $p, $attrs) = @_;
860
861  STACK:
862   foreach (@{$attrs->{_live_join_stack}}) {
863     if (ref $p eq 'HASH') {
864       if (exists $p->{$_}) {
865         $p = $p->{$_};
866       } else {
867         return undef;
868       }
869     } elsif (ref $p eq 'ARRAY') {
870       foreach my $pe (@{$p}) {
871         if ($pe eq $_) {
872           return undef;
873         }
874         if ((ref $pe eq 'HASH') && (exists $pe->{$_})) {
875           $p = $pe->{$_};
876           next STACK;
877         }
878       }
879       return undef;
880     } else {
881       return undef;
882     }
883   }
884   return $p;
885 }
886
887 sub _construct_object {
888   my ($self, @row) = @_;
889   my @as = @{ $self->{_attrs}{as} };
890
891   my $info = $self->_collapse_result(\@as, \@row);
892   my $new = $self->result_class->inflate_result($self->result_source, @$info);
893   $new = $self->{_attrs}{record_filter}->($new)
894     if exists $self->{_attrs}{record_filter};
895   return $new;
896 }
897
898 sub _collapse_result {
899   my ($self, $as, $row, $prefix) = @_;
900
901   my $live_join = $self->{attrs}{alias} ||= '';
902   my %const;
903
904   my @copy = @$row;
905   foreach my $this_as (@$as) {
906     my $val = shift @copy;
907     if (defined $prefix) {
908       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
909         my $remain = $1;
910         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
911         $const{$1||''}{$2} = $val;
912       }
913     } else {
914       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
915       $const{$1||''}{$2} = $val;
916     }
917   }
918
919   my $info = [ {}, {} ];
920   foreach my $key (keys %const) {
921     if (length $key && $key ne $live_join) {
922       my $target = $info;
923       my @parts = split(/\./, $key);
924       foreach my $p (@parts) {
925         $target = $target->[1]->{$p} ||= [];
926       }
927       $target->[0] = $const{$key};
928     } else {
929       $info->[0] = $const{$key};
930     }
931   }
932   my @collapse;
933
934   if (defined $prefix) {
935     @collapse = map {
936         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
937     } keys %{$self->{_attrs}{collapse}}
938   } else {
939     @collapse = keys %{$self->{_attrs}{collapse}};
940   };
941
942   if (@collapse) {
943     my ($c) = sort { length $a <=> length $b } @collapse;
944     my $target = $info;
945     foreach my $p (split(/\./, $c)) {
946       $target = $target->[1]->{$p} ||= [];
947     }
948     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
949     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
950     my $tree = $self->_collapse_result($as, $row, $c_prefix);
951     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
952     my (@final, @raw);
953
954     while (
955       !(
956         grep {
957           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
958         } @co_key
959         )
960     ) {
961       push(@final, $tree);
962       last unless (@raw = $self->cursor->next);
963       $row = $self->{stashed_row} = \@raw;
964       $tree = $self->_collapse_result($as, $row, $c_prefix);
965     }
966     @$target = (@final ? @final : [ {}, {} ]);
967       # single empty result to indicate an empty prefetched has_many
968   }
969
970   #print "final info: " . Dumper($info);
971   return $info;
972 }
973
974 =head2 result_source
975
976 =over 4
977
978 =item Arguments: $result_source?
979
980 =item Return Value: $result_source
981
982 =back
983
984 An accessor for the primary ResultSource object from which this ResultSet
985 is derived.
986
987 =cut
988
989
990 =head2 count
991
992 =over 4
993
994 =item Arguments: $cond, \%attrs??
995
996 =item Return Value: $count
997
998 =back
999
1000 Performs an SQL C<COUNT> with the same query as the resultset was built
1001 with to find the number of elements. If passed arguments, does a search
1002 on the resultset and counts the results of that.
1003
1004 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
1005 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1006 not support C<DISTINCT> with multiple columns. If you are using such a
1007 database, you should only use columns from the main table in your C<group_by>
1008 clause.
1009
1010 =cut
1011
1012 sub count {
1013   my $self = shift;
1014   return $self->search(@_)->count if @_ and defined $_[0];
1015   return scalar @{ $self->get_cache } if $self->get_cache;
1016   my $count = $self->_count;
1017   return 0 unless $count;
1018
1019   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
1020   $count = $self->{attrs}{rows} if
1021     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1022   return $count;
1023 }
1024
1025 sub _count { # Separated out so pager can get the full count
1026   my $self = shift;
1027   my $select = { count => '*' };
1028
1029   $self->_resolve;
1030   my $attrs = { %{ $self->{_attrs} } };
1031   if (my $group_by = delete $attrs->{group_by}) {
1032     delete $attrs->{having};
1033     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1034     # todo: try CONCAT for multi-column pk
1035     my @pk = $self->result_source->primary_columns;
1036     if (@pk == 1) {
1037       my $alias = $attrs->{_orig_alias};
1038       foreach my $column (@distinct) {
1039         if ($column =~ qr/^(?:\Q$alias.\E)?$pk[0]$/) {
1040           @distinct = ($column);
1041           last;
1042         }
1043       }
1044     }
1045
1046     $select = { count => { distinct => \@distinct } };
1047   }
1048
1049   $attrs->{select} = $select;
1050   $attrs->{as} = [qw/count/];
1051
1052   # offset, order by and page are not needed to count. record_filter is cdbi
1053   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1054   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1055   $tmp_rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
1056        #XXX - hack to pass through parent of related resultsets
1057
1058   my ($count) = $tmp_rs->cursor->next;
1059   return $count;
1060 }
1061
1062 =head2 count_literal
1063
1064 =over 4
1065
1066 =item Arguments: $sql_fragment, @bind_values
1067
1068 =item Return Value: $count
1069
1070 =back
1071
1072 Counts the results in a literal query. Equivalent to calling L</search_literal>
1073 with the passed arguments, then L</count>.
1074
1075 =cut
1076
1077 sub count_literal { shift->search_literal(@_)->count; }
1078
1079 =head2 all
1080
1081 =over 4
1082
1083 =item Arguments: none
1084
1085 =item Return Value: @objects
1086
1087 =back
1088
1089 Returns all elements in the resultset. Called implicitly if the resultset
1090 is returned in list context.
1091
1092 =cut
1093
1094 sub all {
1095   my ($self) = @_;
1096   return @{ $self->get_cache } if $self->get_cache;
1097
1098   my @obj;
1099
1100   # TODO: don't call resolve here
1101   $self->_resolve;
1102   if (keys %{$self->{_attrs}{collapse}}) {
1103 #  if ($self->{attrs}{prefetch}) {
1104       # Using $self->cursor->all is really just an optimisation.
1105       # If we're collapsing has_many prefetches it probably makes
1106       # very little difference, and this is cleaner than hacking
1107       # _construct_object to survive the approach
1108     my @row = $self->cursor->next;
1109     while (@row) {
1110       push(@obj, $self->_construct_object(@row));
1111       @row = (exists $self->{stashed_row}
1112                ? @{delete $self->{stashed_row}}
1113                : $self->cursor->next);
1114     }
1115   } else {
1116     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1117   }
1118
1119   $self->set_cache(\@obj) if $self->{attrs}{cache};
1120   return @obj;
1121 }
1122
1123 =head2 reset
1124
1125 =over 4
1126
1127 =item Arguments: none
1128
1129 =item Return Value: $self
1130
1131 =back
1132
1133 Resets the resultset's cursor, so you can iterate through the elements again.
1134
1135 =cut
1136
1137 sub reset {
1138   my ($self) = @_;
1139   delete $self->{_attrs} if (exists $self->{_attrs});
1140
1141   $self->{all_cache_position} = 0;
1142   $self->cursor->reset;
1143   return $self;
1144 }
1145
1146 =head2 first
1147
1148 =over 4
1149
1150 =item Arguments: none
1151
1152 =item Return Value: $object?
1153
1154 =back
1155
1156 Resets the resultset and returns an object for the first result (if the
1157 resultset returns anything).
1158
1159 =cut
1160
1161 sub first {
1162   return $_[0]->reset->next;
1163 }
1164
1165 # _cond_for_update_delete
1166 #
1167 # update/delete require the condition to be modified to handle
1168 # the differing SQL syntax available.  This transforms the $self->{cond}
1169 # appropriately, returning the new condition.
1170
1171 sub _cond_for_update_delete {
1172   my ($self) = @_;
1173   my $cond = {};
1174
1175   if (!ref($self->{cond})) {
1176     # No-op. No condition, we're updating/deleting everything
1177   }
1178   elsif (ref $self->{cond} eq 'ARRAY') {
1179     $cond = [
1180       map {
1181         my %hash;
1182         foreach my $key (keys %{$_}) {
1183           $key =~ /([^.]+)$/;
1184           $hash{$1} = $_->{$key};
1185         }
1186         \%hash;
1187       } @{$self->{cond}}
1188     ];
1189   }
1190   elsif (ref $self->{cond} eq 'HASH') {
1191     if ((keys %{$self->{cond}})[0] eq '-and') {
1192       $cond->{-and} = [];
1193
1194       my @cond = @{$self->{cond}{-and}};
1195       for (my $i = 0; $i <= @cond - 1; $i++) {
1196         my $entry = $cond[$i];
1197
1198         my %hash;
1199         if (ref $entry eq 'HASH') {
1200           foreach my $key (keys %{$entry}) {
1201             $key =~ /([^.]+)$/;
1202             $hash{$1} = $entry->{$key};
1203           }
1204         }
1205         else {
1206           $entry =~ /([^.]+)$/;
1207           $hash{$1} = $cond[++$i];
1208         }
1209
1210         push @{$cond->{-and}}, \%hash;
1211       }
1212     }
1213     else {
1214       foreach my $key (keys %{$self->{cond}}) {
1215         $key =~ /([^.]+)$/;
1216         $cond->{$1} = $self->{cond}{$key};
1217       }
1218     }
1219   }
1220   else {
1221     $self->throw_exception(
1222       "Can't update/delete on resultset with condition unless hash or array"
1223     );
1224   }
1225
1226   return $cond;
1227 }
1228
1229
1230 =head2 update
1231
1232 =over 4
1233
1234 =item Arguments: \%values
1235
1236 =item Return Value: $storage_rv
1237
1238 =back
1239
1240 Sets the specified columns in the resultset to the supplied values in a
1241 single query. Return value will be true if the update succeeded or false
1242 if no records were updated; exact type of success value is storage-dependent.
1243
1244 =cut
1245
1246 sub update {
1247   my ($self, $values) = @_;
1248   $self->throw_exception("Values for update must be a hash")
1249     unless ref $values eq 'HASH';
1250
1251   my $cond = $self->_cond_for_update_delete;
1252
1253   return $self->result_source->storage->update(
1254     $self->result_source->from, $values, $cond
1255   );
1256 }
1257
1258 =head2 update_all
1259
1260 =over 4
1261
1262 =item Arguments: \%values
1263
1264 =item Return Value: 1
1265
1266 =back
1267
1268 Fetches all objects and updates them one at a time. Note that C<update_all>
1269 will run DBIC cascade triggers, while L</update> will not.
1270
1271 =cut
1272
1273 sub update_all {
1274   my ($self, $values) = @_;
1275   $self->throw_exception("Values for update must be a hash")
1276     unless ref $values eq 'HASH';
1277   foreach my $obj ($self->all) {
1278     $obj->set_columns($values)->update;
1279   }
1280   return 1;
1281 }
1282
1283 =head2 delete
1284
1285 =over 4
1286
1287 =item Arguments: none
1288
1289 =item Return Value: 1
1290
1291 =back
1292
1293 Deletes the contents of the resultset from its result source. Note that this
1294 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1295 to run.
1296
1297 =cut
1298
1299 sub delete {
1300   my ($self) = @_;
1301   my $del = {};
1302
1303   my $cond = $self->_cond_for_update_delete;
1304
1305   $self->result_source->storage->delete($self->result_source->from, $cond);
1306   return 1;
1307 }
1308
1309 =head2 delete_all
1310
1311 =over 4
1312
1313 =item Arguments: none
1314
1315 =item Return Value: 1
1316
1317 =back
1318
1319 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1320 will run DBIC cascade triggers, while L</delete> will not.
1321
1322 =cut
1323
1324 sub delete_all {
1325   my ($self) = @_;
1326   $_->delete for $self->all;
1327   return 1;
1328 }
1329
1330 =head2 pager
1331
1332 =over 4
1333
1334 =item Arguments: none
1335
1336 =item Return Value: $pager
1337
1338 =back
1339
1340 Return Value a L<Data::Page> object for the current resultset. Only makes
1341 sense for queries with a C<page> attribute.
1342
1343 =cut
1344
1345 sub pager {
1346   my ($self) = @_;
1347   my $attrs = $self->{attrs};
1348   $self->throw_exception("Can't create pager for non-paged rs")
1349     unless $self->{attrs}{page};
1350   $attrs->{rows} ||= 10;
1351   return $self->{pager} ||= Data::Page->new(
1352     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1353 }
1354
1355 =head2 page
1356
1357 =over 4
1358
1359 =item Arguments: $page_number
1360
1361 =item Return Value: $rs
1362
1363 =back
1364
1365 Returns a resultset for the $page_number page of the resultset on which page
1366 is called, where each page contains a number of rows equal to the 'rows'
1367 attribute set on the resultset (10 by default).
1368
1369 =cut
1370
1371 sub page {
1372   my ($self, $page) = @_;
1373   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1374 }
1375
1376 =head2 new_result
1377
1378 =over 4
1379
1380 =item Arguments: \%vals
1381
1382 =item Return Value: $object
1383
1384 =back
1385
1386 Creates an object in the resultset's result class and returns it.
1387
1388 =cut
1389
1390 sub new_result {
1391   my ($self, $values) = @_;
1392   $self->throw_exception( "new_result needs a hash" )
1393     unless (ref $values eq 'HASH');
1394   $self->throw_exception(
1395     "Can't abstract implicit construct, condition not a hash"
1396   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1397   my %new = %$values;
1398   my $alias = $self->{attrs}{_orig_alias};
1399   foreach my $key (keys %{$self->{cond}||{}}) {
1400     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q$alias.\E)?([^.]+)$/);
1401   }
1402   my $obj = $self->result_class->new(\%new);
1403   $obj->result_source($self->result_source) if $obj->can('result_source');
1404   return $obj;
1405 }
1406
1407 =head2 find_or_new
1408
1409 =over 4
1410
1411 =item Arguments: \%vals, \%attrs?
1412
1413 =item Return Value: $object
1414
1415 =back
1416
1417 Find an existing record from this resultset. If none exists, instantiate a new
1418 result object and return it. The object will not be saved into your storage
1419 until you call L<DBIx::Class::Row/insert> on it.
1420
1421 If you want objects to be saved immediately, use L</find_or_create> instead.
1422
1423 =cut
1424
1425 sub find_or_new {
1426   my $self     = shift;
1427   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1428   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1429   my $exists   = $self->find($hash, $attrs);
1430   return defined $exists ? $exists : $self->new_result($hash);
1431 }
1432
1433 =head2 create
1434
1435 =over 4
1436
1437 =item Arguments: \%vals
1438
1439 =item Return Value: $object
1440
1441 =back
1442
1443 Inserts a record into the resultset and returns the object representing it.
1444
1445 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1446
1447 =cut
1448
1449 sub create {
1450   my ($self, $attrs) = @_;
1451   $self->throw_exception( "create needs a hashref" )
1452     unless ref $attrs eq 'HASH';
1453   return $self->new_result($attrs)->insert;
1454 }
1455
1456 =head2 find_or_create
1457
1458 =over 4
1459
1460 =item Arguments: \%vals, \%attrs?
1461
1462 =item Return Value: $object
1463
1464 =back
1465
1466   $class->find_or_create({ key => $val, ... });
1467
1468 Tries to find a record based on its primary key or unique constraint; if none
1469 is found, creates one and returns that instead.
1470
1471   my $cd = $schema->resultset('CD')->find_or_create({
1472     cdid   => 5,
1473     artist => 'Massive Attack',
1474     title  => 'Mezzanine',
1475     year   => 2005,
1476   });
1477
1478 Also takes an optional C<key> attribute, to search by a specific key or unique
1479 constraint. For example:
1480
1481   my $cd = $schema->resultset('CD')->find_or_create(
1482     {
1483       artist => 'Massive Attack',
1484       title  => 'Mezzanine',
1485     },
1486     { key => 'cd_artist_title' }
1487   );
1488
1489 See also L</find> and L</update_or_create>. For information on how to declare
1490 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1491
1492 =cut
1493
1494 sub find_or_create {
1495   my $self     = shift;
1496   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1497   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1498   my $exists   = $self->find($hash, $attrs);
1499   return defined $exists ? $exists : $self->create($hash);
1500 }
1501
1502 =head2 update_or_create
1503
1504 =over 4
1505
1506 =item Arguments: \%col_values, { key => $unique_constraint }?
1507
1508 =item Return Value: $object
1509
1510 =back
1511
1512   $class->update_or_create({ col => $val, ... });
1513
1514 First, searches for an existing row matching one of the unique constraints
1515 (including the primary key) on the source of this resultset. If a row is
1516 found, updates it with the other given column values. Otherwise, creates a new
1517 row.
1518
1519 Takes an optional C<key> attribute to search on a specific unique constraint.
1520 For example:
1521
1522   # In your application
1523   my $cd = $schema->resultset('CD')->update_or_create(
1524     {
1525       artist => 'Massive Attack',
1526       title  => 'Mezzanine',
1527       year   => 1998,
1528     },
1529     { key => 'cd_artist_title' }
1530   );
1531
1532 If no C<key> is specified, it searches on all unique constraints defined on the
1533 source, including the primary key.
1534
1535 If the C<key> is specified as C<primary>, it searches only on the primary key.
1536
1537 See also L</find> and L</find_or_create>. For information on how to declare
1538 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1539
1540 =cut
1541
1542 sub update_or_create {
1543   my $self = shift;
1544   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1545   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1546
1547   my $row = $self->find($cond);
1548   if (defined $row) {
1549     $row->update($cond);
1550     return $row;
1551   }
1552
1553   return $self->create($cond);
1554 }
1555
1556 =head2 get_cache
1557
1558 =over 4
1559
1560 =item Arguments: none
1561
1562 =item Return Value: \@cache_objects?
1563
1564 =back
1565
1566 Gets the contents of the cache for the resultset, if the cache is set.
1567
1568 =cut
1569
1570 sub get_cache {
1571   shift->{all_cache};
1572 }
1573
1574 =head2 set_cache
1575
1576 =over 4
1577
1578 =item Arguments: \@cache_objects
1579
1580 =item Return Value: \@cache_objects
1581
1582 =back
1583
1584 Sets the contents of the cache for the resultset. Expects an arrayref
1585 of objects of the same class as those produced by the resultset. Note that
1586 if the cache is set the resultset will return the cached objects rather
1587 than re-querying the database even if the cache attr is not set.
1588
1589 =cut
1590
1591 sub set_cache {
1592   my ( $self, $data ) = @_;
1593   $self->throw_exception("set_cache requires an arrayref")
1594       if defined($data) && (ref $data ne 'ARRAY');
1595   $self->{all_cache} = $data;
1596 }
1597
1598 =head2 clear_cache
1599
1600 =over 4
1601
1602 =item Arguments: none
1603
1604 =item Return Value: []
1605
1606 =back
1607
1608 Clears the cache for the resultset.
1609
1610 =cut
1611
1612 sub clear_cache {
1613   shift->set_cache(undef);
1614 }
1615
1616 =head2 related_resultset
1617
1618 =over 4
1619
1620 =item Arguments: $relationship_name
1621
1622 =item Return Value: $resultset
1623
1624 =back
1625
1626 Returns a related resultset for the supplied relationship name.
1627
1628   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1629
1630 =cut
1631
1632 sub related_resultset {
1633   my ( $self, $rel ) = @_;
1634
1635   $self->{related_resultsets} ||= {};
1636   return $self->{related_resultsets}{$rel} ||= do {
1637     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1638     my $rel_obj = $self->result_source->relationship_info($rel);
1639                 #print Dumper($self->result_source->_relationships);
1640     $self->throw_exception(
1641       "search_related: result source '" . $self->result_source->name .
1642         "' has no such relationship ${rel}")
1643       unless $rel_obj; #die Dumper $self->{attrs};
1644
1645     my @live_join_stack = @{$self->{attrs}{_live_join_stack}||[]};
1646     push(@live_join_stack, $rel);
1647
1648     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1649       undef, {
1650         select => undef,
1651         as => undef,
1652         alias => $rel, #the most recent
1653         _live_join_stack => \@live_join_stack, #the trail of rels
1654         _parent_attrs => $self->{attrs}}
1655     );
1656
1657     # keep reference of the original resultset
1658     $rs->{_parent_rs} = ($self->{_parent_rs})
1659       ? $self->{_parent_rs}
1660       : $self->result_source;
1661
1662     return $rs;
1663   };
1664 }
1665
1666 =head2 throw_exception
1667
1668 See L<DBIx::Class::Schema/throw_exception> for details.
1669
1670 =cut
1671
1672 sub throw_exception {
1673   my $self=shift;
1674   $self->result_source->schema->throw_exception(@_);
1675 }
1676
1677 # XXX: FIXME: Attributes docs need clearing up
1678
1679 =head1 ATTRIBUTES
1680
1681 The resultset takes various attributes that modify its behavior. Here's an
1682 overview of them:
1683
1684 =head2 order_by
1685
1686 =over 4
1687
1688 =item Value: ($order_by | \@order_by)
1689
1690 =back
1691
1692 Which column(s) to order the results by. This is currently passed
1693 through directly to SQL, so you can give e.g. C<year DESC> for a
1694 descending order on the column `year'.
1695
1696 Please note that if you have quoting enabled (see
1697 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1698 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1699 so you will need to manually quote things as appropriate.)
1700
1701 =head2 columns
1702
1703 =over 4
1704
1705 =item Value: \@columns
1706
1707 =back
1708
1709 Shortcut to request a particular set of columns to be retrieved.  Adds
1710 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1711 from that, then auto-populates C<as> from C<select> as normal. (You may also
1712 use the C<cols> attribute, as in earlier versions of DBIC.)
1713
1714 =head2 include_columns
1715
1716 =over 4
1717
1718 =item Value: \@columns
1719
1720 =back
1721
1722 Shortcut to include additional columns in the returned results - for example
1723
1724   $schema->resultset('CD')->search(undef, {
1725     include_columns => ['artist.name'],
1726     join => ['artist']
1727   });
1728
1729 would return all CDs and include a 'name' column to the information
1730 passed to object inflation
1731
1732 =head2 select
1733
1734 =over 4
1735
1736 =item Value: \@select_columns
1737
1738 =back
1739
1740 Indicates which columns should be selected from the storage. You can use
1741 column names, or in the case of RDBMS back ends, function or stored procedure
1742 names:
1743
1744   $rs = $schema->resultset('Employee')->search(undef, {
1745     select => [
1746       'name',
1747       { count => 'employeeid' },
1748       { sum => 'salary' }
1749     ]
1750   });
1751
1752 When you use function/stored procedure names and do not supply an C<as>
1753 attribute, the column names returned are storage-dependent. E.g. MySQL would
1754 return a column named C<count(employeeid)> in the above example.
1755
1756 =head2 +select
1757
1758 =over 4
1759
1760 Indicates additional columns to be selected from storage.  Works the same as
1761 L<select> but adds columns to the selection.
1762
1763 =back
1764
1765 =head2 +as
1766
1767 =over 4
1768
1769 Indicates additional column names for those added via L<+select>.
1770
1771 =back
1772
1773 =head2 as
1774
1775 =over 4
1776
1777 =item Value: \@inflation_names
1778
1779 =back
1780
1781 Indicates column names for object inflation. This is used in conjunction with
1782 C<select>, usually when C<select> contains one or more function or stored
1783 procedure names:
1784
1785   $rs = $schema->resultset('Employee')->search(undef, {
1786     select => [
1787       'name',
1788       { count => 'employeeid' }
1789     ],
1790     as => ['name', 'employee_count'],
1791   });
1792
1793   my $employee = $rs->first(); # get the first Employee
1794
1795 If the object against which the search is performed already has an accessor
1796 matching a column name specified in C<as>, the value can be retrieved using
1797 the accessor as normal:
1798
1799   my $name = $employee->name();
1800
1801 If on the other hand an accessor does not exist in the object, you need to
1802 use C<get_column> instead:
1803
1804   my $employee_count = $employee->get_column('employee_count');
1805
1806 You can create your own accessors if required - see
1807 L<DBIx::Class::Manual::Cookbook> for details.
1808
1809 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1810 produced, it is used for internal access only. Thus attempting to use the accessor
1811 in an C<order_by> clause or similar will fail misrably.
1812
1813 =head2 join
1814
1815 =over 4
1816
1817 =item Value: ($rel_name | \@rel_names | \%rel_names)
1818
1819 =back
1820
1821 Contains a list of relationships that should be joined for this query.  For
1822 example:
1823
1824   # Get CDs by Nine Inch Nails
1825   my $rs = $schema->resultset('CD')->search(
1826     { 'artist.name' => 'Nine Inch Nails' },
1827     { join => 'artist' }
1828   );
1829
1830 Can also contain a hash reference to refer to the other relation's relations.
1831 For example:
1832
1833   package MyApp::Schema::Track;
1834   use base qw/DBIx::Class/;
1835   __PACKAGE__->table('track');
1836   __PACKAGE__->add_columns(qw/trackid cd position title/);
1837   __PACKAGE__->set_primary_key('trackid');
1838   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1839   1;
1840
1841   # In your application
1842   my $rs = $schema->resultset('Artist')->search(
1843     { 'track.title' => 'Teardrop' },
1844     {
1845       join     => { cd => 'track' },
1846       order_by => 'artist.name',
1847     }
1848   );
1849
1850 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1851 similarly for a third time). For e.g.
1852
1853   my $rs = $schema->resultset('Artist')->search({
1854     'cds.title'   => 'Down to Earth',
1855     'cds_2.title' => 'Popular',
1856   }, {
1857     join => [ qw/cds cds/ ],
1858   });
1859
1860 will return a set of all artists that have both a cd with title 'Down
1861 to Earth' and a cd with title 'Popular'.
1862
1863 If you want to fetch related objects from other tables as well, see C<prefetch>
1864 below.
1865
1866 =head2 prefetch
1867
1868 =over 4
1869
1870 =item Value: ($rel_name | \@rel_names | \%rel_names)
1871
1872 =back
1873
1874 Contains one or more relationships that should be fetched along with the main
1875 query (when they are accessed afterwards they will have already been
1876 "prefetched").  This is useful for when you know you will need the related
1877 objects, because it saves at least one query:
1878
1879   my $rs = $schema->resultset('Tag')->search(
1880     undef,
1881     {
1882       prefetch => {
1883         cd => 'artist'
1884       }
1885     }
1886   );
1887
1888 The initial search results in SQL like the following:
1889
1890   SELECT tag.*, cd.*, artist.* FROM tag
1891   JOIN cd ON tag.cd = cd.cdid
1892   JOIN artist ON cd.artist = artist.artistid
1893
1894 L<DBIx::Class> has no need to go back to the database when we access the
1895 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1896 case.
1897
1898 Simple prefetches will be joined automatically, so there is no need
1899 for a C<join> attribute in the above search. If you're prefetching to
1900 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1901 specify the join as well.
1902
1903 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1904 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1905 with an accessor type of 'single' or 'filter').
1906
1907 =head2 page
1908
1909 =over 4
1910
1911 =item Value: $page
1912
1913 =back
1914
1915 Makes the resultset paged and specifies the page to retrieve. Effectively
1916 identical to creating a non-pages resultset and then calling ->page($page)
1917 on it.
1918
1919 If L<rows> attribute is not specified it defualts to 10 rows per page.
1920
1921 =head2 rows
1922
1923 =over 4
1924
1925 =item Value: $rows
1926
1927 =back
1928
1929 Specifes the maximum number of rows for direct retrieval or the number of
1930 rows per page if the page attribute or method is used.
1931
1932 =head2 offset
1933
1934 =over 4
1935
1936 =item Value: $offset
1937
1938 =back
1939
1940 Specifies the (zero-based) row number for the  first row to be returned, or the
1941 of the first row of the first page if paging is used.
1942
1943 =head2 group_by
1944
1945 =over 4
1946
1947 =item Value: \@columns
1948
1949 =back
1950
1951 A arrayref of columns to group by. Can include columns of joined tables.
1952
1953   group_by => [qw/ column1 column2 ... /]
1954
1955 =head2 having
1956
1957 =over 4
1958
1959 =item Value: $condition
1960
1961 =back
1962
1963 HAVING is a select statement attribute that is applied between GROUP BY and
1964 ORDER BY. It is applied to the after the grouping calculations have been
1965 done.
1966
1967   having => { 'count(employee)' => { '>=', 100 } }
1968
1969 =head2 distinct
1970
1971 =over 4
1972
1973 =item Value: (0 | 1)
1974
1975 =back
1976
1977 Set to 1 to group by all columns.
1978
1979 =head2 where
1980
1981 =over 4
1982
1983 Adds to the WHERE clause.
1984
1985   # only return rows WHERE deleted IS NULL for all searches
1986   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1987
1988 Can be overridden by passing C<{ where => undef }> as an attribute
1989 to a resulset.
1990
1991 =back
1992
1993 =head2 cache
1994
1995 Set to 1 to cache search results. This prevents extra SQL queries if you
1996 revisit rows in your ResultSet:
1997
1998   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1999
2000   while( my $artist = $resultset->next ) {
2001     ... do stuff ...
2002   }
2003
2004   $rs->first; # without cache, this would issue a query
2005
2006 By default, searches are not cached.
2007
2008 For more examples of using these attributes, see
2009 L<DBIx::Class::Manual::Cookbook>.
2010
2011 =head2 from
2012
2013 =over 4
2014
2015 =item Value: \@from_clause
2016
2017 =back
2018
2019 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2020 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2021 clauses.
2022
2023 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2024
2025 C<join> will usually do what you need and it is strongly recommended that you
2026 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2027 And we really do mean "cannot", not just tried and failed. Attempting to use
2028 this because you're having problems with C<join> is like trying to use x86
2029 ASM because you've got a syntax error in your C. Trust us on this.
2030
2031 Now, if you're still really, really sure you need to use this (and if you're
2032 not 100% sure, ask the mailing list first), here's an explanation of how this
2033 works.
2034
2035 The syntax is as follows -
2036
2037   [
2038     { <alias1> => <table1> },
2039     [
2040       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2041       [], # nested JOIN (optional)
2042       { <table1.column1> => <table2.column2>, ... (more conditions) },
2043     ],
2044     # More of the above [ ] may follow for additional joins
2045   ]
2046
2047   <table1> <alias1>
2048   JOIN
2049     <table2> <alias2>
2050     [JOIN ...]
2051   ON <table1.column1> = <table2.column2>
2052   <more joins may follow>
2053
2054 An easy way to follow the examples below is to remember the following:
2055
2056     Anything inside "[]" is a JOIN
2057     Anything inside "{}" is a condition for the enclosing JOIN
2058
2059 The following examples utilize a "person" table in a family tree application.
2060 In order to express parent->child relationships, this table is self-joined:
2061
2062     # Person->belongs_to('father' => 'Person');
2063     # Person->belongs_to('mother' => 'Person');
2064
2065 C<from> can be used to nest joins. Here we return all children with a father,
2066 then search against all mothers of those children:
2067
2068   $rs = $schema->resultset('Person')->search(
2069       undef,
2070       {
2071           alias => 'mother', # alias columns in accordance with "from"
2072           from => [
2073               { mother => 'person' },
2074               [
2075                   [
2076                       { child => 'person' },
2077                       [
2078                           { father => 'person' },
2079                           { 'father.person_id' => 'child.father_id' }
2080                       ]
2081                   ],
2082                   { 'mother.person_id' => 'child.mother_id' }
2083               ],
2084           ]
2085       },
2086   );
2087
2088   # Equivalent SQL:
2089   # SELECT mother.* FROM person mother
2090   # JOIN (
2091   #   person child
2092   #   JOIN person father
2093   #   ON ( father.person_id = child.father_id )
2094   # )
2095   # ON ( mother.person_id = child.mother_id )
2096
2097 The type of any join can be controlled manually. To search against only people
2098 with a father in the person table, we could explicitly use C<INNER JOIN>:
2099
2100     $rs = $schema->resultset('Person')->search(
2101         undef,
2102         {
2103             alias => 'child', # alias columns in accordance with "from"
2104             from => [
2105                 { child => 'person' },
2106                 [
2107                     { father => 'person', -join_type => 'inner' },
2108                     { 'father.id' => 'child.father_id' }
2109                 ],
2110             ]
2111         },
2112     );
2113
2114     # Equivalent SQL:
2115     # SELECT child.* FROM person child
2116     # INNER JOIN person father ON child.father_id = father.id
2117
2118 =cut
2119
2120 1;