prefetch bugfix work-in-progress
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14 use Data::Dumper;
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs}) ? { %{$attrs->{_parent_attrs}} } : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168         # XXX this is getting messy
169         if ($attrs->{_live_join_stack}) {
170                 my $live_join = $attrs->{_live_join_stack};
171                 foreach (reverse @{$live_join}) {
172                         $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
173                 }
174         }
175
176   # merge new attrs into old
177   foreach my $key (qw/join prefetch/) {
178     next unless (exists $attrs->{$key});
179     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
180                         my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack};
181                         foreach (reverse @{$live_join}) {
182                                 $attrs->{$key} = { $_ => $attrs->{$key} };
183                         }
184     }
185
186     if (exists $our_attrs->{$key}) {
187       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
188     } else {
189       $our_attrs->{$key} = $attrs->{$key};
190     }
191     delete $attrs->{$key};
192   }
193
194         $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $attrs->{_live_join_h}, 1) if ($attrs->{_live_join_h});
195
196   if (exists $our_attrs->{prefetch}) {
197       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
198   }
199
200   my $new_attrs = { %{$our_attrs}, %{$attrs} };
201   my $where = (@_
202                 ? ((@_ == 1 || ref $_[0] eq "HASH")
203                     ? shift
204                     : ((@_ % 2)
205                         ? $self->throw_exception(
206                             "Odd number of arguments to search")
207                         : {@_}))
208                 : undef());
209   if (defined $where) {
210     $new_attrs->{where} = (defined $new_attrs->{where}
211               ? { '-and' =>
212                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
213                       $where, $new_attrs->{where} ] }
214               : $where);
215   }
216
217   if (defined $having) {
218     $new_attrs->{having} = (defined $new_attrs->{having}
219               ? { '-and' =>
220                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
221                       $having, $new_attrs->{having} ] }
222               : $having);
223   }
224
225   my $rs = (ref $self)->new($self->result_source, $new_attrs);
226   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
227
228   unless (@_) { # no search, effectively just a clone
229     my $rows = $self->get_cache;
230     if ($rows) {
231       $rs->set_cache($rows);
232     }
233   }
234   
235   return $rs;
236 }
237
238 =head2 search_literal
239
240 =over 4
241
242 =item Arguments: $sql_fragment, @bind_values
243
244 =item Return Value: $resultset (scalar context), @row_objs (list context)
245
246 =back
247
248   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
249   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
250
251 Pass a literal chunk of SQL to be added to the conditional part of the
252 resultset query.
253
254 =cut
255
256 sub search_literal {
257   my ($self, $cond, @vals) = @_;
258   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
259   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
260   return $self->search(\$cond, $attrs);
261 }
262
263 =head2 find
264
265 =over 4
266
267 =item Arguments: @values | \%cols, \%attrs?
268
269 =item Return Value: $row_object
270
271 =back
272
273 Finds a row based on its primary key or unique constraint. For example, to find
274 a row by its primary key:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 You can also find a row by a specific unique constraint using the C<key>
279 attribute. For example:
280
281   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
282
283 Additionally, you can specify the columns explicitly by name:
284
285   my $cd = $schema->resultset('CD')->find(
286     {
287       artist => 'Massive Attack',
288       title  => 'Mezzanine',
289     },
290     { key => 'cd_artist_title' }
291   );
292
293 If the C<key> is specified as C<primary>, it searches only on the primary key.
294
295 If no C<key> is specified, it searches on all unique constraints defined on the
296 source, including the primary key.
297
298 See also L</find_or_create> and L</update_or_create>. For information on how to
299 declare unique constraints, see
300 L<DBIx::Class::ResultSource/add_unique_constraint>.
301
302 =cut
303
304 sub find {
305   my $self = shift;
306   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
307
308   # Default to the primary key, but allow a specific key
309   my @cols = exists $attrs->{key}
310     ? $self->result_source->unique_constraint_columns($attrs->{key})
311     : $self->result_source->primary_columns;
312   $self->throw_exception(
313     "Can't find unless a primary key or unique constraint is defined"
314   ) unless @cols;
315
316   # Parse out a hashref from input
317   my $input_query;
318   if (ref $_[0] eq 'HASH') {
319     $input_query = { %{$_[0]} };
320   }
321   elsif (@_ == @cols) {
322     $input_query = {};
323     @{$input_query}{@cols} = @_;
324   }
325   else {
326     # Compatibility: Allow e.g. find(id => $value)
327     carp "Find by key => value deprecated; please use a hashref instead";
328     $input_query = {@_};
329   }
330
331   my @unique_queries = $self->_unique_queries($input_query, $attrs);
332
333   # Handle cases where the ResultSet defines the query, or where the user is
334   # abusing find
335   my $query = @unique_queries ? \@unique_queries : $input_query;
336
337   # Run the query
338   if (keys %$attrs) {
339     my $rs = $self->search($query, $attrs);
340     $rs->_resolve;
341     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
342   }
343   else {
344     $self->_resolve;  
345     return (keys %{$self->{_attrs}->{collapse}})
346       ? $self->search($query)->next
347       : $self->single($query);
348   }
349 }
350
351 # _unique_queries
352 #
353 # Build a list of queries which satisfy unique constraints.
354
355 sub _unique_queries {
356   my ($self, $query, $attrs) = @_;
357
358   my @constraint_names = exists $attrs->{key}
359     ? ($attrs->{key})
360     : $self->result_source->unique_constraint_names;
361
362   my @unique_queries;
363   foreach my $name (@constraint_names) {
364     my @unique_cols = $self->result_source->unique_constraint_columns($name);
365     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
366
367     next unless scalar keys %$unique_query;
368
369     # Add the ResultSet's alias
370     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
371                         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
372       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
373     }
374
375     push @unique_queries, $unique_query;
376   }
377
378   return @unique_queries;
379 }
380
381 # _build_unique_query
382 #
383 # Constrain the specified query hash based on the specified column names.
384
385 sub _build_unique_query {
386   my ($self, $query, $unique_cols) = @_;
387
388   my %unique_query =
389     map  { $_ => $query->{$_} }
390     grep { exists $query->{$_} }
391     @$unique_cols;
392
393   return \%unique_query;
394 }
395
396 =head2 search_related
397
398 =over 4
399
400 =item Arguments: $cond, \%attrs?
401
402 =item Return Value: $new_resultset
403
404 =back
405
406   $new_rs = $cd_rs->search_related('artist', {
407     name => 'Emo-R-Us',
408   });
409
410 Searches the specified relationship, optionally specifying a condition and
411 attributes for matching records. See L</ATTRIBUTES> for more information.
412
413 =cut
414
415 sub search_related {
416   return shift->related_resultset(shift)->search(@_);
417 }
418
419 =head2 cursor
420
421 =over 4
422
423 =item Arguments: none
424
425 =item Return Value: $cursor
426
427 =back
428
429 Returns a storage-driven cursor to the given resultset. See
430 L<DBIx::Class::Cursor> for more information.
431
432 =cut
433
434 sub cursor {
435   my ($self) = @_;
436
437   $self->_resolve;
438   my $attrs = { %{$self->{_attrs}} };
439   return $self->{cursor}
440     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
441           $attrs->{where},$attrs);
442 }
443
444 =head2 single
445
446 =over 4
447
448 =item Arguments: $cond?
449
450 =item Return Value: $row_object?
451
452 =back
453
454   my $cd = $schema->resultset('CD')->single({ year => 2001 });
455
456 Inflates the first result without creating a cursor if the resultset has
457 any records in it; if not returns nothing. Used by L</find> as an optimisation.
458
459 Can optionally take an additional condition *only* - this is a fast-code-path
460 method; if you need to add extra joins or similar call ->search and then
461 ->single without a condition on the $rs returned from that.
462
463 =cut
464
465 sub single {
466   my ($self, $where) = @_;
467   $self->_resolve;
468   my $attrs = { %{$self->{_attrs}} };
469   if ($where) {
470     if (defined $attrs->{where}) {
471       $attrs->{where} = {
472         '-and' =>
473             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
474                $where, delete $attrs->{where} ]
475       };
476     } else {
477       $attrs->{where} = $where;
478     }
479   }
480
481   unless ($self->_is_unique_query($attrs->{where})) {
482     carp "Query not guarnteed to return a single row"
483       . "; please declare your unique constraints or use search instead";
484   }
485
486   my @data = $self->result_source->storage->select_single(
487           $attrs->{from}, $attrs->{select},
488           $attrs->{where},$attrs);
489   return (@data ? $self->_construct_object(@data) : ());
490 }
491
492 # _is_unique_query
493 #
494 # Try to determine if the specified query is guaranteed to be unique, based on
495 # the declared unique constraints.
496
497 sub _is_unique_query {
498   my ($self, $query) = @_;
499
500   my $collapsed = $self->_collapse_query($query);
501
502         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
503   foreach my $name ($self->result_source->unique_constraint_names) {
504     my @unique_cols = map { "$alias.$_" }
505       $self->result_source->unique_constraint_columns($name);
506
507     # Count the values for each unique column
508     my %seen = map { $_ => 0 } @unique_cols;
509
510     foreach my $key (keys %$collapsed) {
511       my $aliased = $key;
512       $aliased = "$alias.$key" unless $key =~ /\./;
513
514       next unless exists $seen{$aliased};  # Additional constraints are okay
515       $seen{$aliased} = scalar @{ $collapsed->{$key} };
516     }
517
518     # If we get 0 or more than 1 value for a column, it's not necessarily unique
519     return 1 unless grep { $_ != 1 } values %seen;
520   }
521
522   return 0;
523 }
524
525 # _collapse_query
526 #
527 # Recursively collapse the query, accumulating values for each column.
528
529 sub _collapse_query {
530   my ($self, $query, $collapsed) = @_;
531
532   $collapsed ||= {};
533
534   if (ref $query eq 'ARRAY') {
535     foreach my $subquery (@$query) {
536       next unless ref $subquery;  # -or
537 #      warn "ARRAY: " . Dumper $subquery;
538       $collapsed = $self->_collapse_query($subquery, $collapsed);
539     }
540   }
541   elsif (ref $query eq 'HASH') {
542     if (keys %$query and (keys %$query)[0] eq '-and') {
543       foreach my $subquery (@{$query->{-and}}) {
544 #        warn "HASH: " . Dumper $subquery;
545         $collapsed = $self->_collapse_query($subquery, $collapsed);
546       }
547     }
548     else {
549 #      warn "LEAF: " . Dumper $query;
550       foreach my $key (keys %$query) {
551         push @{$collapsed->{$key}}, $query->{$key};
552       }
553     }
554   }
555
556   return $collapsed;
557 }
558
559 =head2 get_column
560
561 =over 4
562
563 =item Arguments: $cond?
564
565 =item Return Value: $resultsetcolumn
566
567 =back
568
569   my $max_length = $rs->get_column('length')->max;
570
571 Returns a ResultSetColumn instance for $column based on $self
572
573 =cut
574
575 sub get_column {
576   my ($self, $column) = @_;
577
578   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
579   return $new;
580 }
581
582 =head2 search_like
583
584 =over 4
585
586 =item Arguments: $cond, \%attrs?
587
588 =item Return Value: $resultset (scalar context), @row_objs (list context)
589
590 =back
591
592   # WHERE title LIKE '%blue%'
593   $cd_rs = $rs->search_like({ title => '%blue%'});
594
595 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
596 that this is simply a convenience method. You most likely want to use
597 L</search> with specific operators.
598
599 For more information, see L<DBIx::Class::Manual::Cookbook>.
600
601 =cut
602
603 sub search_like {
604   my $class = shift;
605   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
606   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
607   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
608   return $class->search($query, { %$attrs });
609 }
610
611 =head2 slice
612
613 =over 4
614
615 =item Arguments: $first, $last
616
617 =item Return Value: $resultset (scalar context), @row_objs (list context)
618
619 =back
620
621 Returns a resultset or object list representing a subset of elements from the
622 resultset slice is called on. Indexes are from 0, i.e., to get the first
623 three records, call:
624
625   my ($one, $two, $three) = $rs->slice(0, 2);
626
627 =cut
628
629 sub slice {
630   my ($self, $min, $max) = @_;
631   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
632   $attrs->{offset} = $self->{attrs}{offset} || 0;
633   $attrs->{offset} += $min;
634   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
635   return $self->search(undef(), $attrs);
636   #my $slice = (ref $self)->new($self->result_source, $attrs);
637   #return (wantarray ? $slice->all : $slice);
638 }
639
640 =head2 next
641
642 =over 4
643
644 =item Arguments: none
645
646 =item Return Value: $result?
647
648 =back
649
650 Returns the next element in the resultset (C<undef> is there is none).
651
652 Can be used to efficiently iterate over records in the resultset:
653
654   my $rs = $schema->resultset('CD')->search;
655   while (my $cd = $rs->next) {
656     print $cd->title;
657   }
658
659 Note that you need to store the resultset object, and call C<next> on it. 
660 Calling C<< resultset('Table')->next >> repeatedly will always return the
661 first record from the resultset.
662
663 =cut
664
665 sub next {
666   my ($self) = @_;
667   if (my $cache = $self->get_cache) {
668     $self->{all_cache_position} ||= 0;
669     return $cache->[$self->{all_cache_position}++];
670   }
671   if ($self->{attrs}{cache}) {
672     $self->{all_cache_position} = 1;
673     return ($self->all)[0];
674   }
675   my @row = (exists $self->{stashed_row} ?
676                @{delete $self->{stashed_row}} :
677                $self->cursor->next
678   );
679   return unless (@row);
680   return $self->_construct_object(@row);
681 }
682
683 sub _resolve {
684   my $self = shift;
685
686   return if(exists $self->{_attrs}); #return if _resolve has already been called
687
688   my $attrs = $self->{attrs};    
689   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
690
691   # XXX - lose storable dclone
692   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
693   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
694   $attrs->{record_filter} = $record_filter if ($record_filter);
695   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
696
697   my $alias = $attrs->{alias};
698  
699   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
700   delete $attrs->{as} if $attrs->{columns};
701   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
702   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
703   $attrs->{select} = [
704                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
705                       ] if $attrs->{columns};
706   $attrs->{as} ||= [
707                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
708                     ];
709   if (my $include = delete $attrs->{include_columns}) {
710       push(@{$attrs->{select}}, @$include);
711       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
712   }
713
714   $attrs->{from} ||= [ { $alias => $source->from } ];
715   $attrs->{seen_join} ||= {};
716   my %seen;
717   if (my $join = delete $attrs->{join}) {
718     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
719       if (ref $j eq 'HASH') {
720         $seen{$_} = 1 foreach keys %$j;
721       } else {
722         $seen{$j} = 1;
723       }
724     }
725     
726     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
727   }
728   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
729   $attrs->{order_by} = [ $attrs->{order_by} ] if
730       $attrs->{order_by} and !ref($attrs->{order_by});
731   $attrs->{order_by} ||= [];
732
733   if(my $seladds = delete($attrs->{'+select'})) {
734     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
735     $attrs->{select} = [
736                         @{ $attrs->{select} },
737                         map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
738                         ];
739   }
740   if(my $asadds = delete($attrs->{'+as'})) {
741     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
742     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
743   }
744   my $collapse = $attrs->{collapse} || {};
745   if (my $prefetch = delete $attrs->{prefetch}) {
746       my @pre_order;
747       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
748           if ( ref $p eq 'HASH' ) {
749               foreach my $key (keys %$p) {
750                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
751                       unless $seen{$key};
752               }
753           } else {
754               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
755                   unless $seen{$p};
756           }
757           #print "res pre: " . Dumper($p, $collapse);
758           my @prefetch = $source->resolve_prefetch(
759                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
760             
761           #print "prefetch: " . Dumper(\@prefetch);
762           push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
763           push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
764       }
765       push(@{$attrs->{order_by}}, @pre_order);
766   }
767   $attrs->{collapse} = $collapse;
768   $self->{_attrs} = $attrs;
769 }
770
771 sub _merge_attr {
772   my ($self, $a, $b, $is_prefetch) = @_;
773     
774   return $b unless $a;
775   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
776     foreach my $key (keys %{$b}) {
777       if (exists $a->{$key}) {
778         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
779       } else {
780         $a->{$key} = $b->{$key};
781       }
782     }
783     return $a;
784   } else {
785     $a = [$a] unless (ref $a eq 'ARRAY');
786     $b = [$b] unless (ref $b eq 'ARRAY');
787     
788     my $hash = {};
789     my $array = [];      
790     foreach ($a, $b) {
791       foreach my $element (@{$_}) {
792         if (ref $element eq 'HASH') {
793           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
794         } elsif (ref $element eq 'ARRAY') {
795           $array = [@{$array}, @{$element}];
796         } else {        
797           if (($b == $_) && $is_prefetch) {
798             $self->_merge_array($array, $element, $is_prefetch);
799           } else {
800             push(@{$array}, $element);
801           }
802         }
803       }
804     }
805
806                 my $final_array = [];
807                 foreach my $element (@{$array}) {
808                         push(@{$final_array}, $element) unless (exists $hash->{$element});
809                 }
810                 $array = $final_array;
811
812     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
813       return [$hash, @{$array}];
814     } else {    
815       return (keys %{$hash}) ? $hash : $array;
816     }
817   }
818 }
819
820 sub _merge_array {
821   my ($self, $a, $b) = @_;
822   
823   $b = [$b] unless (ref $b eq 'ARRAY');
824   # add elements from @{$b} to @{$a} which aren't already in @{$a}
825   foreach my $b_element (@{$b}) {
826     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
827   }
828 }
829
830 sub _construct_object {
831   my ($self, @row) = @_;
832   my @as = @{ $self->{_attrs}{as} };
833   
834   #print "row in: " . Dumper(\@row);
835   my $info = $self->_collapse_result(\@as, \@row);
836   my $new = $self->result_class->inflate_result($self->result_source, @$info);
837   $new = $self->{_attrs}{record_filter}->($new)
838     if exists $self->{_attrs}{record_filter};
839   return $new;
840 }
841
842 sub _collapse_result {
843   my ($self, $as, $row, $prefix) = @_;
844
845   my $live_join = $self->{attrs}->{_live_join} ||="";
846   my %const;
847
848   my @copy = @$row;
849   foreach my $this_as (@$as) {
850     my $val = shift @copy;
851     if (defined $prefix) {
852       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
853         my $remain = $1;
854         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
855         $const{$1||''}{$2} = $val;
856       }
857     } else {
858       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
859       $const{$1||''}{$2} = $val;
860     }
861   }
862
863   my $info = [ {}, {} ];
864   foreach my $key (keys %const) {
865     if (length $key && $key ne $live_join) {
866       my $target = $info;
867       my @parts = split(/\./, $key);
868       foreach my $p (@parts) {
869         $target = $target->[1]->{$p} ||= [];
870       }
871       $target->[0] = $const{$key};
872     } else {
873       $info->[0] = $const{$key};
874     }
875   }
876   my @collapse;
877
878   #print "collapse: " . Dumper($self->{_attrs}->{collapse});
879   if (defined $prefix) {
880     @collapse = map {
881         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
882     } keys %{$self->{_attrs}->{collapse}}
883   } else {
884     @collapse = keys %{$self->{_attrs}->{collapse}};
885   };
886
887   if (@collapse) {
888     my ($c) = sort { length $a <=> length $b } @collapse;
889     my $target = $info;
890     foreach my $p (split(/\./, $c)) {
891       $target = $target->[1]->{$p} ||= [];
892     }
893     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
894     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
895     my $tree = $self->_collapse_result($as, $row, $c_prefix);
896     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
897     my (@final, @raw);
898
899     #print "les free: " . Dumper($tree->[0], \%co_check, \@co_key);
900     while ( !(grep {
901                 !defined($tree->[0]->{$_}) ||
902                 $co_check{$_} ne $tree->[0]->{$_}
903               } @co_key) ) {
904       push(@final, $tree);
905       last unless (@raw = $self->cursor->next);
906       $row = $self->{stashed_row} = \@raw;
907       $tree = $self->_collapse_result($as, $row, $c_prefix);
908     }
909     @$target = (@final ? @final : [ {}, {} ]); 
910       # single empty result to indicate an empty prefetched has_many
911   }
912
913   # get prefetch tree back to result_source level
914   # $self could be a related resultset
915   #if ($self->{attrs}->{_live_join_stack}) {
916    # foreach (@{$self->{attrs}->{_live_join_stack}}) {
917     #  $info->[1] = $info->[1]->{$_}->[1] if(exists $info->[1]->{$_});
918     #}
919   #}
920   #print "final info: " . Dumper($info);
921   return $info;
922 }
923
924 =head2 result_source
925
926 =over 4
927
928 =item Arguments: $result_source?
929
930 =item Return Value: $result_source
931
932 =back
933
934 An accessor for the primary ResultSource object from which this ResultSet
935 is derived.
936
937 =cut
938
939
940 =head2 count
941
942 =over 4
943
944 =item Arguments: $cond, \%attrs??
945
946 =item Return Value: $count
947
948 =back
949
950 Performs an SQL C<COUNT> with the same query as the resultset was built
951 with to find the number of elements. If passed arguments, does a search
952 on the resultset and counts the results of that.
953
954 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
955 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
956 not support C<DISTINCT> with multiple columns. If you are using such a
957 database, you should only use columns from the main table in your C<group_by>
958 clause.
959
960 =cut
961
962 sub count {
963   my $self = shift;
964   return $self->search(@_)->count if @_ and defined $_[0];
965   return scalar @{ $self->get_cache } if $self->get_cache;
966   my $count = $self->_count;
967   return 0 unless $count;
968
969   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
970   $count = $self->{attrs}{rows} if
971     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
972   return $count;
973 }
974
975 sub _count { # Separated out so pager can get the full count
976   my $self = shift;
977   my $select = { count => '*' };
978   
979   $self->_resolve;
980   my $attrs = { %{ $self->{_attrs} } };
981   if (my $group_by = delete $attrs->{group_by}) {
982     delete $attrs->{having};
983     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
984     # todo: try CONCAT for multi-column pk
985     my @pk = $self->result_source->primary_columns;
986     if (@pk == 1) {
987       foreach my $column (@distinct) {
988         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
989           @distinct = ($column);
990           last;
991         }
992       }
993     }
994
995     $select = { count => { distinct => \@distinct } };
996   }
997
998   $attrs->{select} = $select;
999   $attrs->{as} = [qw/count/];
1000
1001   # offset, order by and page are not needed to count. record_filter is cdbi
1002   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1003         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1004         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
1005
1006   my ($count) = $tmp_rs->cursor->next;
1007   return $count;
1008 }
1009
1010 =head2 count_literal
1011
1012 =over 4
1013
1014 =item Arguments: $sql_fragment, @bind_values
1015
1016 =item Return Value: $count
1017
1018 =back
1019
1020 Counts the results in a literal query. Equivalent to calling L</search_literal>
1021 with the passed arguments, then L</count>.
1022
1023 =cut
1024
1025 sub count_literal { shift->search_literal(@_)->count; }
1026
1027 =head2 all
1028
1029 =over 4
1030
1031 =item Arguments: none
1032
1033 =item Return Value: @objects
1034
1035 =back
1036
1037 Returns all elements in the resultset. Called implicitly if the resultset
1038 is returned in list context.
1039
1040 =cut
1041
1042 sub all {
1043   my ($self) = @_;
1044   return @{ $self->get_cache } if $self->get_cache;
1045
1046   my @obj;
1047
1048   # TODO: don't call resolve here
1049   $self->_resolve;
1050   if (keys %{$self->{_attrs}->{collapse}}) {
1051 #  if ($self->{attrs}->{prefetch}) {
1052       # Using $self->cursor->all is really just an optimisation.
1053       # If we're collapsing has_many prefetches it probably makes
1054       # very little difference, and this is cleaner than hacking
1055       # _construct_object to survive the approach
1056     my @row = $self->cursor->next;
1057     while (@row) {
1058       push(@obj, $self->_construct_object(@row));
1059       @row = (exists $self->{stashed_row}
1060                ? @{delete $self->{stashed_row}}
1061                : $self->cursor->next);
1062     }
1063   } else {
1064     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1065   }
1066
1067   $self->set_cache(\@obj) if $self->{attrs}{cache};
1068   return @obj;
1069 }
1070
1071 =head2 reset
1072
1073 =over 4
1074
1075 =item Arguments: none
1076
1077 =item Return Value: $self
1078
1079 =back
1080
1081 Resets the resultset's cursor, so you can iterate through the elements again.
1082
1083 =cut
1084
1085 sub reset {
1086   my ($self) = @_;
1087   delete $self->{_attrs} if (exists $self->{_attrs});
1088
1089   $self->{all_cache_position} = 0;
1090   $self->cursor->reset;
1091   return $self;
1092 }
1093
1094 =head2 first
1095
1096 =over 4
1097
1098 =item Arguments: none
1099
1100 =item Return Value: $object?
1101
1102 =back
1103
1104 Resets the resultset and returns an object for the first result (if the
1105 resultset returns anything).
1106
1107 =cut
1108
1109 sub first {
1110   return $_[0]->reset->next;
1111 }
1112
1113 # _cond_for_update_delete
1114 #
1115 # update/delete require the condition to be modified to handle
1116 # the differing SQL syntax available.  This transforms the $self->{cond}
1117 # appropriately, returning the new condition.
1118
1119 sub _cond_for_update_delete {
1120   my ($self) = @_;
1121   my $cond = {};
1122
1123   if (!ref($self->{cond})) {
1124     # No-op. No condition, we're updating/deleting everything
1125   }
1126   elsif (ref $self->{cond} eq 'ARRAY') {
1127     $cond = [
1128       map {
1129         my %hash;
1130         foreach my $key (keys %{$_}) {
1131           $key =~ /([^.]+)$/;
1132           $hash{$1} = $_->{$key};
1133         }
1134         \%hash;
1135       } @{$self->{cond}}
1136     ];
1137   }
1138   elsif (ref $self->{cond} eq 'HASH') {
1139     if ((keys %{$self->{cond}})[0] eq '-and') {
1140       $cond->{-and} = [];
1141
1142       my @cond = @{$self->{cond}{-and}};
1143       for (my $i = 0; $i <= @cond - 1; $i++) {
1144         my $entry = $cond[$i];
1145
1146         my %hash;
1147         if (ref $entry eq 'HASH') {
1148           foreach my $key (keys %{$entry}) {
1149             $key =~ /([^.]+)$/;
1150             $hash{$1} = $entry->{$key};
1151           }
1152         }
1153         else {
1154           $entry =~ /([^.]+)$/;
1155           $hash{$1} = $cond[++$i];
1156         }
1157
1158         push @{$cond->{-and}}, \%hash;
1159       }
1160     }
1161     else {
1162       foreach my $key (keys %{$self->{cond}}) {
1163         $key =~ /([^.]+)$/;
1164         $cond->{$1} = $self->{cond}{$key};
1165       }
1166     }
1167   }
1168   else {
1169     $self->throw_exception(
1170       "Can't update/delete on resultset with condition unless hash or array"
1171     );
1172   }
1173
1174   return $cond;
1175 }
1176
1177
1178 =head2 update
1179
1180 =over 4
1181
1182 =item Arguments: \%values
1183
1184 =item Return Value: $storage_rv
1185
1186 =back
1187
1188 Sets the specified columns in the resultset to the supplied values in a
1189 single query. Return value will be true if the update succeeded or false
1190 if no records were updated; exact type of success value is storage-dependent.
1191
1192 =cut
1193
1194 sub update {
1195   my ($self, $values) = @_;
1196   $self->throw_exception("Values for update must be a hash")
1197     unless ref $values eq 'HASH';
1198
1199   my $cond = $self->_cond_for_update_delete;
1200
1201   return $self->result_source->storage->update(
1202     $self->result_source->from, $values, $cond
1203   );
1204 }
1205
1206 =head2 update_all
1207
1208 =over 4
1209
1210 =item Arguments: \%values
1211
1212 =item Return Value: 1
1213
1214 =back
1215
1216 Fetches all objects and updates them one at a time. Note that C<update_all>
1217 will run DBIC cascade triggers, while L</update> will not.
1218
1219 =cut
1220
1221 sub update_all {
1222   my ($self, $values) = @_;
1223   $self->throw_exception("Values for update must be a hash")
1224     unless ref $values eq 'HASH';
1225   foreach my $obj ($self->all) {
1226     $obj->set_columns($values)->update;
1227   }
1228   return 1;
1229 }
1230
1231 =head2 delete
1232
1233 =over 4
1234
1235 =item Arguments: none
1236
1237 =item Return Value: 1
1238
1239 =back
1240
1241 Deletes the contents of the resultset from its result source. Note that this
1242 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1243 to run.
1244
1245 =cut
1246
1247 sub delete {
1248   my ($self) = @_;
1249   my $del = {};
1250
1251   my $cond = $self->_cond_for_update_delete;
1252
1253   $self->result_source->storage->delete($self->result_source->from, $cond);
1254   return 1;
1255 }
1256
1257 =head2 delete_all
1258
1259 =over 4
1260
1261 =item Arguments: none
1262
1263 =item Return Value: 1
1264
1265 =back
1266
1267 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1268 will run DBIC cascade triggers, while L</delete> will not.
1269
1270 =cut
1271
1272 sub delete_all {
1273   my ($self) = @_;
1274   $_->delete for $self->all;
1275   return 1;
1276 }
1277
1278 =head2 pager
1279
1280 =over 4
1281
1282 =item Arguments: none
1283
1284 =item Return Value: $pager
1285
1286 =back
1287
1288 Return Value a L<Data::Page> object for the current resultset. Only makes
1289 sense for queries with a C<page> attribute.
1290
1291 =cut
1292
1293 sub pager {
1294   my ($self) = @_;
1295   my $attrs = $self->{attrs};
1296   $self->throw_exception("Can't create pager for non-paged rs")
1297     unless $self->{page};
1298   $attrs->{rows} ||= 10;
1299   return $self->{pager} ||= Data::Page->new(
1300     $self->_count, $attrs->{rows}, $self->{page});
1301 }
1302
1303 =head2 page
1304
1305 =over 4
1306
1307 =item Arguments: $page_number
1308
1309 =item Return Value: $rs
1310
1311 =back
1312
1313 Returns a resultset for the $page_number page of the resultset on which page
1314 is called, where each page contains a number of rows equal to the 'rows'
1315 attribute set on the resultset (10 by default).
1316
1317 =cut
1318
1319 sub page {
1320   my ($self, $page) = @_;
1321   my $attrs = { %{$self->{attrs}} };
1322   $attrs->{page} = $page;
1323   return (ref $self)->new($self->result_source, $attrs);
1324 }
1325
1326 =head2 new_result
1327
1328 =over 4
1329
1330 =item Arguments: \%vals
1331
1332 =item Return Value: $object
1333
1334 =back
1335
1336 Creates an object in the resultset's result class and returns it.
1337
1338 =cut
1339
1340 sub new_result {
1341   my ($self, $values) = @_;
1342   $self->throw_exception( "new_result needs a hash" )
1343     unless (ref $values eq 'HASH');
1344   $self->throw_exception(
1345     "Can't abstract implicit construct, condition not a hash"
1346   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1347   my %new = %$values;
1348   my $alias = $self->{attrs}{alias};
1349   foreach my $key (keys %{$self->{cond}||{}}) {
1350     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1351   }
1352   my $obj = $self->result_class->new(\%new);
1353   $obj->result_source($self->result_source) if $obj->can('result_source');
1354   return $obj;
1355 }
1356
1357 =head2 find_or_new
1358
1359 =over 4
1360
1361 =item Arguments: \%vals, \%attrs?
1362
1363 =item Return Value: $object
1364
1365 =back
1366
1367 Find an existing record from this resultset. If none exists, instantiate a new
1368 result object and return it. The object will not be saved into your storage
1369 until you call L<DBIx::Class::Row/insert> on it.
1370
1371 If you want objects to be saved immediately, use L</find_or_create> instead.
1372
1373 =cut
1374
1375 sub find_or_new {
1376   my $self     = shift;
1377   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1378   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1379   my $exists   = $self->find($hash, $attrs);
1380   return defined $exists ? $exists : $self->new_result($hash);
1381 }
1382
1383 =head2 create
1384
1385 =over 4
1386
1387 =item Arguments: \%vals
1388
1389 =item Return Value: $object
1390
1391 =back
1392
1393 Inserts a record into the resultset and returns the object representing it.
1394
1395 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1396
1397 =cut
1398
1399 sub create {
1400   my ($self, $attrs) = @_;
1401   $self->throw_exception( "create needs a hashref" )
1402     unless ref $attrs eq 'HASH';
1403   return $self->new_result($attrs)->insert;
1404 }
1405
1406 =head2 find_or_create
1407
1408 =over 4
1409
1410 =item Arguments: \%vals, \%attrs?
1411
1412 =item Return Value: $object
1413
1414 =back
1415
1416   $class->find_or_create({ key => $val, ... });
1417
1418 Tries to find a record based on its primary key or unique constraint; if none
1419 is found, creates one and returns that instead.
1420
1421   my $cd = $schema->resultset('CD')->find_or_create({
1422     cdid   => 5,
1423     artist => 'Massive Attack',
1424     title  => 'Mezzanine',
1425     year   => 2005,
1426   });
1427
1428 Also takes an optional C<key> attribute, to search by a specific key or unique
1429 constraint. For example:
1430
1431   my $cd = $schema->resultset('CD')->find_or_create(
1432     {
1433       artist => 'Massive Attack',
1434       title  => 'Mezzanine',
1435     },
1436     { key => 'cd_artist_title' }
1437   );
1438
1439 See also L</find> and L</update_or_create>. For information on how to declare
1440 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1441
1442 =cut
1443
1444 sub find_or_create {
1445   my $self     = shift;
1446   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1447   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1448   my $exists   = $self->find($hash, $attrs);
1449   return defined $exists ? $exists : $self->create($hash);
1450 }
1451
1452 =head2 update_or_create
1453
1454 =over 4
1455
1456 =item Arguments: \%col_values, { key => $unique_constraint }?
1457
1458 =item Return Value: $object
1459
1460 =back
1461
1462   $class->update_or_create({ col => $val, ... });
1463
1464 First, searches for an existing row matching one of the unique constraints
1465 (including the primary key) on the source of this resultset. If a row is
1466 found, updates it with the other given column values. Otherwise, creates a new
1467 row.
1468
1469 Takes an optional C<key> attribute to search on a specific unique constraint.
1470 For example:
1471
1472   # In your application
1473   my $cd = $schema->resultset('CD')->update_or_create(
1474     {
1475       artist => 'Massive Attack',
1476       title  => 'Mezzanine',
1477       year   => 1998,
1478     },
1479     { key => 'cd_artist_title' }
1480   );
1481
1482 If no C<key> is specified, it searches on all unique constraints defined on the
1483 source, including the primary key.
1484
1485 If the C<key> is specified as C<primary>, it searches only on the primary key.
1486
1487 See also L</find> and L</find_or_create>. For information on how to declare
1488 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1489
1490 =cut
1491
1492 sub update_or_create {
1493   my $self = shift;
1494   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1495   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1496
1497   my $row = $self->find($cond);
1498   if (defined $row) {
1499     $row->update($cond);
1500     return $row;
1501   }
1502
1503   return $self->create($cond);
1504 }
1505
1506 =head2 get_cache
1507
1508 =over 4
1509
1510 =item Arguments: none
1511
1512 =item Return Value: \@cache_objects?
1513
1514 =back
1515
1516 Gets the contents of the cache for the resultset, if the cache is set.
1517
1518 =cut
1519
1520 sub get_cache {
1521   shift->{all_cache};
1522 }
1523
1524 =head2 set_cache
1525
1526 =over 4
1527
1528 =item Arguments: \@cache_objects
1529
1530 =item Return Value: \@cache_objects
1531
1532 =back
1533
1534 Sets the contents of the cache for the resultset. Expects an arrayref
1535 of objects of the same class as those produced by the resultset. Note that
1536 if the cache is set the resultset will return the cached objects rather
1537 than re-querying the database even if the cache attr is not set.
1538
1539 =cut
1540
1541 sub set_cache {
1542   my ( $self, $data ) = @_;
1543   $self->throw_exception("set_cache requires an arrayref")
1544       if defined($data) && (ref $data ne 'ARRAY');
1545   $self->{all_cache} = $data;
1546 }
1547
1548 =head2 clear_cache
1549
1550 =over 4
1551
1552 =item Arguments: none
1553
1554 =item Return Value: []
1555
1556 =back
1557
1558 Clears the cache for the resultset.
1559
1560 =cut
1561
1562 sub clear_cache {
1563   shift->set_cache(undef);
1564 }
1565
1566 =head2 related_resultset
1567
1568 =over 4
1569
1570 =item Arguments: $relationship_name
1571
1572 =item Return Value: $resultset
1573
1574 =back
1575
1576 Returns a related resultset for the supplied relationship name.
1577
1578   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1579
1580 =cut
1581
1582 sub related_resultset {
1583   my ( $self, $rel ) = @_;
1584   
1585   $self->{related_resultsets} ||= {};
1586   return $self->{related_resultsets}{$rel} ||= do {
1587     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1588     my $rel_obj = $self->result_source->relationship_info($rel);
1589     $self->throw_exception(
1590         "search_related: result source '" . $self->result_source->name .
1591         "' has no such relationship ${rel}")
1592         unless $rel_obj; #die Dumper $self->{attrs};
1593
1594                 my $live_join_stack = $self->{attrs}->{_live_join_stack} || [];         
1595                 push(@{$live_join_stack}, $rel);
1596                 
1597     my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1598            )->search( undef,
1599                       { select => undef,
1600                         as => undef,
1601                         _live_join => $rel, #the most recent
1602                         _live_join_stack => $live_join_stack, #the trail of rels
1603                         _parent_attrs => $self->{attrs}}
1604                       );    
1605
1606     # keep reference of the original resultset
1607     $rs->{_parent_rs} = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->result_source;
1608     return $rs;
1609   };
1610 }
1611
1612 =head2 throw_exception
1613
1614 See L<DBIx::Class::Schema/throw_exception> for details.
1615
1616 =cut
1617
1618 sub throw_exception {
1619   my $self=shift;
1620   $self->result_source->schema->throw_exception(@_);
1621 }
1622
1623 # XXX: FIXME: Attributes docs need clearing up
1624
1625 =head1 ATTRIBUTES
1626
1627 The resultset takes various attributes that modify its behavior. Here's an
1628 overview of them:
1629
1630 =head2 order_by
1631
1632 =over 4
1633
1634 =item Value: ($order_by | \@order_by)
1635
1636 =back
1637
1638 Which column(s) to order the results by. This is currently passed
1639 through directly to SQL, so you can give e.g. C<year DESC> for a
1640 descending order on the column `year'.
1641
1642 Please note that if you have quoting enabled (see 
1643 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1644 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1645 so you will need to manually quote things as appropriate.)
1646
1647 =head2 columns
1648
1649 =over 4
1650
1651 =item Value: \@columns
1652
1653 =back
1654
1655 Shortcut to request a particular set of columns to be retrieved.  Adds
1656 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1657 from that, then auto-populates C<as> from C<select> as normal. (You may also
1658 use the C<cols> attribute, as in earlier versions of DBIC.)
1659
1660 =head2 include_columns
1661
1662 =over 4
1663
1664 =item Value: \@columns
1665
1666 =back
1667
1668 Shortcut to include additional columns in the returned results - for example
1669
1670   $schema->resultset('CD')->search(undef, {
1671     include_columns => ['artist.name'],
1672     join => ['artist']
1673   });
1674
1675 would return all CDs and include a 'name' column to the information
1676 passed to object inflation
1677
1678 =head2 select
1679
1680 =over 4
1681
1682 =item Value: \@select_columns
1683
1684 =back
1685
1686 Indicates which columns should be selected from the storage. You can use
1687 column names, or in the case of RDBMS back ends, function or stored procedure
1688 names:
1689
1690   $rs = $schema->resultset('Employee')->search(undef, {
1691     select => [
1692       'name',
1693       { count => 'employeeid' },
1694       { sum => 'salary' }
1695     ]
1696   });
1697
1698 When you use function/stored procedure names and do not supply an C<as>
1699 attribute, the column names returned are storage-dependent. E.g. MySQL would
1700 return a column named C<count(employeeid)> in the above example.
1701
1702 =head2 +select
1703
1704 =over 4
1705
1706 Indicates additional columns to be selected from storage.  Works the same as
1707 L<select> but adds columns to the selection.
1708
1709 =back
1710
1711 =head2 +as
1712
1713 =over 4
1714
1715 Indicates additional column names for those added via L<+select>.
1716
1717 =back
1718
1719 =head2 as
1720
1721 =over 4
1722
1723 =item Value: \@inflation_names
1724
1725 =back
1726
1727 Indicates column names for object inflation. This is used in conjunction with
1728 C<select>, usually when C<select> contains one or more function or stored
1729 procedure names:
1730
1731   $rs = $schema->resultset('Employee')->search(undef, {
1732     select => [
1733       'name',
1734       { count => 'employeeid' }
1735     ],
1736     as => ['name', 'employee_count'],
1737   });
1738
1739   my $employee = $rs->first(); # get the first Employee
1740
1741 If the object against which the search is performed already has an accessor
1742 matching a column name specified in C<as>, the value can be retrieved using
1743 the accessor as normal:
1744
1745   my $name = $employee->name();
1746
1747 If on the other hand an accessor does not exist in the object, you need to
1748 use C<get_column> instead:
1749
1750   my $employee_count = $employee->get_column('employee_count');
1751
1752 You can create your own accessors if required - see
1753 L<DBIx::Class::Manual::Cookbook> for details.
1754
1755 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1756 produced, it is used for internal access only. Thus attempting to use the accessor
1757 in an C<order_by> clause or similar will fail misrably.
1758
1759 =head2 join
1760
1761 =over 4
1762
1763 =item Value: ($rel_name | \@rel_names | \%rel_names)
1764
1765 =back
1766
1767 Contains a list of relationships that should be joined for this query.  For
1768 example:
1769
1770   # Get CDs by Nine Inch Nails
1771   my $rs = $schema->resultset('CD')->search(
1772     { 'artist.name' => 'Nine Inch Nails' },
1773     { join => 'artist' }
1774   );
1775
1776 Can also contain a hash reference to refer to the other relation's relations.
1777 For example:
1778
1779   package MyApp::Schema::Track;
1780   use base qw/DBIx::Class/;
1781   __PACKAGE__->table('track');
1782   __PACKAGE__->add_columns(qw/trackid cd position title/);
1783   __PACKAGE__->set_primary_key('trackid');
1784   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1785   1;
1786
1787   # In your application
1788   my $rs = $schema->resultset('Artist')->search(
1789     { 'track.title' => 'Teardrop' },
1790     {
1791       join     => { cd => 'track' },
1792       order_by => 'artist.name',
1793     }
1794   );
1795
1796 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1797 similarly for a third time). For e.g.
1798
1799   my $rs = $schema->resultset('Artist')->search({
1800     'cds.title'   => 'Down to Earth',
1801     'cds_2.title' => 'Popular',
1802   }, {
1803     join => [ qw/cds cds/ ],
1804   });
1805
1806 will return a set of all artists that have both a cd with title 'Down
1807 to Earth' and a cd with title 'Popular'.
1808
1809 If you want to fetch related objects from other tables as well, see C<prefetch>
1810 below.
1811
1812 =head2 prefetch
1813
1814 =over 4
1815
1816 =item Value: ($rel_name | \@rel_names | \%rel_names)
1817
1818 =back
1819
1820 Contains one or more relationships that should be fetched along with the main
1821 query (when they are accessed afterwards they will have already been
1822 "prefetched").  This is useful for when you know you will need the related
1823 objects, because it saves at least one query:
1824
1825   my $rs = $schema->resultset('Tag')->search(
1826     undef,
1827     {
1828       prefetch => {
1829         cd => 'artist'
1830       }
1831     }
1832   );
1833
1834 The initial search results in SQL like the following:
1835
1836   SELECT tag.*, cd.*, artist.* FROM tag
1837   JOIN cd ON tag.cd = cd.cdid
1838   JOIN artist ON cd.artist = artist.artistid
1839
1840 L<DBIx::Class> has no need to go back to the database when we access the
1841 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1842 case.
1843
1844 Simple prefetches will be joined automatically, so there is no need
1845 for a C<join> attribute in the above search. If you're prefetching to
1846 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1847 specify the join as well.
1848
1849 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1850 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1851 with an accessor type of 'single' or 'filter').
1852
1853 =head2 page
1854
1855 =over 4
1856
1857 =item Value: $page
1858
1859 =back
1860
1861 Makes the resultset paged and specifies the page to retrieve. Effectively
1862 identical to creating a non-pages resultset and then calling ->page($page)
1863 on it. 
1864
1865 If L<rows> attribute is not specified it defualts to 10 rows per page.
1866
1867 =head2 rows
1868
1869 =over 4
1870
1871 =item Value: $rows
1872
1873 =back
1874
1875 Specifes the maximum number of rows for direct retrieval or the number of
1876 rows per page if the page attribute or method is used.
1877
1878 =head2 offset
1879
1880 =over 4
1881
1882 =item Value: $offset
1883
1884 =back
1885
1886 Specifies the (zero-based) row number for the  first row to be returned, or the
1887 of the first row of the first page if paging is used.
1888
1889 =head2 group_by
1890
1891 =over 4
1892
1893 =item Value: \@columns
1894
1895 =back
1896
1897 A arrayref of columns to group by. Can include columns of joined tables.
1898
1899   group_by => [qw/ column1 column2 ... /]
1900
1901 =head2 having
1902
1903 =over 4
1904
1905 =item Value: $condition
1906
1907 =back
1908
1909 HAVING is a select statement attribute that is applied between GROUP BY and
1910 ORDER BY. It is applied to the after the grouping calculations have been
1911 done. 
1912
1913   having => { 'count(employee)' => { '>=', 100 } }
1914
1915 =head2 distinct
1916
1917 =over 4
1918
1919 =item Value: (0 | 1)
1920
1921 =back
1922
1923 Set to 1 to group by all columns.
1924
1925 =head2 cache
1926
1927 Set to 1 to cache search results. This prevents extra SQL queries if you
1928 revisit rows in your ResultSet:
1929
1930   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1931   
1932   while( my $artist = $resultset->next ) {
1933     ... do stuff ...
1934   }
1935
1936   $rs->first; # without cache, this would issue a query
1937
1938 By default, searches are not cached.
1939
1940 For more examples of using these attributes, see
1941 L<DBIx::Class::Manual::Cookbook>.
1942
1943 =head2 from
1944
1945 =over 4
1946
1947 =item Value: \@from_clause
1948
1949 =back
1950
1951 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1952 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1953 clauses.
1954
1955 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1956
1957 C<join> will usually do what you need and it is strongly recommended that you
1958 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1959 And we really do mean "cannot", not just tried and failed. Attempting to use
1960 this because you're having problems with C<join> is like trying to use x86
1961 ASM because you've got a syntax error in your C. Trust us on this.
1962
1963 Now, if you're still really, really sure you need to use this (and if you're
1964 not 100% sure, ask the mailing list first), here's an explanation of how this
1965 works.
1966
1967 The syntax is as follows -
1968
1969   [
1970     { <alias1> => <table1> },
1971     [
1972       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1973       [], # nested JOIN (optional)
1974       { <table1.column1> => <table2.column2>, ... (more conditions) },
1975     ],
1976     # More of the above [ ] may follow for additional joins
1977   ]
1978
1979   <table1> <alias1>
1980   JOIN
1981     <table2> <alias2>
1982     [JOIN ...]
1983   ON <table1.column1> = <table2.column2>
1984   <more joins may follow>
1985
1986 An easy way to follow the examples below is to remember the following:
1987
1988     Anything inside "[]" is a JOIN
1989     Anything inside "{}" is a condition for the enclosing JOIN
1990
1991 The following examples utilize a "person" table in a family tree application.
1992 In order to express parent->child relationships, this table is self-joined:
1993
1994     # Person->belongs_to('father' => 'Person');
1995     # Person->belongs_to('mother' => 'Person');
1996
1997 C<from> can be used to nest joins. Here we return all children with a father,
1998 then search against all mothers of those children:
1999
2000   $rs = $schema->resultset('Person')->search(
2001       undef,
2002       {
2003           alias => 'mother', # alias columns in accordance with "from"
2004           from => [
2005               { mother => 'person' },
2006               [
2007                   [
2008                       { child => 'person' },
2009                       [
2010                           { father => 'person' },
2011                           { 'father.person_id' => 'child.father_id' }
2012                       ]
2013                   ],
2014                   { 'mother.person_id' => 'child.mother_id' }
2015               ],
2016           ]
2017       },
2018   );
2019
2020   # Equivalent SQL:
2021   # SELECT mother.* FROM person mother
2022   # JOIN (
2023   #   person child
2024   #   JOIN person father
2025   #   ON ( father.person_id = child.father_id )
2026   # )
2027   # ON ( mother.person_id = child.mother_id )
2028
2029 The type of any join can be controlled manually. To search against only people
2030 with a father in the person table, we could explicitly use C<INNER JOIN>:
2031
2032     $rs = $schema->resultset('Person')->search(
2033         undef,
2034         {
2035             alias => 'child', # alias columns in accordance with "from"
2036             from => [
2037                 { child => 'person' },
2038                 [
2039                     { father => 'person', -join_type => 'inner' },
2040                     { 'father.id' => 'child.father_id' }
2041                 ],
2042             ]
2043         },
2044     );
2045
2046     # Equivalent SQL:
2047     # SELECT child.* FROM person child
2048     # INNER JOIN person father ON child.father_id = father.id
2049
2050 =cut
2051
2052 1;