Reverted accidental 'svk pull' from inside mirrored checkout.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
139
140 =cut
141
142 sub search {
143   my $self = shift;
144   my $rs = $self->search_rs( @_ );
145   return (wantarray ? $rs->all : $rs);
146 }
147
148 =head2 search_rs
149
150 =over 4
151
152 =item Arguments: $cond, \%attrs?
153
154 =item Return Value: $resultset
155
156 =back
157
158 This method does the same exact thing as search() except it will
159 always return a resultset, even in list context.
160
161 =cut
162
163 sub search_rs {
164   my $self = shift;
165
166   my $rows;
167
168   unless (@_) {                 # no search, effectively just a clone
169     $rows = $self->get_cache;
170   }
171
172   my $attrs = {};
173   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
174   my $our_attrs = { %{$self->{attrs}} };
175   my $having = delete $our_attrs->{having};
176   my $where = delete $our_attrs->{where};
177
178   my $new_attrs = { %{$our_attrs}, %{$attrs} };
179
180   # merge new attrs into inherited
181   foreach my $key (qw/join prefetch/) {
182     next unless exists $attrs->{$key};
183     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
184   }
185
186   my $cond = (@_
187     ? (
188         (@_ == 1 || ref $_[0] eq "HASH")
189           ? (
190               (ref $_[0] eq 'HASH')
191                 ? (
192                     (keys %{ $_[0] }  > 0)
193                       ? shift
194                       : undef
195                    )
196                 :  shift
197              )
198           : (
199               (@_ % 2)
200                 ? $self->throw_exception("Odd number of arguments to search")
201                 : {@_}
202              )
203       )
204     : undef
205   );
206
207   if (defined $where) {
208     $new_attrs->{where} = (
209       defined $new_attrs->{where}
210         ? { '-and' => [
211               map {
212                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
213               } $where, $new_attrs->{where}
214             ]
215           }
216         : $where);
217   }
218
219   if (defined $cond) {
220     $new_attrs->{where} = (
221       defined $new_attrs->{where}
222         ? { '-and' => [
223               map {
224                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
225               } $cond, $new_attrs->{where}
226             ]
227           }
228         : $cond);
229   }
230
231   if (defined $having) {
232     $new_attrs->{having} = (
233       defined $new_attrs->{having}
234         ? { '-and' => [
235               map {
236                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
237               } $having, $new_attrs->{having}
238             ]
239           }
240         : $having);
241   }
242
243   my $rs = (ref $self)->new($self->_source_handle, $new_attrs);
244   if ($rows) {
245     $rs->set_cache($rows);
246   }
247   return $rs;
248 }
249
250 =head2 search_literal
251
252 =over 4
253
254 =item Arguments: $sql_fragment, @bind_values
255
256 =item Return Value: $resultset (scalar context), @row_objs (list context)
257
258 =back
259
260   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
261   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
262
263 Pass a literal chunk of SQL to be added to the conditional part of the
264 resultset query.
265
266 =cut
267
268 sub search_literal {
269   my ($self, $cond, @vals) = @_;
270   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
271   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
272   return $self->search(\$cond, $attrs);
273 }
274
275 =head2 find
276
277 =over 4
278
279 =item Arguments: @values | \%cols, \%attrs?
280
281 =item Return Value: $row_object
282
283 =back
284
285 Finds a row based on its primary key or unique constraint. For example, to find
286 a row by its primary key:
287
288   my $cd = $schema->resultset('CD')->find(5);
289
290 You can also find a row by a specific unique constraint using the C<key>
291 attribute. For example:
292
293   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
294     key => 'cd_artist_title'
295   });
296
297 Additionally, you can specify the columns explicitly by name:
298
299   my $cd = $schema->resultset('CD')->find(
300     {
301       artist => 'Massive Attack',
302       title  => 'Mezzanine',
303     },
304     { key => 'cd_artist_title' }
305   );
306
307 If the C<key> is specified as C<primary>, it searches only on the primary key.
308
309 If no C<key> is specified, it searches on all unique constraints defined on the
310 source, including the primary key.
311
312 If your table does not have a primary key, you B<must> provide a value for the
313 C<key> attribute matching one of the unique constraints on the source.
314
315 See also L</find_or_create> and L</update_or_create>. For information on how to
316 declare unique constraints, see
317 L<DBIx::Class::ResultSource/add_unique_constraint>.
318
319 =cut
320
321 sub find {
322   my $self = shift;
323   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
324
325   # Default to the primary key, but allow a specific key
326   my @cols = exists $attrs->{key}
327     ? $self->result_source->unique_constraint_columns($attrs->{key})
328     : $self->result_source->primary_columns;
329   $self->throw_exception(
330     "Can't find unless a primary key is defined or unique constraint is specified"
331   ) unless @cols;
332
333   # Parse out a hashref from input
334   my $input_query;
335   if (ref $_[0] eq 'HASH') {
336     $input_query = { %{$_[0]} };
337   }
338   elsif (@_ == @cols) {
339     $input_query = {};
340     @{$input_query}{@cols} = @_;
341   }
342   else {
343     # Compatibility: Allow e.g. find(id => $value)
344     carp "Find by key => value deprecated; please use a hashref instead";
345     $input_query = {@_};
346   }
347
348   my (%related, $info);
349
350   foreach my $key (keys %$input_query) {
351     if (ref($input_query->{$key})
352         && ($info = $self->result_source->relationship_info($key))) {
353       my $rel_q = $self->result_source->resolve_condition(
354                     $info->{cond}, delete $input_query->{$key}, $key
355                   );
356       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
357       @related{keys %$rel_q} = values %$rel_q;
358     }
359   }
360   if (my @keys = keys %related) {
361     @{$input_query}{@keys} = values %related;
362   }
363
364   my @unique_queries = $self->_unique_queries($input_query, $attrs);
365
366   # Build the final query: Default to the disjunction of the unique queries,
367   # but allow the input query in case the ResultSet defines the query or the
368   # user is abusing find
369   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
370   my $query = @unique_queries
371     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
372     : $self->_add_alias($input_query, $alias);
373
374   # Run the query
375   if (keys %$attrs) {
376     my $rs = $self->search($query, $attrs);
377     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
378   }
379   else {
380     return keys %{$self->_resolved_attrs->{collapse}}
381       ? $self->search($query)->next
382       : $self->single($query);
383   }
384 }
385
386 # _add_alias
387 #
388 # Add the specified alias to the specified query hash. A copy is made so the
389 # original query is not modified.
390
391 sub _add_alias {
392   my ($self, $query, $alias) = @_;
393
394   my %aliased = %$query;
395   foreach my $col (grep { ! m/\./ } keys %aliased) {
396     $aliased{"$alias.$col"} = delete $aliased{$col};
397   }
398
399   return \%aliased;
400 }
401
402 # _unique_queries
403 #
404 # Build a list of queries which satisfy unique constraints.
405
406 sub _unique_queries {
407   my ($self, $query, $attrs) = @_;
408
409   my @constraint_names = exists $attrs->{key}
410     ? ($attrs->{key})
411     : $self->result_source->unique_constraint_names;
412
413   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
414   my $num_where = scalar keys %$where;
415
416   my @unique_queries;
417   foreach my $name (@constraint_names) {
418     my @unique_cols = $self->result_source->unique_constraint_columns($name);
419     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
420
421     my $num_cols = scalar @unique_cols;
422     my $num_query = scalar keys %$unique_query;
423
424     my $total = $num_query + $num_where;
425     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
426       # The query is either unique on its own or is unique in combination with
427       # the existing where clause
428       push @unique_queries, $unique_query;
429     }
430   }
431
432   return @unique_queries;
433 }
434
435 # _build_unique_query
436 #
437 # Constrain the specified query hash based on the specified column names.
438
439 sub _build_unique_query {
440   my ($self, $query, $unique_cols) = @_;
441
442   return {
443     map  { $_ => $query->{$_} }
444     grep { exists $query->{$_} }
445       @$unique_cols
446   };
447 }
448
449 =head2 search_related
450
451 =over 4
452
453 =item Arguments: $rel, $cond, \%attrs?
454
455 =item Return Value: $new_resultset
456
457 =back
458
459   $new_rs = $cd_rs->search_related('artist', {
460     name => 'Emo-R-Us',
461   });
462
463 Searches the specified relationship, optionally specifying a condition and
464 attributes for matching records. See L</ATTRIBUTES> for more information.
465
466 =cut
467
468 sub search_related {
469   return shift->related_resultset(shift)->search(@_);
470 }
471
472 =head2 cursor
473
474 =over 4
475
476 =item Arguments: none
477
478 =item Return Value: $cursor
479
480 =back
481
482 Returns a storage-driven cursor to the given resultset. See
483 L<DBIx::Class::Cursor> for more information.
484
485 =cut
486
487 sub cursor {
488   my ($self) = @_;
489
490   my $attrs = { %{$self->_resolved_attrs} };
491   return $self->{cursor}
492     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
493           $attrs->{where},$attrs);
494 }
495
496 =head2 single
497
498 =over 4
499
500 =item Arguments: $cond?
501
502 =item Return Value: $row_object?
503
504 =back
505
506   my $cd = $schema->resultset('CD')->single({ year => 2001 });
507
508 Inflates the first result without creating a cursor if the resultset has
509 any records in it; if not returns nothing. Used by L</find> as an optimisation.
510
511 Can optionally take an additional condition *only* - this is a fast-code-path
512 method; if you need to add extra joins or similar call ->search and then
513 ->single without a condition on the $rs returned from that.
514
515 =cut
516
517 sub single {
518   my ($self, $where) = @_;
519   my $attrs = { %{$self->_resolved_attrs} };
520   if ($where) {
521     if (defined $attrs->{where}) {
522       $attrs->{where} = {
523         '-and' =>
524             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
525                $where, delete $attrs->{where} ]
526       };
527     } else {
528       $attrs->{where} = $where;
529     }
530   }
531
532 #  XXX: Disabled since it doesn't infer uniqueness in all cases
533 #  unless ($self->_is_unique_query($attrs->{where})) {
534 #    carp "Query not guaranteed to return a single row"
535 #      . "; please declare your unique constraints or use search instead";
536 #  }
537
538   my @data = $self->result_source->storage->select_single(
539     $attrs->{from}, $attrs->{select},
540     $attrs->{where}, $attrs
541   );
542
543   return (@data ? ($self->_construct_object(@data))[0] : ());
544 }
545
546 # _is_unique_query
547 #
548 # Try to determine if the specified query is guaranteed to be unique, based on
549 # the declared unique constraints.
550
551 sub _is_unique_query {
552   my ($self, $query) = @_;
553
554   my $collapsed = $self->_collapse_query($query);
555   my $alias = $self->{attrs}{alias};
556
557   foreach my $name ($self->result_source->unique_constraint_names) {
558     my @unique_cols = map {
559       "$alias.$_"
560     } $self->result_source->unique_constraint_columns($name);
561
562     # Count the values for each unique column
563     my %seen = map { $_ => 0 } @unique_cols;
564
565     foreach my $key (keys %$collapsed) {
566       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
567       next unless exists $seen{$aliased};  # Additional constraints are okay
568       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
569     }
570
571     # If we get 0 or more than 1 value for a column, it's not necessarily unique
572     return 1 unless grep { $_ != 1 } values %seen;
573   }
574
575   return 0;
576 }
577
578 # _collapse_query
579 #
580 # Recursively collapse the query, accumulating values for each column.
581
582 sub _collapse_query {
583   my ($self, $query, $collapsed) = @_;
584
585   $collapsed ||= {};
586
587   if (ref $query eq 'ARRAY') {
588     foreach my $subquery (@$query) {
589       next unless ref $subquery;  # -or
590 #      warn "ARRAY: " . Dumper $subquery;
591       $collapsed = $self->_collapse_query($subquery, $collapsed);
592     }
593   }
594   elsif (ref $query eq 'HASH') {
595     if (keys %$query and (keys %$query)[0] eq '-and') {
596       foreach my $subquery (@{$query->{-and}}) {
597 #        warn "HASH: " . Dumper $subquery;
598         $collapsed = $self->_collapse_query($subquery, $collapsed);
599       }
600     }
601     else {
602 #      warn "LEAF: " . Dumper $query;
603       foreach my $col (keys %$query) {
604         my $value = $query->{$col};
605         $collapsed->{$col}{$value}++;
606       }
607     }
608   }
609
610   return $collapsed;
611 }
612
613 =head2 get_column
614
615 =over 4
616
617 =item Arguments: $cond?
618
619 =item Return Value: $resultsetcolumn
620
621 =back
622
623   my $max_length = $rs->get_column('length')->max;
624
625 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
626
627 =cut
628
629 sub get_column {
630   my ($self, $column) = @_;
631   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
632   return $new;
633 }
634
635 =head2 search_like
636
637 =over 4
638
639 =item Arguments: $cond, \%attrs?
640
641 =item Return Value: $resultset (scalar context), @row_objs (list context)
642
643 =back
644
645   # WHERE title LIKE '%blue%'
646   $cd_rs = $rs->search_like({ title => '%blue%'});
647
648 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
649 that this is simply a convenience method. You most likely want to use
650 L</search> with specific operators.
651
652 For more information, see L<DBIx::Class::Manual::Cookbook>.
653
654 =cut
655
656 sub search_like {
657   my $class = shift;
658   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
659   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
660   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
661   return $class->search($query, { %$attrs });
662 }
663
664 =head2 slice
665
666 =over 4
667
668 =item Arguments: $first, $last
669
670 =item Return Value: $resultset (scalar context), @row_objs (list context)
671
672 =back
673
674 Returns a resultset or object list representing a subset of elements from the
675 resultset slice is called on. Indexes are from 0, i.e., to get the first
676 three records, call:
677
678   my ($one, $two, $three) = $rs->slice(0, 2);
679
680 =cut
681
682 sub slice {
683   my ($self, $min, $max) = @_;
684   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
685   $attrs->{offset} = $self->{attrs}{offset} || 0;
686   $attrs->{offset} += $min;
687   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
688   return $self->search(undef(), $attrs);
689   #my $slice = (ref $self)->new($self->result_source, $attrs);
690   #return (wantarray ? $slice->all : $slice);
691 }
692
693 =head2 next
694
695 =over 4
696
697 =item Arguments: none
698
699 =item Return Value: $result?
700
701 =back
702
703 Returns the next element in the resultset (C<undef> is there is none).
704
705 Can be used to efficiently iterate over records in the resultset:
706
707   my $rs = $schema->resultset('CD')->search;
708   while (my $cd = $rs->next) {
709     print $cd->title;
710   }
711
712 Note that you need to store the resultset object, and call C<next> on it.
713 Calling C<< resultset('Table')->next >> repeatedly will always return the
714 first record from the resultset.
715
716 =cut
717
718 sub next {
719   my ($self) = @_;
720   if (my $cache = $self->get_cache) {
721     $self->{all_cache_position} ||= 0;
722     return $cache->[$self->{all_cache_position}++];
723   }
724   if ($self->{attrs}{cache}) {
725     $self->{all_cache_position} = 1;
726     return ($self->all)[0];
727   }
728   if ($self->{stashed_objects}) {
729     my $obj = shift(@{$self->{stashed_objects}});
730     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
731     return $obj;
732   }
733   my @row = (
734     exists $self->{stashed_row}
735       ? @{delete $self->{stashed_row}}
736       : $self->cursor->next
737   );
738   return unless (@row);
739   my ($row, @more) = $self->_construct_object(@row);
740   $self->{stashed_objects} = \@more if @more;
741   return $row;
742 }
743
744 sub _construct_object {
745   my ($self, @row) = @_;
746   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
747   my @new = $self->result_class->inflate_result($self->_source_handle, @$info);
748   @new = $self->{_attrs}{record_filter}->(@new)
749     if exists $self->{_attrs}{record_filter};
750   return @new;
751 }
752
753 sub _collapse_result {
754   my ($self, $as, $row, $prefix) = @_;
755
756   my %const;
757   my @copy = @$row;
758   
759   foreach my $this_as (@$as) {
760     my $val = shift @copy;
761     if (defined $prefix) {
762       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
763         my $remain = $1;
764         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
765         $const{$1||''}{$2} = $val;
766       }
767     } else {
768       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
769       $const{$1||''}{$2} = $val;
770     }
771   }
772
773   my $alias = $self->{attrs}{alias};
774   my $info = [ {}, {} ];
775   foreach my $key (keys %const) {
776     if (length $key && $key ne $alias) {
777       my $target = $info;
778       my @parts = split(/\./, $key);
779       foreach my $p (@parts) {
780         $target = $target->[1]->{$p} ||= [];
781       }
782       $target->[0] = $const{$key};
783     } else {
784       $info->[0] = $const{$key};
785     }
786   }
787   
788   my @collapse;
789   if (defined $prefix) {
790     @collapse = map {
791         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
792     } keys %{$self->{_attrs}{collapse}}
793   } else {
794     @collapse = keys %{$self->{_attrs}{collapse}};
795   };
796
797   if (@collapse) {
798     my ($c) = sort { length $a <=> length $b } @collapse;
799     my $target = $info;
800     foreach my $p (split(/\./, $c)) {
801       $target = $target->[1]->{$p} ||= [];
802     }
803     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
804     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
805     my $tree = $self->_collapse_result($as, $row, $c_prefix);
806     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
807     my (@final, @raw);
808
809     while (
810       !(
811         grep {
812           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
813         } @co_key
814         )
815     ) {
816       push(@final, $tree);
817       last unless (@raw = $self->cursor->next);
818       $row = $self->{stashed_row} = \@raw;
819       $tree = $self->_collapse_result($as, $row, $c_prefix);
820     }
821     @$target = (@final ? @final : [ {}, {} ]);
822       # single empty result to indicate an empty prefetched has_many
823   }
824
825   #print "final info: " . Dumper($info);
826   return $info;
827 }
828
829 =head2 result_source
830
831 =over 4
832
833 =item Arguments: $result_source?
834
835 =item Return Value: $result_source
836
837 =back
838
839 An accessor for the primary ResultSource object from which this ResultSet
840 is derived.
841
842 =head2 result_class
843
844 =over 4
845
846 =item Arguments: $result_class?
847
848 =item Return Value: $result_class
849
850 =back
851
852 An accessor for the class to use when creating row objects. Defaults to 
853 C<< result_source->result_class >> - which in most cases is the name of the 
854 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
855
856 =cut
857
858
859 =head2 count
860
861 =over 4
862
863 =item Arguments: $cond, \%attrs??
864
865 =item Return Value: $count
866
867 =back
868
869 Performs an SQL C<COUNT> with the same query as the resultset was built
870 with to find the number of elements. If passed arguments, does a search
871 on the resultset and counts the results of that.
872
873 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
874 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
875 not support C<DISTINCT> with multiple columns. If you are using such a
876 database, you should only use columns from the main table in your C<group_by>
877 clause.
878
879 =cut
880
881 sub count {
882   my $self = shift;
883   return $self->search(@_)->count if @_ and defined $_[0];
884   return scalar @{ $self->get_cache } if $self->get_cache;
885   my $count = $self->_count;
886   return 0 unless $count;
887
888   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
889   $count = $self->{attrs}{rows} if
890     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
891   return $count;
892 }
893
894 sub _count { # Separated out so pager can get the full count
895   my $self = shift;
896   my $select = { count => '*' };
897
898   my $attrs = { %{$self->_resolved_attrs} };
899   if (my $group_by = delete $attrs->{group_by}) {
900     delete $attrs->{having};
901     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
902     # todo: try CONCAT for multi-column pk
903     my @pk = $self->result_source->primary_columns;
904     if (@pk == 1) {
905       my $alias = $attrs->{alias};
906       foreach my $column (@distinct) {
907         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
908           @distinct = ($column);
909           last;
910         }
911       }
912     }
913
914     $select = { count => { distinct => \@distinct } };
915   }
916
917   $attrs->{select} = $select;
918   $attrs->{as} = [qw/count/];
919
920   # offset, order by and page are not needed to count. record_filter is cdbi
921   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
922
923   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
924   my ($count) = $tmp_rs->cursor->next;
925   return $count;
926 }
927
928 =head2 count_literal
929
930 =over 4
931
932 =item Arguments: $sql_fragment, @bind_values
933
934 =item Return Value: $count
935
936 =back
937
938 Counts the results in a literal query. Equivalent to calling L</search_literal>
939 with the passed arguments, then L</count>.
940
941 =cut
942
943 sub count_literal { shift->search_literal(@_)->count; }
944
945 =head2 all
946
947 =over 4
948
949 =item Arguments: none
950
951 =item Return Value: @objects
952
953 =back
954
955 Returns all elements in the resultset. Called implicitly if the resultset
956 is returned in list context.
957
958 =cut
959
960 sub all {
961   my ($self) = @_;
962   return @{ $self->get_cache } if $self->get_cache;
963
964   my @obj;
965
966   # TODO: don't call resolve here
967   if (keys %{$self->_resolved_attrs->{collapse}}) {
968 #  if ($self->{attrs}{prefetch}) {
969       # Using $self->cursor->all is really just an optimisation.
970       # If we're collapsing has_many prefetches it probably makes
971       # very little difference, and this is cleaner than hacking
972       # _construct_object to survive the approach
973     my @row = $self->cursor->next;
974     while (@row) {
975       push(@obj, $self->_construct_object(@row));
976       @row = (exists $self->{stashed_row}
977                ? @{delete $self->{stashed_row}}
978                : $self->cursor->next);
979     }
980   } else {
981     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
982   }
983
984   $self->set_cache(\@obj) if $self->{attrs}{cache};
985   return @obj;
986 }
987
988 =head2 reset
989
990 =over 4
991
992 =item Arguments: none
993
994 =item Return Value: $self
995
996 =back
997
998 Resets the resultset's cursor, so you can iterate through the elements again.
999
1000 =cut
1001
1002 sub reset {
1003   my ($self) = @_;
1004   delete $self->{_attrs} if exists $self->{_attrs};
1005   $self->{all_cache_position} = 0;
1006   $self->cursor->reset;
1007   return $self;
1008 }
1009
1010 =head2 first
1011
1012 =over 4
1013
1014 =item Arguments: none
1015
1016 =item Return Value: $object?
1017
1018 =back
1019
1020 Resets the resultset and returns an object for the first result (if the
1021 resultset returns anything).
1022
1023 =cut
1024
1025 sub first {
1026   return $_[0]->reset->next;
1027 }
1028
1029 # _cond_for_update_delete
1030 #
1031 # update/delete require the condition to be modified to handle
1032 # the differing SQL syntax available.  This transforms the $self->{cond}
1033 # appropriately, returning the new condition.
1034
1035 sub _cond_for_update_delete {
1036   my ($self, $full_cond) = @_;
1037   my $cond = {};
1038
1039   $full_cond ||= $self->{cond};
1040   # No-op. No condition, we're updating/deleting everything
1041   return $cond unless ref $full_cond;
1042
1043   if (ref $full_cond eq 'ARRAY') {
1044     $cond = [
1045       map {
1046         my %hash;
1047         foreach my $key (keys %{$_}) {
1048           $key =~ /([^.]+)$/;
1049           $hash{$1} = $_->{$key};
1050         }
1051         \%hash;
1052       } @{$full_cond}
1053     ];
1054   }
1055   elsif (ref $full_cond eq 'HASH') {
1056     if ((keys %{$full_cond})[0] eq '-and') {
1057       $cond->{-and} = [];
1058
1059       my @cond = @{$full_cond->{-and}};
1060       for (my $i = 0; $i < @cond; $i++) {
1061         my $entry = $cond[$i];
1062
1063         my $hash;
1064         if (ref $entry eq 'HASH') {
1065           $hash = $self->_cond_for_update_delete($entry);
1066         }
1067         else {
1068           $entry =~ /([^.]+)$/;
1069           $hash->{$1} = $cond[++$i];
1070         }
1071
1072         push @{$cond->{-and}}, $hash;
1073       }
1074     }
1075     else {
1076       foreach my $key (keys %{$full_cond}) {
1077         $key =~ /([^.]+)$/;
1078         $cond->{$1} = $full_cond->{$key};
1079       }
1080     }
1081   }
1082   else {
1083     $self->throw_exception(
1084       "Can't update/delete on resultset with condition unless hash or array"
1085     );
1086   }
1087
1088   return $cond;
1089 }
1090
1091
1092 =head2 update
1093
1094 =over 4
1095
1096 =item Arguments: \%values
1097
1098 =item Return Value: $storage_rv
1099
1100 =back
1101
1102 Sets the specified columns in the resultset to the supplied values in a
1103 single query. Return value will be true if the update succeeded or false
1104 if no records were updated; exact type of success value is storage-dependent.
1105
1106 =cut
1107
1108 sub update {
1109   my ($self, $values) = @_;
1110   $self->throw_exception("Values for update must be a hash")
1111     unless ref $values eq 'HASH';
1112
1113   my $cond = $self->_cond_for_update_delete;
1114    
1115   return $self->result_source->storage->update(
1116     $self->result_source, $values, $cond
1117   );
1118 }
1119
1120 =head2 update_all
1121
1122 =over 4
1123
1124 =item Arguments: \%values
1125
1126 =item Return Value: 1
1127
1128 =back
1129
1130 Fetches all objects and updates them one at a time. Note that C<update_all>
1131 will run DBIC cascade triggers, while L</update> will not.
1132
1133 =cut
1134
1135 sub update_all {
1136   my ($self, $values) = @_;
1137   $self->throw_exception("Values for update must be a hash")
1138     unless ref $values eq 'HASH';
1139   foreach my $obj ($self->all) {
1140     $obj->set_columns($values)->update;
1141   }
1142   return 1;
1143 }
1144
1145 =head2 delete
1146
1147 =over 4
1148
1149 =item Arguments: none
1150
1151 =item Return Value: 1
1152
1153 =back
1154
1155 Deletes the contents of the resultset from its result source. Note that this
1156 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1157 to run. See also L<DBIx::Class::Row/delete>.
1158
1159 =cut
1160
1161 sub delete {
1162   my ($self) = @_;
1163
1164   my $cond = $self->_cond_for_update_delete;
1165
1166   $self->result_source->storage->delete($self->result_source, $cond);
1167   return 1;
1168 }
1169
1170 =head2 delete_all
1171
1172 =over 4
1173
1174 =item Arguments: none
1175
1176 =item Return Value: 1
1177
1178 =back
1179
1180 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1181 will run DBIC cascade triggers, while L</delete> will not.
1182
1183 =cut
1184
1185 sub delete_all {
1186   my ($self) = @_;
1187   $_->delete for $self->all;
1188   return 1;
1189 }
1190
1191 =head2 pager
1192
1193 =over 4
1194
1195 =item Arguments: none
1196
1197 =item Return Value: $pager
1198
1199 =back
1200
1201 Return Value a L<Data::Page> object for the current resultset. Only makes
1202 sense for queries with a C<page> attribute.
1203
1204 =cut
1205
1206 sub pager {
1207   my ($self) = @_;
1208   my $attrs = $self->{attrs};
1209   $self->throw_exception("Can't create pager for non-paged rs")
1210     unless $self->{attrs}{page};
1211   $attrs->{rows} ||= 10;
1212   return $self->{pager} ||= Data::Page->new(
1213     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1214 }
1215
1216 =head2 page
1217
1218 =over 4
1219
1220 =item Arguments: $page_number
1221
1222 =item Return Value: $rs
1223
1224 =back
1225
1226 Returns a resultset for the $page_number page of the resultset on which page
1227 is called, where each page contains a number of rows equal to the 'rows'
1228 attribute set on the resultset (10 by default).
1229
1230 =cut
1231
1232 sub page {
1233   my ($self, $page) = @_;
1234   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1235 }
1236
1237 =head2 new_result
1238
1239 =over 4
1240
1241 =item Arguments: \%vals
1242
1243 =item Return Value: $object
1244
1245 =back
1246
1247 Creates an object in the resultset's result class and returns it.
1248
1249 =cut
1250
1251 sub new_result {
1252   my ($self, $values) = @_;
1253   $self->throw_exception( "new_result needs a hash" )
1254     unless (ref $values eq 'HASH');
1255   $self->throw_exception(
1256     "Can't abstract implicit construct, condition not a hash"
1257   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1258
1259   my $alias = $self->{attrs}{alias};
1260   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1261   my %new = (
1262     %{ $self->_remove_alias($values, $alias) },
1263     %{ $self->_remove_alias($collapsed_cond, $alias) },
1264   );
1265
1266   return $self->result_class->new(\%new,$self->_source_handle);
1267 }
1268
1269 # _collapse_cond
1270 #
1271 # Recursively collapse the condition.
1272
1273 sub _collapse_cond {
1274   my ($self, $cond, $collapsed) = @_;
1275
1276   $collapsed ||= {};
1277
1278   if (ref $cond eq 'ARRAY') {
1279     foreach my $subcond (@$cond) {
1280       next unless ref $subcond;  # -or
1281 #      warn "ARRAY: " . Dumper $subcond;
1282       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1283     }
1284   }
1285   elsif (ref $cond eq 'HASH') {
1286     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1287       foreach my $subcond (@{$cond->{-and}}) {
1288 #        warn "HASH: " . Dumper $subcond;
1289         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1290       }
1291     }
1292     else {
1293 #      warn "LEAF: " . Dumper $cond;
1294       foreach my $col (keys %$cond) {
1295         my $value = $cond->{$col};
1296         $collapsed->{$col} = $value;
1297       }
1298     }
1299   }
1300
1301   return $collapsed;
1302 }
1303
1304 # _remove_alias
1305 #
1306 # Remove the specified alias from the specified query hash. A copy is made so
1307 # the original query is not modified.
1308
1309 sub _remove_alias {
1310   my ($self, $query, $alias) = @_;
1311
1312   my %orig = %{ $query || {} };
1313   my %unaliased;
1314
1315   foreach my $key (keys %orig) {
1316     if ($key !~ /\./) {
1317       $unaliased{$key} = $orig{$key};
1318       next;
1319     }
1320     $unaliased{$1} = $orig{$key}
1321       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1322   }
1323
1324   return \%unaliased;
1325 }
1326
1327 =head2 find_or_new
1328
1329 =over 4
1330
1331 =item Arguments: \%vals, \%attrs?
1332
1333 =item Return Value: $object
1334
1335 =back
1336
1337 Find an existing record from this resultset. If none exists, instantiate a new
1338 result object and return it. The object will not be saved into your storage
1339 until you call L<DBIx::Class::Row/insert> on it.
1340
1341 If you want objects to be saved immediately, use L</find_or_create> instead.
1342
1343 =cut
1344
1345 sub find_or_new {
1346   my $self     = shift;
1347   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1348   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1349   my $exists   = $self->find($hash, $attrs);
1350   return defined $exists ? $exists : $self->new_result($hash);
1351 }
1352
1353 =head2 create
1354
1355 =over 4
1356
1357 =item Arguments: \%vals
1358
1359 =item Return Value: $object
1360
1361 =back
1362
1363 Inserts a record into the resultset and returns the object representing it.
1364
1365 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1366
1367 =cut
1368
1369 sub create {
1370   my ($self, $attrs) = @_;
1371   $self->throw_exception( "create needs a hashref" )
1372     unless ref $attrs eq 'HASH';
1373   return $self->new_result($attrs)->insert;
1374 }
1375
1376 =head2 find_or_create
1377
1378 =over 4
1379
1380 =item Arguments: \%vals, \%attrs?
1381
1382 =item Return Value: $object
1383
1384 =back
1385
1386   $class->find_or_create({ key => $val, ... });
1387
1388 Tries to find a record based on its primary key or unique constraint; if none
1389 is found, creates one and returns that instead.
1390
1391   my $cd = $schema->resultset('CD')->find_or_create({
1392     cdid   => 5,
1393     artist => 'Massive Attack',
1394     title  => 'Mezzanine',
1395     year   => 2005,
1396   });
1397
1398 Also takes an optional C<key> attribute, to search by a specific key or unique
1399 constraint. For example:
1400
1401   my $cd = $schema->resultset('CD')->find_or_create(
1402     {
1403       artist => 'Massive Attack',
1404       title  => 'Mezzanine',
1405     },
1406     { key => 'cd_artist_title' }
1407   );
1408
1409 See also L</find> and L</update_or_create>. For information on how to declare
1410 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1411
1412 =cut
1413
1414 sub find_or_create {
1415   my $self     = shift;
1416   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1417   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1418   my $exists   = $self->find($hash, $attrs);
1419   return defined $exists ? $exists : $self->create($hash);
1420 }
1421
1422 =head2 update_or_create
1423
1424 =over 4
1425
1426 =item Arguments: \%col_values, { key => $unique_constraint }?
1427
1428 =item Return Value: $object
1429
1430 =back
1431
1432   $class->update_or_create({ col => $val, ... });
1433
1434 First, searches for an existing row matching one of the unique constraints
1435 (including the primary key) on the source of this resultset. If a row is
1436 found, updates it with the other given column values. Otherwise, creates a new
1437 row.
1438
1439 Takes an optional C<key> attribute to search on a specific unique constraint.
1440 For example:
1441
1442   # In your application
1443   my $cd = $schema->resultset('CD')->update_or_create(
1444     {
1445       artist => 'Massive Attack',
1446       title  => 'Mezzanine',
1447       year   => 1998,
1448     },
1449     { key => 'cd_artist_title' }
1450   );
1451
1452 If no C<key> is specified, it searches on all unique constraints defined on the
1453 source, including the primary key.
1454
1455 If the C<key> is specified as C<primary>, it searches only on the primary key.
1456
1457 See also L</find> and L</find_or_create>. For information on how to declare
1458 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1459
1460 =cut
1461
1462 sub update_or_create {
1463   my $self = shift;
1464   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1465   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1466
1467   my $row = $self->find($cond, $attrs);
1468   if (defined $row) {
1469     $row->update($cond);
1470     return $row;
1471   }
1472
1473   return $self->create($cond);
1474 }
1475
1476 =head2 get_cache
1477
1478 =over 4
1479
1480 =item Arguments: none
1481
1482 =item Return Value: \@cache_objects?
1483
1484 =back
1485
1486 Gets the contents of the cache for the resultset, if the cache is set.
1487
1488 =cut
1489
1490 sub get_cache {
1491   shift->{all_cache};
1492 }
1493
1494 =head2 set_cache
1495
1496 =over 4
1497
1498 =item Arguments: \@cache_objects
1499
1500 =item Return Value: \@cache_objects
1501
1502 =back
1503
1504 Sets the contents of the cache for the resultset. Expects an arrayref
1505 of objects of the same class as those produced by the resultset. Note that
1506 if the cache is set the resultset will return the cached objects rather
1507 than re-querying the database even if the cache attr is not set.
1508
1509 =cut
1510
1511 sub set_cache {
1512   my ( $self, $data ) = @_;
1513   $self->throw_exception("set_cache requires an arrayref")
1514       if defined($data) && (ref $data ne 'ARRAY');
1515   $self->{all_cache} = $data;
1516 }
1517
1518 =head2 clear_cache
1519
1520 =over 4
1521
1522 =item Arguments: none
1523
1524 =item Return Value: []
1525
1526 =back
1527
1528 Clears the cache for the resultset.
1529
1530 =cut
1531
1532 sub clear_cache {
1533   shift->set_cache(undef);
1534 }
1535
1536 =head2 related_resultset
1537
1538 =over 4
1539
1540 =item Arguments: $relationship_name
1541
1542 =item Return Value: $resultset
1543
1544 =back
1545
1546 Returns a related resultset for the supplied relationship name.
1547
1548   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1549
1550 =cut
1551
1552 sub related_resultset {
1553   my ($self, $rel) = @_;
1554
1555   $self->{related_resultsets} ||= {};
1556   return $self->{related_resultsets}{$rel} ||= do {
1557     my $rel_obj = $self->result_source->relationship_info($rel);
1558
1559     $self->throw_exception(
1560       "search_related: result source '" . $self->_source_handle->source_moniker .
1561         "' has no such relationship $rel")
1562       unless $rel_obj;
1563     
1564     my ($from,$seen) = $self->_resolve_from($rel);
1565
1566     my $join_count = $seen->{$rel};
1567     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1568
1569     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1570       undef, {
1571         %{$self->{attrs}||{}},
1572         join => undef,
1573         prefetch => undef,
1574         select => undef,
1575         as => undef,
1576         alias => $alias,
1577         where => $self->{cond},
1578         seen_join => $seen,
1579         from => $from,
1580     });
1581   };
1582 }
1583
1584 sub _resolve_from {
1585   my ($self, $extra_join) = @_;
1586   my $source = $self->result_source;
1587   my $attrs = $self->{attrs};
1588   
1589   my $from = $attrs->{from}
1590     || [ { $attrs->{alias} => $source->from } ];
1591     
1592   my $seen = { %{$attrs->{seen_join}||{}} };
1593
1594   my $join = ($attrs->{join}
1595                ? [ $attrs->{join}, $extra_join ]
1596                : $extra_join);
1597   $from = [
1598     @$from,
1599     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1600   ];
1601
1602   return ($from,$seen);
1603 }
1604
1605 sub _resolved_attrs {
1606   my $self = shift;
1607   return $self->{_attrs} if $self->{_attrs};
1608
1609   my $attrs = { %{$self->{attrs}||{}} };
1610   my $source = $self->result_source;
1611   my $alias = $attrs->{alias};
1612
1613   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1614   if ($attrs->{columns}) {
1615     delete $attrs->{as};
1616   } elsif (!$attrs->{select}) {
1617     $attrs->{columns} = [ $source->columns ];
1618   }
1619  
1620   $attrs->{select} = 
1621     ($attrs->{select}
1622       ? (ref $attrs->{select} eq 'ARRAY'
1623           ? [ @{$attrs->{select}} ]
1624           : [ $attrs->{select} ])
1625       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1626     );
1627   $attrs->{as} =
1628     ($attrs->{as}
1629       ? (ref $attrs->{as} eq 'ARRAY'
1630           ? [ @{$attrs->{as}} ]
1631           : [ $attrs->{as} ])
1632       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1633     );
1634   
1635   my $adds;
1636   if ($adds = delete $attrs->{include_columns}) {
1637     $adds = [$adds] unless ref $adds eq 'ARRAY';
1638     push(@{$attrs->{select}}, @$adds);
1639     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1640   }
1641   if ($adds = delete $attrs->{'+select'}) {
1642     $adds = [$adds] unless ref $adds eq 'ARRAY';
1643     push(@{$attrs->{select}},
1644            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1645   }
1646   if (my $adds = delete $attrs->{'+as'}) {
1647     $adds = [$adds] unless ref $adds eq 'ARRAY';
1648     push(@{$attrs->{as}}, @$adds);
1649   }
1650
1651   $attrs->{from} ||= [ { 'me' => $source->from } ];
1652
1653   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1654     my $join = delete $attrs->{join} || {};
1655
1656     if (defined $attrs->{prefetch}) {
1657       $join = $self->_merge_attr(
1658         $join, $attrs->{prefetch}
1659       );
1660     }
1661
1662     $attrs->{from} =   # have to copy here to avoid corrupting the original
1663       [
1664         @{$attrs->{from}}, 
1665         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1666       ];
1667   }
1668
1669   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1670   if ($attrs->{order_by}) {
1671     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1672                            ? [ @{$attrs->{order_by}} ]
1673                            : [ $attrs->{order_by} ]);
1674   } else {
1675     $attrs->{order_by} = [];    
1676   }
1677
1678   my $collapse = $attrs->{collapse} || {};
1679   if (my $prefetch = delete $attrs->{prefetch}) {
1680     $prefetch = $self->_merge_attr({}, $prefetch);
1681     my @pre_order;
1682     my $seen = $attrs->{seen_join} || {};
1683     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1684       # bring joins back to level of current class
1685       my @prefetch = $source->resolve_prefetch(
1686         $p, $alias, $seen, \@pre_order, $collapse
1687       );
1688       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1689       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1690     }
1691     push(@{$attrs->{order_by}}, @pre_order);
1692   }
1693   $attrs->{collapse} = $collapse;
1694
1695   return $self->{_attrs} = $attrs;
1696 }
1697
1698 sub _merge_attr {
1699   my ($self, $a, $b) = @_;
1700   return $b unless defined($a);
1701   return $a unless defined($b);
1702   
1703   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1704     foreach my $key (keys %{$b}) {
1705       if (exists $a->{$key}) {
1706         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1707       } else {
1708         $a->{$key} = $b->{$key};
1709       }
1710     }
1711     return $a;
1712   } else {
1713     $a = [$a] unless ref $a eq 'ARRAY';
1714     $b = [$b] unless ref $b eq 'ARRAY';
1715
1716     my $hash = {};
1717     my @array;
1718     foreach my $x ($a, $b) {
1719       foreach my $element (@{$x}) {
1720         if (ref $element eq 'HASH') {
1721           $hash = $self->_merge_attr($hash, $element);
1722         } elsif (ref $element eq 'ARRAY') {
1723           push(@array, @{$element});
1724         } else {
1725           push(@array, $element) unless $b == $x
1726             && grep { $_ eq $element } @array;
1727         }
1728       }
1729     }
1730     
1731     @array = grep { !exists $hash->{$_} } @array;
1732
1733     return keys %{$hash}
1734       ? ( scalar(@array)
1735             ? [$hash, @array]
1736             : $hash
1737         )
1738       : \@array;
1739   }
1740 }
1741
1742 sub result_source {
1743     my $self = shift;
1744
1745     if (@_) {
1746         $self->_source_handle($_[0]->handle);
1747     } else {
1748         $self->_source_handle->resolve;
1749     }
1750 }
1751
1752 =head2 throw_exception
1753
1754 See L<DBIx::Class::Schema/throw_exception> for details.
1755
1756 =cut
1757
1758 sub throw_exception {
1759   my $self=shift;
1760   $self->_source_handle->schema->throw_exception(@_);
1761 }
1762
1763 # XXX: FIXME: Attributes docs need clearing up
1764
1765 =head1 ATTRIBUTES
1766
1767 The resultset takes various attributes that modify its behavior. Here's an
1768 overview of them:
1769
1770 =head2 order_by
1771
1772 =over 4
1773
1774 =item Value: ($order_by | \@order_by)
1775
1776 =back
1777
1778 Which column(s) to order the results by. This is currently passed
1779 through directly to SQL, so you can give e.g. C<year DESC> for a
1780 descending order on the column `year'.
1781
1782 Please note that if you have C<quote_char> enabled (see
1783 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1784 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1785 so you will need to manually quote things as appropriate.)
1786
1787 =head2 columns
1788
1789 =over 4
1790
1791 =item Value: \@columns
1792
1793 =back
1794
1795 Shortcut to request a particular set of columns to be retrieved.  Adds
1796 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1797 from that, then auto-populates C<as> from C<select> as normal. (You may also
1798 use the C<cols> attribute, as in earlier versions of DBIC.)
1799
1800 =head2 include_columns
1801
1802 =over 4
1803
1804 =item Value: \@columns
1805
1806 =back
1807
1808 Shortcut to include additional columns in the returned results - for example
1809
1810   $schema->resultset('CD')->search(undef, {
1811     include_columns => ['artist.name'],
1812     join => ['artist']
1813   });
1814
1815 would return all CDs and include a 'name' column to the information
1816 passed to object inflation
1817
1818 =head2 select
1819
1820 =over 4
1821
1822 =item Value: \@select_columns
1823
1824 =back
1825
1826 Indicates which columns should be selected from the storage. You can use
1827 column names, or in the case of RDBMS back ends, function or stored procedure
1828 names:
1829
1830   $rs = $schema->resultset('Employee')->search(undef, {
1831     select => [
1832       'name',
1833       { count => 'employeeid' },
1834       { sum => 'salary' }
1835     ]
1836   });
1837
1838 When you use function/stored procedure names and do not supply an C<as>
1839 attribute, the column names returned are storage-dependent. E.g. MySQL would
1840 return a column named C<count(employeeid)> in the above example.
1841
1842 =head2 +select
1843
1844 =over 4
1845
1846 Indicates additional columns to be selected from storage.  Works the same as
1847 L<select> but adds columns to the selection.
1848
1849 =back
1850
1851 =head2 +as
1852
1853 =over 4
1854
1855 Indicates additional column names for those added via L<+select>.
1856
1857 =back
1858
1859 =head2 as
1860
1861 =over 4
1862
1863 =item Value: \@inflation_names
1864
1865 =back
1866
1867 Indicates column names for object inflation. This is used in conjunction with
1868 C<select>, usually when C<select> contains one or more function or stored
1869 procedure names:
1870
1871   $rs = $schema->resultset('Employee')->search(undef, {
1872     select => [
1873       'name',
1874       { count => 'employeeid' }
1875     ],
1876     as => ['name', 'employee_count'],
1877   });
1878
1879   my $employee = $rs->first(); # get the first Employee
1880
1881 If the object against which the search is performed already has an accessor
1882 matching a column name specified in C<as>, the value can be retrieved using
1883 the accessor as normal:
1884
1885   my $name = $employee->name();
1886
1887 If on the other hand an accessor does not exist in the object, you need to
1888 use C<get_column> instead:
1889
1890   my $employee_count = $employee->get_column('employee_count');
1891
1892 You can create your own accessors if required - see
1893 L<DBIx::Class::Manual::Cookbook> for details.
1894
1895 Please note: This will NOT insert an C<AS employee_count> into the SQL
1896 statement produced, it is used for internal access only. Thus
1897 attempting to use the accessor in an C<order_by> clause or similar
1898 will fail miserably.
1899
1900 To get around this limitation, you can supply literal SQL to your
1901 C<select> attibute that contains the C<AS alias> text, eg:
1902
1903   select => [\'myfield AS alias']
1904
1905 =head2 join
1906
1907 =over 4
1908
1909 =item Value: ($rel_name | \@rel_names | \%rel_names)
1910
1911 =back
1912
1913 Contains a list of relationships that should be joined for this query.  For
1914 example:
1915
1916   # Get CDs by Nine Inch Nails
1917   my $rs = $schema->resultset('CD')->search(
1918     { 'artist.name' => 'Nine Inch Nails' },
1919     { join => 'artist' }
1920   );
1921
1922 Can also contain a hash reference to refer to the other relation's relations.
1923 For example:
1924
1925   package MyApp::Schema::Track;
1926   use base qw/DBIx::Class/;
1927   __PACKAGE__->table('track');
1928   __PACKAGE__->add_columns(qw/trackid cd position title/);
1929   __PACKAGE__->set_primary_key('trackid');
1930   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1931   1;
1932
1933   # In your application
1934   my $rs = $schema->resultset('Artist')->search(
1935     { 'track.title' => 'Teardrop' },
1936     {
1937       join     => { cd => 'track' },
1938       order_by => 'artist.name',
1939     }
1940   );
1941
1942 You need to use the relationship (not the table) name in  conditions, 
1943 because they are aliased as such. The current table is aliased as "me", so 
1944 you need to use me.column_name in order to avoid ambiguity. For example:
1945
1946   # Get CDs from 1984 with a 'Foo' track 
1947   my $rs = $schema->resultset('CD')->search(
1948     { 
1949       'me.year' => 1984,
1950       'tracks.name' => 'Foo'
1951     },
1952     { join => 'tracks' }
1953   );
1954   
1955 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1956 similarly for a third time). For e.g.
1957
1958   my $rs = $schema->resultset('Artist')->search({
1959     'cds.title'   => 'Down to Earth',
1960     'cds_2.title' => 'Popular',
1961   }, {
1962     join => [ qw/cds cds/ ],
1963   });
1964
1965 will return a set of all artists that have both a cd with title 'Down
1966 to Earth' and a cd with title 'Popular'.
1967
1968 If you want to fetch related objects from other tables as well, see C<prefetch>
1969 below.
1970
1971 =head2 prefetch
1972
1973 =over 4
1974
1975 =item Value: ($rel_name | \@rel_names | \%rel_names)
1976
1977 =back
1978
1979 Contains one or more relationships that should be fetched along with the main
1980 query (when they are accessed afterwards they will have already been
1981 "prefetched").  This is useful for when you know you will need the related
1982 objects, because it saves at least one query:
1983
1984   my $rs = $schema->resultset('Tag')->search(
1985     undef,
1986     {
1987       prefetch => {
1988         cd => 'artist'
1989       }
1990     }
1991   );
1992
1993 The initial search results in SQL like the following:
1994
1995   SELECT tag.*, cd.*, artist.* FROM tag
1996   JOIN cd ON tag.cd = cd.cdid
1997   JOIN artist ON cd.artist = artist.artistid
1998
1999 L<DBIx::Class> has no need to go back to the database when we access the
2000 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2001 case.
2002
2003 Simple prefetches will be joined automatically, so there is no need
2004 for a C<join> attribute in the above search. If you're prefetching to
2005 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2006 specify the join as well.
2007
2008 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2009 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2010 with an accessor type of 'single' or 'filter').
2011
2012 =head2 page
2013
2014 =over 4
2015
2016 =item Value: $page
2017
2018 =back
2019
2020 Makes the resultset paged and specifies the page to retrieve. Effectively
2021 identical to creating a non-pages resultset and then calling ->page($page)
2022 on it.
2023
2024 If L<rows> attribute is not specified it defualts to 10 rows per page.
2025
2026 =head2 rows
2027
2028 =over 4
2029
2030 =item Value: $rows
2031
2032 =back
2033
2034 Specifes the maximum number of rows for direct retrieval or the number of
2035 rows per page if the page attribute or method is used.
2036
2037 =head2 offset
2038
2039 =over 4
2040
2041 =item Value: $offset
2042
2043 =back
2044
2045 Specifies the (zero-based) row number for the  first row to be returned, or the
2046 of the first row of the first page if paging is used.
2047
2048 =head2 group_by
2049
2050 =over 4
2051
2052 =item Value: \@columns
2053
2054 =back
2055
2056 A arrayref of columns to group by. Can include columns of joined tables.
2057
2058   group_by => [qw/ column1 column2 ... /]
2059
2060 =head2 having
2061
2062 =over 4
2063
2064 =item Value: $condition
2065
2066 =back
2067
2068 HAVING is a select statement attribute that is applied between GROUP BY and
2069 ORDER BY. It is applied to the after the grouping calculations have been
2070 done.
2071
2072   having => { 'count(employee)' => { '>=', 100 } }
2073
2074 =head2 distinct
2075
2076 =over 4
2077
2078 =item Value: (0 | 1)
2079
2080 =back
2081
2082 Set to 1 to group by all columns.
2083
2084 =head2 where
2085
2086 =over 4
2087
2088 Adds to the WHERE clause.
2089
2090   # only return rows WHERE deleted IS NULL for all searches
2091   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2092
2093 Can be overridden by passing C<{ where => undef }> as an attribute
2094 to a resulset.
2095
2096 =back
2097
2098 =head2 cache
2099
2100 Set to 1 to cache search results. This prevents extra SQL queries if you
2101 revisit rows in your ResultSet:
2102
2103   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2104
2105   while( my $artist = $resultset->next ) {
2106     ... do stuff ...
2107   }
2108
2109   $rs->first; # without cache, this would issue a query
2110
2111 By default, searches are not cached.
2112
2113 For more examples of using these attributes, see
2114 L<DBIx::Class::Manual::Cookbook>.
2115
2116 =head2 from
2117
2118 =over 4
2119
2120 =item Value: \@from_clause
2121
2122 =back
2123
2124 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2125 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2126 clauses.
2127
2128 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2129
2130 C<join> will usually do what you need and it is strongly recommended that you
2131 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2132 And we really do mean "cannot", not just tried and failed. Attempting to use
2133 this because you're having problems with C<join> is like trying to use x86
2134 ASM because you've got a syntax error in your C. Trust us on this.
2135
2136 Now, if you're still really, really sure you need to use this (and if you're
2137 not 100% sure, ask the mailing list first), here's an explanation of how this
2138 works.
2139
2140 The syntax is as follows -
2141
2142   [
2143     { <alias1> => <table1> },
2144     [
2145       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2146       [], # nested JOIN (optional)
2147       { <table1.column1> => <table2.column2>, ... (more conditions) },
2148     ],
2149     # More of the above [ ] may follow for additional joins
2150   ]
2151
2152   <table1> <alias1>
2153   JOIN
2154     <table2> <alias2>
2155     [JOIN ...]
2156   ON <table1.column1> = <table2.column2>
2157   <more joins may follow>
2158
2159 An easy way to follow the examples below is to remember the following:
2160
2161     Anything inside "[]" is a JOIN
2162     Anything inside "{}" is a condition for the enclosing JOIN
2163
2164 The following examples utilize a "person" table in a family tree application.
2165 In order to express parent->child relationships, this table is self-joined:
2166
2167     # Person->belongs_to('father' => 'Person');
2168     # Person->belongs_to('mother' => 'Person');
2169
2170 C<from> can be used to nest joins. Here we return all children with a father,
2171 then search against all mothers of those children:
2172
2173   $rs = $schema->resultset('Person')->search(
2174       undef,
2175       {
2176           alias => 'mother', # alias columns in accordance with "from"
2177           from => [
2178               { mother => 'person' },
2179               [
2180                   [
2181                       { child => 'person' },
2182                       [
2183                           { father => 'person' },
2184                           { 'father.person_id' => 'child.father_id' }
2185                       ]
2186                   ],
2187                   { 'mother.person_id' => 'child.mother_id' }
2188               ],
2189           ]
2190       },
2191   );
2192
2193   # Equivalent SQL:
2194   # SELECT mother.* FROM person mother
2195   # JOIN (
2196   #   person child
2197   #   JOIN person father
2198   #   ON ( father.person_id = child.father_id )
2199   # )
2200   # ON ( mother.person_id = child.mother_id )
2201
2202 The type of any join can be controlled manually. To search against only people
2203 with a father in the person table, we could explicitly use C<INNER JOIN>:
2204
2205     $rs = $schema->resultset('Person')->search(
2206         undef,
2207         {
2208             alias => 'child', # alias columns in accordance with "from"
2209             from => [
2210                 { child => 'person' },
2211                 [
2212                     { father => 'person', -join_type => 'inner' },
2213                     { 'father.id' => 'child.father_id' }
2214                 ],
2215             ]
2216         },
2217     );
2218
2219     # Equivalent SQL:
2220     # SELECT child.* FROM person child
2221     # INNER JOIN person father ON child.father_id = father.id
2222
2223 =cut
2224
2225 1;