tightened up deep search_related
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs}) ? { %{$attrs->{_parent_attrs}} } : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168         # XXX this is getting messy
169         if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
170                 my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack};
171                 foreach (reverse @{$live_join}) {
172                         $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
173                 }
174         }
175
176   # merge new attrs into old
177   foreach my $key (qw/join prefetch/) {
178     next unless (exists $attrs->{$key});
179     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
180                         my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack};
181                         foreach (@{$live_join}) {
182                                 $attrs->{$key} = { $_ => $attrs->{$key} };
183                         }
184     }
185     if ($attrs->{_live_join} || $our_attrs->{_live_join}) {
186       $attrs->{$key} = { ($attrs->{_live_join}) ? $attrs->{_live_join} : $our_attrs->{_live_join} => $attrs->{$key} };
187     }
188     if (exists $our_attrs->{$key}) {
189       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
190     } else {
191       $our_attrs->{$key} = $attrs->{$key};
192     }
193     delete $attrs->{$key};
194   }
195
196         $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $attrs->{_live_join_h}, 1) if ($attrs->{_live_join_h});
197
198   if (exists $our_attrs->{prefetch}) {
199       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
200   }
201
202   my $new_attrs = { %{$our_attrs}, %{$attrs} };
203   my $where = (@_
204                 ? ((@_ == 1 || ref $_[0] eq "HASH")
205                     ? shift
206                     : ((@_ % 2)
207                         ? $self->throw_exception(
208                             "Odd number of arguments to search")
209                         : {@_}))
210                 : undef());
211   if (defined $where) {
212     $new_attrs->{where} = (defined $new_attrs->{where}
213               ? { '-and' =>
214                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
215                       $where, $new_attrs->{where} ] }
216               : $where);
217   }
218
219   if (defined $having) {
220     $new_attrs->{having} = (defined $new_attrs->{having}
221               ? { '-and' =>
222                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
223                       $having, $new_attrs->{having} ] }
224               : $having);
225   }
226
227   my $rs = (ref $self)->new($self->result_source, $new_attrs);
228   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
229
230   unless (@_) { # no search, effectively just a clone
231     my $rows = $self->get_cache;
232     if ($rows) {
233       $rs->set_cache($rows);
234     }
235   }
236   
237   return $rs;
238 }
239
240 =head2 search_literal
241
242 =over 4
243
244 =item Arguments: $sql_fragment, @bind_values
245
246 =item Return Value: $resultset (scalar context), @row_objs (list context)
247
248 =back
249
250   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
251   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
252
253 Pass a literal chunk of SQL to be added to the conditional part of the
254 resultset query.
255
256 =cut
257
258 sub search_literal {
259   my ($self, $cond, @vals) = @_;
260   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
261   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
262   return $self->search(\$cond, $attrs);
263 }
264
265 =head2 find
266
267 =over 4
268
269 =item Arguments: @values | \%cols, \%attrs?
270
271 =item Return Value: $row_object
272
273 =back
274
275 Finds a row based on its primary key or unique constraint. For example, to find
276 a row by its primary key:
277
278   my $cd = $schema->resultset('CD')->find(5);
279
280 You can also find a row by a specific unique constraint using the C<key>
281 attribute. For example:
282
283   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
284
285 Additionally, you can specify the columns explicitly by name:
286
287   my $cd = $schema->resultset('CD')->find(
288     {
289       artist => 'Massive Attack',
290       title  => 'Mezzanine',
291     },
292     { key => 'cd_artist_title' }
293   );
294
295 If the C<key> is specified as C<primary>, it searches only on the primary key.
296
297 If no C<key> is specified, it searches on all unique constraints defined on the
298 source, including the primary key.
299
300 See also L</find_or_create> and L</update_or_create>. For information on how to
301 declare unique constraints, see
302 L<DBIx::Class::ResultSource/add_unique_constraint>.
303
304 =cut
305
306 sub find {
307   my $self = shift;
308   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
309
310   # Default to the primary key, but allow a specific key
311   my @cols = exists $attrs->{key}
312     ? $self->result_source->unique_constraint_columns($attrs->{key})
313     : $self->result_source->primary_columns;
314   $self->throw_exception(
315     "Can't find unless a primary key or unique constraint is defined"
316   ) unless @cols;
317
318   # Parse out a hashref from input
319   my $input_query;
320   if (ref $_[0] eq 'HASH') {
321     $input_query = { %{$_[0]} };
322   }
323   elsif (@_ == @cols) {
324     $input_query = {};
325     @{$input_query}{@cols} = @_;
326   }
327   else {
328     # Compatibility: Allow e.g. find(id => $value)
329     carp "Find by key => value deprecated; please use a hashref instead";
330     $input_query = {@_};
331   }
332
333   my @unique_queries = $self->_unique_queries($input_query, $attrs);
334
335   # Handle cases where the ResultSet defines the query, or where the user is
336   # abusing find
337   my $query = @unique_queries ? \@unique_queries : $input_query;
338
339   # Run the query
340   if (keys %$attrs) {
341     my $rs = $self->search($query, $attrs);
342     $rs->_resolve;
343     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
344   }
345   else {
346     $self->_resolve;  
347     return (keys %{$self->{_attrs}->{collapse}})
348       ? $self->search($query)->next
349       : $self->single($query);
350   }
351 }
352
353 # _unique_queries
354 #
355 # Build a list of queries which satisfy unique constraints.
356
357 sub _unique_queries {
358   my ($self, $query, $attrs) = @_;
359
360   my @constraint_names = exists $attrs->{key}
361     ? ($attrs->{key})
362     : $self->result_source->unique_constraint_names;
363
364   my @unique_queries;
365   foreach my $name (@constraint_names) {
366     my @unique_cols = $self->result_source->unique_constraint_columns($name);
367     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
368
369     next unless scalar keys %$unique_query;
370
371     # Add the ResultSet's alias
372     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
373                         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
374       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
375     }
376
377     push @unique_queries, $unique_query;
378   }
379
380   return @unique_queries;
381 }
382
383 # _build_unique_query
384 #
385 # Constrain the specified query hash based on the specified column names.
386
387 sub _build_unique_query {
388   my ($self, $query, $unique_cols) = @_;
389
390   my %unique_query =
391     map  { $_ => $query->{$_} }
392     grep { exists $query->{$_} }
393     @$unique_cols;
394
395   return \%unique_query;
396 }
397
398 =head2 search_related
399
400 =over 4
401
402 =item Arguments: $cond, \%attrs?
403
404 =item Return Value: $new_resultset
405
406 =back
407
408   $new_rs = $cd_rs->search_related('artist', {
409     name => 'Emo-R-Us',
410   });
411
412 Searches the specified relationship, optionally specifying a condition and
413 attributes for matching records. See L</ATTRIBUTES> for more information.
414
415 =cut
416
417 sub search_related {
418   return shift->related_resultset(shift)->search(@_);
419 }
420
421 =head2 cursor
422
423 =over 4
424
425 =item Arguments: none
426
427 =item Return Value: $cursor
428
429 =back
430
431 Returns a storage-driven cursor to the given resultset. See
432 L<DBIx::Class::Cursor> for more information.
433
434 =cut
435
436 sub cursor {
437   my ($self) = @_;
438
439   $self->_resolve;
440   my $attrs = { %{$self->{_attrs}} };
441   return $self->{cursor}
442     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
443           $attrs->{where},$attrs);
444 }
445
446 =head2 single
447
448 =over 4
449
450 =item Arguments: $cond?
451
452 =item Return Value: $row_object?
453
454 =back
455
456   my $cd = $schema->resultset('CD')->single({ year => 2001 });
457
458 Inflates the first result without creating a cursor if the resultset has
459 any records in it; if not returns nothing. Used by L</find> as an optimisation.
460
461 Can optionally take an additional condition *only* - this is a fast-code-path
462 method; if you need to add extra joins or similar call ->search and then
463 ->single without a condition on the $rs returned from that.
464
465 =cut
466
467 sub single {
468   my ($self, $where) = @_;
469   $self->_resolve;
470   my $attrs = { %{$self->{_attrs}} };
471   if ($where) {
472     if (defined $attrs->{where}) {
473       $attrs->{where} = {
474         '-and' =>
475             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
476                $where, delete $attrs->{where} ]
477       };
478     } else {
479       $attrs->{where} = $where;
480     }
481   }
482
483   unless ($self->_is_unique_query($attrs->{where})) {
484     carp "Query not guarnteed to return a single row"
485       . "; please declare your unique constraints or use search instead";
486   }
487
488   my @data = $self->result_source->storage->select_single(
489           $attrs->{from}, $attrs->{select},
490           $attrs->{where},$attrs);
491   return (@data ? $self->_construct_object(@data) : ());
492 }
493
494 # _is_unique_query
495 #
496 # Try to determine if the specified query is guaranteed to be unique, based on
497 # the declared unique constraints.
498
499 sub _is_unique_query {
500   my ($self, $query) = @_;
501
502   my $collapsed = $self->_collapse_query($query);
503
504         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
505   foreach my $name ($self->result_source->unique_constraint_names) {
506     my @unique_cols = map { "$alias.$_" }
507       $self->result_source->unique_constraint_columns($name);
508
509     # Count the values for each unique column
510     my %seen = map { $_ => 0 } @unique_cols;
511
512     foreach my $key (keys %$collapsed) {
513       my $aliased = $key;
514       $aliased = "$alias.$key" unless $key =~ /\./;
515
516       next unless exists $seen{$aliased};  # Additional constraints are okay
517       $seen{$aliased} = scalar @{ $collapsed->{$key} };
518     }
519
520     # If we get 0 or more than 1 value for a column, it's not necessarily unique
521     return 1 unless grep { $_ != 1 } values %seen;
522   }
523
524   return 0;
525 }
526
527 # _collapse_query
528 #
529 # Recursively collapse the query, accumulating values for each column.
530
531 sub _collapse_query {
532   my ($self, $query, $collapsed) = @_;
533
534   $collapsed ||= {};
535
536   if (ref $query eq 'ARRAY') {
537     foreach my $subquery (@$query) {
538       next unless ref $subquery;  # -or
539 #      warn "ARRAY: " . Dumper $subquery;
540       $collapsed = $self->_collapse_query($subquery, $collapsed);
541     }
542   }
543   elsif (ref $query eq 'HASH') {
544     if (keys %$query and (keys %$query)[0] eq '-and') {
545       foreach my $subquery (@{$query->{-and}}) {
546 #        warn "HASH: " . Dumper $subquery;
547         $collapsed = $self->_collapse_query($subquery, $collapsed);
548       }
549     }
550     else {
551 #      warn "LEAF: " . Dumper $query;
552       foreach my $key (keys %$query) {
553         push @{$collapsed->{$key}}, $query->{$key};
554       }
555     }
556   }
557
558   return $collapsed;
559 }
560
561 =head2 get_column
562
563 =over 4
564
565 =item Arguments: $cond?
566
567 =item Return Value: $resultsetcolumn
568
569 =back
570
571   my $max_length = $rs->get_column('length')->max;
572
573 Returns a ResultSetColumn instance for $column based on $self
574
575 =cut
576
577 sub get_column {
578   my ($self, $column) = @_;
579
580   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
581   return $new;
582 }
583
584 =head2 search_like
585
586 =over 4
587
588 =item Arguments: $cond, \%attrs?
589
590 =item Return Value: $resultset (scalar context), @row_objs (list context)
591
592 =back
593
594   # WHERE title LIKE '%blue%'
595   $cd_rs = $rs->search_like({ title => '%blue%'});
596
597 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
598 that this is simply a convenience method. You most likely want to use
599 L</search> with specific operators.
600
601 For more information, see L<DBIx::Class::Manual::Cookbook>.
602
603 =cut
604
605 sub search_like {
606   my $class = shift;
607   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
608   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
609   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
610   return $class->search($query, { %$attrs });
611 }
612
613 =head2 slice
614
615 =over 4
616
617 =item Arguments: $first, $last
618
619 =item Return Value: $resultset (scalar context), @row_objs (list context)
620
621 =back
622
623 Returns a resultset or object list representing a subset of elements from the
624 resultset slice is called on. Indexes are from 0, i.e., to get the first
625 three records, call:
626
627   my ($one, $two, $three) = $rs->slice(0, 2);
628
629 =cut
630
631 sub slice {
632   my ($self, $min, $max) = @_;
633   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
634   $attrs->{offset} = $self->{attrs}{offset} || 0;
635   $attrs->{offset} += $min;
636   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
637   return $self->search(undef(), $attrs);
638   #my $slice = (ref $self)->new($self->result_source, $attrs);
639   #return (wantarray ? $slice->all : $slice);
640 }
641
642 =head2 next
643
644 =over 4
645
646 =item Arguments: none
647
648 =item Return Value: $result?
649
650 =back
651
652 Returns the next element in the resultset (C<undef> is there is none).
653
654 Can be used to efficiently iterate over records in the resultset:
655
656   my $rs = $schema->resultset('CD')->search;
657   while (my $cd = $rs->next) {
658     print $cd->title;
659   }
660
661 Note that you need to store the resultset object, and call C<next> on it. 
662 Calling C<< resultset('Table')->next >> repeatedly will always return the
663 first record from the resultset.
664
665 =cut
666
667 sub next {
668   my ($self) = @_;
669   if (my $cache = $self->get_cache) {
670     $self->{all_cache_position} ||= 0;
671     return $cache->[$self->{all_cache_position}++];
672   }
673   if ($self->{attrs}{cache}) {
674     $self->{all_cache_position} = 1;
675     return ($self->all)[0];
676   }
677   my @row = (exists $self->{stashed_row} ?
678                @{delete $self->{stashed_row}} :
679                $self->cursor->next
680   );
681   return unless (@row);
682   return $self->_construct_object(@row);
683 }
684
685 sub _resolve {
686   my $self = shift;
687
688   return if(exists $self->{_attrs}); #return if _resolve has already been called
689
690   my $attrs = $self->{attrs};    
691   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
692
693   # XXX - lose storable dclone
694   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
695   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
696   $attrs->{record_filter} = $record_filter if ($record_filter);
697   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
698
699   my $alias = $attrs->{alias};
700  
701   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
702   delete $attrs->{as} if $attrs->{columns};
703   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
704   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
705   $attrs->{select} = [
706                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
707                       ] if $attrs->{columns};
708   $attrs->{as} ||= [
709                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
710                     ];
711   if (my $include = delete $attrs->{include_columns}) {
712       push(@{$attrs->{select}}, @$include);
713       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
714   }
715
716   $attrs->{from} ||= [ { $alias => $source->from } ];
717   $attrs->{seen_join} ||= {};
718   my %seen;
719   if (my $join = delete $attrs->{join}) {
720     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
721       if (ref $j eq 'HASH') {
722         $seen{$_} = 1 foreach keys %$j;
723       } else {
724         $seen{$j} = 1;
725       }
726     }
727     
728     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
729   }
730   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
731   $attrs->{order_by} = [ $attrs->{order_by} ] if
732       $attrs->{order_by} and !ref($attrs->{order_by});
733   $attrs->{order_by} ||= [];
734
735   if(my $seladds = delete($attrs->{'+select'})) {
736     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
737     $attrs->{select} = [
738                         @{ $attrs->{select} },
739                         map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
740                         ];
741   }
742   if(my $asadds = delete($attrs->{'+as'})) {
743     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
744     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
745   }
746   
747   my $collapse = $attrs->{collapse} || {};
748   if (my $prefetch = delete $attrs->{prefetch}) {
749       my @pre_order;
750       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
751           if ( ref $p eq 'HASH' ) {
752               foreach my $key (keys %$p) {
753                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
754                       unless $seen{$key};
755               }
756           } else {
757               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
758                   unless $seen{$p};
759           }
760           my @prefetch = $source->resolve_prefetch(
761                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
762           push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
763           push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
764       }
765       push(@{$attrs->{order_by}}, @pre_order);
766   }
767   $attrs->{collapse} = $collapse;
768   $self->{_attrs} = $attrs;
769 }
770
771 sub _merge_attr {
772   my ($self, $a, $b, $is_prefetch) = @_;
773     
774   return $b unless $a;
775   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
776     foreach my $key (keys %{$b}) {
777       if (exists $a->{$key}) {
778         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
779       } else {
780         $a->{$key} = delete $b->{$key};
781       }
782     }
783     return $a;
784   } else {
785     $a = [$a] unless (ref $a eq 'ARRAY');
786     $b = [$b] unless (ref $b eq 'ARRAY');
787     
788     my $hash = {};
789     my $array = [];      
790     foreach ($a, $b) {
791       foreach my $element (@{$_}) {
792         if (ref $element eq 'HASH') {
793           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
794         } elsif (ref $element eq 'ARRAY') {
795           $array = [@{$array}, @{$element}];
796         } else {        
797           if (($b == $_) && $is_prefetch) {
798             $self->_merge_array($array, $element, $is_prefetch);
799           } else {
800             push(@{$array}, $element);
801           }
802         }
803       }
804     }
805     if ($is_prefetch) {
806       my $final_array = [];
807       foreach my $element (@{$array}) {
808         push(@{$final_array}, $element) unless (exists $hash->{$element});
809       }
810       $array = $final_array;
811     }
812     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
813       return [$hash, @{$array}];
814     } else {    
815       return (keys %{$hash}) ? $hash : $array;
816     }
817   }
818 }
819
820 sub _merge_array {
821   my ($self, $a, $b) = @_;
822   
823   $b = [$b] unless (ref $b eq 'ARRAY');
824   # add elements from @{$b} to @{$a} which aren't already in @{$a}
825   foreach my $b_element (@{$b}) {
826     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
827   }
828 }
829
830 sub _construct_object {
831   my ($self, @row) = @_;
832   my @as = @{ $self->{_attrs}{as} };
833   
834   my $info = $self->_collapse_result(\@as, \@row);
835   my $new = $self->result_class->inflate_result($self->result_source, @$info);
836   $new = $self->{_attrs}{record_filter}->($new)
837     if exists $self->{_attrs}{record_filter};
838   return $new;
839 }
840
841 sub _collapse_result {
842   my ($self, $as, $row, $prefix) = @_;
843
844   my $live_join = $self->{attrs}->{_live_join} ||="";
845   my %const;
846
847   my @copy = @$row;
848   foreach my $this_as (@$as) {
849     my $val = shift @copy;
850     if (defined $prefix) {
851       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
852         my $remain = $1;
853         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
854         $const{$1||''}{$2} = $val;
855       }
856     } else {
857       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
858       $const{$1||''}{$2} = $val;
859     }
860   }
861
862   my $info = [ {}, {} ];
863   foreach my $key (keys %const) {
864     if (length $key && $key ne $live_join) {
865       my $target = $info;
866       my @parts = split(/\./, $key);
867       foreach my $p (@parts) {
868         $target = $target->[1]->{$p} ||= [];
869       }
870       $target->[0] = $const{$key};
871     } else {
872       $info->[0] = $const{$key};
873     }
874   }
875
876   my @collapse;
877   if (defined $prefix) {
878     @collapse = map {
879         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
880     } keys %{$self->{_attrs}->{collapse}}
881   } else {
882     @collapse = keys %{$self->{_attrs}->{collapse}};
883   };
884
885   if (@collapse) {
886     my ($c) = sort { length $a <=> length $b } @collapse;
887     my $target = $info;
888     foreach my $p (split(/\./, $c)) {
889       $target = $target->[1]->{$p} ||= [];
890     }
891     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
892     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
893     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
894     my $tree = $self->_collapse_result($as, $row, $c_prefix);
895     my (@final, @raw);
896     while ( !(grep {
897                 !defined($tree->[0]->{$_}) ||
898                 $co_check{$_} ne $tree->[0]->{$_}
899               } @co_key) ) {
900       push(@final, $tree);
901       last unless (@raw = $self->cursor->next);
902       $row = $self->{stashed_row} = \@raw;
903       $tree = $self->_collapse_result($as, $row, $c_prefix);
904     }
905     @$target = (@final ? @final : [ {}, {} ]); 
906       # single empty result to indicate an empty prefetched has_many
907   }
908   return $info;
909 }
910
911 =head2 result_source
912
913 =over 4
914
915 =item Arguments: $result_source?
916
917 =item Return Value: $result_source
918
919 =back
920
921 An accessor for the primary ResultSource object from which this ResultSet
922 is derived.
923
924 =cut
925
926
927 =head2 count
928
929 =over 4
930
931 =item Arguments: $cond, \%attrs??
932
933 =item Return Value: $count
934
935 =back
936
937 Performs an SQL C<COUNT> with the same query as the resultset was built
938 with to find the number of elements. If passed arguments, does a search
939 on the resultset and counts the results of that.
940
941 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
942 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
943 not support C<DISTINCT> with multiple columns. If you are using such a
944 database, you should only use columns from the main table in your C<group_by>
945 clause.
946
947 =cut
948
949 sub count {
950   my $self = shift;
951   return $self->search(@_)->count if @_ and defined $_[0];
952   return scalar @{ $self->get_cache } if $self->get_cache;
953   my $count = $self->_count;
954   return 0 unless $count;
955
956   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
957   $count = $self->{attrs}{rows} if
958     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
959   return $count;
960 }
961
962 sub _count { # Separated out so pager can get the full count
963   my $self = shift;
964   my $select = { count => '*' };
965   
966   $self->_resolve;
967   my $attrs = { %{ $self->{_attrs} } };
968   if (my $group_by = delete $attrs->{group_by}) {
969     delete $attrs->{having};
970     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
971     # todo: try CONCAT for multi-column pk
972     my @pk = $self->result_source->primary_columns;
973     if (@pk == 1) {
974       foreach my $column (@distinct) {
975         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
976           @distinct = ($column);
977           last;
978         }
979       }
980     }
981
982     $select = { count => { distinct => \@distinct } };
983   }
984
985   $attrs->{select} = $select;
986   $attrs->{as} = [qw/count/];
987
988   # offset, order by and page are not needed to count. record_filter is cdbi
989   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
990         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
991         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
992
993   my ($count) = $tmp_rs->cursor->next;
994   return $count;
995 }
996
997 =head2 count_literal
998
999 =over 4
1000
1001 =item Arguments: $sql_fragment, @bind_values
1002
1003 =item Return Value: $count
1004
1005 =back
1006
1007 Counts the results in a literal query. Equivalent to calling L</search_literal>
1008 with the passed arguments, then L</count>.
1009
1010 =cut
1011
1012 sub count_literal { shift->search_literal(@_)->count; }
1013
1014 =head2 all
1015
1016 =over 4
1017
1018 =item Arguments: none
1019
1020 =item Return Value: @objects
1021
1022 =back
1023
1024 Returns all elements in the resultset. Called implicitly if the resultset
1025 is returned in list context.
1026
1027 =cut
1028
1029 sub all {
1030   my ($self) = @_;
1031   return @{ $self->get_cache } if $self->get_cache;
1032
1033   my @obj;
1034
1035   # TODO: don't call resolve here
1036   $self->_resolve;
1037   if (keys %{$self->{_attrs}->{collapse}}) {
1038 #  if ($self->{attrs}->{prefetch}) {
1039       # Using $self->cursor->all is really just an optimisation.
1040       # If we're collapsing has_many prefetches it probably makes
1041       # very little difference, and this is cleaner than hacking
1042       # _construct_object to survive the approach
1043     my @row = $self->cursor->next;
1044     while (@row) {
1045       push(@obj, $self->_construct_object(@row));
1046       @row = (exists $self->{stashed_row}
1047                ? @{delete $self->{stashed_row}}
1048                : $self->cursor->next);
1049     }
1050   } else {
1051     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1052   }
1053
1054   $self->set_cache(\@obj) if $self->{attrs}{cache};
1055   return @obj;
1056 }
1057
1058 =head2 reset
1059
1060 =over 4
1061
1062 =item Arguments: none
1063
1064 =item Return Value: $self
1065
1066 =back
1067
1068 Resets the resultset's cursor, so you can iterate through the elements again.
1069
1070 =cut
1071
1072 sub reset {
1073   my ($self) = @_;
1074   delete $self->{_attrs} if (exists $self->{_attrs});
1075
1076   $self->{all_cache_position} = 0;
1077   $self->cursor->reset;
1078   return $self;
1079 }
1080
1081 =head2 first
1082
1083 =over 4
1084
1085 =item Arguments: none
1086
1087 =item Return Value: $object?
1088
1089 =back
1090
1091 Resets the resultset and returns an object for the first result (if the
1092 resultset returns anything).
1093
1094 =cut
1095
1096 sub first {
1097   return $_[0]->reset->next;
1098 }
1099
1100 # _cond_for_update_delete
1101 #
1102 # update/delete require the condition to be modified to handle
1103 # the differing SQL syntax available.  This transforms the $self->{cond}
1104 # appropriately, returning the new condition.
1105
1106 sub _cond_for_update_delete {
1107   my ($self) = @_;
1108   my $cond = {};
1109
1110   if (!ref($self->{cond})) {
1111     # No-op. No condition, we're updating/deleting everything
1112   }
1113   elsif (ref $self->{cond} eq 'ARRAY') {
1114     $cond = [
1115       map {
1116         my %hash;
1117         foreach my $key (keys %{$_}) {
1118           $key =~ /([^.]+)$/;
1119           $hash{$1} = $_->{$key};
1120         }
1121         \%hash;
1122       } @{$self->{cond}}
1123     ];
1124   }
1125   elsif (ref $self->{cond} eq 'HASH') {
1126     if ((keys %{$self->{cond}})[0] eq '-and') {
1127       $cond->{-and} = [];
1128
1129       my @cond = @{$self->{cond}{-and}};
1130       for (my $i = 0; $i <= @cond - 1; $i++) {
1131         my $entry = $cond[$i];
1132
1133         my %hash;
1134         if (ref $entry eq 'HASH') {
1135           foreach my $key (keys %{$entry}) {
1136             $key =~ /([^.]+)$/;
1137             $hash{$1} = $entry->{$key};
1138           }
1139         }
1140         else {
1141           $entry =~ /([^.]+)$/;
1142           $hash{$1} = $cond[++$i];
1143         }
1144
1145         push @{$cond->{-and}}, \%hash;
1146       }
1147     }
1148     else {
1149       foreach my $key (keys %{$self->{cond}}) {
1150         $key =~ /([^.]+)$/;
1151         $cond->{$1} = $self->{cond}{$key};
1152       }
1153     }
1154   }
1155   else {
1156     $self->throw_exception(
1157       "Can't update/delete on resultset with condition unless hash or array"
1158     );
1159   }
1160
1161   return $cond;
1162 }
1163
1164
1165 =head2 update
1166
1167 =over 4
1168
1169 =item Arguments: \%values
1170
1171 =item Return Value: $storage_rv
1172
1173 =back
1174
1175 Sets the specified columns in the resultset to the supplied values in a
1176 single query. Return value will be true if the update succeeded or false
1177 if no records were updated; exact type of success value is storage-dependent.
1178
1179 =cut
1180
1181 sub update {
1182   my ($self, $values) = @_;
1183   $self->throw_exception("Values for update must be a hash")
1184     unless ref $values eq 'HASH';
1185
1186   my $cond = $self->_cond_for_update_delete;
1187
1188   return $self->result_source->storage->update(
1189     $self->result_source->from, $values, $cond
1190   );
1191 }
1192
1193 =head2 update_all
1194
1195 =over 4
1196
1197 =item Arguments: \%values
1198
1199 =item Return Value: 1
1200
1201 =back
1202
1203 Fetches all objects and updates them one at a time. Note that C<update_all>
1204 will run DBIC cascade triggers, while L</update> will not.
1205
1206 =cut
1207
1208 sub update_all {
1209   my ($self, $values) = @_;
1210   $self->throw_exception("Values for update must be a hash")
1211     unless ref $values eq 'HASH';
1212   foreach my $obj ($self->all) {
1213     $obj->set_columns($values)->update;
1214   }
1215   return 1;
1216 }
1217
1218 =head2 delete
1219
1220 =over 4
1221
1222 =item Arguments: none
1223
1224 =item Return Value: 1
1225
1226 =back
1227
1228 Deletes the contents of the resultset from its result source. Note that this
1229 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1230 to run.
1231
1232 =cut
1233
1234 sub delete {
1235   my ($self) = @_;
1236   my $del = {};
1237
1238   my $cond = $self->_cond_for_update_delete;
1239
1240   $self->result_source->storage->delete($self->result_source->from, $cond);
1241   return 1;
1242 }
1243
1244 =head2 delete_all
1245
1246 =over 4
1247
1248 =item Arguments: none
1249
1250 =item Return Value: 1
1251
1252 =back
1253
1254 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1255 will run DBIC cascade triggers, while L</delete> will not.
1256
1257 =cut
1258
1259 sub delete_all {
1260   my ($self) = @_;
1261   $_->delete for $self->all;
1262   return 1;
1263 }
1264
1265 =head2 pager
1266
1267 =over 4
1268
1269 =item Arguments: none
1270
1271 =item Return Value: $pager
1272
1273 =back
1274
1275 Return Value a L<Data::Page> object for the current resultset. Only makes
1276 sense for queries with a C<page> attribute.
1277
1278 =cut
1279
1280 sub pager {
1281   my ($self) = @_;
1282   my $attrs = $self->{attrs};
1283   $self->throw_exception("Can't create pager for non-paged rs")
1284     unless $self->{page};
1285   $attrs->{rows} ||= 10;
1286   return $self->{pager} ||= Data::Page->new(
1287     $self->_count, $attrs->{rows}, $self->{page});
1288 }
1289
1290 =head2 page
1291
1292 =over 4
1293
1294 =item Arguments: $page_number
1295
1296 =item Return Value: $rs
1297
1298 =back
1299
1300 Returns a resultset for the $page_number page of the resultset on which page
1301 is called, where each page contains a number of rows equal to the 'rows'
1302 attribute set on the resultset (10 by default).
1303
1304 =cut
1305
1306 sub page {
1307   my ($self, $page) = @_;
1308   my $attrs = { %{$self->{attrs}} };
1309   $attrs->{page} = $page;
1310   return (ref $self)->new($self->result_source, $attrs);
1311 }
1312
1313 =head2 new_result
1314
1315 =over 4
1316
1317 =item Arguments: \%vals
1318
1319 =item Return Value: $object
1320
1321 =back
1322
1323 Creates an object in the resultset's result class and returns it.
1324
1325 =cut
1326
1327 sub new_result {
1328   my ($self, $values) = @_;
1329   $self->throw_exception( "new_result needs a hash" )
1330     unless (ref $values eq 'HASH');
1331   $self->throw_exception(
1332     "Can't abstract implicit construct, condition not a hash"
1333   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1334   my %new = %$values;
1335   my $alias = $self->{attrs}{alias};
1336   foreach my $key (keys %{$self->{cond}||{}}) {
1337     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1338   }
1339   my $obj = $self->result_class->new(\%new);
1340   $obj->result_source($self->result_source) if $obj->can('result_source');
1341   return $obj;
1342 }
1343
1344 =head2 find_or_new
1345
1346 =over 4
1347
1348 =item Arguments: \%vals, \%attrs?
1349
1350 =item Return Value: $object
1351
1352 =back
1353
1354 Find an existing record from this resultset. If none exists, instantiate a new
1355 result object and return it. The object will not be saved into your storage
1356 until you call L<DBIx::Class::Row/insert> on it.
1357
1358 If you want objects to be saved immediately, use L</find_or_create> instead.
1359
1360 =cut
1361
1362 sub find_or_new {
1363   my $self     = shift;
1364   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1365   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1366   my $exists   = $self->find($hash, $attrs);
1367   return defined $exists ? $exists : $self->new_result($hash);
1368 }
1369
1370 =head2 create
1371
1372 =over 4
1373
1374 =item Arguments: \%vals
1375
1376 =item Return Value: $object
1377
1378 =back
1379
1380 Inserts a record into the resultset and returns the object representing it.
1381
1382 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1383
1384 =cut
1385
1386 sub create {
1387   my ($self, $attrs) = @_;
1388   $self->throw_exception( "create needs a hashref" )
1389     unless ref $attrs eq 'HASH';
1390   return $self->new_result($attrs)->insert;
1391 }
1392
1393 =head2 find_or_create
1394
1395 =over 4
1396
1397 =item Arguments: \%vals, \%attrs?
1398
1399 =item Return Value: $object
1400
1401 =back
1402
1403   $class->find_or_create({ key => $val, ... });
1404
1405 Tries to find a record based on its primary key or unique constraint; if none
1406 is found, creates one and returns that instead.
1407
1408   my $cd = $schema->resultset('CD')->find_or_create({
1409     cdid   => 5,
1410     artist => 'Massive Attack',
1411     title  => 'Mezzanine',
1412     year   => 2005,
1413   });
1414
1415 Also takes an optional C<key> attribute, to search by a specific key or unique
1416 constraint. For example:
1417
1418   my $cd = $schema->resultset('CD')->find_or_create(
1419     {
1420       artist => 'Massive Attack',
1421       title  => 'Mezzanine',
1422     },
1423     { key => 'cd_artist_title' }
1424   );
1425
1426 See also L</find> and L</update_or_create>. For information on how to declare
1427 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1428
1429 =cut
1430
1431 sub find_or_create {
1432   my $self     = shift;
1433   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1434   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1435   my $exists   = $self->find($hash, $attrs);
1436   return defined $exists ? $exists : $self->create($hash);
1437 }
1438
1439 =head2 update_or_create
1440
1441 =over 4
1442
1443 =item Arguments: \%col_values, { key => $unique_constraint }?
1444
1445 =item Return Value: $object
1446
1447 =back
1448
1449   $class->update_or_create({ col => $val, ... });
1450
1451 First, searches for an existing row matching one of the unique constraints
1452 (including the primary key) on the source of this resultset. If a row is
1453 found, updates it with the other given column values. Otherwise, creates a new
1454 row.
1455
1456 Takes an optional C<key> attribute to search on a specific unique constraint.
1457 For example:
1458
1459   # In your application
1460   my $cd = $schema->resultset('CD')->update_or_create(
1461     {
1462       artist => 'Massive Attack',
1463       title  => 'Mezzanine',
1464       year   => 1998,
1465     },
1466     { key => 'cd_artist_title' }
1467   );
1468
1469 If no C<key> is specified, it searches on all unique constraints defined on the
1470 source, including the primary key.
1471
1472 If the C<key> is specified as C<primary>, it searches only on the primary key.
1473
1474 See also L</find> and L</find_or_create>. For information on how to declare
1475 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1476
1477 =cut
1478
1479 sub update_or_create {
1480   my $self = shift;
1481   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1482   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1483
1484   my $row = $self->find($cond);
1485   if (defined $row) {
1486     $row->update($cond);
1487     return $row;
1488   }
1489
1490   return $self->create($cond);
1491 }
1492
1493 =head2 get_cache
1494
1495 =over 4
1496
1497 =item Arguments: none
1498
1499 =item Return Value: \@cache_objects?
1500
1501 =back
1502
1503 Gets the contents of the cache for the resultset, if the cache is set.
1504
1505 =cut
1506
1507 sub get_cache {
1508   shift->{all_cache};
1509 }
1510
1511 =head2 set_cache
1512
1513 =over 4
1514
1515 =item Arguments: \@cache_objects
1516
1517 =item Return Value: \@cache_objects
1518
1519 =back
1520
1521 Sets the contents of the cache for the resultset. Expects an arrayref
1522 of objects of the same class as those produced by the resultset. Note that
1523 if the cache is set the resultset will return the cached objects rather
1524 than re-querying the database even if the cache attr is not set.
1525
1526 =cut
1527
1528 sub set_cache {
1529   my ( $self, $data ) = @_;
1530   $self->throw_exception("set_cache requires an arrayref")
1531       if defined($data) && (ref $data ne 'ARRAY');
1532   $self->{all_cache} = $data;
1533 }
1534
1535 =head2 clear_cache
1536
1537 =over 4
1538
1539 =item Arguments: none
1540
1541 =item Return Value: []
1542
1543 =back
1544
1545 Clears the cache for the resultset.
1546
1547 =cut
1548
1549 sub clear_cache {
1550   shift->set_cache(undef);
1551 }
1552
1553 =head2 related_resultset
1554
1555 =over 4
1556
1557 =item Arguments: $relationship_name
1558
1559 =item Return Value: $resultset
1560
1561 =back
1562
1563 Returns a related resultset for the supplied relationship name.
1564
1565   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1566
1567 =cut
1568
1569 sub related_resultset {
1570   my ( $self, $rel ) = @_;
1571   
1572   $self->{related_resultsets} ||= {};
1573   return $self->{related_resultsets}{$rel} ||= do {
1574     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1575     my $rel_obj = $self->result_source->relationship_info($rel);
1576     $self->throw_exception(
1577         "search_related: result source '" . $self->result_source->name .
1578         "' has no such relationship ${rel}")
1579         unless $rel_obj; #die Dumper $self->{attrs};
1580
1581                 my $live_join_stack = $self->{attrs}->{_live_join_stack} || [];         
1582                 push(@{$live_join_stack}, $rel);
1583                 
1584     my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1585            )->search( undef,
1586                       { select => undef,
1587                         as => undef,
1588                         #join => $rel,
1589                         _live_join => $rel,
1590                         _live_join_stack => $live_join_stack,
1591                         _parent_attrs => $self->{attrs}}
1592                       );    
1593
1594     # keep reference of the original resultset
1595     $rs->{_parent_rs} = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->result_source;
1596     return $rs;
1597   };
1598 }
1599
1600 =head2 throw_exception
1601
1602 See L<DBIx::Class::Schema/throw_exception> for details.
1603
1604 =cut
1605
1606 sub throw_exception {
1607   my $self=shift;
1608   $self->result_source->schema->throw_exception(@_);
1609 }
1610
1611 # XXX: FIXME: Attributes docs need clearing up
1612
1613 =head1 ATTRIBUTES
1614
1615 The resultset takes various attributes that modify its behavior. Here's an
1616 overview of them:
1617
1618 =head2 order_by
1619
1620 =over 4
1621
1622 =item Value: ($order_by | \@order_by)
1623
1624 =back
1625
1626 Which column(s) to order the results by. This is currently passed
1627 through directly to SQL, so you can give e.g. C<year DESC> for a
1628 descending order on the column `year'.
1629
1630 Please note that if you have quoting enabled (see 
1631 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1632 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1633 so you will need to manually quote things as appropriate.)
1634
1635 =head2 columns
1636
1637 =over 4
1638
1639 =item Value: \@columns
1640
1641 =back
1642
1643 Shortcut to request a particular set of columns to be retrieved.  Adds
1644 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1645 from that, then auto-populates C<as> from C<select> as normal. (You may also
1646 use the C<cols> attribute, as in earlier versions of DBIC.)
1647
1648 =head2 include_columns
1649
1650 =over 4
1651
1652 =item Value: \@columns
1653
1654 =back
1655
1656 Shortcut to include additional columns in the returned results - for example
1657
1658   $schema->resultset('CD')->search(undef, {
1659     include_columns => ['artist.name'],
1660     join => ['artist']
1661   });
1662
1663 would return all CDs and include a 'name' column to the information
1664 passed to object inflation
1665
1666 =head2 select
1667
1668 =over 4
1669
1670 =item Value: \@select_columns
1671
1672 =back
1673
1674 Indicates which columns should be selected from the storage. You can use
1675 column names, or in the case of RDBMS back ends, function or stored procedure
1676 names:
1677
1678   $rs = $schema->resultset('Employee')->search(undef, {
1679     select => [
1680       'name',
1681       { count => 'employeeid' },
1682       { sum => 'salary' }
1683     ]
1684   });
1685
1686 When you use function/stored procedure names and do not supply an C<as>
1687 attribute, the column names returned are storage-dependent. E.g. MySQL would
1688 return a column named C<count(employeeid)> in the above example.
1689
1690 =head2 +select
1691
1692 =over 4
1693
1694 Indicates additional columns to be selected from storage.  Works the same as
1695 L<select> but adds columns to the selection.
1696
1697 =back
1698
1699 =head2 +as
1700
1701 =over 4
1702
1703 Indicates additional column names for those added via L<+select>.
1704
1705 =back
1706
1707 =head2 as
1708
1709 =over 4
1710
1711 =item Value: \@inflation_names
1712
1713 =back
1714
1715 Indicates column names for object inflation. This is used in conjunction with
1716 C<select>, usually when C<select> contains one or more function or stored
1717 procedure names:
1718
1719   $rs = $schema->resultset('Employee')->search(undef, {
1720     select => [
1721       'name',
1722       { count => 'employeeid' }
1723     ],
1724     as => ['name', 'employee_count'],
1725   });
1726
1727   my $employee = $rs->first(); # get the first Employee
1728
1729 If the object against which the search is performed already has an accessor
1730 matching a column name specified in C<as>, the value can be retrieved using
1731 the accessor as normal:
1732
1733   my $name = $employee->name();
1734
1735 If on the other hand an accessor does not exist in the object, you need to
1736 use C<get_column> instead:
1737
1738   my $employee_count = $employee->get_column('employee_count');
1739
1740 You can create your own accessors if required - see
1741 L<DBIx::Class::Manual::Cookbook> for details.
1742
1743 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1744 produced, it is used for internal access only. Thus attempting to use the accessor
1745 in an C<order_by> clause or similar will fail misrably.
1746
1747 =head2 join
1748
1749 =over 4
1750
1751 =item Value: ($rel_name | \@rel_names | \%rel_names)
1752
1753 =back
1754
1755 Contains a list of relationships that should be joined for this query.  For
1756 example:
1757
1758   # Get CDs by Nine Inch Nails
1759   my $rs = $schema->resultset('CD')->search(
1760     { 'artist.name' => 'Nine Inch Nails' },
1761     { join => 'artist' }
1762   );
1763
1764 Can also contain a hash reference to refer to the other relation's relations.
1765 For example:
1766
1767   package MyApp::Schema::Track;
1768   use base qw/DBIx::Class/;
1769   __PACKAGE__->table('track');
1770   __PACKAGE__->add_columns(qw/trackid cd position title/);
1771   __PACKAGE__->set_primary_key('trackid');
1772   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1773   1;
1774
1775   # In your application
1776   my $rs = $schema->resultset('Artist')->search(
1777     { 'track.title' => 'Teardrop' },
1778     {
1779       join     => { cd => 'track' },
1780       order_by => 'artist.name',
1781     }
1782   );
1783
1784 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1785 similarly for a third time). For e.g.
1786
1787   my $rs = $schema->resultset('Artist')->search({
1788     'cds.title'   => 'Down to Earth',
1789     'cds_2.title' => 'Popular',
1790   }, {
1791     join => [ qw/cds cds/ ],
1792   });
1793
1794 will return a set of all artists that have both a cd with title 'Down
1795 to Earth' and a cd with title 'Popular'.
1796
1797 If you want to fetch related objects from other tables as well, see C<prefetch>
1798 below.
1799
1800 =head2 prefetch
1801
1802 =over 4
1803
1804 =item Value: ($rel_name | \@rel_names | \%rel_names)
1805
1806 =back
1807
1808 Contains one or more relationships that should be fetched along with the main
1809 query (when they are accessed afterwards they will have already been
1810 "prefetched").  This is useful for when you know you will need the related
1811 objects, because it saves at least one query:
1812
1813   my $rs = $schema->resultset('Tag')->search(
1814     undef,
1815     {
1816       prefetch => {
1817         cd => 'artist'
1818       }
1819     }
1820   );
1821
1822 The initial search results in SQL like the following:
1823
1824   SELECT tag.*, cd.*, artist.* FROM tag
1825   JOIN cd ON tag.cd = cd.cdid
1826   JOIN artist ON cd.artist = artist.artistid
1827
1828 L<DBIx::Class> has no need to go back to the database when we access the
1829 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1830 case.
1831
1832 Simple prefetches will be joined automatically, so there is no need
1833 for a C<join> attribute in the above search. If you're prefetching to
1834 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1835 specify the join as well.
1836
1837 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1838 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1839 with an accessor type of 'single' or 'filter').
1840
1841 =head2 page
1842
1843 =over 4
1844
1845 =item Value: $page
1846
1847 =back
1848
1849 Makes the resultset paged and specifies the page to retrieve. Effectively
1850 identical to creating a non-pages resultset and then calling ->page($page)
1851 on it. 
1852
1853 If L<rows> attribute is not specified it defualts to 10 rows per page.
1854
1855 =head2 rows
1856
1857 =over 4
1858
1859 =item Value: $rows
1860
1861 =back
1862
1863 Specifes the maximum number of rows for direct retrieval or the number of
1864 rows per page if the page attribute or method is used.
1865
1866 =head2 offset
1867
1868 =over 4
1869
1870 =item Value: $offset
1871
1872 =back
1873
1874 Specifies the (zero-based) row number for the  first row to be returned, or the
1875 of the first row of the first page if paging is used.
1876
1877 =head2 group_by
1878
1879 =over 4
1880
1881 =item Value: \@columns
1882
1883 =back
1884
1885 A arrayref of columns to group by. Can include columns of joined tables.
1886
1887   group_by => [qw/ column1 column2 ... /]
1888
1889 =head2 having
1890
1891 =over 4
1892
1893 =item Value: $condition
1894
1895 =back
1896
1897 HAVING is a select statement attribute that is applied between GROUP BY and
1898 ORDER BY. It is applied to the after the grouping calculations have been
1899 done. 
1900
1901   having => { 'count(employee)' => { '>=', 100 } }
1902
1903 =head2 distinct
1904
1905 =over 4
1906
1907 =item Value: (0 | 1)
1908
1909 =back
1910
1911 Set to 1 to group by all columns.
1912
1913 =head2 cache
1914
1915 Set to 1 to cache search results. This prevents extra SQL queries if you
1916 revisit rows in your ResultSet:
1917
1918   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1919   
1920   while( my $artist = $resultset->next ) {
1921     ... do stuff ...
1922   }
1923
1924   $rs->first; # without cache, this would issue a query
1925
1926 By default, searches are not cached.
1927
1928 For more examples of using these attributes, see
1929 L<DBIx::Class::Manual::Cookbook>.
1930
1931 =head2 from
1932
1933 =over 4
1934
1935 =item Value: \@from_clause
1936
1937 =back
1938
1939 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1940 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1941 clauses.
1942
1943 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1944
1945 C<join> will usually do what you need and it is strongly recommended that you
1946 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1947 And we really do mean "cannot", not just tried and failed. Attempting to use
1948 this because you're having problems with C<join> is like trying to use x86
1949 ASM because you've got a syntax error in your C. Trust us on this.
1950
1951 Now, if you're still really, really sure you need to use this (and if you're
1952 not 100% sure, ask the mailing list first), here's an explanation of how this
1953 works.
1954
1955 The syntax is as follows -
1956
1957   [
1958     { <alias1> => <table1> },
1959     [
1960       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1961       [], # nested JOIN (optional)
1962       { <table1.column1> => <table2.column2>, ... (more conditions) },
1963     ],
1964     # More of the above [ ] may follow for additional joins
1965   ]
1966
1967   <table1> <alias1>
1968   JOIN
1969     <table2> <alias2>
1970     [JOIN ...]
1971   ON <table1.column1> = <table2.column2>
1972   <more joins may follow>
1973
1974 An easy way to follow the examples below is to remember the following:
1975
1976     Anything inside "[]" is a JOIN
1977     Anything inside "{}" is a condition for the enclosing JOIN
1978
1979 The following examples utilize a "person" table in a family tree application.
1980 In order to express parent->child relationships, this table is self-joined:
1981
1982     # Person->belongs_to('father' => 'Person');
1983     # Person->belongs_to('mother' => 'Person');
1984
1985 C<from> can be used to nest joins. Here we return all children with a father,
1986 then search against all mothers of those children:
1987
1988   $rs = $schema->resultset('Person')->search(
1989       undef,
1990       {
1991           alias => 'mother', # alias columns in accordance with "from"
1992           from => [
1993               { mother => 'person' },
1994               [
1995                   [
1996                       { child => 'person' },
1997                       [
1998                           { father => 'person' },
1999                           { 'father.person_id' => 'child.father_id' }
2000                       ]
2001                   ],
2002                   { 'mother.person_id' => 'child.mother_id' }
2003               ],
2004           ]
2005       },
2006   );
2007
2008   # Equivalent SQL:
2009   # SELECT mother.* FROM person mother
2010   # JOIN (
2011   #   person child
2012   #   JOIN person father
2013   #   ON ( father.person_id = child.father_id )
2014   # )
2015   # ON ( mother.person_id = child.mother_id )
2016
2017 The type of any join can be controlled manually. To search against only people
2018 with a father in the person table, we could explicitly use C<INNER JOIN>:
2019
2020     $rs = $schema->resultset('Person')->search(
2021         undef,
2022         {
2023             alias => 'child', # alias columns in accordance with "from"
2024             from => [
2025                 { child => 'person' },
2026                 [
2027                     { father => 'person', -join_type => 'inner' },
2028                     { 'father.id' => 'child.father_id' }
2029                 ],
2030             ]
2031         },
2032     );
2033
2034     # Equivalent SQL:
2035     # SELECT child.* FROM person child
2036     # INNER JOIN person father ON child.father_id = father.id
2037
2038 =cut
2039
2040 1;