718cb1a18b36ff1d4e39d8924801a2242ed4cb78
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
139
140 =cut
141
142 sub search {
143   my $self = shift;
144   my $rs = $self->search_rs( @_ );
145   return (wantarray ? $rs->all : $rs);
146 }
147
148 =head2 search_rs
149
150 =over 4
151
152 =item Arguments: $cond, \%attrs?
153
154 =item Return Value: $resultset
155
156 =back
157
158 This method does the same exact thing as search() except it will
159 always return a resultset, even in list context.
160
161 =cut
162
163 sub search_rs {
164   my $self = shift;
165
166   my $rows;
167
168   unless (@_) {                 # no search, effectively just a clone
169     $rows = $self->get_cache;
170   }
171
172   my $attrs = {};
173   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
174   my $our_attrs = { %{$self->{attrs}} };
175   my $having = delete $our_attrs->{having};
176   my $where = delete $our_attrs->{where};
177
178   my $new_attrs = { %{$our_attrs}, %{$attrs} };
179
180   # merge new attrs into inherited
181   foreach my $key (qw/join prefetch/) {
182     next unless exists $attrs->{$key};
183     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
184   }
185
186   my $cond = (@_
187     ? (
188         (@_ == 1 || ref $_[0] eq "HASH")
189           ? (
190               (ref $_[0] eq 'HASH')
191                 ? (
192                     (keys %{ $_[0] }  > 0)
193                       ? shift
194                       : undef
195                    )
196                 :  shift
197              )
198           : (
199               (@_ % 2)
200                 ? $self->throw_exception("Odd number of arguments to search")
201                 : {@_}
202              )
203       )
204     : undef
205   );
206
207   if (defined $where) {
208     $new_attrs->{where} = (
209       defined $new_attrs->{where}
210         ? { '-and' => [
211               map {
212                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
213               } $where, $new_attrs->{where}
214             ]
215           }
216         : $where);
217   }
218
219   if (defined $cond) {
220     $new_attrs->{where} = (
221       defined $new_attrs->{where}
222         ? { '-and' => [
223               map {
224                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
225               } $cond, $new_attrs->{where}
226             ]
227           }
228         : $cond);
229   }
230
231   if (defined $having) {
232     $new_attrs->{having} = (
233       defined $new_attrs->{having}
234         ? { '-and' => [
235               map {
236                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
237               } $having, $new_attrs->{having}
238             ]
239           }
240         : $having);
241   }
242
243   my $rs = (ref $self)->new($self->_source_handle, $new_attrs);
244   if ($rows) {
245     $rs->set_cache($rows);
246   }
247   return $rs;
248 }
249
250 =head2 search_literal
251
252 =over 4
253
254 =item Arguments: $sql_fragment, @bind_values
255
256 =item Return Value: $resultset (scalar context), @row_objs (list context)
257
258 =back
259
260   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
261   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
262
263 Pass a literal chunk of SQL to be added to the conditional part of the
264 resultset query.
265
266 =cut
267
268 sub search_literal {
269   my ($self, $cond, @vals) = @_;
270   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
271   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
272   return $self->search(\$cond, $attrs);
273 }
274
275 =head2 find
276
277 =over 4
278
279 =item Arguments: @values | \%cols, \%attrs?
280
281 =item Return Value: $row_object
282
283 =back
284
285 Finds a row based on its primary key or unique constraint. For example, to find
286 a row by its primary key:
287
288   my $cd = $schema->resultset('CD')->find(5);
289
290 You can also find a row by a specific unique constraint using the C<key>
291 attribute. For example:
292
293   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
294     key => 'cd_artist_title'
295   });
296
297 Additionally, you can specify the columns explicitly by name:
298
299   my $cd = $schema->resultset('CD')->find(
300     {
301       artist => 'Massive Attack',
302       title  => 'Mezzanine',
303     },
304     { key => 'cd_artist_title' }
305   );
306
307 If the C<key> is specified as C<primary>, it searches only on the primary key.
308
309 If no C<key> is specified, it searches on all unique constraints defined on the
310 source, including the primary key.
311
312 If your table does not have a primary key, you B<must> provide a value for the
313 C<key> attribute matching one of the unique constraints on the source.
314
315 See also L</find_or_create> and L</update_or_create>. For information on how to
316 declare unique constraints, see
317 L<DBIx::Class::ResultSource/add_unique_constraint>.
318
319 =cut
320
321 sub find {
322   my $self = shift;
323   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
324
325   # Default to the primary key, but allow a specific key
326   my @cols = exists $attrs->{key}
327     ? $self->result_source->unique_constraint_columns($attrs->{key})
328     : $self->result_source->primary_columns;
329   $self->throw_exception(
330     "Can't find unless a primary key is defined or unique constraint is specified"
331   ) unless @cols;
332
333   # Parse out a hashref from input
334   my $input_query;
335   if (ref $_[0] eq 'HASH') {
336     $input_query = { %{$_[0]} };
337   }
338   elsif (@_ == @cols) {
339     $input_query = {};
340     @{$input_query}{@cols} = @_;
341   }
342   else {
343     # Compatibility: Allow e.g. find(id => $value)
344     carp "Find by key => value deprecated; please use a hashref instead";
345     $input_query = {@_};
346   }
347
348   my (%related, $info);
349
350   foreach my $key (keys %$input_query) {
351     if (ref($input_query->{$key})
352         && ($info = $self->result_source->relationship_info($key))) {
353       my $rel_q = $self->result_source->resolve_condition(
354                     $info->{cond}, delete $input_query->{$key}, $key
355                   );
356       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
357       @related{keys %$rel_q} = values %$rel_q;
358     }
359   }
360   if (my @keys = keys %related) {
361     @{$input_query}{@keys} = values %related;
362   }
363
364   my @unique_queries = $self->_unique_queries($input_query, $attrs);
365
366   # Build the final query: Default to the disjunction of the unique queries,
367   # but allow the input query in case the ResultSet defines the query or the
368   # user is abusing find
369   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
370   my $query = @unique_queries
371     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
372     : $self->_add_alias($input_query, $alias);
373
374   # Run the query
375   if (keys %$attrs) {
376     my $rs = $self->search($query, $attrs);
377     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
378   }
379   else {
380     return keys %{$self->_resolved_attrs->{collapse}}
381       ? $self->search($query)->next
382       : $self->single($query);
383   }
384 }
385
386 # _add_alias
387 #
388 # Add the specified alias to the specified query hash. A copy is made so the
389 # original query is not modified.
390
391 sub _add_alias {
392   my ($self, $query, $alias) = @_;
393
394   my %aliased = %$query;
395   foreach my $col (grep { ! m/\./ } keys %aliased) {
396     $aliased{"$alias.$col"} = delete $aliased{$col};
397   }
398
399   return \%aliased;
400 }
401
402 # _unique_queries
403 #
404 # Build a list of queries which satisfy unique constraints.
405
406 sub _unique_queries {
407   my ($self, $query, $attrs) = @_;
408
409   my @constraint_names = exists $attrs->{key}
410     ? ($attrs->{key})
411     : $self->result_source->unique_constraint_names;
412
413   my @unique_queries;
414   foreach my $name (@constraint_names) {
415     my @unique_cols = $self->result_source->unique_constraint_columns($name);
416     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
417
418     my $num_query = scalar keys %$unique_query;
419     next unless $num_query;
420
421     # XXX: Assuming quite a bit about $self->{attrs}{where}
422     my $num_cols = scalar @unique_cols;
423     my $num_where = exists $self->{attrs}{where}
424       ? scalar keys %{ $self->{attrs}{where} }
425       : 0;
426     push @unique_queries, $unique_query
427       if $num_query + $num_where == $num_cols;
428   }
429
430   return @unique_queries;
431 }
432
433 # _build_unique_query
434 #
435 # Constrain the specified query hash based on the specified column names.
436
437 sub _build_unique_query {
438   my ($self, $query, $unique_cols) = @_;
439
440   return {
441     map  { $_ => $query->{$_} }
442     grep { exists $query->{$_} }
443       @$unique_cols
444   };
445 }
446
447 =head2 search_related
448
449 =over 4
450
451 =item Arguments: $rel, $cond, \%attrs?
452
453 =item Return Value: $new_resultset
454
455 =back
456
457   $new_rs = $cd_rs->search_related('artist', {
458     name => 'Emo-R-Us',
459   });
460
461 Searches the specified relationship, optionally specifying a condition and
462 attributes for matching records. See L</ATTRIBUTES> for more information.
463
464 =cut
465
466 sub search_related {
467   return shift->related_resultset(shift)->search(@_);
468 }
469
470 =head2 cursor
471
472 =over 4
473
474 =item Arguments: none
475
476 =item Return Value: $cursor
477
478 =back
479
480 Returns a storage-driven cursor to the given resultset. See
481 L<DBIx::Class::Cursor> for more information.
482
483 =cut
484
485 sub cursor {
486   my ($self) = @_;
487
488   my $attrs = { %{$self->_resolved_attrs} };
489   return $self->{cursor}
490     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
491           $attrs->{where},$attrs);
492 }
493
494 =head2 single
495
496 =over 4
497
498 =item Arguments: $cond?
499
500 =item Return Value: $row_object?
501
502 =back
503
504   my $cd = $schema->resultset('CD')->single({ year => 2001 });
505
506 Inflates the first result without creating a cursor if the resultset has
507 any records in it; if not returns nothing. Used by L</find> as an optimisation.
508
509 Can optionally take an additional condition *only* - this is a fast-code-path
510 method; if you need to add extra joins or similar call ->search and then
511 ->single without a condition on the $rs returned from that.
512
513 =cut
514
515 sub single {
516   my ($self, $where) = @_;
517   my $attrs = { %{$self->_resolved_attrs} };
518   if ($where) {
519     if (defined $attrs->{where}) {
520       $attrs->{where} = {
521         '-and' =>
522             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
523                $where, delete $attrs->{where} ]
524       };
525     } else {
526       $attrs->{where} = $where;
527     }
528   }
529
530 #  XXX: Disabled since it doesn't infer uniqueness in all cases
531 #  unless ($self->_is_unique_query($attrs->{where})) {
532 #    carp "Query not guaranteed to return a single row"
533 #      . "; please declare your unique constraints or use search instead";
534 #  }
535
536   my @data = $self->result_source->storage->select_single(
537     $attrs->{from}, $attrs->{select},
538     $attrs->{where}, $attrs
539   );
540
541   return (@data ? ($self->_construct_object(@data))[0] : ());
542 }
543
544 # _is_unique_query
545 #
546 # Try to determine if the specified query is guaranteed to be unique, based on
547 # the declared unique constraints.
548
549 sub _is_unique_query {
550   my ($self, $query) = @_;
551
552   my $collapsed = $self->_collapse_query($query);
553   my $alias = $self->{attrs}{alias};
554
555   foreach my $name ($self->result_source->unique_constraint_names) {
556     my @unique_cols = map {
557       "$alias.$_"
558     } $self->result_source->unique_constraint_columns($name);
559
560     # Count the values for each unique column
561     my %seen = map { $_ => 0 } @unique_cols;
562
563     foreach my $key (keys %$collapsed) {
564       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
565       next unless exists $seen{$aliased};  # Additional constraints are okay
566       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
567     }
568
569     # If we get 0 or more than 1 value for a column, it's not necessarily unique
570     return 1 unless grep { $_ != 1 } values %seen;
571   }
572
573   return 0;
574 }
575
576 # _collapse_query
577 #
578 # Recursively collapse the query, accumulating values for each column.
579
580 sub _collapse_query {
581   my ($self, $query, $collapsed) = @_;
582
583   $collapsed ||= {};
584
585   if (ref $query eq 'ARRAY') {
586     foreach my $subquery (@$query) {
587       next unless ref $subquery;  # -or
588 #      warn "ARRAY: " . Dumper $subquery;
589       $collapsed = $self->_collapse_query($subquery, $collapsed);
590     }
591   }
592   elsif (ref $query eq 'HASH') {
593     if (keys %$query and (keys %$query)[0] eq '-and') {
594       foreach my $subquery (@{$query->{-and}}) {
595 #        warn "HASH: " . Dumper $subquery;
596         $collapsed = $self->_collapse_query($subquery, $collapsed);
597       }
598     }
599     else {
600 #      warn "LEAF: " . Dumper $query;
601       foreach my $col (keys %$query) {
602         my $value = $query->{$col};
603         $collapsed->{$col}{$value}++;
604       }
605     }
606   }
607
608   return $collapsed;
609 }
610
611 =head2 get_column
612
613 =over 4
614
615 =item Arguments: $cond?
616
617 =item Return Value: $resultsetcolumn
618
619 =back
620
621   my $max_length = $rs->get_column('length')->max;
622
623 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
624
625 =cut
626
627 sub get_column {
628   my ($self, $column) = @_;
629   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
630   return $new;
631 }
632
633 =head2 search_like
634
635 =over 4
636
637 =item Arguments: $cond, \%attrs?
638
639 =item Return Value: $resultset (scalar context), @row_objs (list context)
640
641 =back
642
643   # WHERE title LIKE '%blue%'
644   $cd_rs = $rs->search_like({ title => '%blue%'});
645
646 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
647 that this is simply a convenience method. You most likely want to use
648 L</search> with specific operators.
649
650 For more information, see L<DBIx::Class::Manual::Cookbook>.
651
652 =cut
653
654 sub search_like {
655   my $class = shift;
656   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
657   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
658   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
659   return $class->search($query, { %$attrs });
660 }
661
662 =head2 slice
663
664 =over 4
665
666 =item Arguments: $first, $last
667
668 =item Return Value: $resultset (scalar context), @row_objs (list context)
669
670 =back
671
672 Returns a resultset or object list representing a subset of elements from the
673 resultset slice is called on. Indexes are from 0, i.e., to get the first
674 three records, call:
675
676   my ($one, $two, $three) = $rs->slice(0, 2);
677
678 =cut
679
680 sub slice {
681   my ($self, $min, $max) = @_;
682   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
683   $attrs->{offset} = $self->{attrs}{offset} || 0;
684   $attrs->{offset} += $min;
685   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
686   return $self->search(undef(), $attrs);
687   #my $slice = (ref $self)->new($self->result_source, $attrs);
688   #return (wantarray ? $slice->all : $slice);
689 }
690
691 =head2 next
692
693 =over 4
694
695 =item Arguments: none
696
697 =item Return Value: $result?
698
699 =back
700
701 Returns the next element in the resultset (C<undef> is there is none).
702
703 Can be used to efficiently iterate over records in the resultset:
704
705   my $rs = $schema->resultset('CD')->search;
706   while (my $cd = $rs->next) {
707     print $cd->title;
708   }
709
710 Note that you need to store the resultset object, and call C<next> on it.
711 Calling C<< resultset('Table')->next >> repeatedly will always return the
712 first record from the resultset.
713
714 =cut
715
716 sub next {
717   my ($self) = @_;
718   if (my $cache = $self->get_cache) {
719     $self->{all_cache_position} ||= 0;
720     return $cache->[$self->{all_cache_position}++];
721   }
722   if ($self->{attrs}{cache}) {
723     $self->{all_cache_position} = 1;
724     return ($self->all)[0];
725   }
726   if ($self->{stashed_objects}) {
727     my $obj = shift(@{$self->{stashed_objects}});
728     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
729     return $obj;
730   }
731   my @row = (
732     exists $self->{stashed_row}
733       ? @{delete $self->{stashed_row}}
734       : $self->cursor->next
735   );
736   return unless (@row);
737   my ($row, @more) = $self->_construct_object(@row);
738   $self->{stashed_objects} = \@more if @more;
739   return $row;
740 }
741
742 sub _construct_object {
743   my ($self, @row) = @_;
744   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
745   my @new = $self->result_class->inflate_result($self->_source_handle, @$info);
746   @new = $self->{_attrs}{record_filter}->(@new)
747     if exists $self->{_attrs}{record_filter};
748   return @new;
749 }
750
751 sub _collapse_result {
752   my ($self, $as, $row, $prefix) = @_;
753
754   my %const;
755   my @copy = @$row;
756   
757   foreach my $this_as (@$as) {
758     my $val = shift @copy;
759     if (defined $prefix) {
760       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
761         my $remain = $1;
762         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
763         $const{$1||''}{$2} = $val;
764       }
765     } else {
766       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
767       $const{$1||''}{$2} = $val;
768     }
769   }
770
771   my $alias = $self->{attrs}{alias};
772   my $info = [ {}, {} ];
773   foreach my $key (keys %const) {
774     if (length $key && $key ne $alias) {
775       my $target = $info;
776       my @parts = split(/\./, $key);
777       foreach my $p (@parts) {
778         $target = $target->[1]->{$p} ||= [];
779       }
780       $target->[0] = $const{$key};
781     } else {
782       $info->[0] = $const{$key};
783     }
784   }
785   
786   my @collapse;
787   if (defined $prefix) {
788     @collapse = map {
789         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
790     } keys %{$self->{_attrs}{collapse}}
791   } else {
792     @collapse = keys %{$self->{_attrs}{collapse}};
793   };
794
795   if (@collapse) {
796     my ($c) = sort { length $a <=> length $b } @collapse;
797     my $target = $info;
798     foreach my $p (split(/\./, $c)) {
799       $target = $target->[1]->{$p} ||= [];
800     }
801     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
802     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
803     my $tree = $self->_collapse_result($as, $row, $c_prefix);
804     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
805     my (@final, @raw);
806
807     while (
808       !(
809         grep {
810           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
811         } @co_key
812         )
813     ) {
814       push(@final, $tree);
815       last unless (@raw = $self->cursor->next);
816       $row = $self->{stashed_row} = \@raw;
817       $tree = $self->_collapse_result($as, $row, $c_prefix);
818     }
819     @$target = (@final ? @final : [ {}, {} ]);
820       # single empty result to indicate an empty prefetched has_many
821   }
822
823   #print "final info: " . Dumper($info);
824   return $info;
825 }
826
827 =head2 result_source
828
829 =over 4
830
831 =item Arguments: $result_source?
832
833 =item Return Value: $result_source
834
835 =back
836
837 An accessor for the primary ResultSource object from which this ResultSet
838 is derived.
839
840 =head2 result_class
841
842 =over 4
843
844 =item Arguments: $result_class?
845
846 =item Return Value: $result_class
847
848 =back
849
850 An accessor for the class to use when creating row objects. Defaults to 
851 C<< result_source->result_class >> - which in most cases is the name of the 
852 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
853
854 =cut
855
856
857 =head2 count
858
859 =over 4
860
861 =item Arguments: $cond, \%attrs??
862
863 =item Return Value: $count
864
865 =back
866
867 Performs an SQL C<COUNT> with the same query as the resultset was built
868 with to find the number of elements. If passed arguments, does a search
869 on the resultset and counts the results of that.
870
871 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
872 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
873 not support C<DISTINCT> with multiple columns. If you are using such a
874 database, you should only use columns from the main table in your C<group_by>
875 clause.
876
877 =cut
878
879 sub count {
880   my $self = shift;
881   return $self->search(@_)->count if @_ and defined $_[0];
882   return scalar @{ $self->get_cache } if $self->get_cache;
883   my $count = $self->_count;
884   return 0 unless $count;
885
886   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
887   $count = $self->{attrs}{rows} if
888     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
889   return $count;
890 }
891
892 sub _count { # Separated out so pager can get the full count
893   my $self = shift;
894   my $select = { count => '*' };
895
896   my $attrs = { %{$self->_resolved_attrs} };
897   if (my $group_by = delete $attrs->{group_by}) {
898     delete $attrs->{having};
899     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
900     # todo: try CONCAT for multi-column pk
901     my @pk = $self->result_source->primary_columns;
902     if (@pk == 1) {
903       my $alias = $attrs->{alias};
904       foreach my $column (@distinct) {
905         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
906           @distinct = ($column);
907           last;
908         }
909       }
910     }
911
912     $select = { count => { distinct => \@distinct } };
913   }
914
915   $attrs->{select} = $select;
916   $attrs->{as} = [qw/count/];
917
918   # offset, order by and page are not needed to count. record_filter is cdbi
919   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
920
921   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
922   my ($count) = $tmp_rs->cursor->next;
923   return $count;
924 }
925
926 =head2 count_literal
927
928 =over 4
929
930 =item Arguments: $sql_fragment, @bind_values
931
932 =item Return Value: $count
933
934 =back
935
936 Counts the results in a literal query. Equivalent to calling L</search_literal>
937 with the passed arguments, then L</count>.
938
939 =cut
940
941 sub count_literal { shift->search_literal(@_)->count; }
942
943 =head2 all
944
945 =over 4
946
947 =item Arguments: none
948
949 =item Return Value: @objects
950
951 =back
952
953 Returns all elements in the resultset. Called implicitly if the resultset
954 is returned in list context.
955
956 =cut
957
958 sub all {
959   my ($self) = @_;
960   return @{ $self->get_cache } if $self->get_cache;
961
962   my @obj;
963
964   # TODO: don't call resolve here
965   if (keys %{$self->_resolved_attrs->{collapse}}) {
966 #  if ($self->{attrs}{prefetch}) {
967       # Using $self->cursor->all is really just an optimisation.
968       # If we're collapsing has_many prefetches it probably makes
969       # very little difference, and this is cleaner than hacking
970       # _construct_object to survive the approach
971     my @row = $self->cursor->next;
972     while (@row) {
973       push(@obj, $self->_construct_object(@row));
974       @row = (exists $self->{stashed_row}
975                ? @{delete $self->{stashed_row}}
976                : $self->cursor->next);
977     }
978   } else {
979     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
980   }
981
982   $self->set_cache(\@obj) if $self->{attrs}{cache};
983   return @obj;
984 }
985
986 =head2 reset
987
988 =over 4
989
990 =item Arguments: none
991
992 =item Return Value: $self
993
994 =back
995
996 Resets the resultset's cursor, so you can iterate through the elements again.
997
998 =cut
999
1000 sub reset {
1001   my ($self) = @_;
1002   delete $self->{_attrs} if exists $self->{_attrs};
1003   $self->{all_cache_position} = 0;
1004   $self->cursor->reset;
1005   return $self;
1006 }
1007
1008 =head2 first
1009
1010 =over 4
1011
1012 =item Arguments: none
1013
1014 =item Return Value: $object?
1015
1016 =back
1017
1018 Resets the resultset and returns an object for the first result (if the
1019 resultset returns anything).
1020
1021 =cut
1022
1023 sub first {
1024   return $_[0]->reset->next;
1025 }
1026
1027 # _cond_for_update_delete
1028 #
1029 # update/delete require the condition to be modified to handle
1030 # the differing SQL syntax available.  This transforms the $self->{cond}
1031 # appropriately, returning the new condition.
1032
1033 sub _cond_for_update_delete {
1034   my ($self, $full_cond) = @_;
1035   my $cond = {};
1036
1037   $full_cond ||= $self->{cond};
1038   # No-op. No condition, we're updating/deleting everything
1039   return $cond unless ref $full_cond;
1040
1041   if (ref $full_cond eq 'ARRAY') {
1042     $cond = [
1043       map {
1044         my %hash;
1045         foreach my $key (keys %{$_}) {
1046           $key =~ /([^.]+)$/;
1047           $hash{$1} = $_->{$key};
1048         }
1049         \%hash;
1050       } @{$full_cond}
1051     ];
1052   }
1053   elsif (ref $full_cond eq 'HASH') {
1054     if ((keys %{$full_cond})[0] eq '-and') {
1055       $cond->{-and} = [];
1056
1057       my @cond = @{$full_cond->{-and}};
1058       for (my $i = 0; $i < @cond; $i++) {
1059         my $entry = $cond[$i];
1060
1061         my $hash;
1062         if (ref $entry eq 'HASH') {
1063           $hash = $self->_cond_for_update_delete($entry);
1064         }
1065         else {
1066           $entry =~ /([^.]+)$/;
1067           $hash->{$1} = $cond[++$i];
1068         }
1069
1070         push @{$cond->{-and}}, $hash;
1071       }
1072     }
1073     else {
1074       foreach my $key (keys %{$full_cond}) {
1075         $key =~ /([^.]+)$/;
1076         $cond->{$1} = $full_cond->{$key};
1077       }
1078     }
1079   }
1080   else {
1081     $self->throw_exception(
1082       "Can't update/delete on resultset with condition unless hash or array"
1083     );
1084   }
1085
1086   return $cond;
1087 }
1088
1089
1090 =head2 update
1091
1092 =over 4
1093
1094 =item Arguments: \%values
1095
1096 =item Return Value: $storage_rv
1097
1098 =back
1099
1100 Sets the specified columns in the resultset to the supplied values in a
1101 single query. Return value will be true if the update succeeded or false
1102 if no records were updated; exact type of success value is storage-dependent.
1103
1104 =cut
1105
1106 sub update {
1107   my ($self, $values) = @_;
1108   $self->throw_exception("Values for update must be a hash")
1109     unless ref $values eq 'HASH';
1110
1111   my $cond = $self->_cond_for_update_delete;
1112
1113   return $self->result_source->storage->update(
1114     $self->result_source->from, $values, $cond
1115   );
1116 }
1117
1118 =head2 update_all
1119
1120 =over 4
1121
1122 =item Arguments: \%values
1123
1124 =item Return Value: 1
1125
1126 =back
1127
1128 Fetches all objects and updates them one at a time. Note that C<update_all>
1129 will run DBIC cascade triggers, while L</update> will not.
1130
1131 =cut
1132
1133 sub update_all {
1134   my ($self, $values) = @_;
1135   $self->throw_exception("Values for update must be a hash")
1136     unless ref $values eq 'HASH';
1137   foreach my $obj ($self->all) {
1138     $obj->set_columns($values)->update;
1139   }
1140   return 1;
1141 }
1142
1143 =head2 delete
1144
1145 =over 4
1146
1147 =item Arguments: none
1148
1149 =item Return Value: 1
1150
1151 =back
1152
1153 Deletes the contents of the resultset from its result source. Note that this
1154 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1155 to run. See also L<DBIx::Class::Row/delete>.
1156
1157 =cut
1158
1159 sub delete {
1160   my ($self) = @_;
1161
1162   my $cond = $self->_cond_for_update_delete;
1163
1164   $self->result_source->storage->delete($self->result_source->from, $cond);
1165   return 1;
1166 }
1167
1168 =head2 delete_all
1169
1170 =over 4
1171
1172 =item Arguments: none
1173
1174 =item Return Value: 1
1175
1176 =back
1177
1178 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1179 will run DBIC cascade triggers, while L</delete> will not.
1180
1181 =cut
1182
1183 sub delete_all {
1184   my ($self) = @_;
1185   $_->delete for $self->all;
1186   return 1;
1187 }
1188
1189 =head2 pager
1190
1191 =over 4
1192
1193 =item Arguments: none
1194
1195 =item Return Value: $pager
1196
1197 =back
1198
1199 Return Value a L<Data::Page> object for the current resultset. Only makes
1200 sense for queries with a C<page> attribute.
1201
1202 =cut
1203
1204 sub pager {
1205   my ($self) = @_;
1206   my $attrs = $self->{attrs};
1207   $self->throw_exception("Can't create pager for non-paged rs")
1208     unless $self->{attrs}{page};
1209   $attrs->{rows} ||= 10;
1210   return $self->{pager} ||= Data::Page->new(
1211     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1212 }
1213
1214 =head2 page
1215
1216 =over 4
1217
1218 =item Arguments: $page_number
1219
1220 =item Return Value: $rs
1221
1222 =back
1223
1224 Returns a resultset for the $page_number page of the resultset on which page
1225 is called, where each page contains a number of rows equal to the 'rows'
1226 attribute set on the resultset (10 by default).
1227
1228 =cut
1229
1230 sub page {
1231   my ($self, $page) = @_;
1232   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1233 }
1234
1235 =head2 new_result
1236
1237 =over 4
1238
1239 =item Arguments: \%vals
1240
1241 =item Return Value: $object
1242
1243 =back
1244
1245 Creates an object in the resultset's result class and returns it.
1246
1247 =cut
1248
1249 sub new_result {
1250   my ($self, $values) = @_;
1251   $self->throw_exception( "new_result needs a hash" )
1252     unless (ref $values eq 'HASH');
1253   $self->throw_exception(
1254     "Can't abstract implicit construct, condition not a hash"
1255   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1256
1257   my $alias = $self->{attrs}{alias};
1258   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1259   my %new = (
1260     %{ $self->_remove_alias($values, $alias) },
1261     %{ $self->_remove_alias($collapsed_cond, $alias) },
1262   );
1263
1264   return $self->result_class->new(\%new,$self->_source_handle);
1265 }
1266
1267 # _collapse_cond
1268 #
1269 # Recursively collapse the condition.
1270
1271 sub _collapse_cond {
1272   my ($self, $cond, $collapsed) = @_;
1273
1274   $collapsed ||= {};
1275
1276   if (ref $cond eq 'ARRAY') {
1277     foreach my $subcond (@$cond) {
1278       next unless ref $subcond;  # -or
1279 #      warn "ARRAY: " . Dumper $subcond;
1280       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1281     }
1282   }
1283   elsif (ref $cond eq 'HASH') {
1284     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1285       foreach my $subcond (@{$cond->{-and}}) {
1286 #        warn "HASH: " . Dumper $subcond;
1287         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1288       }
1289     }
1290     else {
1291 #      warn "LEAF: " . Dumper $cond;
1292       foreach my $col (keys %$cond) {
1293         my $value = $cond->{$col};
1294         $collapsed->{$col} = $value;
1295       }
1296     }
1297   }
1298
1299   return $collapsed;
1300 }
1301
1302 # _remove_alias
1303 #
1304 # Remove the specified alias from the specified query hash. A copy is made so
1305 # the original query is not modified.
1306
1307 sub _remove_alias {
1308   my ($self, $query, $alias) = @_;
1309
1310   my %orig = %{ $query || {} };
1311   my %unaliased;
1312
1313   foreach my $key (keys %orig) {
1314     if ($key !~ /\./) {
1315       $unaliased{$key} = $orig{$key};
1316       next;
1317     }
1318     $unaliased{$1} = $orig{$key}
1319       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1320   }
1321
1322   return \%unaliased;
1323 }
1324
1325 =head2 find_or_new
1326
1327 =over 4
1328
1329 =item Arguments: \%vals, \%attrs?
1330
1331 =item Return Value: $object
1332
1333 =back
1334
1335 Find an existing record from this resultset. If none exists, instantiate a new
1336 result object and return it. The object will not be saved into your storage
1337 until you call L<DBIx::Class::Row/insert> on it.
1338
1339 If you want objects to be saved immediately, use L</find_or_create> instead.
1340
1341 =cut
1342
1343 sub find_or_new {
1344   my $self     = shift;
1345   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1346   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1347   my $exists   = $self->find($hash, $attrs);
1348   return defined $exists ? $exists : $self->new_result($hash);
1349 }
1350
1351 =head2 create
1352
1353 =over 4
1354
1355 =item Arguments: \%vals
1356
1357 =item Return Value: $object
1358
1359 =back
1360
1361 Inserts a record into the resultset and returns the object representing it.
1362
1363 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1364
1365 =cut
1366
1367 sub create {
1368   my ($self, $attrs) = @_;
1369   $self->throw_exception( "create needs a hashref" )
1370     unless ref $attrs eq 'HASH';
1371   return $self->new_result($attrs)->insert;
1372 }
1373
1374 =head2 find_or_create
1375
1376 =over 4
1377
1378 =item Arguments: \%vals, \%attrs?
1379
1380 =item Return Value: $object
1381
1382 =back
1383
1384   $class->find_or_create({ key => $val, ... });
1385
1386 Tries to find a record based on its primary key or unique constraint; if none
1387 is found, creates one and returns that instead.
1388
1389   my $cd = $schema->resultset('CD')->find_or_create({
1390     cdid   => 5,
1391     artist => 'Massive Attack',
1392     title  => 'Mezzanine',
1393     year   => 2005,
1394   });
1395
1396 Also takes an optional C<key> attribute, to search by a specific key or unique
1397 constraint. For example:
1398
1399   my $cd = $schema->resultset('CD')->find_or_create(
1400     {
1401       artist => 'Massive Attack',
1402       title  => 'Mezzanine',
1403     },
1404     { key => 'cd_artist_title' }
1405   );
1406
1407 See also L</find> and L</update_or_create>. For information on how to declare
1408 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1409
1410 =cut
1411
1412 sub find_or_create {
1413   my $self     = shift;
1414   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1415   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1416   my $exists   = $self->find($hash, $attrs);
1417   return defined $exists ? $exists : $self->create($hash);
1418 }
1419
1420 =head2 update_or_create
1421
1422 =over 4
1423
1424 =item Arguments: \%col_values, { key => $unique_constraint }?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430   $class->update_or_create({ col => $val, ... });
1431
1432 First, searches for an existing row matching one of the unique constraints
1433 (including the primary key) on the source of this resultset. If a row is
1434 found, updates it with the other given column values. Otherwise, creates a new
1435 row.
1436
1437 Takes an optional C<key> attribute to search on a specific unique constraint.
1438 For example:
1439
1440   # In your application
1441   my $cd = $schema->resultset('CD')->update_or_create(
1442     {
1443       artist => 'Massive Attack',
1444       title  => 'Mezzanine',
1445       year   => 1998,
1446     },
1447     { key => 'cd_artist_title' }
1448   );
1449
1450 If no C<key> is specified, it searches on all unique constraints defined on the
1451 source, including the primary key.
1452
1453 If the C<key> is specified as C<primary>, it searches only on the primary key.
1454
1455 See also L</find> and L</find_or_create>. For information on how to declare
1456 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1457
1458 =cut
1459
1460 sub update_or_create {
1461   my $self = shift;
1462   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1463   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1464
1465   my $row = $self->find($cond, $attrs);
1466   if (defined $row) {
1467     $row->update($cond);
1468     return $row;
1469   }
1470
1471   return $self->create($cond);
1472 }
1473
1474 =head2 get_cache
1475
1476 =over 4
1477
1478 =item Arguments: none
1479
1480 =item Return Value: \@cache_objects?
1481
1482 =back
1483
1484 Gets the contents of the cache for the resultset, if the cache is set.
1485
1486 =cut
1487
1488 sub get_cache {
1489   shift->{all_cache};
1490 }
1491
1492 =head2 set_cache
1493
1494 =over 4
1495
1496 =item Arguments: \@cache_objects
1497
1498 =item Return Value: \@cache_objects
1499
1500 =back
1501
1502 Sets the contents of the cache for the resultset. Expects an arrayref
1503 of objects of the same class as those produced by the resultset. Note that
1504 if the cache is set the resultset will return the cached objects rather
1505 than re-querying the database even if the cache attr is not set.
1506
1507 =cut
1508
1509 sub set_cache {
1510   my ( $self, $data ) = @_;
1511   $self->throw_exception("set_cache requires an arrayref")
1512       if defined($data) && (ref $data ne 'ARRAY');
1513   $self->{all_cache} = $data;
1514 }
1515
1516 =head2 clear_cache
1517
1518 =over 4
1519
1520 =item Arguments: none
1521
1522 =item Return Value: []
1523
1524 =back
1525
1526 Clears the cache for the resultset.
1527
1528 =cut
1529
1530 sub clear_cache {
1531   shift->set_cache(undef);
1532 }
1533
1534 =head2 related_resultset
1535
1536 =over 4
1537
1538 =item Arguments: $relationship_name
1539
1540 =item Return Value: $resultset
1541
1542 =back
1543
1544 Returns a related resultset for the supplied relationship name.
1545
1546   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1547
1548 =cut
1549
1550 sub related_resultset {
1551   my ($self, $rel) = @_;
1552
1553   $self->{related_resultsets} ||= {};
1554   return $self->{related_resultsets}{$rel} ||= do {
1555     my $rel_obj = $self->result_source->relationship_info($rel);
1556
1557     $self->throw_exception(
1558       "search_related: result source '" . $self->_source_handle->source_moniker .
1559         "' has no such relationship $rel")
1560       unless $rel_obj;
1561     
1562     my ($from,$seen) = $self->_resolve_from($rel);
1563
1564     my $join_count = $seen->{$rel};
1565     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1566
1567     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1568       undef, {
1569         %{$self->{attrs}||{}},
1570         join => undef,
1571         prefetch => undef,
1572         select => undef,
1573         as => undef,
1574         alias => $alias,
1575         where => $self->{cond},
1576         seen_join => $seen,
1577         from => $from,
1578     });
1579   };
1580 }
1581
1582 sub _resolve_from {
1583   my ($self, $extra_join) = @_;
1584   my $source = $self->result_source;
1585   my $attrs = $self->{attrs};
1586   
1587   my $from = $attrs->{from}
1588     || [ { $attrs->{alias} => $source->from } ];
1589     
1590   my $seen = { %{$attrs->{seen_join}||{}} };
1591
1592   my $join = ($attrs->{join}
1593                ? [ $attrs->{join}, $extra_join ]
1594                : $extra_join);
1595   $from = [
1596     @$from,
1597     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1598   ];
1599
1600   return ($from,$seen);
1601 }
1602
1603 sub _resolved_attrs {
1604   my $self = shift;
1605   return $self->{_attrs} if $self->{_attrs};
1606
1607   my $attrs = { %{$self->{attrs}||{}} };
1608   my $source = $self->result_source;
1609   my $alias = $attrs->{alias};
1610
1611   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1612   if ($attrs->{columns}) {
1613     delete $attrs->{as};
1614   } elsif (!$attrs->{select}) {
1615     $attrs->{columns} = [ $source->columns ];
1616   }
1617  
1618   $attrs->{select} = 
1619     ($attrs->{select}
1620       ? (ref $attrs->{select} eq 'ARRAY'
1621           ? [ @{$attrs->{select}} ]
1622           : [ $attrs->{select} ])
1623       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1624     );
1625   $attrs->{as} =
1626     ($attrs->{as}
1627       ? (ref $attrs->{as} eq 'ARRAY'
1628           ? [ @{$attrs->{as}} ]
1629           : [ $attrs->{as} ])
1630       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1631     );
1632   
1633   my $adds;
1634   if ($adds = delete $attrs->{include_columns}) {
1635     $adds = [$adds] unless ref $adds eq 'ARRAY';
1636     push(@{$attrs->{select}}, @$adds);
1637     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1638   }
1639   if ($adds = delete $attrs->{'+select'}) {
1640     $adds = [$adds] unless ref $adds eq 'ARRAY';
1641     push(@{$attrs->{select}},
1642            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1643   }
1644   if (my $adds = delete $attrs->{'+as'}) {
1645     $adds = [$adds] unless ref $adds eq 'ARRAY';
1646     push(@{$attrs->{as}}, @$adds);
1647   }
1648
1649   $attrs->{from} ||= [ { 'me' => $source->from } ];
1650
1651   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1652     my $join = delete $attrs->{join} || {};
1653
1654     if (defined $attrs->{prefetch}) {
1655       $join = $self->_merge_attr(
1656         $join, $attrs->{prefetch}
1657       );
1658     }
1659
1660     $attrs->{from} =   # have to copy here to avoid corrupting the original
1661       [
1662         @{$attrs->{from}}, 
1663         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1664       ];
1665   }
1666
1667   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1668   if ($attrs->{order_by}) {
1669     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1670                            ? [ @{$attrs->{order_by}} ]
1671                            : [ $attrs->{order_by} ]);
1672   } else {
1673     $attrs->{order_by} = [];    
1674   }
1675
1676   my $collapse = $attrs->{collapse} || {};
1677   if (my $prefetch = delete $attrs->{prefetch}) {
1678     $prefetch = $self->_merge_attr({}, $prefetch);
1679     my @pre_order;
1680     my $seen = $attrs->{seen_join} || {};
1681     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1682       # bring joins back to level of current class
1683       my @prefetch = $source->resolve_prefetch(
1684         $p, $alias, $seen, \@pre_order, $collapse
1685       );
1686       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1687       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1688     }
1689     push(@{$attrs->{order_by}}, @pre_order);
1690   }
1691   $attrs->{collapse} = $collapse;
1692
1693   return $self->{_attrs} = $attrs;
1694 }
1695
1696 sub _merge_attr {
1697   my ($self, $a, $b) = @_;
1698   return $b unless defined($a);
1699   return $a unless defined($b);
1700   
1701   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1702     foreach my $key (keys %{$b}) {
1703       if (exists $a->{$key}) {
1704         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1705       } else {
1706         $a->{$key} = $b->{$key};
1707       }
1708     }
1709     return $a;
1710   } else {
1711     $a = [$a] unless ref $a eq 'ARRAY';
1712     $b = [$b] unless ref $b eq 'ARRAY';
1713
1714     my $hash = {};
1715     my @array;
1716     foreach my $x ($a, $b) {
1717       foreach my $element (@{$x}) {
1718         if (ref $element eq 'HASH') {
1719           $hash = $self->_merge_attr($hash, $element);
1720         } elsif (ref $element eq 'ARRAY') {
1721           push(@array, @{$element});
1722         } else {
1723           push(@array, $element) unless $b == $x
1724             && grep { $_ eq $element } @array;
1725         }
1726       }
1727     }
1728     
1729     @array = grep { !exists $hash->{$_} } @array;
1730
1731     return keys %{$hash}
1732       ? ( scalar(@array)
1733             ? [$hash, @array]
1734             : $hash
1735         )
1736       : \@array;
1737   }
1738 }
1739
1740 sub result_source {
1741     my $self = shift;
1742
1743     if (@_) {
1744         $self->_source_handle($_[0]->handle);
1745     } else {
1746         $self->_source_handle->resolve;
1747     }
1748 }
1749
1750 =head2 throw_exception
1751
1752 See L<DBIx::Class::Schema/throw_exception> for details.
1753
1754 =cut
1755
1756 sub throw_exception {
1757   my $self=shift;
1758   $self->_source_handle->schema->throw_exception(@_);
1759 }
1760
1761 # XXX: FIXME: Attributes docs need clearing up
1762
1763 =head1 ATTRIBUTES
1764
1765 The resultset takes various attributes that modify its behavior. Here's an
1766 overview of them:
1767
1768 =head2 order_by
1769
1770 =over 4
1771
1772 =item Value: ($order_by | \@order_by)
1773
1774 =back
1775
1776 Which column(s) to order the results by. This is currently passed
1777 through directly to SQL, so you can give e.g. C<year DESC> for a
1778 descending order on the column `year'.
1779
1780 Please note that if you have C<quote_char> enabled (see
1781 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1782 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1783 so you will need to manually quote things as appropriate.)
1784
1785 =head2 columns
1786
1787 =over 4
1788
1789 =item Value: \@columns
1790
1791 =back
1792
1793 Shortcut to request a particular set of columns to be retrieved.  Adds
1794 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1795 from that, then auto-populates C<as> from C<select> as normal. (You may also
1796 use the C<cols> attribute, as in earlier versions of DBIC.)
1797
1798 =head2 include_columns
1799
1800 =over 4
1801
1802 =item Value: \@columns
1803
1804 =back
1805
1806 Shortcut to include additional columns in the returned results - for example
1807
1808   $schema->resultset('CD')->search(undef, {
1809     include_columns => ['artist.name'],
1810     join => ['artist']
1811   });
1812
1813 would return all CDs and include a 'name' column to the information
1814 passed to object inflation
1815
1816 =head2 select
1817
1818 =over 4
1819
1820 =item Value: \@select_columns
1821
1822 =back
1823
1824 Indicates which columns should be selected from the storage. You can use
1825 column names, or in the case of RDBMS back ends, function or stored procedure
1826 names:
1827
1828   $rs = $schema->resultset('Employee')->search(undef, {
1829     select => [
1830       'name',
1831       { count => 'employeeid' },
1832       { sum => 'salary' }
1833     ]
1834   });
1835
1836 When you use function/stored procedure names and do not supply an C<as>
1837 attribute, the column names returned are storage-dependent. E.g. MySQL would
1838 return a column named C<count(employeeid)> in the above example.
1839
1840 =head2 +select
1841
1842 =over 4
1843
1844 Indicates additional columns to be selected from storage.  Works the same as
1845 L<select> but adds columns to the selection.
1846
1847 =back
1848
1849 =head2 +as
1850
1851 =over 4
1852
1853 Indicates additional column names for those added via L<+select>.
1854
1855 =back
1856
1857 =head2 as
1858
1859 =over 4
1860
1861 =item Value: \@inflation_names
1862
1863 =back
1864
1865 Indicates column names for object inflation. This is used in conjunction with
1866 C<select>, usually when C<select> contains one or more function or stored
1867 procedure names:
1868
1869   $rs = $schema->resultset('Employee')->search(undef, {
1870     select => [
1871       'name',
1872       { count => 'employeeid' }
1873     ],
1874     as => ['name', 'employee_count'],
1875   });
1876
1877   my $employee = $rs->first(); # get the first Employee
1878
1879 If the object against which the search is performed already has an accessor
1880 matching a column name specified in C<as>, the value can be retrieved using
1881 the accessor as normal:
1882
1883   my $name = $employee->name();
1884
1885 If on the other hand an accessor does not exist in the object, you need to
1886 use C<get_column> instead:
1887
1888   my $employee_count = $employee->get_column('employee_count');
1889
1890 You can create your own accessors if required - see
1891 L<DBIx::Class::Manual::Cookbook> for details.
1892
1893 Please note: This will NOT insert an C<AS employee_count> into the SQL
1894 statement produced, it is used for internal access only. Thus
1895 attempting to use the accessor in an C<order_by> clause or similar
1896 will fail miserably.
1897
1898 To get around this limitation, you can supply literal SQL to your
1899 C<select> attibute that contains the C<AS alias> text, eg:
1900
1901   select => [\'myfield AS alias']
1902
1903 =head2 join
1904
1905 =over 4
1906
1907 =item Value: ($rel_name | \@rel_names | \%rel_names)
1908
1909 =back
1910
1911 Contains a list of relationships that should be joined for this query.  For
1912 example:
1913
1914   # Get CDs by Nine Inch Nails
1915   my $rs = $schema->resultset('CD')->search(
1916     { 'artist.name' => 'Nine Inch Nails' },
1917     { join => 'artist' }
1918   );
1919
1920 Can also contain a hash reference to refer to the other relation's relations.
1921 For example:
1922
1923   package MyApp::Schema::Track;
1924   use base qw/DBIx::Class/;
1925   __PACKAGE__->table('track');
1926   __PACKAGE__->add_columns(qw/trackid cd position title/);
1927   __PACKAGE__->set_primary_key('trackid');
1928   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1929   1;
1930
1931   # In your application
1932   my $rs = $schema->resultset('Artist')->search(
1933     { 'track.title' => 'Teardrop' },
1934     {
1935       join     => { cd => 'track' },
1936       order_by => 'artist.name',
1937     }
1938   );
1939
1940 You need to use the relationship (not the table) name in  conditions, 
1941 because they are aliased as such. The current table is aliased as "me", so 
1942 you need to use me.column_name in order to avoid ambiguity. For example:
1943
1944   # Get CDs from 1984 with a 'Foo' track 
1945   my $rs = $schema->resultset('CD')->search(
1946     { 
1947       'me.year' => 1984,
1948       'tracks.name' => 'Foo'
1949     },
1950     { join => 'tracks' }
1951   );
1952   
1953 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1954 similarly for a third time). For e.g.
1955
1956   my $rs = $schema->resultset('Artist')->search({
1957     'cds.title'   => 'Down to Earth',
1958     'cds_2.title' => 'Popular',
1959   }, {
1960     join => [ qw/cds cds/ ],
1961   });
1962
1963 will return a set of all artists that have both a cd with title 'Down
1964 to Earth' and a cd with title 'Popular'.
1965
1966 If you want to fetch related objects from other tables as well, see C<prefetch>
1967 below.
1968
1969 =head2 prefetch
1970
1971 =over 4
1972
1973 =item Value: ($rel_name | \@rel_names | \%rel_names)
1974
1975 =back
1976
1977 Contains one or more relationships that should be fetched along with the main
1978 query (when they are accessed afterwards they will have already been
1979 "prefetched").  This is useful for when you know you will need the related
1980 objects, because it saves at least one query:
1981
1982   my $rs = $schema->resultset('Tag')->search(
1983     undef,
1984     {
1985       prefetch => {
1986         cd => 'artist'
1987       }
1988     }
1989   );
1990
1991 The initial search results in SQL like the following:
1992
1993   SELECT tag.*, cd.*, artist.* FROM tag
1994   JOIN cd ON tag.cd = cd.cdid
1995   JOIN artist ON cd.artist = artist.artistid
1996
1997 L<DBIx::Class> has no need to go back to the database when we access the
1998 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1999 case.
2000
2001 Simple prefetches will be joined automatically, so there is no need
2002 for a C<join> attribute in the above search. If you're prefetching to
2003 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2004 specify the join as well.
2005
2006 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2007 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2008 with an accessor type of 'single' or 'filter').
2009
2010 =head2 page
2011
2012 =over 4
2013
2014 =item Value: $page
2015
2016 =back
2017
2018 Makes the resultset paged and specifies the page to retrieve. Effectively
2019 identical to creating a non-pages resultset and then calling ->page($page)
2020 on it.
2021
2022 If L<rows> attribute is not specified it defualts to 10 rows per page.
2023
2024 =head2 rows
2025
2026 =over 4
2027
2028 =item Value: $rows
2029
2030 =back
2031
2032 Specifes the maximum number of rows for direct retrieval or the number of
2033 rows per page if the page attribute or method is used.
2034
2035 =head2 offset
2036
2037 =over 4
2038
2039 =item Value: $offset
2040
2041 =back
2042
2043 Specifies the (zero-based) row number for the  first row to be returned, or the
2044 of the first row of the first page if paging is used.
2045
2046 =head2 group_by
2047
2048 =over 4
2049
2050 =item Value: \@columns
2051
2052 =back
2053
2054 A arrayref of columns to group by. Can include columns of joined tables.
2055
2056   group_by => [qw/ column1 column2 ... /]
2057
2058 =head2 having
2059
2060 =over 4
2061
2062 =item Value: $condition
2063
2064 =back
2065
2066 HAVING is a select statement attribute that is applied between GROUP BY and
2067 ORDER BY. It is applied to the after the grouping calculations have been
2068 done.
2069
2070   having => { 'count(employee)' => { '>=', 100 } }
2071
2072 =head2 distinct
2073
2074 =over 4
2075
2076 =item Value: (0 | 1)
2077
2078 =back
2079
2080 Set to 1 to group by all columns.
2081
2082 =head2 where
2083
2084 =over 4
2085
2086 Adds to the WHERE clause.
2087
2088   # only return rows WHERE deleted IS NULL for all searches
2089   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2090
2091 Can be overridden by passing C<{ where => undef }> as an attribute
2092 to a resulset.
2093
2094 =back
2095
2096 =head2 cache
2097
2098 Set to 1 to cache search results. This prevents extra SQL queries if you
2099 revisit rows in your ResultSet:
2100
2101   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2102
2103   while( my $artist = $resultset->next ) {
2104     ... do stuff ...
2105   }
2106
2107   $rs->first; # without cache, this would issue a query
2108
2109 By default, searches are not cached.
2110
2111 For more examples of using these attributes, see
2112 L<DBIx::Class::Manual::Cookbook>.
2113
2114 =head2 from
2115
2116 =over 4
2117
2118 =item Value: \@from_clause
2119
2120 =back
2121
2122 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2123 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2124 clauses.
2125
2126 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2127
2128 C<join> will usually do what you need and it is strongly recommended that you
2129 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2130 And we really do mean "cannot", not just tried and failed. Attempting to use
2131 this because you're having problems with C<join> is like trying to use x86
2132 ASM because you've got a syntax error in your C. Trust us on this.
2133
2134 Now, if you're still really, really sure you need to use this (and if you're
2135 not 100% sure, ask the mailing list first), here's an explanation of how this
2136 works.
2137
2138 The syntax is as follows -
2139
2140   [
2141     { <alias1> => <table1> },
2142     [
2143       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2144       [], # nested JOIN (optional)
2145       { <table1.column1> => <table2.column2>, ... (more conditions) },
2146     ],
2147     # More of the above [ ] may follow for additional joins
2148   ]
2149
2150   <table1> <alias1>
2151   JOIN
2152     <table2> <alias2>
2153     [JOIN ...]
2154   ON <table1.column1> = <table2.column2>
2155   <more joins may follow>
2156
2157 An easy way to follow the examples below is to remember the following:
2158
2159     Anything inside "[]" is a JOIN
2160     Anything inside "{}" is a condition for the enclosing JOIN
2161
2162 The following examples utilize a "person" table in a family tree application.
2163 In order to express parent->child relationships, this table is self-joined:
2164
2165     # Person->belongs_to('father' => 'Person');
2166     # Person->belongs_to('mother' => 'Person');
2167
2168 C<from> can be used to nest joins. Here we return all children with a father,
2169 then search against all mothers of those children:
2170
2171   $rs = $schema->resultset('Person')->search(
2172       undef,
2173       {
2174           alias => 'mother', # alias columns in accordance with "from"
2175           from => [
2176               { mother => 'person' },
2177               [
2178                   [
2179                       { child => 'person' },
2180                       [
2181                           { father => 'person' },
2182                           { 'father.person_id' => 'child.father_id' }
2183                       ]
2184                   ],
2185                   { 'mother.person_id' => 'child.mother_id' }
2186               ],
2187           ]
2188       },
2189   );
2190
2191   # Equivalent SQL:
2192   # SELECT mother.* FROM person mother
2193   # JOIN (
2194   #   person child
2195   #   JOIN person father
2196   #   ON ( father.person_id = child.father_id )
2197   # )
2198   # ON ( mother.person_id = child.mother_id )
2199
2200 The type of any join can be controlled manually. To search against only people
2201 with a father in the person table, we could explicitly use C<INNER JOIN>:
2202
2203     $rs = $schema->resultset('Person')->search(
2204         undef,
2205         {
2206             alias => 'child', # alias columns in accordance with "from"
2207             from => [
2208                 { child => 'person' },
2209                 [
2210                     { father => 'person', -join_type => 'inner' },
2211                     { 'father.id' => 'child.father_id' }
2212                 ],
2213             ]
2214         },
2215     );
2216
2217     # Equivalent SQL:
2218     # SELECT child.* FROM person child
2219     # INNER JOIN person father ON child.father_id = father.id
2220
2221 =cut
2222
2223 1;