Merge 'trunk' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see
139 L</ATTRIBUTES>. For more examples of using this function, see
140 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
141 documentation for the first argument, see L<SQL::Abstract>.
142
143 =cut
144
145 sub search {
146   my $self = shift;
147   my $rs = $self->search_rs( @_ );
148   return (wantarray ? $rs->all : $rs);
149 }
150
151 =head2 search_rs
152
153 =over 4
154
155 =item Arguments: $cond, \%attrs?
156
157 =item Return Value: $resultset
158
159 =back
160
161 This method does the same exact thing as search() except it will
162 always return a resultset, even in list context.
163
164 =cut
165
166 sub search_rs {
167   my $self = shift;
168
169   my $rows;
170
171   unless (@_) {                 # no search, effectively just a clone
172     $rows = $self->get_cache;
173   }
174
175   my $attrs = {};
176   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
177   my $our_attrs = { %{$self->{attrs}} };
178   my $having = delete $our_attrs->{having};
179   my $where = delete $our_attrs->{where};
180
181   my $new_attrs = { %{$our_attrs}, %{$attrs} };
182
183   # merge new attrs into inherited
184   foreach my $key (qw/join prefetch/) {
185     next unless exists $attrs->{$key};
186     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
187   }
188
189   my $cond = (@_
190     ? (
191         (@_ == 1 || ref $_[0] eq "HASH")
192           ? (
193               (ref $_[0] eq 'HASH')
194                 ? (
195                     (keys %{ $_[0] }  > 0)
196                       ? shift
197                       : undef
198                    )
199                 :  shift
200              )
201           : (
202               (@_ % 2)
203                 ? $self->throw_exception("Odd number of arguments to search")
204                 : {@_}
205              )
206       )
207     : undef
208   );
209
210   if (defined $where) {
211     $new_attrs->{where} = (
212       defined $new_attrs->{where}
213         ? { '-and' => [
214               map {
215                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
216               } $where, $new_attrs->{where}
217             ]
218           }
219         : $where);
220   }
221
222   if (defined $cond) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $cond, $new_attrs->{where}
229             ]
230           }
231         : $cond);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->_source_handle, $new_attrs);
247   if ($rows) {
248     $rs->set_cache($rows);
249   }
250   return $rs;
251 }
252
253 =head2 search_literal
254
255 =over 4
256
257 =item Arguments: $sql_fragment, @bind_values
258
259 =item Return Value: $resultset (scalar context), @row_objs (list context)
260
261 =back
262
263   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
264   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
265
266 Pass a literal chunk of SQL to be added to the conditional part of the
267 resultset query.
268
269 =cut
270
271 sub search_literal {
272   my ($self, $cond, @vals) = @_;
273   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
274   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
275   return $self->search(\$cond, $attrs);
276 }
277
278 =head2 find
279
280 =over 4
281
282 =item Arguments: @values | \%cols, \%attrs?
283
284 =item Return Value: $row_object
285
286 =back
287
288 Finds a row based on its primary key or unique constraint. For example, to find
289 a row by its primary key:
290
291   my $cd = $schema->resultset('CD')->find(5);
292
293 You can also find a row by a specific unique constraint using the C<key>
294 attribute. For example:
295
296   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
297     key => 'cd_artist_title'
298   });
299
300 Additionally, you can specify the columns explicitly by name:
301
302   my $cd = $schema->resultset('CD')->find(
303     {
304       artist => 'Massive Attack',
305       title  => 'Mezzanine',
306     },
307     { key => 'cd_artist_title' }
308   );
309
310 If the C<key> is specified as C<primary>, it searches only on the primary key.
311
312 If no C<key> is specified, it searches on all unique constraints defined on the
313 source, including the primary key.
314
315 If your table does not have a primary key, you B<must> provide a value for the
316 C<key> attribute matching one of the unique constraints on the source.
317
318 See also L</find_or_create> and L</update_or_create>. For information on how to
319 declare unique constraints, see
320 L<DBIx::Class::ResultSource/add_unique_constraint>.
321
322 =cut
323
324 sub find {
325   my $self = shift;
326   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
327
328   # Default to the primary key, but allow a specific key
329   my @cols = exists $attrs->{key}
330     ? $self->result_source->unique_constraint_columns($attrs->{key})
331     : $self->result_source->primary_columns;
332   $self->throw_exception(
333     "Can't find unless a primary key is defined or unique constraint is specified"
334   ) unless @cols;
335
336   # Parse out a hashref from input
337   my $input_query;
338   if (ref $_[0] eq 'HASH') {
339     $input_query = { %{$_[0]} };
340   }
341   elsif (@_ == @cols) {
342     $input_query = {};
343     @{$input_query}{@cols} = @_;
344   }
345   else {
346     # Compatibility: Allow e.g. find(id => $value)
347     carp "Find by key => value deprecated; please use a hashref instead";
348     $input_query = {@_};
349   }
350
351   my (%related, $info);
352
353   foreach my $key (keys %$input_query) {
354     if (ref($input_query->{$key})
355         && ($info = $self->result_source->relationship_info($key))) {
356       my $rel_q = $self->result_source->resolve_condition(
357                     $info->{cond}, delete $input_query->{$key}, $key
358                   );
359       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
360       @related{keys %$rel_q} = values %$rel_q;
361     }
362   }
363   if (my @keys = keys %related) {
364     @{$input_query}{@keys} = values %related;
365   }
366
367   my @unique_queries = $self->_unique_queries($input_query, $attrs);
368
369   # Build the final query: Default to the disjunction of the unique queries,
370   # but allow the input query in case the ResultSet defines the query or the
371   # user is abusing find
372   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
373   my $query = @unique_queries
374     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
375     : $self->_add_alias($input_query, $alias);
376
377   # Run the query
378   if (keys %$attrs) {
379     my $rs = $self->search($query, $attrs);
380     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
381   }
382   else {
383     return keys %{$self->_resolved_attrs->{collapse}}
384       ? $self->search($query)->next
385       : $self->single($query);
386   }
387 }
388
389 # _add_alias
390 #
391 # Add the specified alias to the specified query hash. A copy is made so the
392 # original query is not modified.
393
394 sub _add_alias {
395   my ($self, $query, $alias) = @_;
396
397   my %aliased = %$query;
398   foreach my $col (grep { ! m/\./ } keys %aliased) {
399     $aliased{"$alias.$col"} = delete $aliased{$col};
400   }
401
402   return \%aliased;
403 }
404
405 # _unique_queries
406 #
407 # Build a list of queries which satisfy unique constraints.
408
409 sub _unique_queries {
410   my ($self, $query, $attrs) = @_;
411
412   my @constraint_names = exists $attrs->{key}
413     ? ($attrs->{key})
414     : $self->result_source->unique_constraint_names;
415
416   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
417   my $num_where = scalar keys %$where;
418
419   my @unique_queries;
420   foreach my $name (@constraint_names) {
421     my @unique_cols = $self->result_source->unique_constraint_columns($name);
422     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
423
424     my $num_cols = scalar @unique_cols;
425     my $num_query = scalar keys %$unique_query;
426
427     my $total = $num_query + $num_where;
428     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
429       # The query is either unique on its own or is unique in combination with
430       # the existing where clause
431       push @unique_queries, $unique_query;
432     }
433   }
434
435   return @unique_queries;
436 }
437
438 # _build_unique_query
439 #
440 # Constrain the specified query hash based on the specified column names.
441
442 sub _build_unique_query {
443   my ($self, $query, $unique_cols) = @_;
444
445   return {
446     map  { $_ => $query->{$_} }
447     grep { exists $query->{$_} }
448       @$unique_cols
449   };
450 }
451
452 =head2 search_related
453
454 =over 4
455
456 =item Arguments: $rel, $cond, \%attrs?
457
458 =item Return Value: $new_resultset
459
460 =back
461
462   $new_rs = $cd_rs->search_related('artist', {
463     name => 'Emo-R-Us',
464   });
465
466 Searches the specified relationship, optionally specifying a condition and
467 attributes for matching records. See L</ATTRIBUTES> for more information.
468
469 =cut
470
471 sub search_related {
472   return shift->related_resultset(shift)->search(@_);
473 }
474
475 =head2 cursor
476
477 =over 4
478
479 =item Arguments: none
480
481 =item Return Value: $cursor
482
483 =back
484
485 Returns a storage-driven cursor to the given resultset. See
486 L<DBIx::Class::Cursor> for more information.
487
488 =cut
489
490 sub cursor {
491   my ($self) = @_;
492
493   my $attrs = { %{$self->_resolved_attrs} };
494   return $self->{cursor}
495     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
496           $attrs->{where},$attrs);
497 }
498
499 =head2 single
500
501 =over 4
502
503 =item Arguments: $cond?
504
505 =item Return Value: $row_object?
506
507 =back
508
509   my $cd = $schema->resultset('CD')->single({ year => 2001 });
510
511 Inflates the first result without creating a cursor if the resultset has
512 any records in it; if not returns nothing. Used by L</find> as an optimisation.
513
514 Can optionally take an additional condition *only* - this is a fast-code-path
515 method; if you need to add extra joins or similar call ->search and then
516 ->single without a condition on the $rs returned from that.
517
518 =cut
519
520 sub single {
521   my ($self, $where) = @_;
522   my $attrs = { %{$self->_resolved_attrs} };
523   if ($where) {
524     if (defined $attrs->{where}) {
525       $attrs->{where} = {
526         '-and' =>
527             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
528                $where, delete $attrs->{where} ]
529       };
530     } else {
531       $attrs->{where} = $where;
532     }
533   }
534
535 #  XXX: Disabled since it doesn't infer uniqueness in all cases
536 #  unless ($self->_is_unique_query($attrs->{where})) {
537 #    carp "Query not guaranteed to return a single row"
538 #      . "; please declare your unique constraints or use search instead";
539 #  }
540
541   my @data = $self->result_source->storage->select_single(
542     $attrs->{from}, $attrs->{select},
543     $attrs->{where}, $attrs
544   );
545
546   return (@data ? ($self->_construct_object(@data))[0] : ());
547 }
548
549 # _is_unique_query
550 #
551 # Try to determine if the specified query is guaranteed to be unique, based on
552 # the declared unique constraints.
553
554 sub _is_unique_query {
555   my ($self, $query) = @_;
556
557   my $collapsed = $self->_collapse_query($query);
558   my $alias = $self->{attrs}{alias};
559
560   foreach my $name ($self->result_source->unique_constraint_names) {
561     my @unique_cols = map {
562       "$alias.$_"
563     } $self->result_source->unique_constraint_columns($name);
564
565     # Count the values for each unique column
566     my %seen = map { $_ => 0 } @unique_cols;
567
568     foreach my $key (keys %$collapsed) {
569       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
570       next unless exists $seen{$aliased};  # Additional constraints are okay
571       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
572     }
573
574     # If we get 0 or more than 1 value for a column, it's not necessarily unique
575     return 1 unless grep { $_ != 1 } values %seen;
576   }
577
578   return 0;
579 }
580
581 # _collapse_query
582 #
583 # Recursively collapse the query, accumulating values for each column.
584
585 sub _collapse_query {
586   my ($self, $query, $collapsed) = @_;
587
588   $collapsed ||= {};
589
590   if (ref $query eq 'ARRAY') {
591     foreach my $subquery (@$query) {
592       next unless ref $subquery;  # -or
593 #      warn "ARRAY: " . Dumper $subquery;
594       $collapsed = $self->_collapse_query($subquery, $collapsed);
595     }
596   }
597   elsif (ref $query eq 'HASH') {
598     if (keys %$query and (keys %$query)[0] eq '-and') {
599       foreach my $subquery (@{$query->{-and}}) {
600 #        warn "HASH: " . Dumper $subquery;
601         $collapsed = $self->_collapse_query($subquery, $collapsed);
602       }
603     }
604     else {
605 #      warn "LEAF: " . Dumper $query;
606       foreach my $col (keys %$query) {
607         my $value = $query->{$col};
608         $collapsed->{$col}{$value}++;
609       }
610     }
611   }
612
613   return $collapsed;
614 }
615
616 =head2 get_column
617
618 =over 4
619
620 =item Arguments: $cond?
621
622 =item Return Value: $resultsetcolumn
623
624 =back
625
626   my $max_length = $rs->get_column('length')->max;
627
628 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
629
630 =cut
631
632 sub get_column {
633   my ($self, $column) = @_;
634   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
635   return $new;
636 }
637
638 =head2 search_like
639
640 =over 4
641
642 =item Arguments: $cond, \%attrs?
643
644 =item Return Value: $resultset (scalar context), @row_objs (list context)
645
646 =back
647
648   # WHERE title LIKE '%blue%'
649   $cd_rs = $rs->search_like({ title => '%blue%'});
650
651 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
652 that this is simply a convenience method. You most likely want to use
653 L</search> with specific operators.
654
655 For more information, see L<DBIx::Class::Manual::Cookbook>.
656
657 =cut
658
659 sub search_like {
660   my $class = shift;
661   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
662   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
663   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
664   return $class->search($query, { %$attrs });
665 }
666
667 =head2 slice
668
669 =over 4
670
671 =item Arguments: $first, $last
672
673 =item Return Value: $resultset (scalar context), @row_objs (list context)
674
675 =back
676
677 Returns a resultset or object list representing a subset of elements from the
678 resultset slice is called on. Indexes are from 0, i.e., to get the first
679 three records, call:
680
681   my ($one, $two, $three) = $rs->slice(0, 2);
682
683 =cut
684
685 sub slice {
686   my ($self, $min, $max) = @_;
687   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
688   $attrs->{offset} = $self->{attrs}{offset} || 0;
689   $attrs->{offset} += $min;
690   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
691   return $self->search(undef(), $attrs);
692   #my $slice = (ref $self)->new($self->result_source, $attrs);
693   #return (wantarray ? $slice->all : $slice);
694 }
695
696 =head2 next
697
698 =over 4
699
700 =item Arguments: none
701
702 =item Return Value: $result?
703
704 =back
705
706 Returns the next element in the resultset (C<undef> is there is none).
707
708 Can be used to efficiently iterate over records in the resultset:
709
710   my $rs = $schema->resultset('CD')->search;
711   while (my $cd = $rs->next) {
712     print $cd->title;
713   }
714
715 Note that you need to store the resultset object, and call C<next> on it.
716 Calling C<< resultset('Table')->next >> repeatedly will always return the
717 first record from the resultset.
718
719 =cut
720
721 sub next {
722   my ($self) = @_;
723   if (my $cache = $self->get_cache) {
724     $self->{all_cache_position} ||= 0;
725     return $cache->[$self->{all_cache_position}++];
726   }
727   if ($self->{attrs}{cache}) {
728     $self->{all_cache_position} = 1;
729     return ($self->all)[0];
730   }
731   if ($self->{stashed_objects}) {
732     my $obj = shift(@{$self->{stashed_objects}});
733     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
734     return $obj;
735   }
736   my @row = (
737     exists $self->{stashed_row}
738       ? @{delete $self->{stashed_row}}
739       : $self->cursor->next
740   );
741   return unless (@row);
742   my ($row, @more) = $self->_construct_object(@row);
743   $self->{stashed_objects} = \@more if @more;
744   return $row;
745 }
746
747 sub _construct_object {
748   my ($self, @row) = @_;
749   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
750   my @new = $self->result_class->inflate_result($self->_source_handle, @$info);
751   @new = $self->{_attrs}{record_filter}->(@new)
752     if exists $self->{_attrs}{record_filter};
753   return @new;
754 }
755
756 sub _collapse_result {
757   my ($self, $as, $row, $prefix) = @_;
758
759   my %const;
760   my @copy = @$row;
761   
762   foreach my $this_as (@$as) {
763     my $val = shift @copy;
764     if (defined $prefix) {
765       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
766         my $remain = $1;
767         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
768         $const{$1||''}{$2} = $val;
769       }
770     } else {
771       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
772       $const{$1||''}{$2} = $val;
773     }
774   }
775
776   my $alias = $self->{attrs}{alias};
777   my $info = [ {}, {} ];
778   foreach my $key (keys %const) {
779     if (length $key && $key ne $alias) {
780       my $target = $info;
781       my @parts = split(/\./, $key);
782       foreach my $p (@parts) {
783         $target = $target->[1]->{$p} ||= [];
784       }
785       $target->[0] = $const{$key};
786     } else {
787       $info->[0] = $const{$key};
788     }
789   }
790   
791   my @collapse;
792   if (defined $prefix) {
793     @collapse = map {
794         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
795     } keys %{$self->{_attrs}{collapse}}
796   } else {
797     @collapse = keys %{$self->{_attrs}{collapse}};
798   };
799
800   if (@collapse) {
801     my ($c) = sort { length $a <=> length $b } @collapse;
802     my $target = $info;
803     foreach my $p (split(/\./, $c)) {
804       $target = $target->[1]->{$p} ||= [];
805     }
806     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
807     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
808     my $tree = $self->_collapse_result($as, $row, $c_prefix);
809     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
810     my (@final, @raw);
811
812     while (
813       !(
814         grep {
815           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
816         } @co_key
817         )
818     ) {
819       push(@final, $tree);
820       last unless (@raw = $self->cursor->next);
821       $row = $self->{stashed_row} = \@raw;
822       $tree = $self->_collapse_result($as, $row, $c_prefix);
823     }
824     @$target = (@final ? @final : [ {}, {} ]);
825       # single empty result to indicate an empty prefetched has_many
826   }
827
828   #print "final info: " . Dumper($info);
829   return $info;
830 }
831
832 =head2 result_source
833
834 =over 4
835
836 =item Arguments: $result_source?
837
838 =item Return Value: $result_source
839
840 =back
841
842 An accessor for the primary ResultSource object from which this ResultSet
843 is derived.
844
845 =head2 result_class
846
847 =over 4
848
849 =item Arguments: $result_class?
850
851 =item Return Value: $result_class
852
853 =back
854
855 An accessor for the class to use when creating row objects. Defaults to 
856 C<< result_source->result_class >> - which in most cases is the name of the 
857 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
858
859 =cut
860
861
862 =head2 count
863
864 =over 4
865
866 =item Arguments: $cond, \%attrs??
867
868 =item Return Value: $count
869
870 =back
871
872 Performs an SQL C<COUNT> with the same query as the resultset was built
873 with to find the number of elements. If passed arguments, does a search
874 on the resultset and counts the results of that.
875
876 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
877 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
878 not support C<DISTINCT> with multiple columns. If you are using such a
879 database, you should only use columns from the main table in your C<group_by>
880 clause.
881
882 =cut
883
884 sub count {
885   my $self = shift;
886   return $self->search(@_)->count if @_ and defined $_[0];
887   return scalar @{ $self->get_cache } if $self->get_cache;
888   my $count = $self->_count;
889   return 0 unless $count;
890
891   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
892   $count = $self->{attrs}{rows} if
893     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
894   return $count;
895 }
896
897 sub _count { # Separated out so pager can get the full count
898   my $self = shift;
899   my $select = { count => '*' };
900
901   my $attrs = { %{$self->_resolved_attrs} };
902   if (my $group_by = delete $attrs->{group_by}) {
903     delete $attrs->{having};
904     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
905     # todo: try CONCAT for multi-column pk
906     my @pk = $self->result_source->primary_columns;
907     if (@pk == 1) {
908       my $alias = $attrs->{alias};
909       foreach my $column (@distinct) {
910         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
911           @distinct = ($column);
912           last;
913         }
914       }
915     }
916
917     $select = { count => { distinct => \@distinct } };
918   }
919
920   $attrs->{select} = $select;
921   $attrs->{as} = [qw/count/];
922
923   # offset, order by and page are not needed to count. record_filter is cdbi
924   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
925
926   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
927   my ($count) = $tmp_rs->cursor->next;
928   return $count;
929 }
930
931 =head2 count_literal
932
933 =over 4
934
935 =item Arguments: $sql_fragment, @bind_values
936
937 =item Return Value: $count
938
939 =back
940
941 Counts the results in a literal query. Equivalent to calling L</search_literal>
942 with the passed arguments, then L</count>.
943
944 =cut
945
946 sub count_literal { shift->search_literal(@_)->count; }
947
948 =head2 all
949
950 =over 4
951
952 =item Arguments: none
953
954 =item Return Value: @objects
955
956 =back
957
958 Returns all elements in the resultset. Called implicitly if the resultset
959 is returned in list context.
960
961 =cut
962
963 sub all {
964   my ($self) = @_;
965   return @{ $self->get_cache } if $self->get_cache;
966
967   my @obj;
968
969   # TODO: don't call resolve here
970   if (keys %{$self->_resolved_attrs->{collapse}}) {
971 #  if ($self->{attrs}{prefetch}) {
972       # Using $self->cursor->all is really just an optimisation.
973       # If we're collapsing has_many prefetches it probably makes
974       # very little difference, and this is cleaner than hacking
975       # _construct_object to survive the approach
976     my @row = $self->cursor->next;
977     while (@row) {
978       push(@obj, $self->_construct_object(@row));
979       @row = (exists $self->{stashed_row}
980                ? @{delete $self->{stashed_row}}
981                : $self->cursor->next);
982     }
983   } else {
984     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
985   }
986
987   $self->set_cache(\@obj) if $self->{attrs}{cache};
988   return @obj;
989 }
990
991 =head2 reset
992
993 =over 4
994
995 =item Arguments: none
996
997 =item Return Value: $self
998
999 =back
1000
1001 Resets the resultset's cursor, so you can iterate through the elements again.
1002
1003 =cut
1004
1005 sub reset {
1006   my ($self) = @_;
1007   delete $self->{_attrs} if exists $self->{_attrs};
1008   $self->{all_cache_position} = 0;
1009   $self->cursor->reset;
1010   return $self;
1011 }
1012
1013 =head2 first
1014
1015 =over 4
1016
1017 =item Arguments: none
1018
1019 =item Return Value: $object?
1020
1021 =back
1022
1023 Resets the resultset and returns an object for the first result (if the
1024 resultset returns anything).
1025
1026 =cut
1027
1028 sub first {
1029   return $_[0]->reset->next;
1030 }
1031
1032 # _cond_for_update_delete
1033 #
1034 # update/delete require the condition to be modified to handle
1035 # the differing SQL syntax available.  This transforms the $self->{cond}
1036 # appropriately, returning the new condition.
1037
1038 sub _cond_for_update_delete {
1039   my ($self, $full_cond) = @_;
1040   my $cond = {};
1041
1042   $full_cond ||= $self->{cond};
1043   # No-op. No condition, we're updating/deleting everything
1044   return $cond unless ref $full_cond;
1045
1046   if (ref $full_cond eq 'ARRAY') {
1047     $cond = [
1048       map {
1049         my %hash;
1050         foreach my $key (keys %{$_}) {
1051           $key =~ /([^.]+)$/;
1052           $hash{$1} = $_->{$key};
1053         }
1054         \%hash;
1055       } @{$full_cond}
1056     ];
1057   }
1058   elsif (ref $full_cond eq 'HASH') {
1059     if ((keys %{$full_cond})[0] eq '-and') {
1060       $cond->{-and} = [];
1061
1062       my @cond = @{$full_cond->{-and}};
1063       for (my $i = 0; $i < @cond; $i++) {
1064         my $entry = $cond[$i];
1065
1066         my $hash;
1067         if (ref $entry eq 'HASH') {
1068           $hash = $self->_cond_for_update_delete($entry);
1069         }
1070         else {
1071           $entry =~ /([^.]+)$/;
1072           $hash->{$1} = $cond[++$i];
1073         }
1074
1075         push @{$cond->{-and}}, $hash;
1076       }
1077     }
1078     else {
1079       foreach my $key (keys %{$full_cond}) {
1080         $key =~ /([^.]+)$/;
1081         $cond->{$1} = $full_cond->{$key};
1082       }
1083     }
1084   }
1085   else {
1086     $self->throw_exception(
1087       "Can't update/delete on resultset with condition unless hash or array"
1088     );
1089   }
1090
1091   return $cond;
1092 }
1093
1094
1095 =head2 update
1096
1097 =over 4
1098
1099 =item Arguments: \%values
1100
1101 =item Return Value: $storage_rv
1102
1103 =back
1104
1105 Sets the specified columns in the resultset to the supplied values in a
1106 single query. Return value will be true if the update succeeded or false
1107 if no records were updated; exact type of success value is storage-dependent.
1108
1109 =cut
1110
1111 sub update {
1112   my ($self, $values) = @_;
1113   $self->throw_exception("Values for update must be a hash")
1114     unless ref $values eq 'HASH';
1115
1116   my $cond = $self->_cond_for_update_delete;
1117    
1118   return $self->result_source->storage->update(
1119     $self->result_source, $values, $cond
1120   );
1121 }
1122
1123 =head2 update_all
1124
1125 =over 4
1126
1127 =item Arguments: \%values
1128
1129 =item Return Value: 1
1130
1131 =back
1132
1133 Fetches all objects and updates them one at a time. Note that C<update_all>
1134 will run DBIC cascade triggers, while L</update> will not.
1135
1136 =cut
1137
1138 sub update_all {
1139   my ($self, $values) = @_;
1140   $self->throw_exception("Values for update must be a hash")
1141     unless ref $values eq 'HASH';
1142   foreach my $obj ($self->all) {
1143     $obj->set_columns($values)->update;
1144   }
1145   return 1;
1146 }
1147
1148 =head2 delete
1149
1150 =over 4
1151
1152 =item Arguments: none
1153
1154 =item Return Value: 1
1155
1156 =back
1157
1158 Deletes the contents of the resultset from its result source. Note that this
1159 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1160 to run. See also L<DBIx::Class::Row/delete>.
1161
1162 =cut
1163
1164 sub delete {
1165   my ($self) = @_;
1166
1167   my $cond = $self->_cond_for_update_delete;
1168
1169   $self->result_source->storage->delete($self->result_source, $cond);
1170   return 1;
1171 }
1172
1173 =head2 delete_all
1174
1175 =over 4
1176
1177 =item Arguments: none
1178
1179 =item Return Value: 1
1180
1181 =back
1182
1183 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1184 will run DBIC cascade triggers, while L</delete> will not.
1185
1186 =cut
1187
1188 sub delete_all {
1189   my ($self) = @_;
1190   $_->delete for $self->all;
1191   return 1;
1192 }
1193
1194 =head2 pager
1195
1196 =over 4
1197
1198 =item Arguments: none
1199
1200 =item Return Value: $pager
1201
1202 =back
1203
1204 Return Value a L<Data::Page> object for the current resultset. Only makes
1205 sense for queries with a C<page> attribute.
1206
1207 =cut
1208
1209 sub pager {
1210   my ($self) = @_;
1211   my $attrs = $self->{attrs};
1212   $self->throw_exception("Can't create pager for non-paged rs")
1213     unless $self->{attrs}{page};
1214   $attrs->{rows} ||= 10;
1215   return $self->{pager} ||= Data::Page->new(
1216     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1217 }
1218
1219 =head2 page
1220
1221 =over 4
1222
1223 =item Arguments: $page_number
1224
1225 =item Return Value: $rs
1226
1227 =back
1228
1229 Returns a resultset for the $page_number page of the resultset on which page
1230 is called, where each page contains a number of rows equal to the 'rows'
1231 attribute set on the resultset (10 by default).
1232
1233 =cut
1234
1235 sub page {
1236   my ($self, $page) = @_;
1237   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1238 }
1239
1240 =head2 new_result
1241
1242 =over 4
1243
1244 =item Arguments: \%vals
1245
1246 =item Return Value: $object
1247
1248 =back
1249
1250 Creates an object in the resultset's result class and returns it.
1251
1252 =cut
1253
1254 sub new_result {
1255   my ($self, $values) = @_;
1256   $self->throw_exception( "new_result needs a hash" )
1257     unless (ref $values eq 'HASH');
1258   $self->throw_exception(
1259     "Can't abstract implicit construct, condition not a hash"
1260   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1261
1262   my $alias = $self->{attrs}{alias};
1263   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1264   my %new = (
1265     %{ $self->_remove_alias($values, $alias) },
1266     %{ $self->_remove_alias($collapsed_cond, $alias) },
1267   );
1268
1269   return $self->result_class->new(\%new,$self->_source_handle);
1270 }
1271
1272 # _collapse_cond
1273 #
1274 # Recursively collapse the condition.
1275
1276 sub _collapse_cond {
1277   my ($self, $cond, $collapsed) = @_;
1278
1279   $collapsed ||= {};
1280
1281   if (ref $cond eq 'ARRAY') {
1282     foreach my $subcond (@$cond) {
1283       next unless ref $subcond;  # -or
1284 #      warn "ARRAY: " . Dumper $subcond;
1285       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1286     }
1287   }
1288   elsif (ref $cond eq 'HASH') {
1289     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1290       foreach my $subcond (@{$cond->{-and}}) {
1291 #        warn "HASH: " . Dumper $subcond;
1292         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1293       }
1294     }
1295     else {
1296 #      warn "LEAF: " . Dumper $cond;
1297       foreach my $col (keys %$cond) {
1298         my $value = $cond->{$col};
1299         $collapsed->{$col} = $value;
1300       }
1301     }
1302   }
1303
1304   return $collapsed;
1305 }
1306
1307 # _remove_alias
1308 #
1309 # Remove the specified alias from the specified query hash. A copy is made so
1310 # the original query is not modified.
1311
1312 sub _remove_alias {
1313   my ($self, $query, $alias) = @_;
1314
1315   my %orig = %{ $query || {} };
1316   my %unaliased;
1317
1318   foreach my $key (keys %orig) {
1319     if ($key !~ /\./) {
1320       $unaliased{$key} = $orig{$key};
1321       next;
1322     }
1323     $unaliased{$1} = $orig{$key}
1324       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1325   }
1326
1327   return \%unaliased;
1328 }
1329
1330 =head2 find_or_new
1331
1332 =over 4
1333
1334 =item Arguments: \%vals, \%attrs?
1335
1336 =item Return Value: $object
1337
1338 =back
1339
1340 Find an existing record from this resultset. If none exists, instantiate a new
1341 result object and return it. The object will not be saved into your storage
1342 until you call L<DBIx::Class::Row/insert> on it.
1343
1344 If you want objects to be saved immediately, use L</find_or_create> instead.
1345
1346 =cut
1347
1348 sub find_or_new {
1349   my $self     = shift;
1350   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1351   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1352   my $exists   = $self->find($hash, $attrs);
1353   return defined $exists ? $exists : $self->new_result($hash);
1354 }
1355
1356 =head2 create
1357
1358 =over 4
1359
1360 =item Arguments: \%vals
1361
1362 =item Return Value: $object
1363
1364 =back
1365
1366 Inserts a record into the resultset and returns the object representing it.
1367
1368 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1369
1370 =cut
1371
1372 sub create {
1373   my ($self, $attrs) = @_;
1374   $self->throw_exception( "create needs a hashref" )
1375     unless ref $attrs eq 'HASH';
1376   return $self->new_result($attrs)->insert;
1377 }
1378
1379 =head2 find_or_create
1380
1381 =over 4
1382
1383 =item Arguments: \%vals, \%attrs?
1384
1385 =item Return Value: $object
1386
1387 =back
1388
1389   $class->find_or_create({ key => $val, ... });
1390
1391 Tries to find a record based on its primary key or unique constraint; if none
1392 is found, creates one and returns that instead.
1393
1394   my $cd = $schema->resultset('CD')->find_or_create({
1395     cdid   => 5,
1396     artist => 'Massive Attack',
1397     title  => 'Mezzanine',
1398     year   => 2005,
1399   });
1400
1401 Also takes an optional C<key> attribute, to search by a specific key or unique
1402 constraint. For example:
1403
1404   my $cd = $schema->resultset('CD')->find_or_create(
1405     {
1406       artist => 'Massive Attack',
1407       title  => 'Mezzanine',
1408     },
1409     { key => 'cd_artist_title' }
1410   );
1411
1412 See also L</find> and L</update_or_create>. For information on how to declare
1413 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1414
1415 =cut
1416
1417 sub find_or_create {
1418   my $self     = shift;
1419   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1420   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1421   my $exists   = $self->find($hash, $attrs);
1422   return defined $exists ? $exists : $self->create($hash);
1423 }
1424
1425 =head2 update_or_create
1426
1427 =over 4
1428
1429 =item Arguments: \%col_values, { key => $unique_constraint }?
1430
1431 =item Return Value: $object
1432
1433 =back
1434
1435   $class->update_or_create({ col => $val, ... });
1436
1437 First, searches for an existing row matching one of the unique constraints
1438 (including the primary key) on the source of this resultset. If a row is
1439 found, updates it with the other given column values. Otherwise, creates a new
1440 row.
1441
1442 Takes an optional C<key> attribute to search on a specific unique constraint.
1443 For example:
1444
1445   # In your application
1446   my $cd = $schema->resultset('CD')->update_or_create(
1447     {
1448       artist => 'Massive Attack',
1449       title  => 'Mezzanine',
1450       year   => 1998,
1451     },
1452     { key => 'cd_artist_title' }
1453   );
1454
1455 If no C<key> is specified, it searches on all unique constraints defined on the
1456 source, including the primary key.
1457
1458 If the C<key> is specified as C<primary>, it searches only on the primary key.
1459
1460 See also L</find> and L</find_or_create>. For information on how to declare
1461 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1462
1463 =cut
1464
1465 sub update_or_create {
1466   my $self = shift;
1467   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1468   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1469
1470   my $row = $self->find($cond, $attrs);
1471   if (defined $row) {
1472     $row->update($cond);
1473     return $row;
1474   }
1475
1476   return $self->create($cond);
1477 }
1478
1479 =head2 get_cache
1480
1481 =over 4
1482
1483 =item Arguments: none
1484
1485 =item Return Value: \@cache_objects?
1486
1487 =back
1488
1489 Gets the contents of the cache for the resultset, if the cache is set.
1490
1491 =cut
1492
1493 sub get_cache {
1494   shift->{all_cache};
1495 }
1496
1497 =head2 set_cache
1498
1499 =over 4
1500
1501 =item Arguments: \@cache_objects
1502
1503 =item Return Value: \@cache_objects
1504
1505 =back
1506
1507 Sets the contents of the cache for the resultset. Expects an arrayref
1508 of objects of the same class as those produced by the resultset. Note that
1509 if the cache is set the resultset will return the cached objects rather
1510 than re-querying the database even if the cache attr is not set.
1511
1512 =cut
1513
1514 sub set_cache {
1515   my ( $self, $data ) = @_;
1516   $self->throw_exception("set_cache requires an arrayref")
1517       if defined($data) && (ref $data ne 'ARRAY');
1518   $self->{all_cache} = $data;
1519 }
1520
1521 =head2 clear_cache
1522
1523 =over 4
1524
1525 =item Arguments: none
1526
1527 =item Return Value: []
1528
1529 =back
1530
1531 Clears the cache for the resultset.
1532
1533 =cut
1534
1535 sub clear_cache {
1536   shift->set_cache(undef);
1537 }
1538
1539 =head2 related_resultset
1540
1541 =over 4
1542
1543 =item Arguments: $relationship_name
1544
1545 =item Return Value: $resultset
1546
1547 =back
1548
1549 Returns a related resultset for the supplied relationship name.
1550
1551   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1552
1553 =cut
1554
1555 sub related_resultset {
1556   my ($self, $rel) = @_;
1557
1558   $self->{related_resultsets} ||= {};
1559   return $self->{related_resultsets}{$rel} ||= do {
1560     my $rel_obj = $self->result_source->relationship_info($rel);
1561
1562     $self->throw_exception(
1563       "search_related: result source '" . $self->_source_handle->source_moniker .
1564         "' has no such relationship $rel")
1565       unless $rel_obj;
1566     
1567     my ($from,$seen) = $self->_resolve_from($rel);
1568
1569     my $join_count = $seen->{$rel};
1570     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1571
1572     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1573       undef, {
1574         %{$self->{attrs}||{}},
1575         join => undef,
1576         prefetch => undef,
1577         select => undef,
1578         as => undef,
1579         alias => $alias,
1580         where => $self->{cond},
1581         seen_join => $seen,
1582         from => $from,
1583     });
1584   };
1585 }
1586
1587 sub _resolve_from {
1588   my ($self, $extra_join) = @_;
1589   my $source = $self->result_source;
1590   my $attrs = $self->{attrs};
1591   
1592   my $from = $attrs->{from}
1593     || [ { $attrs->{alias} => $source->from } ];
1594     
1595   my $seen = { %{$attrs->{seen_join}||{}} };
1596
1597   my $join = ($attrs->{join}
1598                ? [ $attrs->{join}, $extra_join ]
1599                : $extra_join);
1600   $from = [
1601     @$from,
1602     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1603   ];
1604
1605   return ($from,$seen);
1606 }
1607
1608 sub _resolved_attrs {
1609   my $self = shift;
1610   return $self->{_attrs} if $self->{_attrs};
1611
1612   my $attrs = { %{$self->{attrs}||{}} };
1613   my $source = $self->result_source;
1614   my $alias = $attrs->{alias};
1615
1616   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1617   if ($attrs->{columns}) {
1618     delete $attrs->{as};
1619   } elsif (!$attrs->{select}) {
1620     $attrs->{columns} = [ $source->columns ];
1621   }
1622  
1623   $attrs->{select} = 
1624     ($attrs->{select}
1625       ? (ref $attrs->{select} eq 'ARRAY'
1626           ? [ @{$attrs->{select}} ]
1627           : [ $attrs->{select} ])
1628       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1629     );
1630   $attrs->{as} =
1631     ($attrs->{as}
1632       ? (ref $attrs->{as} eq 'ARRAY'
1633           ? [ @{$attrs->{as}} ]
1634           : [ $attrs->{as} ])
1635       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1636     );
1637   
1638   my $adds;
1639   if ($adds = delete $attrs->{include_columns}) {
1640     $adds = [$adds] unless ref $adds eq 'ARRAY';
1641     push(@{$attrs->{select}}, @$adds);
1642     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1643   }
1644   if ($adds = delete $attrs->{'+select'}) {
1645     $adds = [$adds] unless ref $adds eq 'ARRAY';
1646     push(@{$attrs->{select}},
1647            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1648   }
1649   if (my $adds = delete $attrs->{'+as'}) {
1650     $adds = [$adds] unless ref $adds eq 'ARRAY';
1651     push(@{$attrs->{as}}, @$adds);
1652   }
1653
1654   $attrs->{from} ||= [ { 'me' => $source->from } ];
1655
1656   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1657     my $join = delete $attrs->{join} || {};
1658
1659     if (defined $attrs->{prefetch}) {
1660       $join = $self->_merge_attr(
1661         $join, $attrs->{prefetch}
1662       );
1663     }
1664
1665     $attrs->{from} =   # have to copy here to avoid corrupting the original
1666       [
1667         @{$attrs->{from}}, 
1668         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1669       ];
1670   }
1671
1672   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1673   if ($attrs->{order_by}) {
1674     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1675                            ? [ @{$attrs->{order_by}} ]
1676                            : [ $attrs->{order_by} ]);
1677   } else {
1678     $attrs->{order_by} = [];    
1679   }
1680
1681   my $collapse = $attrs->{collapse} || {};
1682   if (my $prefetch = delete $attrs->{prefetch}) {
1683     $prefetch = $self->_merge_attr({}, $prefetch);
1684     my @pre_order;
1685     my $seen = $attrs->{seen_join} || {};
1686     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1687       # bring joins back to level of current class
1688       my @prefetch = $source->resolve_prefetch(
1689         $p, $alias, $seen, \@pre_order, $collapse
1690       );
1691       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1692       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1693     }
1694     push(@{$attrs->{order_by}}, @pre_order);
1695   }
1696   $attrs->{collapse} = $collapse;
1697
1698   return $self->{_attrs} = $attrs;
1699 }
1700
1701 sub _merge_attr {
1702   my ($self, $a, $b) = @_;
1703   return $b unless defined($a);
1704   return $a unless defined($b);
1705   
1706   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1707     foreach my $key (keys %{$b}) {
1708       if (exists $a->{$key}) {
1709         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1710       } else {
1711         $a->{$key} = $b->{$key};
1712       }
1713     }
1714     return $a;
1715   } else {
1716     $a = [$a] unless ref $a eq 'ARRAY';
1717     $b = [$b] unless ref $b eq 'ARRAY';
1718
1719     my $hash = {};
1720     my @array;
1721     foreach my $x ($a, $b) {
1722       foreach my $element (@{$x}) {
1723         if (ref $element eq 'HASH') {
1724           $hash = $self->_merge_attr($hash, $element);
1725         } elsif (ref $element eq 'ARRAY') {
1726           push(@array, @{$element});
1727         } else {
1728           push(@array, $element) unless $b == $x
1729             && grep { $_ eq $element } @array;
1730         }
1731       }
1732     }
1733     
1734     @array = grep { !exists $hash->{$_} } @array;
1735
1736     return keys %{$hash}
1737       ? ( scalar(@array)
1738             ? [$hash, @array]
1739             : $hash
1740         )
1741       : \@array;
1742   }
1743 }
1744
1745 sub result_source {
1746     my $self = shift;
1747
1748     if (@_) {
1749         $self->_source_handle($_[0]->handle);
1750     } else {
1751         $self->_source_handle->resolve;
1752     }
1753 }
1754
1755 =head2 throw_exception
1756
1757 See L<DBIx::Class::Schema/throw_exception> for details.
1758
1759 =cut
1760
1761 sub throw_exception {
1762   my $self=shift;
1763   $self->_source_handle->schema->throw_exception(@_);
1764 }
1765
1766 # XXX: FIXME: Attributes docs need clearing up
1767
1768 =head1 ATTRIBUTES
1769
1770 The resultset takes various attributes that modify its behavior. Here's an
1771 overview of them:
1772
1773 =head2 order_by
1774
1775 =over 4
1776
1777 =item Value: ($order_by | \@order_by)
1778
1779 =back
1780
1781 Which column(s) to order the results by. This is currently passed
1782 through directly to SQL, so you can give e.g. C<year DESC> for a
1783 descending order on the column `year'.
1784
1785 Please note that if you have C<quote_char> enabled (see
1786 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1787 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1788 so you will need to manually quote things as appropriate.)
1789
1790 =head2 columns
1791
1792 =over 4
1793
1794 =item Value: \@columns
1795
1796 =back
1797
1798 Shortcut to request a particular set of columns to be retrieved.  Adds
1799 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1800 from that, then auto-populates C<as> from C<select> as normal. (You may also
1801 use the C<cols> attribute, as in earlier versions of DBIC.)
1802
1803 =head2 include_columns
1804
1805 =over 4
1806
1807 =item Value: \@columns
1808
1809 =back
1810
1811 Shortcut to include additional columns in the returned results - for example
1812
1813   $schema->resultset('CD')->search(undef, {
1814     include_columns => ['artist.name'],
1815     join => ['artist']
1816   });
1817
1818 would return all CDs and include a 'name' column to the information
1819 passed to object inflation. Note that the 'artist' is the name of the
1820 column (or relationship) accessor, and 'name' is the name of the column
1821 accessor in the related table.
1822
1823 =head2 select
1824
1825 =over 4
1826
1827 =item Value: \@select_columns
1828
1829 =back
1830
1831 Indicates which columns should be selected from the storage. You can use
1832 column names, or in the case of RDBMS back ends, function or stored procedure
1833 names:
1834
1835   $rs = $schema->resultset('Employee')->search(undef, {
1836     select => [
1837       'name',
1838       { count => 'employeeid' },
1839       { sum => 'salary' }
1840     ]
1841   });
1842
1843 When you use function/stored procedure names and do not supply an C<as>
1844 attribute, the column names returned are storage-dependent. E.g. MySQL would
1845 return a column named C<count(employeeid)> in the above example.
1846
1847 =head2 +select
1848
1849 =over 4
1850
1851 Indicates additional columns to be selected from storage.  Works the same as
1852 L<select> but adds columns to the selection.
1853
1854 =back
1855
1856 =head2 +as
1857
1858 =over 4
1859
1860 Indicates additional column names for those added via L<+select>.
1861
1862 =back
1863
1864 =head2 as
1865
1866 =over 4
1867
1868 =item Value: \@inflation_names
1869
1870 =back
1871
1872 Indicates column names for object inflation. That is, c< as >
1873 indicates the name that the column can be accessed as via the
1874 C<get_column> method (or via the object accessor, B<if one already
1875 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
1876 >.
1877
1878 The C< as > attribute is used in conjunction with C<select>,
1879 usually when C<select> contains one or more function or stored
1880 procedure names:
1881
1882   $rs = $schema->resultset('Employee')->search(undef, {
1883     select => [
1884       'name',
1885       { count => 'employeeid' }
1886     ],
1887     as => ['name', 'employee_count'],
1888   });
1889
1890   my $employee = $rs->first(); # get the first Employee
1891
1892 If the object against which the search is performed already has an accessor
1893 matching a column name specified in C<as>, the value can be retrieved using
1894 the accessor as normal:
1895
1896   my $name = $employee->name();
1897
1898 If on the other hand an accessor does not exist in the object, you need to
1899 use C<get_column> instead:
1900
1901   my $employee_count = $employee->get_column('employee_count');
1902
1903 You can create your own accessors if required - see
1904 L<DBIx::Class::Manual::Cookbook> for details.
1905
1906 Please note: This will NOT insert an C<AS employee_count> into the SQL
1907 statement produced, it is used for internal access only. Thus
1908 attempting to use the accessor in an C<order_by> clause or similar
1909 will fail miserably.
1910
1911 To get around this limitation, you can supply literal SQL to your
1912 C<select> attibute that contains the C<AS alias> text, eg:
1913
1914   select => [\'myfield AS alias']
1915
1916 =head2 join
1917
1918 =over 4
1919
1920 =item Value: ($rel_name | \@rel_names | \%rel_names)
1921
1922 =back
1923
1924 Contains a list of relationships that should be joined for this query.  For
1925 example:
1926
1927   # Get CDs by Nine Inch Nails
1928   my $rs = $schema->resultset('CD')->search(
1929     { 'artist.name' => 'Nine Inch Nails' },
1930     { join => 'artist' }
1931   );
1932
1933 Can also contain a hash reference to refer to the other relation's relations.
1934 For example:
1935
1936   package MyApp::Schema::Track;
1937   use base qw/DBIx::Class/;
1938   __PACKAGE__->table('track');
1939   __PACKAGE__->add_columns(qw/trackid cd position title/);
1940   __PACKAGE__->set_primary_key('trackid');
1941   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1942   1;
1943
1944   # In your application
1945   my $rs = $schema->resultset('Artist')->search(
1946     { 'track.title' => 'Teardrop' },
1947     {
1948       join     => { cd => 'track' },
1949       order_by => 'artist.name',
1950     }
1951   );
1952
1953 You need to use the relationship (not the table) name in  conditions, 
1954 because they are aliased as such. The current table is aliased as "me", so 
1955 you need to use me.column_name in order to avoid ambiguity. For example:
1956
1957   # Get CDs from 1984 with a 'Foo' track 
1958   my $rs = $schema->resultset('CD')->search(
1959     { 
1960       'me.year' => 1984,
1961       'tracks.name' => 'Foo'
1962     },
1963     { join => 'tracks' }
1964   );
1965   
1966 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1967 similarly for a third time). For e.g.
1968
1969   my $rs = $schema->resultset('Artist')->search({
1970     'cds.title'   => 'Down to Earth',
1971     'cds_2.title' => 'Popular',
1972   }, {
1973     join => [ qw/cds cds/ ],
1974   });
1975
1976 will return a set of all artists that have both a cd with title 'Down
1977 to Earth' and a cd with title 'Popular'.
1978
1979 If you want to fetch related objects from other tables as well, see C<prefetch>
1980 below.
1981
1982 =head2 prefetch
1983
1984 =over 4
1985
1986 =item Value: ($rel_name | \@rel_names | \%rel_names)
1987
1988 =back
1989
1990 Contains one or more relationships that should be fetched along with the main
1991 query (when they are accessed afterwards they will have already been
1992 "prefetched").  This is useful for when you know you will need the related
1993 objects, because it saves at least one query:
1994
1995   my $rs = $schema->resultset('Tag')->search(
1996     undef,
1997     {
1998       prefetch => {
1999         cd => 'artist'
2000       }
2001     }
2002   );
2003
2004 The initial search results in SQL like the following:
2005
2006   SELECT tag.*, cd.*, artist.* FROM tag
2007   JOIN cd ON tag.cd = cd.cdid
2008   JOIN artist ON cd.artist = artist.artistid
2009
2010 L<DBIx::Class> has no need to go back to the database when we access the
2011 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2012 case.
2013
2014 Simple prefetches will be joined automatically, so there is no need
2015 for a C<join> attribute in the above search. If you're prefetching to
2016 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2017 specify the join as well.
2018
2019 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2020 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2021 with an accessor type of 'single' or 'filter').
2022
2023 =head2 page
2024
2025 =over 4
2026
2027 =item Value: $page
2028
2029 =back
2030
2031 Makes the resultset paged and specifies the page to retrieve. Effectively
2032 identical to creating a non-pages resultset and then calling ->page($page)
2033 on it.
2034
2035 If L<rows> attribute is not specified it defualts to 10 rows per page.
2036
2037 =head2 rows
2038
2039 =over 4
2040
2041 =item Value: $rows
2042
2043 =back
2044
2045 Specifes the maximum number of rows for direct retrieval or the number of
2046 rows per page if the page attribute or method is used.
2047
2048 =head2 offset
2049
2050 =over 4
2051
2052 =item Value: $offset
2053
2054 =back
2055
2056 Specifies the (zero-based) row number for the  first row to be returned, or the
2057 of the first row of the first page if paging is used.
2058
2059 =head2 group_by
2060
2061 =over 4
2062
2063 =item Value: \@columns
2064
2065 =back
2066
2067 A arrayref of columns to group by. Can include columns of joined tables.
2068
2069   group_by => [qw/ column1 column2 ... /]
2070
2071 =head2 having
2072
2073 =over 4
2074
2075 =item Value: $condition
2076
2077 =back
2078
2079 HAVING is a select statement attribute that is applied between GROUP BY and
2080 ORDER BY. It is applied to the after the grouping calculations have been
2081 done.
2082
2083   having => { 'count(employee)' => { '>=', 100 } }
2084
2085 =head2 distinct
2086
2087 =over 4
2088
2089 =item Value: (0 | 1)
2090
2091 =back
2092
2093 Set to 1 to group by all columns.
2094
2095 =head2 where
2096
2097 =over 4
2098
2099 Adds to the WHERE clause.
2100
2101   # only return rows WHERE deleted IS NULL for all searches
2102   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2103
2104 Can be overridden by passing C<{ where => undef }> as an attribute
2105 to a resulset.
2106
2107 =back
2108
2109 =head2 cache
2110
2111 Set to 1 to cache search results. This prevents extra SQL queries if you
2112 revisit rows in your ResultSet:
2113
2114   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2115
2116   while( my $artist = $resultset->next ) {
2117     ... do stuff ...
2118   }
2119
2120   $rs->first; # without cache, this would issue a query
2121
2122 By default, searches are not cached.
2123
2124 For more examples of using these attributes, see
2125 L<DBIx::Class::Manual::Cookbook>.
2126
2127 =head2 from
2128
2129 =over 4
2130
2131 =item Value: \@from_clause
2132
2133 =back
2134
2135 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2136 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2137 clauses.
2138
2139 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2140
2141 C<join> will usually do what you need and it is strongly recommended that you
2142 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2143 And we really do mean "cannot", not just tried and failed. Attempting to use
2144 this because you're having problems with C<join> is like trying to use x86
2145 ASM because you've got a syntax error in your C. Trust us on this.
2146
2147 Now, if you're still really, really sure you need to use this (and if you're
2148 not 100% sure, ask the mailing list first), here's an explanation of how this
2149 works.
2150
2151 The syntax is as follows -
2152
2153   [
2154     { <alias1> => <table1> },
2155     [
2156       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2157       [], # nested JOIN (optional)
2158       { <table1.column1> => <table2.column2>, ... (more conditions) },
2159     ],
2160     # More of the above [ ] may follow for additional joins
2161   ]
2162
2163   <table1> <alias1>
2164   JOIN
2165     <table2> <alias2>
2166     [JOIN ...]
2167   ON <table1.column1> = <table2.column2>
2168   <more joins may follow>
2169
2170 An easy way to follow the examples below is to remember the following:
2171
2172     Anything inside "[]" is a JOIN
2173     Anything inside "{}" is a condition for the enclosing JOIN
2174
2175 The following examples utilize a "person" table in a family tree application.
2176 In order to express parent->child relationships, this table is self-joined:
2177
2178     # Person->belongs_to('father' => 'Person');
2179     # Person->belongs_to('mother' => 'Person');
2180
2181 C<from> can be used to nest joins. Here we return all children with a father,
2182 then search against all mothers of those children:
2183
2184   $rs = $schema->resultset('Person')->search(
2185       undef,
2186       {
2187           alias => 'mother', # alias columns in accordance with "from"
2188           from => [
2189               { mother => 'person' },
2190               [
2191                   [
2192                       { child => 'person' },
2193                       [
2194                           { father => 'person' },
2195                           { 'father.person_id' => 'child.father_id' }
2196                       ]
2197                   ],
2198                   { 'mother.person_id' => 'child.mother_id' }
2199               ],
2200           ]
2201       },
2202   );
2203
2204   # Equivalent SQL:
2205   # SELECT mother.* FROM person mother
2206   # JOIN (
2207   #   person child
2208   #   JOIN person father
2209   #   ON ( father.person_id = child.father_id )
2210   # )
2211   # ON ( mother.person_id = child.mother_id )
2212
2213 The type of any join can be controlled manually. To search against only people
2214 with a father in the person table, we could explicitly use C<INNER JOIN>:
2215
2216     $rs = $schema->resultset('Person')->search(
2217         undef,
2218         {
2219             alias => 'child', # alias columns in accordance with "from"
2220             from => [
2221                 { child => 'person' },
2222                 [
2223                     { father => 'person', -join_type => 'inner' },
2224                     { 'father.id' => 'child.father_id' }
2225                 ],
2226             ]
2227         },
2228     );
2229
2230     # Equivalent SQL:
2231     # SELECT child.* FROM person child
2232     # INNER JOIN person father ON child.father_id = father.id
2233
2234 =cut
2235
2236 1;