Fix typo in warning
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs}) ? { %{$attrs->{_parent_attrs}} } : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168         # XXX this is getting messy
169         if ($attrs->{_live_join_stack}) {
170                 my $live_join = $attrs->{_live_join_stack};
171                 foreach (reverse @{$live_join}) {
172                         $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
173                 }
174         }
175
176   # merge new attrs into old
177   foreach my $key (qw/join prefetch/) {
178     next unless (exists $attrs->{$key});
179     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
180                         my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack};
181                         foreach (reverse @{$live_join}) {
182                                 $attrs->{$key} = { $_ => $attrs->{$key} };
183                         }
184     }
185
186     if (exists $our_attrs->{$key}) {
187       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
188     } else {
189       $our_attrs->{$key} = $attrs->{$key};
190     }
191     delete $attrs->{$key};
192   }
193
194         $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $attrs->{_live_join_h}, 1) if ($attrs->{_live_join_h});
195
196   if (exists $our_attrs->{prefetch}) {
197       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
198   }
199
200   my $new_attrs = { %{$our_attrs}, %{$attrs} };
201   my $where = (@_
202                 ? ((@_ == 1 || ref $_[0] eq "HASH")
203                     ? shift
204                     : ((@_ % 2)
205                         ? $self->throw_exception(
206                             "Odd number of arguments to search")
207                         : {@_}))
208                 : undef());
209   if (defined $where) {
210     $new_attrs->{where} = (defined $new_attrs->{where}
211               ? { '-and' =>
212                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
213                       $where, $new_attrs->{where} ] }
214               : $where);
215   }
216
217   if (defined $having) {
218     $new_attrs->{having} = (defined $new_attrs->{having}
219               ? { '-and' =>
220                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
221                       $having, $new_attrs->{having} ] }
222               : $having);
223   }
224
225   my $rs = (ref $self)->new($self->result_source, $new_attrs);
226   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
227
228   unless (@_) { # no search, effectively just a clone
229     my $rows = $self->get_cache;
230     if ($rows) {
231       $rs->set_cache($rows);
232     }
233   }
234   
235   return $rs;
236 }
237
238 =head2 search_literal
239
240 =over 4
241
242 =item Arguments: $sql_fragment, @bind_values
243
244 =item Return Value: $resultset (scalar context), @row_objs (list context)
245
246 =back
247
248   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
249   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
250
251 Pass a literal chunk of SQL to be added to the conditional part of the
252 resultset query.
253
254 =cut
255
256 sub search_literal {
257   my ($self, $cond, @vals) = @_;
258   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
259   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
260   return $self->search(\$cond, $attrs);
261 }
262
263 =head2 find
264
265 =over 4
266
267 =item Arguments: @values | \%cols, \%attrs?
268
269 =item Return Value: $row_object
270
271 =back
272
273 Finds a row based on its primary key or unique constraint. For example, to find
274 a row by its primary key:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 You can also find a row by a specific unique constraint using the C<key>
279 attribute. For example:
280
281   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
282
283 Additionally, you can specify the columns explicitly by name:
284
285   my $cd = $schema->resultset('CD')->find(
286     {
287       artist => 'Massive Attack',
288       title  => 'Mezzanine',
289     },
290     { key => 'cd_artist_title' }
291   );
292
293 If the C<key> is specified as C<primary>, it searches only on the primary key.
294
295 If no C<key> is specified, it searches on all unique constraints defined on the
296 source, including the primary key.
297
298 See also L</find_or_create> and L</update_or_create>. For information on how to
299 declare unique constraints, see
300 L<DBIx::Class::ResultSource/add_unique_constraint>.
301
302 =cut
303
304 sub find {
305   my $self = shift;
306   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
307
308   # Default to the primary key, but allow a specific key
309   my @cols = exists $attrs->{key}
310     ? $self->result_source->unique_constraint_columns($attrs->{key})
311     : $self->result_source->primary_columns;
312   $self->throw_exception(
313     "Can't find unless a primary key or unique constraint is defined"
314   ) unless @cols;
315
316   # Parse out a hashref from input
317   my $input_query;
318   if (ref $_[0] eq 'HASH') {
319     $input_query = { %{$_[0]} };
320   }
321   elsif (@_ == @cols) {
322     $input_query = {};
323     @{$input_query}{@cols} = @_;
324   }
325   else {
326     # Compatibility: Allow e.g. find(id => $value)
327     carp "Find by key => value deprecated; please use a hashref instead";
328     $input_query = {@_};
329   }
330
331   my @unique_queries = $self->_unique_queries($input_query, $attrs);
332
333   # Handle cases where the ResultSet defines the query, or where the user is
334   # abusing find
335   my $query = @unique_queries ? \@unique_queries : $input_query;
336
337   # Run the query
338   if (keys %$attrs) {
339     my $rs = $self->search($query, $attrs);
340     $rs->_resolve;
341     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
342   }
343   else {
344     $self->_resolve;  
345     return (keys %{$self->{_attrs}->{collapse}})
346       ? $self->search($query)->next
347       : $self->single($query);
348   }
349 }
350
351 # _unique_queries
352 #
353 # Build a list of queries which satisfy unique constraints.
354
355 sub _unique_queries {
356   my ($self, $query, $attrs) = @_;
357
358   my @constraint_names = exists $attrs->{key}
359     ? ($attrs->{key})
360     : $self->result_source->unique_constraint_names;
361
362   my @unique_queries;
363   foreach my $name (@constraint_names) {
364     my @unique_cols = $self->result_source->unique_constraint_columns($name);
365     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
366
367     next unless scalar keys %$unique_query;
368
369     # Add the ResultSet's alias
370     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
371                         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
372       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
373     }
374
375     push @unique_queries, $unique_query;
376   }
377
378   return @unique_queries;
379 }
380
381 # _build_unique_query
382 #
383 # Constrain the specified query hash based on the specified column names.
384
385 sub _build_unique_query {
386   my ($self, $query, $unique_cols) = @_;
387
388   my %unique_query =
389     map  { $_ => $query->{$_} }
390     grep { exists $query->{$_} }
391     @$unique_cols;
392
393   return \%unique_query;
394 }
395
396 =head2 search_related
397
398 =over 4
399
400 =item Arguments: $cond, \%attrs?
401
402 =item Return Value: $new_resultset
403
404 =back
405
406   $new_rs = $cd_rs->search_related('artist', {
407     name => 'Emo-R-Us',
408   });
409
410 Searches the specified relationship, optionally specifying a condition and
411 attributes for matching records. See L</ATTRIBUTES> for more information.
412
413 =cut
414
415 sub search_related {
416   return shift->related_resultset(shift)->search(@_);
417 }
418
419 =head2 cursor
420
421 =over 4
422
423 =item Arguments: none
424
425 =item Return Value: $cursor
426
427 =back
428
429 Returns a storage-driven cursor to the given resultset. See
430 L<DBIx::Class::Cursor> for more information.
431
432 =cut
433
434 sub cursor {
435   my ($self) = @_;
436
437   $self->_resolve;
438   my $attrs = { %{$self->{_attrs}} };
439   return $self->{cursor}
440     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
441           $attrs->{where},$attrs);
442 }
443
444 =head2 single
445
446 =over 4
447
448 =item Arguments: $cond?
449
450 =item Return Value: $row_object?
451
452 =back
453
454   my $cd = $schema->resultset('CD')->single({ year => 2001 });
455
456 Inflates the first result without creating a cursor if the resultset has
457 any records in it; if not returns nothing. Used by L</find> as an optimisation.
458
459 Can optionally take an additional condition *only* - this is a fast-code-path
460 method; if you need to add extra joins or similar call ->search and then
461 ->single without a condition on the $rs returned from that.
462
463 =cut
464
465 sub single {
466   my ($self, $where) = @_;
467   $self->_resolve;
468   my $attrs = { %{$self->{_attrs}} };
469   if ($where) {
470     if (defined $attrs->{where}) {
471       $attrs->{where} = {
472         '-and' =>
473             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
474                $where, delete $attrs->{where} ]
475       };
476     } else {
477       $attrs->{where} = $where;
478     }
479   }
480
481   unless ($self->_is_unique_query($attrs->{where})) {
482     carp "Query not guaranteed to return a single row"
483       . "; please declare your unique constraints or use search instead";
484   }
485
486   my @data = $self->result_source->storage->select_single(
487           $attrs->{from}, $attrs->{select},
488           $attrs->{where},$attrs);
489   return (@data ? $self->_construct_object(@data) : ());
490 }
491
492 # _is_unique_query
493 #
494 # Try to determine if the specified query is guaranteed to be unique, based on
495 # the declared unique constraints.
496
497 sub _is_unique_query {
498   my ($self, $query) = @_;
499
500   my $collapsed = $self->_collapse_query($query);
501
502         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
503   foreach my $name ($self->result_source->unique_constraint_names) {
504     my @unique_cols = map { "$alias.$_" }
505       $self->result_source->unique_constraint_columns($name);
506
507     # Count the values for each unique column
508     my %seen = map { $_ => 0 } @unique_cols;
509
510     foreach my $key (keys %$collapsed) {
511       my $aliased = $key;
512       $aliased = "$alias.$key" unless $key =~ /\./;
513
514       next unless exists $seen{$aliased};  # Additional constraints are okay
515       $seen{$aliased} = scalar @{ $collapsed->{$key} };
516     }
517
518     # If we get 0 or more than 1 value for a column, it's not necessarily unique
519     return 1 unless grep { $_ != 1 } values %seen;
520   }
521
522   return 0;
523 }
524
525 # _collapse_query
526 #
527 # Recursively collapse the query, accumulating values for each column.
528
529 sub _collapse_query {
530   my ($self, $query, $collapsed) = @_;
531
532   $collapsed ||= {};
533
534   if (ref $query eq 'ARRAY') {
535     foreach my $subquery (@$query) {
536       next unless ref $subquery;  # -or
537 #      warn "ARRAY: " . Dumper $subquery;
538       $collapsed = $self->_collapse_query($subquery, $collapsed);
539     }
540   }
541   elsif (ref $query eq 'HASH') {
542     if (keys %$query and (keys %$query)[0] eq '-and') {
543       foreach my $subquery (@{$query->{-and}}) {
544 #        warn "HASH: " . Dumper $subquery;
545         $collapsed = $self->_collapse_query($subquery, $collapsed);
546       }
547     }
548     else {
549 #      warn "LEAF: " . Dumper $query;
550       foreach my $key (keys %$query) {
551         push @{$collapsed->{$key}}, $query->{$key};
552       }
553     }
554   }
555
556   return $collapsed;
557 }
558
559 =head2 get_column
560
561 =over 4
562
563 =item Arguments: $cond?
564
565 =item Return Value: $resultsetcolumn
566
567 =back
568
569   my $max_length = $rs->get_column('length')->max;
570
571 Returns a ResultSetColumn instance for $column based on $self
572
573 =cut
574
575 sub get_column {
576   my ($self, $column) = @_;
577
578   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
579   return $new;
580 }
581
582 =head2 search_like
583
584 =over 4
585
586 =item Arguments: $cond, \%attrs?
587
588 =item Return Value: $resultset (scalar context), @row_objs (list context)
589
590 =back
591
592   # WHERE title LIKE '%blue%'
593   $cd_rs = $rs->search_like({ title => '%blue%'});
594
595 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
596 that this is simply a convenience method. You most likely want to use
597 L</search> with specific operators.
598
599 For more information, see L<DBIx::Class::Manual::Cookbook>.
600
601 =cut
602
603 sub search_like {
604   my $class = shift;
605   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
606   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
607   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
608   return $class->search($query, { %$attrs });
609 }
610
611 =head2 slice
612
613 =over 4
614
615 =item Arguments: $first, $last
616
617 =item Return Value: $resultset (scalar context), @row_objs (list context)
618
619 =back
620
621 Returns a resultset or object list representing a subset of elements from the
622 resultset slice is called on. Indexes are from 0, i.e., to get the first
623 three records, call:
624
625   my ($one, $two, $three) = $rs->slice(0, 2);
626
627 =cut
628
629 sub slice {
630   my ($self, $min, $max) = @_;
631   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
632   $attrs->{offset} = $self->{attrs}{offset} || 0;
633   $attrs->{offset} += $min;
634   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
635   return $self->search(undef(), $attrs);
636   #my $slice = (ref $self)->new($self->result_source, $attrs);
637   #return (wantarray ? $slice->all : $slice);
638 }
639
640 =head2 next
641
642 =over 4
643
644 =item Arguments: none
645
646 =item Return Value: $result?
647
648 =back
649
650 Returns the next element in the resultset (C<undef> is there is none).
651
652 Can be used to efficiently iterate over records in the resultset:
653
654   my $rs = $schema->resultset('CD')->search;
655   while (my $cd = $rs->next) {
656     print $cd->title;
657   }
658
659 Note that you need to store the resultset object, and call C<next> on it. 
660 Calling C<< resultset('Table')->next >> repeatedly will always return the
661 first record from the resultset.
662
663 =cut
664
665 sub next {
666   my ($self) = @_;
667   if (my $cache = $self->get_cache) {
668     $self->{all_cache_position} ||= 0;
669     return $cache->[$self->{all_cache_position}++];
670   }
671   if ($self->{attrs}{cache}) {
672     $self->{all_cache_position} = 1;
673     return ($self->all)[0];
674   }
675   my @row = (exists $self->{stashed_row} ?
676                @{delete $self->{stashed_row}} :
677                $self->cursor->next
678   );
679   return unless (@row);
680   return $self->_construct_object(@row);
681 }
682
683 sub _resolve {
684   my $self = shift;
685
686   return if(exists $self->{_attrs}); #return if _resolve has already been called
687
688   my $attrs = $self->{attrs};    
689   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
690
691   # XXX - lose storable dclone
692   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
693   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
694   $attrs->{record_filter} = $record_filter if ($record_filter);
695   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
696
697   my $alias = $attrs->{alias};
698  
699   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
700   delete $attrs->{as} if $attrs->{columns};
701   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
702   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
703   $attrs->{select} = [
704                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
705                       ] if $attrs->{columns};
706   $attrs->{as} ||= [
707                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
708                     ];
709   if (my $include = delete $attrs->{include_columns}) {
710       push(@{$attrs->{select}}, @$include);
711       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
712   }
713
714   $attrs->{from} ||= [ { $alias => $source->from } ];
715   $attrs->{seen_join} ||= {};
716   my %seen;
717   if (my $join = delete $attrs->{join}) {
718     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
719       if (ref $j eq 'HASH') {
720         $seen{$_} = 1 foreach keys %$j;
721       } else {
722         $seen{$j} = 1;
723       }
724     }
725     
726     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
727   }
728   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
729   $attrs->{order_by} = [ $attrs->{order_by} ] if
730       $attrs->{order_by} and !ref($attrs->{order_by});
731   $attrs->{order_by} ||= [];
732
733   if(my $seladds = delete($attrs->{'+select'})) {
734     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
735     $attrs->{select} = [
736                         @{ $attrs->{select} },
737                         map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
738                         ];
739   }
740   if(my $asadds = delete($attrs->{'+as'})) {
741     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
742     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
743   }
744   
745   my $collapse = $attrs->{collapse} || {};
746   if (my $prefetch = delete $attrs->{prefetch}) {
747       my @pre_order;
748       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
749           if ( ref $p eq 'HASH' ) {
750               foreach my $key (keys %$p) {
751                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
752                       unless $seen{$key};
753               }
754           } else {
755               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
756                   unless $seen{$p};
757           }
758           my @prefetch = $source->resolve_prefetch(
759                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
760           push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
761           push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
762       }
763       push(@{$attrs->{order_by}}, @pre_order);
764   }
765   $attrs->{collapse} = $collapse;
766   $self->{_attrs} = $attrs;
767 }
768
769 sub _merge_attr {
770   my ($self, $a, $b, $is_prefetch) = @_;
771     
772   return $b unless $a;
773   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
774     foreach my $key (keys %{$b}) {
775       if (exists $a->{$key}) {
776         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
777       } else {
778         $a->{$key} = delete $b->{$key};
779       }
780     }
781     return $a;
782   } else {
783     $a = [$a] unless (ref $a eq 'ARRAY');
784     $b = [$b] unless (ref $b eq 'ARRAY');
785     
786     my $hash = {};
787     my $array = [];      
788     foreach ($a, $b) {
789       foreach my $element (@{$_}) {
790         if (ref $element eq 'HASH') {
791           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
792         } elsif (ref $element eq 'ARRAY') {
793           $array = [@{$array}, @{$element}];
794         } else {        
795           if (($b == $_) && $is_prefetch) {
796             $self->_merge_array($array, $element, $is_prefetch);
797           } else {
798             push(@{$array}, $element);
799           }
800         }
801       }
802     }
803
804                 my $final_array = [];
805                 foreach my $element (@{$array}) {
806                         push(@{$final_array}, $element) unless (exists $hash->{$element});
807                 }
808                 $array = $final_array;
809
810     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
811       return [$hash, @{$array}];
812     } else {    
813       return (keys %{$hash}) ? $hash : $array;
814     }
815   }
816 }
817
818 sub _merge_array {
819   my ($self, $a, $b) = @_;
820   
821   $b = [$b] unless (ref $b eq 'ARRAY');
822   # add elements from @{$b} to @{$a} which aren't already in @{$a}
823   foreach my $b_element (@{$b}) {
824     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
825   }
826 }
827
828 sub _construct_object {
829   my ($self, @row) = @_;
830   my @as = @{ $self->{_attrs}{as} };
831   
832   my $info = $self->_collapse_result(\@as, \@row);
833   my $new = $self->result_class->inflate_result($self->result_source, @$info);
834   $new = $self->{_attrs}{record_filter}->($new)
835     if exists $self->{_attrs}{record_filter};
836   return $new;
837 }
838
839 sub _collapse_result {
840   my ($self, $as, $row, $prefix) = @_;
841
842   my $live_join = $self->{attrs}->{_live_join} ||="";
843   my %const;
844
845   my @copy = @$row;
846   foreach my $this_as (@$as) {
847     my $val = shift @copy;
848     if (defined $prefix) {
849       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
850         my $remain = $1;
851         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
852         $const{$1||''}{$2} = $val;
853       }
854     } else {
855       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
856       $const{$1||''}{$2} = $val;
857     }
858   }
859
860   my $info = [ {}, {} ];
861   foreach my $key (keys %const) {
862     if (length $key && $key ne $live_join) {
863       my $target = $info;
864       my @parts = split(/\./, $key);
865       foreach my $p (@parts) {
866         $target = $target->[1]->{$p} ||= [];
867       }
868       $target->[0] = $const{$key};
869     } else {
870       $info->[0] = $const{$key};
871     }
872   }
873
874   my @collapse;
875   if (defined $prefix) {
876     @collapse = map {
877         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
878     } keys %{$self->{_attrs}->{collapse}}
879   } else {
880     @collapse = keys %{$self->{_attrs}->{collapse}};
881   };
882
883   if (@collapse) {
884     my ($c) = sort { length $a <=> length $b } @collapse;
885     my $target = $info;
886     foreach my $p (split(/\./, $c)) {
887       $target = $target->[1]->{$p} ||= [];
888     }
889     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
890     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
891     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
892     my $tree = $self->_collapse_result($as, $row, $c_prefix);
893     my (@final, @raw);
894     while ( !(grep {
895                 !defined($tree->[0]->{$_}) ||
896                 $co_check{$_} ne $tree->[0]->{$_}
897               } @co_key) ) {
898       push(@final, $tree);
899       last unless (@raw = $self->cursor->next);
900       $row = $self->{stashed_row} = \@raw;
901       $tree = $self->_collapse_result($as, $row, $c_prefix);
902     }
903     @$target = (@final ? @final : [ {}, {} ]); 
904       # single empty result to indicate an empty prefetched has_many
905   }
906   return $info;
907 }
908
909 =head2 result_source
910
911 =over 4
912
913 =item Arguments: $result_source?
914
915 =item Return Value: $result_source
916
917 =back
918
919 An accessor for the primary ResultSource object from which this ResultSet
920 is derived.
921
922 =cut
923
924
925 =head2 count
926
927 =over 4
928
929 =item Arguments: $cond, \%attrs??
930
931 =item Return Value: $count
932
933 =back
934
935 Performs an SQL C<COUNT> with the same query as the resultset was built
936 with to find the number of elements. If passed arguments, does a search
937 on the resultset and counts the results of that.
938
939 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
940 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
941 not support C<DISTINCT> with multiple columns. If you are using such a
942 database, you should only use columns from the main table in your C<group_by>
943 clause.
944
945 =cut
946
947 sub count {
948   my $self = shift;
949   return $self->search(@_)->count if @_ and defined $_[0];
950   return scalar @{ $self->get_cache } if $self->get_cache;
951   my $count = $self->_count;
952   return 0 unless $count;
953
954   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
955   $count = $self->{attrs}{rows} if
956     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
957   return $count;
958 }
959
960 sub _count { # Separated out so pager can get the full count
961   my $self = shift;
962   my $select = { count => '*' };
963   
964   $self->_resolve;
965   my $attrs = { %{ $self->{_attrs} } };
966   if (my $group_by = delete $attrs->{group_by}) {
967     delete $attrs->{having};
968     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
969     # todo: try CONCAT for multi-column pk
970     my @pk = $self->result_source->primary_columns;
971     if (@pk == 1) {
972       foreach my $column (@distinct) {
973         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
974           @distinct = ($column);
975           last;
976         }
977       }
978     }
979
980     $select = { count => { distinct => \@distinct } };
981   }
982
983   $attrs->{select} = $select;
984   $attrs->{as} = [qw/count/];
985
986   # offset, order by and page are not needed to count. record_filter is cdbi
987   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
988         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
989         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
990
991   my ($count) = $tmp_rs->cursor->next;
992   return $count;
993 }
994
995 =head2 count_literal
996
997 =over 4
998
999 =item Arguments: $sql_fragment, @bind_values
1000
1001 =item Return Value: $count
1002
1003 =back
1004
1005 Counts the results in a literal query. Equivalent to calling L</search_literal>
1006 with the passed arguments, then L</count>.
1007
1008 =cut
1009
1010 sub count_literal { shift->search_literal(@_)->count; }
1011
1012 =head2 all
1013
1014 =over 4
1015
1016 =item Arguments: none
1017
1018 =item Return Value: @objects
1019
1020 =back
1021
1022 Returns all elements in the resultset. Called implicitly if the resultset
1023 is returned in list context.
1024
1025 =cut
1026
1027 sub all {
1028   my ($self) = @_;
1029   return @{ $self->get_cache } if $self->get_cache;
1030
1031   my @obj;
1032
1033   # TODO: don't call resolve here
1034   $self->_resolve;
1035   if (keys %{$self->{_attrs}->{collapse}}) {
1036 #  if ($self->{attrs}->{prefetch}) {
1037       # Using $self->cursor->all is really just an optimisation.
1038       # If we're collapsing has_many prefetches it probably makes
1039       # very little difference, and this is cleaner than hacking
1040       # _construct_object to survive the approach
1041     my @row = $self->cursor->next;
1042     while (@row) {
1043       push(@obj, $self->_construct_object(@row));
1044       @row = (exists $self->{stashed_row}
1045                ? @{delete $self->{stashed_row}}
1046                : $self->cursor->next);
1047     }
1048   } else {
1049     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1050   }
1051
1052   $self->set_cache(\@obj) if $self->{attrs}{cache};
1053   return @obj;
1054 }
1055
1056 =head2 reset
1057
1058 =over 4
1059
1060 =item Arguments: none
1061
1062 =item Return Value: $self
1063
1064 =back
1065
1066 Resets the resultset's cursor, so you can iterate through the elements again.
1067
1068 =cut
1069
1070 sub reset {
1071   my ($self) = @_;
1072   delete $self->{_attrs} if (exists $self->{_attrs});
1073
1074   $self->{all_cache_position} = 0;
1075   $self->cursor->reset;
1076   return $self;
1077 }
1078
1079 =head2 first
1080
1081 =over 4
1082
1083 =item Arguments: none
1084
1085 =item Return Value: $object?
1086
1087 =back
1088
1089 Resets the resultset and returns an object for the first result (if the
1090 resultset returns anything).
1091
1092 =cut
1093
1094 sub first {
1095   return $_[0]->reset->next;
1096 }
1097
1098 # _cond_for_update_delete
1099 #
1100 # update/delete require the condition to be modified to handle
1101 # the differing SQL syntax available.  This transforms the $self->{cond}
1102 # appropriately, returning the new condition.
1103
1104 sub _cond_for_update_delete {
1105   my ($self) = @_;
1106   my $cond = {};
1107
1108   if (!ref($self->{cond})) {
1109     # No-op. No condition, we're updating/deleting everything
1110   }
1111   elsif (ref $self->{cond} eq 'ARRAY') {
1112     $cond = [
1113       map {
1114         my %hash;
1115         foreach my $key (keys %{$_}) {
1116           $key =~ /([^.]+)$/;
1117           $hash{$1} = $_->{$key};
1118         }
1119         \%hash;
1120       } @{$self->{cond}}
1121     ];
1122   }
1123   elsif (ref $self->{cond} eq 'HASH') {
1124     if ((keys %{$self->{cond}})[0] eq '-and') {
1125       $cond->{-and} = [];
1126
1127       my @cond = @{$self->{cond}{-and}};
1128       for (my $i = 0; $i <= @cond - 1; $i++) {
1129         my $entry = $cond[$i];
1130
1131         my %hash;
1132         if (ref $entry eq 'HASH') {
1133           foreach my $key (keys %{$entry}) {
1134             $key =~ /([^.]+)$/;
1135             $hash{$1} = $entry->{$key};
1136           }
1137         }
1138         else {
1139           $entry =~ /([^.]+)$/;
1140           $hash{$1} = $cond[++$i];
1141         }
1142
1143         push @{$cond->{-and}}, \%hash;
1144       }
1145     }
1146     else {
1147       foreach my $key (keys %{$self->{cond}}) {
1148         $key =~ /([^.]+)$/;
1149         $cond->{$1} = $self->{cond}{$key};
1150       }
1151     }
1152   }
1153   else {
1154     $self->throw_exception(
1155       "Can't update/delete on resultset with condition unless hash or array"
1156     );
1157   }
1158
1159   return $cond;
1160 }
1161
1162
1163 =head2 update
1164
1165 =over 4
1166
1167 =item Arguments: \%values
1168
1169 =item Return Value: $storage_rv
1170
1171 =back
1172
1173 Sets the specified columns in the resultset to the supplied values in a
1174 single query. Return value will be true if the update succeeded or false
1175 if no records were updated; exact type of success value is storage-dependent.
1176
1177 =cut
1178
1179 sub update {
1180   my ($self, $values) = @_;
1181   $self->throw_exception("Values for update must be a hash")
1182     unless ref $values eq 'HASH';
1183
1184   my $cond = $self->_cond_for_update_delete;
1185
1186   return $self->result_source->storage->update(
1187     $self->result_source->from, $values, $cond
1188   );
1189 }
1190
1191 =head2 update_all
1192
1193 =over 4
1194
1195 =item Arguments: \%values
1196
1197 =item Return Value: 1
1198
1199 =back
1200
1201 Fetches all objects and updates them one at a time. Note that C<update_all>
1202 will run DBIC cascade triggers, while L</update> will not.
1203
1204 =cut
1205
1206 sub update_all {
1207   my ($self, $values) = @_;
1208   $self->throw_exception("Values for update must be a hash")
1209     unless ref $values eq 'HASH';
1210   foreach my $obj ($self->all) {
1211     $obj->set_columns($values)->update;
1212   }
1213   return 1;
1214 }
1215
1216 =head2 delete
1217
1218 =over 4
1219
1220 =item Arguments: none
1221
1222 =item Return Value: 1
1223
1224 =back
1225
1226 Deletes the contents of the resultset from its result source. Note that this
1227 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1228 to run.
1229
1230 =cut
1231
1232 sub delete {
1233   my ($self) = @_;
1234   my $del = {};
1235
1236   my $cond = $self->_cond_for_update_delete;
1237
1238   $self->result_source->storage->delete($self->result_source->from, $cond);
1239   return 1;
1240 }
1241
1242 =head2 delete_all
1243
1244 =over 4
1245
1246 =item Arguments: none
1247
1248 =item Return Value: 1
1249
1250 =back
1251
1252 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1253 will run DBIC cascade triggers, while L</delete> will not.
1254
1255 =cut
1256
1257 sub delete_all {
1258   my ($self) = @_;
1259   $_->delete for $self->all;
1260   return 1;
1261 }
1262
1263 =head2 pager
1264
1265 =over 4
1266
1267 =item Arguments: none
1268
1269 =item Return Value: $pager
1270
1271 =back
1272
1273 Return Value a L<Data::Page> object for the current resultset. Only makes
1274 sense for queries with a C<page> attribute.
1275
1276 =cut
1277
1278 sub pager {
1279   my ($self) = @_;
1280   my $attrs = $self->{attrs};
1281   $self->throw_exception("Can't create pager for non-paged rs")
1282     unless $self->{page};
1283   $attrs->{rows} ||= 10;
1284   return $self->{pager} ||= Data::Page->new(
1285     $self->_count, $attrs->{rows}, $self->{page});
1286 }
1287
1288 =head2 page
1289
1290 =over 4
1291
1292 =item Arguments: $page_number
1293
1294 =item Return Value: $rs
1295
1296 =back
1297
1298 Returns a resultset for the $page_number page of the resultset on which page
1299 is called, where each page contains a number of rows equal to the 'rows'
1300 attribute set on the resultset (10 by default).
1301
1302 =cut
1303
1304 sub page {
1305   my ($self, $page) = @_;
1306   my $attrs = { %{$self->{attrs}} };
1307   $attrs->{page} = $page;
1308   return (ref $self)->new($self->result_source, $attrs);
1309 }
1310
1311 =head2 new_result
1312
1313 =over 4
1314
1315 =item Arguments: \%vals
1316
1317 =item Return Value: $object
1318
1319 =back
1320
1321 Creates an object in the resultset's result class and returns it.
1322
1323 =cut
1324
1325 sub new_result {
1326   my ($self, $values) = @_;
1327   $self->throw_exception( "new_result needs a hash" )
1328     unless (ref $values eq 'HASH');
1329   $self->throw_exception(
1330     "Can't abstract implicit construct, condition not a hash"
1331   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1332   my %new = %$values;
1333   my $alias = $self->{attrs}{alias};
1334   foreach my $key (keys %{$self->{cond}||{}}) {
1335     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1336   }
1337   my $obj = $self->result_class->new(\%new);
1338   $obj->result_source($self->result_source) if $obj->can('result_source');
1339   return $obj;
1340 }
1341
1342 =head2 find_or_new
1343
1344 =over 4
1345
1346 =item Arguments: \%vals, \%attrs?
1347
1348 =item Return Value: $object
1349
1350 =back
1351
1352 Find an existing record from this resultset. If none exists, instantiate a new
1353 result object and return it. The object will not be saved into your storage
1354 until you call L<DBIx::Class::Row/insert> on it.
1355
1356 If you want objects to be saved immediately, use L</find_or_create> instead.
1357
1358 =cut
1359
1360 sub find_or_new {
1361   my $self     = shift;
1362   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1363   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1364   my $exists   = $self->find($hash, $attrs);
1365   return defined $exists ? $exists : $self->new_result($hash);
1366 }
1367
1368 =head2 create
1369
1370 =over 4
1371
1372 =item Arguments: \%vals
1373
1374 =item Return Value: $object
1375
1376 =back
1377
1378 Inserts a record into the resultset and returns the object representing it.
1379
1380 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1381
1382 =cut
1383
1384 sub create {
1385   my ($self, $attrs) = @_;
1386   $self->throw_exception( "create needs a hashref" )
1387     unless ref $attrs eq 'HASH';
1388   return $self->new_result($attrs)->insert;
1389 }
1390
1391 =head2 find_or_create
1392
1393 =over 4
1394
1395 =item Arguments: \%vals, \%attrs?
1396
1397 =item Return Value: $object
1398
1399 =back
1400
1401   $class->find_or_create({ key => $val, ... });
1402
1403 Tries to find a record based on its primary key or unique constraint; if none
1404 is found, creates one and returns that instead.
1405
1406   my $cd = $schema->resultset('CD')->find_or_create({
1407     cdid   => 5,
1408     artist => 'Massive Attack',
1409     title  => 'Mezzanine',
1410     year   => 2005,
1411   });
1412
1413 Also takes an optional C<key> attribute, to search by a specific key or unique
1414 constraint. For example:
1415
1416   my $cd = $schema->resultset('CD')->find_or_create(
1417     {
1418       artist => 'Massive Attack',
1419       title  => 'Mezzanine',
1420     },
1421     { key => 'cd_artist_title' }
1422   );
1423
1424 See also L</find> and L</update_or_create>. For information on how to declare
1425 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1426
1427 =cut
1428
1429 sub find_or_create {
1430   my $self     = shift;
1431   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1432   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1433   my $exists   = $self->find($hash, $attrs);
1434   return defined $exists ? $exists : $self->create($hash);
1435 }
1436
1437 =head2 update_or_create
1438
1439 =over 4
1440
1441 =item Arguments: \%col_values, { key => $unique_constraint }?
1442
1443 =item Return Value: $object
1444
1445 =back
1446
1447   $class->update_or_create({ col => $val, ... });
1448
1449 First, searches for an existing row matching one of the unique constraints
1450 (including the primary key) on the source of this resultset. If a row is
1451 found, updates it with the other given column values. Otherwise, creates a new
1452 row.
1453
1454 Takes an optional C<key> attribute to search on a specific unique constraint.
1455 For example:
1456
1457   # In your application
1458   my $cd = $schema->resultset('CD')->update_or_create(
1459     {
1460       artist => 'Massive Attack',
1461       title  => 'Mezzanine',
1462       year   => 1998,
1463     },
1464     { key => 'cd_artist_title' }
1465   );
1466
1467 If no C<key> is specified, it searches on all unique constraints defined on the
1468 source, including the primary key.
1469
1470 If the C<key> is specified as C<primary>, it searches only on the primary key.
1471
1472 See also L</find> and L</find_or_create>. For information on how to declare
1473 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1474
1475 =cut
1476
1477 sub update_or_create {
1478   my $self = shift;
1479   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1480   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1481
1482   my $row = $self->find($cond);
1483   if (defined $row) {
1484     $row->update($cond);
1485     return $row;
1486   }
1487
1488   return $self->create($cond);
1489 }
1490
1491 =head2 get_cache
1492
1493 =over 4
1494
1495 =item Arguments: none
1496
1497 =item Return Value: \@cache_objects?
1498
1499 =back
1500
1501 Gets the contents of the cache for the resultset, if the cache is set.
1502
1503 =cut
1504
1505 sub get_cache {
1506   shift->{all_cache};
1507 }
1508
1509 =head2 set_cache
1510
1511 =over 4
1512
1513 =item Arguments: \@cache_objects
1514
1515 =item Return Value: \@cache_objects
1516
1517 =back
1518
1519 Sets the contents of the cache for the resultset. Expects an arrayref
1520 of objects of the same class as those produced by the resultset. Note that
1521 if the cache is set the resultset will return the cached objects rather
1522 than re-querying the database even if the cache attr is not set.
1523
1524 =cut
1525
1526 sub set_cache {
1527   my ( $self, $data ) = @_;
1528   $self->throw_exception("set_cache requires an arrayref")
1529       if defined($data) && (ref $data ne 'ARRAY');
1530   $self->{all_cache} = $data;
1531 }
1532
1533 =head2 clear_cache
1534
1535 =over 4
1536
1537 =item Arguments: none
1538
1539 =item Return Value: []
1540
1541 =back
1542
1543 Clears the cache for the resultset.
1544
1545 =cut
1546
1547 sub clear_cache {
1548   shift->set_cache(undef);
1549 }
1550
1551 =head2 related_resultset
1552
1553 =over 4
1554
1555 =item Arguments: $relationship_name
1556
1557 =item Return Value: $resultset
1558
1559 =back
1560
1561 Returns a related resultset for the supplied relationship name.
1562
1563   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1564
1565 =cut
1566
1567 sub related_resultset {
1568   my ( $self, $rel ) = @_;
1569   
1570   $self->{related_resultsets} ||= {};
1571   return $self->{related_resultsets}{$rel} ||= do {
1572     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1573     my $rel_obj = $self->result_source->relationship_info($rel);
1574     $self->throw_exception(
1575         "search_related: result source '" . $self->result_source->name .
1576         "' has no such relationship ${rel}")
1577         unless $rel_obj; #die Dumper $self->{attrs};
1578
1579                 my $live_join_stack = $self->{attrs}->{_live_join_stack} || [];         
1580                 push(@{$live_join_stack}, $rel);
1581                 
1582     my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1583            )->search( undef,
1584                       { select => undef,
1585                         as => undef,
1586                         _live_join => $rel, #the most recent
1587                         _live_join_stack => $live_join_stack, #the trail of rels
1588                         _parent_attrs => $self->{attrs}}
1589                       );    
1590
1591     # keep reference of the original resultset
1592     $rs->{_parent_rs} = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->result_source;
1593     return $rs;
1594   };
1595 }
1596
1597 =head2 throw_exception
1598
1599 See L<DBIx::Class::Schema/throw_exception> for details.
1600
1601 =cut
1602
1603 sub throw_exception {
1604   my $self=shift;
1605   $self->result_source->schema->throw_exception(@_);
1606 }
1607
1608 # XXX: FIXME: Attributes docs need clearing up
1609
1610 =head1 ATTRIBUTES
1611
1612 The resultset takes various attributes that modify its behavior. Here's an
1613 overview of them:
1614
1615 =head2 order_by
1616
1617 =over 4
1618
1619 =item Value: ($order_by | \@order_by)
1620
1621 =back
1622
1623 Which column(s) to order the results by. This is currently passed
1624 through directly to SQL, so you can give e.g. C<year DESC> for a
1625 descending order on the column `year'.
1626
1627 Please note that if you have quoting enabled (see 
1628 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1629 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1630 so you will need to manually quote things as appropriate.)
1631
1632 =head2 columns
1633
1634 =over 4
1635
1636 =item Value: \@columns
1637
1638 =back
1639
1640 Shortcut to request a particular set of columns to be retrieved.  Adds
1641 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1642 from that, then auto-populates C<as> from C<select> as normal. (You may also
1643 use the C<cols> attribute, as in earlier versions of DBIC.)
1644
1645 =head2 include_columns
1646
1647 =over 4
1648
1649 =item Value: \@columns
1650
1651 =back
1652
1653 Shortcut to include additional columns in the returned results - for example
1654
1655   $schema->resultset('CD')->search(undef, {
1656     include_columns => ['artist.name'],
1657     join => ['artist']
1658   });
1659
1660 would return all CDs and include a 'name' column to the information
1661 passed to object inflation
1662
1663 =head2 select
1664
1665 =over 4
1666
1667 =item Value: \@select_columns
1668
1669 =back
1670
1671 Indicates which columns should be selected from the storage. You can use
1672 column names, or in the case of RDBMS back ends, function or stored procedure
1673 names:
1674
1675   $rs = $schema->resultset('Employee')->search(undef, {
1676     select => [
1677       'name',
1678       { count => 'employeeid' },
1679       { sum => 'salary' }
1680     ]
1681   });
1682
1683 When you use function/stored procedure names and do not supply an C<as>
1684 attribute, the column names returned are storage-dependent. E.g. MySQL would
1685 return a column named C<count(employeeid)> in the above example.
1686
1687 =head2 +select
1688
1689 =over 4
1690
1691 Indicates additional columns to be selected from storage.  Works the same as
1692 L<select> but adds columns to the selection.
1693
1694 =back
1695
1696 =head2 +as
1697
1698 =over 4
1699
1700 Indicates additional column names for those added via L<+select>.
1701
1702 =back
1703
1704 =head2 as
1705
1706 =over 4
1707
1708 =item Value: \@inflation_names
1709
1710 =back
1711
1712 Indicates column names for object inflation. This is used in conjunction with
1713 C<select>, usually when C<select> contains one or more function or stored
1714 procedure names:
1715
1716   $rs = $schema->resultset('Employee')->search(undef, {
1717     select => [
1718       'name',
1719       { count => 'employeeid' }
1720     ],
1721     as => ['name', 'employee_count'],
1722   });
1723
1724   my $employee = $rs->first(); # get the first Employee
1725
1726 If the object against which the search is performed already has an accessor
1727 matching a column name specified in C<as>, the value can be retrieved using
1728 the accessor as normal:
1729
1730   my $name = $employee->name();
1731
1732 If on the other hand an accessor does not exist in the object, you need to
1733 use C<get_column> instead:
1734
1735   my $employee_count = $employee->get_column('employee_count');
1736
1737 You can create your own accessors if required - see
1738 L<DBIx::Class::Manual::Cookbook> for details.
1739
1740 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1741 produced, it is used for internal access only. Thus attempting to use the accessor
1742 in an C<order_by> clause or similar will fail misrably.
1743
1744 =head2 join
1745
1746 =over 4
1747
1748 =item Value: ($rel_name | \@rel_names | \%rel_names)
1749
1750 =back
1751
1752 Contains a list of relationships that should be joined for this query.  For
1753 example:
1754
1755   # Get CDs by Nine Inch Nails
1756   my $rs = $schema->resultset('CD')->search(
1757     { 'artist.name' => 'Nine Inch Nails' },
1758     { join => 'artist' }
1759   );
1760
1761 Can also contain a hash reference to refer to the other relation's relations.
1762 For example:
1763
1764   package MyApp::Schema::Track;
1765   use base qw/DBIx::Class/;
1766   __PACKAGE__->table('track');
1767   __PACKAGE__->add_columns(qw/trackid cd position title/);
1768   __PACKAGE__->set_primary_key('trackid');
1769   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1770   1;
1771
1772   # In your application
1773   my $rs = $schema->resultset('Artist')->search(
1774     { 'track.title' => 'Teardrop' },
1775     {
1776       join     => { cd => 'track' },
1777       order_by => 'artist.name',
1778     }
1779   );
1780
1781 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1782 similarly for a third time). For e.g.
1783
1784   my $rs = $schema->resultset('Artist')->search({
1785     'cds.title'   => 'Down to Earth',
1786     'cds_2.title' => 'Popular',
1787   }, {
1788     join => [ qw/cds cds/ ],
1789   });
1790
1791 will return a set of all artists that have both a cd with title 'Down
1792 to Earth' and a cd with title 'Popular'.
1793
1794 If you want to fetch related objects from other tables as well, see C<prefetch>
1795 below.
1796
1797 =head2 prefetch
1798
1799 =over 4
1800
1801 =item Value: ($rel_name | \@rel_names | \%rel_names)
1802
1803 =back
1804
1805 Contains one or more relationships that should be fetched along with the main
1806 query (when they are accessed afterwards they will have already been
1807 "prefetched").  This is useful for when you know you will need the related
1808 objects, because it saves at least one query:
1809
1810   my $rs = $schema->resultset('Tag')->search(
1811     undef,
1812     {
1813       prefetch => {
1814         cd => 'artist'
1815       }
1816     }
1817   );
1818
1819 The initial search results in SQL like the following:
1820
1821   SELECT tag.*, cd.*, artist.* FROM tag
1822   JOIN cd ON tag.cd = cd.cdid
1823   JOIN artist ON cd.artist = artist.artistid
1824
1825 L<DBIx::Class> has no need to go back to the database when we access the
1826 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1827 case.
1828
1829 Simple prefetches will be joined automatically, so there is no need
1830 for a C<join> attribute in the above search. If you're prefetching to
1831 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1832 specify the join as well.
1833
1834 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1835 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1836 with an accessor type of 'single' or 'filter').
1837
1838 =head2 page
1839
1840 =over 4
1841
1842 =item Value: $page
1843
1844 =back
1845
1846 Makes the resultset paged and specifies the page to retrieve. Effectively
1847 identical to creating a non-pages resultset and then calling ->page($page)
1848 on it. 
1849
1850 If L<rows> attribute is not specified it defualts to 10 rows per page.
1851
1852 =head2 rows
1853
1854 =over 4
1855
1856 =item Value: $rows
1857
1858 =back
1859
1860 Specifes the maximum number of rows for direct retrieval or the number of
1861 rows per page if the page attribute or method is used.
1862
1863 =head2 offset
1864
1865 =over 4
1866
1867 =item Value: $offset
1868
1869 =back
1870
1871 Specifies the (zero-based) row number for the  first row to be returned, or the
1872 of the first row of the first page if paging is used.
1873
1874 =head2 group_by
1875
1876 =over 4
1877
1878 =item Value: \@columns
1879
1880 =back
1881
1882 A arrayref of columns to group by. Can include columns of joined tables.
1883
1884   group_by => [qw/ column1 column2 ... /]
1885
1886 =head2 having
1887
1888 =over 4
1889
1890 =item Value: $condition
1891
1892 =back
1893
1894 HAVING is a select statement attribute that is applied between GROUP BY and
1895 ORDER BY. It is applied to the after the grouping calculations have been
1896 done. 
1897
1898   having => { 'count(employee)' => { '>=', 100 } }
1899
1900 =head2 distinct
1901
1902 =over 4
1903
1904 =item Value: (0 | 1)
1905
1906 =back
1907
1908 Set to 1 to group by all columns.
1909
1910 =head2 cache
1911
1912 Set to 1 to cache search results. This prevents extra SQL queries if you
1913 revisit rows in your ResultSet:
1914
1915   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1916   
1917   while( my $artist = $resultset->next ) {
1918     ... do stuff ...
1919   }
1920
1921   $rs->first; # without cache, this would issue a query
1922
1923 By default, searches are not cached.
1924
1925 For more examples of using these attributes, see
1926 L<DBIx::Class::Manual::Cookbook>.
1927
1928 =head2 from
1929
1930 =over 4
1931
1932 =item Value: \@from_clause
1933
1934 =back
1935
1936 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1937 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1938 clauses.
1939
1940 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1941
1942 C<join> will usually do what you need and it is strongly recommended that you
1943 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1944 And we really do mean "cannot", not just tried and failed. Attempting to use
1945 this because you're having problems with C<join> is like trying to use x86
1946 ASM because you've got a syntax error in your C. Trust us on this.
1947
1948 Now, if you're still really, really sure you need to use this (and if you're
1949 not 100% sure, ask the mailing list first), here's an explanation of how this
1950 works.
1951
1952 The syntax is as follows -
1953
1954   [
1955     { <alias1> => <table1> },
1956     [
1957       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1958       [], # nested JOIN (optional)
1959       { <table1.column1> => <table2.column2>, ... (more conditions) },
1960     ],
1961     # More of the above [ ] may follow for additional joins
1962   ]
1963
1964   <table1> <alias1>
1965   JOIN
1966     <table2> <alias2>
1967     [JOIN ...]
1968   ON <table1.column1> = <table2.column2>
1969   <more joins may follow>
1970
1971 An easy way to follow the examples below is to remember the following:
1972
1973     Anything inside "[]" is a JOIN
1974     Anything inside "{}" is a condition for the enclosing JOIN
1975
1976 The following examples utilize a "person" table in a family tree application.
1977 In order to express parent->child relationships, this table is self-joined:
1978
1979     # Person->belongs_to('father' => 'Person');
1980     # Person->belongs_to('mother' => 'Person');
1981
1982 C<from> can be used to nest joins. Here we return all children with a father,
1983 then search against all mothers of those children:
1984
1985   $rs = $schema->resultset('Person')->search(
1986       undef,
1987       {
1988           alias => 'mother', # alias columns in accordance with "from"
1989           from => [
1990               { mother => 'person' },
1991               [
1992                   [
1993                       { child => 'person' },
1994                       [
1995                           { father => 'person' },
1996                           { 'father.person_id' => 'child.father_id' }
1997                       ]
1998                   ],
1999                   { 'mother.person_id' => 'child.mother_id' }
2000               ],
2001           ]
2002       },
2003   );
2004
2005   # Equivalent SQL:
2006   # SELECT mother.* FROM person mother
2007   # JOIN (
2008   #   person child
2009   #   JOIN person father
2010   #   ON ( father.person_id = child.father_id )
2011   # )
2012   # ON ( mother.person_id = child.mother_id )
2013
2014 The type of any join can be controlled manually. To search against only people
2015 with a father in the person table, we could explicitly use C<INNER JOIN>:
2016
2017     $rs = $schema->resultset('Person')->search(
2018         undef,
2019         {
2020             alias => 'child', # alias columns in accordance with "from"
2021             from => [
2022                 { child => 'person' },
2023                 [
2024                     { father => 'person', -join_type => 'inner' },
2025                     { 'father.id' => 'child.father_id' }
2026                 ],
2027             ]
2028         },
2029     );
2030
2031     # Equivalent SQL:
2032     # SELECT child.* FROM person child
2033     # INNER JOIN person father ON child.father_id = father.id
2034
2035 =cut
2036
2037 1;