Unbreak back-compat
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see
139 L</ATTRIBUTES>. For more examples of using this function, see
140 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
141 documentation for the first argument, see L<SQL::Abstract>.
142
143 =cut
144
145 sub search {
146   my $self = shift;
147   my $rs = $self->search_rs( @_ );
148   return (wantarray ? $rs->all : $rs);
149 }
150
151 =head2 search_rs
152
153 =over 4
154
155 =item Arguments: $cond, \%attrs?
156
157 =item Return Value: $resultset
158
159 =back
160
161 This method does the same exact thing as search() except it will
162 always return a resultset, even in list context.
163
164 =cut
165
166 sub search_rs {
167   my $self = shift;
168
169   my $rows;
170
171   unless (@_) {                 # no search, effectively just a clone
172     $rows = $self->get_cache;
173   }
174
175   my $attrs = {};
176   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
177   my $our_attrs = { %{$self->{attrs}} };
178   my $having = delete $our_attrs->{having};
179   my $where = delete $our_attrs->{where};
180
181   my $new_attrs = { %{$our_attrs}, %{$attrs} };
182
183   # merge new attrs into inherited
184   foreach my $key (qw/join prefetch/) {
185     next unless exists $attrs->{$key};
186     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
187   }
188
189   my $cond = (@_
190     ? (
191         (@_ == 1 || ref $_[0] eq "HASH")
192           ? (
193               (ref $_[0] eq 'HASH')
194                 ? (
195                     (keys %{ $_[0] }  > 0)
196                       ? shift
197                       : undef
198                    )
199                 :  shift
200              )
201           : (
202               (@_ % 2)
203                 ? $self->throw_exception("Odd number of arguments to search")
204                 : {@_}
205              )
206       )
207     : undef
208   );
209
210   if (defined $where) {
211     $new_attrs->{where} = (
212       defined $new_attrs->{where}
213         ? { '-and' => [
214               map {
215                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
216               } $where, $new_attrs->{where}
217             ]
218           }
219         : $where);
220   }
221
222   if (defined $cond) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $cond, $new_attrs->{where}
229             ]
230           }
231         : $cond);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->_source_handle, $new_attrs);
247   if ($rows) {
248     $rs->set_cache($rows);
249   }
250   return $rs;
251 }
252
253 =head2 search_literal
254
255 =over 4
256
257 =item Arguments: $sql_fragment, @bind_values
258
259 =item Return Value: $resultset (scalar context), @row_objs (list context)
260
261 =back
262
263   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
264   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
265
266 Pass a literal chunk of SQL to be added to the conditional part of the
267 resultset query.
268
269 =cut
270
271 sub search_literal {
272   my ($self, $cond, @vals) = @_;
273   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
274   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
275   return $self->search(\$cond, $attrs);
276 }
277
278 =head2 find
279
280 =over 4
281
282 =item Arguments: @values | \%cols, \%attrs?
283
284 =item Return Value: $row_object
285
286 =back
287
288 Finds a row based on its primary key or unique constraint. For example, to find
289 a row by its primary key:
290
291   my $cd = $schema->resultset('CD')->find(5);
292
293 You can also find a row by a specific unique constraint using the C<key>
294 attribute. For example:
295
296   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
297     key => 'cd_artist_title'
298   });
299
300 Additionally, you can specify the columns explicitly by name:
301
302   my $cd = $schema->resultset('CD')->find(
303     {
304       artist => 'Massive Attack',
305       title  => 'Mezzanine',
306     },
307     { key => 'cd_artist_title' }
308   );
309
310 If the C<key> is specified as C<primary>, it searches only on the primary key.
311
312 If no C<key> is specified, it searches on all unique constraints defined on the
313 source, including the primary key.
314
315 If your table does not have a primary key, you B<must> provide a value for the
316 C<key> attribute matching one of the unique constraints on the source.
317
318 See also L</find_or_create> and L</update_or_create>. For information on how to
319 declare unique constraints, see
320 L<DBIx::Class::ResultSource/add_unique_constraint>.
321
322 =cut
323
324 sub find {
325   my $self = shift;
326   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
327
328   # Default to the primary key, but allow a specific key
329   my @cols = exists $attrs->{key}
330     ? $self->result_source->unique_constraint_columns($attrs->{key})
331     : $self->result_source->primary_columns;
332   $self->throw_exception(
333     "Can't find unless a primary key is defined or unique constraint is specified"
334   ) unless @cols;
335
336   # Parse out a hashref from input
337   my $input_query;
338   if (ref $_[0] eq 'HASH') {
339     $input_query = { %{$_[0]} };
340   }
341   elsif (@_ == @cols) {
342     $input_query = {};
343     @{$input_query}{@cols} = @_;
344   }
345   else {
346     # Compatibility: Allow e.g. find(id => $value)
347     carp "Find by key => value deprecated; please use a hashref instead";
348     $input_query = {@_};
349   }
350
351   my (%related, $info);
352
353   foreach my $key (keys %$input_query) {
354     if (ref($input_query->{$key})
355         && ($info = $self->result_source->relationship_info($key))) {
356       my $rel_q = $self->result_source->resolve_condition(
357                     $info->{cond}, delete $input_query->{$key}, $key
358                   );
359       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
360       @related{keys %$rel_q} = values %$rel_q;
361     }
362   }
363   if (my @keys = keys %related) {
364     @{$input_query}{@keys} = values %related;
365   }
366
367   my @unique_queries = $self->_unique_queries($input_query, $attrs);
368
369   # Build the final query: Default to the disjunction of the unique queries,
370   # but allow the input query in case the ResultSet defines the query or the
371   # user is abusing find
372   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
373   my $query = @unique_queries
374     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
375     : $self->_add_alias($input_query, $alias);
376
377   # Run the query
378   if (keys %$attrs) {
379     my $rs = $self->search($query, $attrs);
380     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
381   }
382   else {
383     return keys %{$self->_resolved_attrs->{collapse}}
384       ? $self->search($query)->next
385       : $self->single($query);
386   }
387 }
388
389 # _add_alias
390 #
391 # Add the specified alias to the specified query hash. A copy is made so the
392 # original query is not modified.
393
394 sub _add_alias {
395   my ($self, $query, $alias) = @_;
396
397   my %aliased = %$query;
398   foreach my $col (grep { ! m/\./ } keys %aliased) {
399     $aliased{"$alias.$col"} = delete $aliased{$col};
400   }
401
402   return \%aliased;
403 }
404
405 # _unique_queries
406 #
407 # Build a list of queries which satisfy unique constraints.
408
409 sub _unique_queries {
410   my ($self, $query, $attrs) = @_;
411
412   my @constraint_names = exists $attrs->{key}
413     ? ($attrs->{key})
414     : $self->result_source->unique_constraint_names;
415
416   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
417   my $num_where = scalar keys %$where;
418
419   my @unique_queries;
420   foreach my $name (@constraint_names) {
421     my @unique_cols = $self->result_source->unique_constraint_columns($name);
422     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
423
424     my $num_cols = scalar @unique_cols;
425     my $num_query = scalar keys %$unique_query;
426
427     my $total = $num_query + $num_where;
428     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
429       # The query is either unique on its own or is unique in combination with
430       # the existing where clause
431       push @unique_queries, $unique_query;
432     }
433   }
434
435   return @unique_queries;
436 }
437
438 # _build_unique_query
439 #
440 # Constrain the specified query hash based on the specified column names.
441
442 sub _build_unique_query {
443   my ($self, $query, $unique_cols) = @_;
444
445   return {
446     map  { $_ => $query->{$_} }
447     grep { exists $query->{$_} }
448       @$unique_cols
449   };
450 }
451
452 =head2 search_related
453
454 =over 4
455
456 =item Arguments: $rel, $cond, \%attrs?
457
458 =item Return Value: $new_resultset
459
460 =back
461
462   $new_rs = $cd_rs->search_related('artist', {
463     name => 'Emo-R-Us',
464   });
465
466 Searches the specified relationship, optionally specifying a condition and
467 attributes for matching records. See L</ATTRIBUTES> for more information.
468
469 =cut
470
471 sub search_related {
472   return shift->related_resultset(shift)->search(@_);
473 }
474
475 =head2 cursor
476
477 =over 4
478
479 =item Arguments: none
480
481 =item Return Value: $cursor
482
483 =back
484
485 Returns a storage-driven cursor to the given resultset. See
486 L<DBIx::Class::Cursor> for more information.
487
488 =cut
489
490 sub cursor {
491   my ($self) = @_;
492
493   my $attrs = { %{$self->_resolved_attrs} };
494   return $self->{cursor}
495     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
496           $attrs->{where},$attrs);
497 }
498
499 =head2 single
500
501 =over 4
502
503 =item Arguments: $cond?
504
505 =item Return Value: $row_object?
506
507 =back
508
509   my $cd = $schema->resultset('CD')->single({ year => 2001 });
510
511 Inflates the first result without creating a cursor if the resultset has
512 any records in it; if not returns nothing. Used by L</find> as an optimisation.
513
514 Can optionally take an additional condition *only* - this is a fast-code-path
515 method; if you need to add extra joins or similar call ->search and then
516 ->single without a condition on the $rs returned from that.
517
518 =cut
519
520 sub single {
521   my ($self, $where) = @_;
522   my $attrs = { %{$self->_resolved_attrs} };
523   if ($where) {
524     if (defined $attrs->{where}) {
525       $attrs->{where} = {
526         '-and' =>
527             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
528                $where, delete $attrs->{where} ]
529       };
530     } else {
531       $attrs->{where} = $where;
532     }
533   }
534
535 #  XXX: Disabled since it doesn't infer uniqueness in all cases
536 #  unless ($self->_is_unique_query($attrs->{where})) {
537 #    carp "Query not guaranteed to return a single row"
538 #      . "; please declare your unique constraints or use search instead";
539 #  }
540
541   my @data = $self->result_source->storage->select_single(
542     $attrs->{from}, $attrs->{select},
543     $attrs->{where}, $attrs
544   );
545
546   return (@data ? ($self->_construct_object(@data))[0] : ());
547 }
548
549 # _is_unique_query
550 #
551 # Try to determine if the specified query is guaranteed to be unique, based on
552 # the declared unique constraints.
553
554 sub _is_unique_query {
555   my ($self, $query) = @_;
556
557   my $collapsed = $self->_collapse_query($query);
558   my $alias = $self->{attrs}{alias};
559
560   foreach my $name ($self->result_source->unique_constraint_names) {
561     my @unique_cols = map {
562       "$alias.$_"
563     } $self->result_source->unique_constraint_columns($name);
564
565     # Count the values for each unique column
566     my %seen = map { $_ => 0 } @unique_cols;
567
568     foreach my $key (keys %$collapsed) {
569       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
570       next unless exists $seen{$aliased};  # Additional constraints are okay
571       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
572     }
573
574     # If we get 0 or more than 1 value for a column, it's not necessarily unique
575     return 1 unless grep { $_ != 1 } values %seen;
576   }
577
578   return 0;
579 }
580
581 # _collapse_query
582 #
583 # Recursively collapse the query, accumulating values for each column.
584
585 sub _collapse_query {
586   my ($self, $query, $collapsed) = @_;
587
588   $collapsed ||= {};
589
590   if (ref $query eq 'ARRAY') {
591     foreach my $subquery (@$query) {
592       next unless ref $subquery;  # -or
593 #      warn "ARRAY: " . Dumper $subquery;
594       $collapsed = $self->_collapse_query($subquery, $collapsed);
595     }
596   }
597   elsif (ref $query eq 'HASH') {
598     if (keys %$query and (keys %$query)[0] eq '-and') {
599       foreach my $subquery (@{$query->{-and}}) {
600 #        warn "HASH: " . Dumper $subquery;
601         $collapsed = $self->_collapse_query($subquery, $collapsed);
602       }
603     }
604     else {
605 #      warn "LEAF: " . Dumper $query;
606       foreach my $col (keys %$query) {
607         my $value = $query->{$col};
608         $collapsed->{$col}{$value}++;
609       }
610     }
611   }
612
613   return $collapsed;
614 }
615
616 =head2 get_column
617
618 =over 4
619
620 =item Arguments: $cond?
621
622 =item Return Value: $resultsetcolumn
623
624 =back
625
626   my $max_length = $rs->get_column('length')->max;
627
628 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
629
630 =cut
631
632 sub get_column {
633   my ($self, $column) = @_;
634   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
635   return $new;
636 }
637
638 =head2 search_like
639
640 =over 4
641
642 =item Arguments: $cond, \%attrs?
643
644 =item Return Value: $resultset (scalar context), @row_objs (list context)
645
646 =back
647
648   # WHERE title LIKE '%blue%'
649   $cd_rs = $rs->search_like({ title => '%blue%'});
650
651 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
652 that this is simply a convenience method. You most likely want to use
653 L</search> with specific operators.
654
655 For more information, see L<DBIx::Class::Manual::Cookbook>.
656
657 =cut
658
659 sub search_like {
660   my $class = shift;
661   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
662   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
663   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
664   return $class->search($query, { %$attrs });
665 }
666
667 =head2 slice
668
669 =over 4
670
671 =item Arguments: $first, $last
672
673 =item Return Value: $resultset (scalar context), @row_objs (list context)
674
675 =back
676
677 Returns a resultset or object list representing a subset of elements from the
678 resultset slice is called on. Indexes are from 0, i.e., to get the first
679 three records, call:
680
681   my ($one, $two, $three) = $rs->slice(0, 2);
682
683 =cut
684
685 sub slice {
686   my ($self, $min, $max) = @_;
687   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
688   $attrs->{offset} = $self->{attrs}{offset} || 0;
689   $attrs->{offset} += $min;
690   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
691   return $self->search(undef(), $attrs);
692   #my $slice = (ref $self)->new($self->result_source, $attrs);
693   #return (wantarray ? $slice->all : $slice);
694 }
695
696 =head2 next
697
698 =over 4
699
700 =item Arguments: none
701
702 =item Return Value: $result?
703
704 =back
705
706 Returns the next element in the resultset (C<undef> is there is none).
707
708 Can be used to efficiently iterate over records in the resultset:
709
710   my $rs = $schema->resultset('CD')->search;
711   while (my $cd = $rs->next) {
712     print $cd->title;
713   }
714
715 Note that you need to store the resultset object, and call C<next> on it.
716 Calling C<< resultset('Table')->next >> repeatedly will always return the
717 first record from the resultset.
718
719 =cut
720
721 sub next {
722   my ($self) = @_;
723   if (my $cache = $self->get_cache) {
724     $self->{all_cache_position} ||= 0;
725     return $cache->[$self->{all_cache_position}++];
726   }
727   if ($self->{attrs}{cache}) {
728     $self->{all_cache_position} = 1;
729     return ($self->all)[0];
730   }
731   if ($self->{stashed_objects}) {
732     my $obj = shift(@{$self->{stashed_objects}});
733     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
734     return $obj;
735   }
736   my @row = (
737     exists $self->{stashed_row}
738       ? @{delete $self->{stashed_row}}
739       : $self->cursor->next
740   );
741   return unless (@row);
742   my ($row, @more) = $self->_construct_object(@row);
743   $self->{stashed_objects} = \@more if @more;
744   return $row;
745 }
746
747 sub _construct_object {
748   my ($self, @row) = @_;
749   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
750   my @new = $self->result_class->inflate_result($self->_source_handle, @$info);
751   @new = $self->{_attrs}{record_filter}->(@new)
752     if exists $self->{_attrs}{record_filter};
753   return @new;
754 }
755
756 sub _collapse_result {
757   my ($self, $as, $row, $prefix) = @_;
758
759   my %const;
760   my @copy = @$row;
761   
762   foreach my $this_as (@$as) {
763     my $val = shift @copy;
764     if (defined $prefix) {
765       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
766         my $remain = $1;
767         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
768         $const{$1||''}{$2} = $val;
769       }
770     } else {
771       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
772       $const{$1||''}{$2} = $val;
773     }
774   }
775
776   my $alias = $self->{attrs}{alias};
777   my $info = [ {}, {} ];
778   foreach my $key (keys %const) {
779     if (length $key && $key ne $alias) {
780       my $target = $info;
781       my @parts = split(/\./, $key);
782       foreach my $p (@parts) {
783         $target = $target->[1]->{$p} ||= [];
784       }
785       $target->[0] = $const{$key};
786     } else {
787       $info->[0] = $const{$key};
788     }
789   }
790   
791   my @collapse;
792   if (defined $prefix) {
793     @collapse = map {
794         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
795     } keys %{$self->{_attrs}{collapse}}
796   } else {
797     @collapse = keys %{$self->{_attrs}{collapse}};
798   };
799
800   if (@collapse) {
801     my ($c) = sort { length $a <=> length $b } @collapse;
802     my $target = $info;
803     foreach my $p (split(/\./, $c)) {
804       $target = $target->[1]->{$p} ||= [];
805     }
806     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
807     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
808     my $tree = $self->_collapse_result($as, $row, $c_prefix);
809     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
810     my (@final, @raw);
811
812     while (
813       !(
814         grep {
815           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
816         } @co_key
817         )
818     ) {
819       push(@final, $tree);
820       last unless (@raw = $self->cursor->next);
821       $row = $self->{stashed_row} = \@raw;
822       $tree = $self->_collapse_result($as, $row, $c_prefix);
823     }
824     @$target = (@final ? @final : [ {}, {} ]);
825       # single empty result to indicate an empty prefetched has_many
826   }
827
828   #print "final info: " . Dumper($info);
829   return $info;
830 }
831
832 =head2 result_source
833
834 =over 4
835
836 =item Arguments: $result_source?
837
838 =item Return Value: $result_source
839
840 =back
841
842 An accessor for the primary ResultSource object from which this ResultSet
843 is derived.
844
845 =head2 result_class
846
847 =over 4
848
849 =item Arguments: $result_class?
850
851 =item Return Value: $result_class
852
853 =back
854
855 An accessor for the class to use when creating row objects. Defaults to 
856 C<< result_source->result_class >> - which in most cases is the name of the 
857 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
858
859 =cut
860
861
862 =head2 count
863
864 =over 4
865
866 =item Arguments: $cond, \%attrs??
867
868 =item Return Value: $count
869
870 =back
871
872 Performs an SQL C<COUNT> with the same query as the resultset was built
873 with to find the number of elements. If passed arguments, does a search
874 on the resultset and counts the results of that.
875
876 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
877 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
878 not support C<DISTINCT> with multiple columns. If you are using such a
879 database, you should only use columns from the main table in your C<group_by>
880 clause.
881
882 =cut
883
884 sub count {
885   my $self = shift;
886   return $self->search(@_)->count if @_ and defined $_[0];
887   return scalar @{ $self->get_cache } if $self->get_cache;
888   my $count = $self->_count;
889   return 0 unless $count;
890
891   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
892   $count = $self->{attrs}{rows} if
893     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
894   return $count;
895 }
896
897 sub _count { # Separated out so pager can get the full count
898   my $self = shift;
899   my $select = { count => '*' };
900
901   my $attrs = { %{$self->_resolved_attrs} };
902   if (my $group_by = delete $attrs->{group_by}) {
903     delete $attrs->{having};
904     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
905     # todo: try CONCAT for multi-column pk
906     my @pk = $self->result_source->primary_columns;
907     if (@pk == 1) {
908       my $alias = $attrs->{alias};
909       foreach my $column (@distinct) {
910         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
911           @distinct = ($column);
912           last;
913         }
914       }
915     }
916
917     $select = { count => { distinct => \@distinct } };
918   }
919
920   $attrs->{select} = $select;
921   $attrs->{as} = [qw/count/];
922
923   # offset, order by and page are not needed to count. record_filter is cdbi
924   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
925
926   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
927   my ($count) = $tmp_rs->cursor->next;
928   return $count;
929 }
930
931 =head2 count_literal
932
933 =over 4
934
935 =item Arguments: $sql_fragment, @bind_values
936
937 =item Return Value: $count
938
939 =back
940
941 Counts the results in a literal query. Equivalent to calling L</search_literal>
942 with the passed arguments, then L</count>.
943
944 =cut
945
946 sub count_literal { shift->search_literal(@_)->count; }
947
948 =head2 all
949
950 =over 4
951
952 =item Arguments: none
953
954 =item Return Value: @objects
955
956 =back
957
958 Returns all elements in the resultset. Called implicitly if the resultset
959 is returned in list context.
960
961 =cut
962
963 sub all {
964   my ($self) = @_;
965   return @{ $self->get_cache } if $self->get_cache;
966
967   my @obj;
968
969   # TODO: don't call resolve here
970   if (keys %{$self->_resolved_attrs->{collapse}}) {
971 #  if ($self->{attrs}{prefetch}) {
972       # Using $self->cursor->all is really just an optimisation.
973       # If we're collapsing has_many prefetches it probably makes
974       # very little difference, and this is cleaner than hacking
975       # _construct_object to survive the approach
976     my @row = $self->cursor->next;
977     while (@row) {
978       push(@obj, $self->_construct_object(@row));
979       @row = (exists $self->{stashed_row}
980                ? @{delete $self->{stashed_row}}
981                : $self->cursor->next);
982     }
983   } else {
984     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
985   }
986
987   $self->set_cache(\@obj) if $self->{attrs}{cache};
988   return @obj;
989 }
990
991 =head2 reset
992
993 =over 4
994
995 =item Arguments: none
996
997 =item Return Value: $self
998
999 =back
1000
1001 Resets the resultset's cursor, so you can iterate through the elements again.
1002
1003 =cut
1004
1005 sub reset {
1006   my ($self) = @_;
1007   delete $self->{_attrs} if exists $self->{_attrs};
1008   $self->{all_cache_position} = 0;
1009   $self->cursor->reset;
1010   return $self;
1011 }
1012
1013 =head2 first
1014
1015 =over 4
1016
1017 =item Arguments: none
1018
1019 =item Return Value: $object?
1020
1021 =back
1022
1023 Resets the resultset and returns an object for the first result (if the
1024 resultset returns anything).
1025
1026 =cut
1027
1028 sub first {
1029   return $_[0]->reset->next;
1030 }
1031
1032 # _cond_for_update_delete
1033 #
1034 # update/delete require the condition to be modified to handle
1035 # the differing SQL syntax available.  This transforms the $self->{cond}
1036 # appropriately, returning the new condition.
1037
1038 sub _cond_for_update_delete {
1039   my ($self, $full_cond) = @_;
1040   my $cond = {};
1041
1042   $full_cond ||= $self->{cond};
1043   # No-op. No condition, we're updating/deleting everything
1044   return $cond unless ref $full_cond;
1045
1046   if (ref $full_cond eq 'ARRAY') {
1047     $cond = [
1048       map {
1049         my %hash;
1050         foreach my $key (keys %{$_}) {
1051           $key =~ /([^.]+)$/;
1052           $hash{$1} = $_->{$key};
1053         }
1054         \%hash;
1055       } @{$full_cond}
1056     ];
1057   }
1058   elsif (ref $full_cond eq 'HASH') {
1059     if ((keys %{$full_cond})[0] eq '-and') {
1060       $cond->{-and} = [];
1061
1062       my @cond = @{$full_cond->{-and}};
1063       for (my $i = 0; $i < @cond; $i++) {
1064         my $entry = $cond[$i];
1065
1066         my $hash;
1067         if (ref $entry eq 'HASH') {
1068           $hash = $self->_cond_for_update_delete($entry);
1069         }
1070         else {
1071           $entry =~ /([^.]+)$/;
1072           $hash->{$1} = $cond[++$i];
1073         }
1074
1075         push @{$cond->{-and}}, $hash;
1076       }
1077     }
1078     else {
1079       foreach my $key (keys %{$full_cond}) {
1080         $key =~ /([^.]+)$/;
1081         $cond->{$1} = $full_cond->{$key};
1082       }
1083     }
1084   }
1085   else {
1086     $self->throw_exception(
1087       "Can't update/delete on resultset with condition unless hash or array"
1088     );
1089   }
1090
1091   return $cond;
1092 }
1093
1094
1095 =head2 update
1096
1097 =over 4
1098
1099 =item Arguments: \%values
1100
1101 =item Return Value: $storage_rv
1102
1103 =back
1104
1105 Sets the specified columns in the resultset to the supplied values in a
1106 single query. Return value will be true if the update succeeded or false
1107 if no records were updated; exact type of success value is storage-dependent.
1108
1109 =cut
1110
1111 sub update {
1112   my ($self, $values) = @_;
1113   $self->throw_exception("Values for update must be a hash")
1114     unless ref $values eq 'HASH';
1115
1116   my $cond = $self->_cond_for_update_delete;
1117    
1118   return $self->result_source->storage->update(
1119     $self->result_source, $values, $cond
1120   );
1121 }
1122
1123 =head2 update_all
1124
1125 =over 4
1126
1127 =item Arguments: \%values
1128
1129 =item Return Value: 1
1130
1131 =back
1132
1133 Fetches all objects and updates them one at a time. Note that C<update_all>
1134 will run DBIC cascade triggers, while L</update> will not.
1135
1136 =cut
1137
1138 sub update_all {
1139   my ($self, $values) = @_;
1140   $self->throw_exception("Values for update must be a hash")
1141     unless ref $values eq 'HASH';
1142   foreach my $obj ($self->all) {
1143     $obj->set_columns($values)->update;
1144   }
1145   return 1;
1146 }
1147
1148 =head2 delete
1149
1150 =over 4
1151
1152 =item Arguments: none
1153
1154 =item Return Value: 1
1155
1156 =back
1157
1158 Deletes the contents of the resultset from its result source. Note that this
1159 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1160 to run. See also L<DBIx::Class::Row/delete>.
1161
1162 =cut
1163
1164 sub delete {
1165   my ($self) = @_;
1166
1167   my $cond = $self->_cond_for_update_delete;
1168
1169   $self->result_source->storage->delete($self->result_source, $cond);
1170   return 1;
1171 }
1172
1173 =head2 delete_all
1174
1175 =over 4
1176
1177 =item Arguments: none
1178
1179 =item Return Value: 1
1180
1181 =back
1182
1183 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1184 will run DBIC cascade triggers, while L</delete> will not.
1185
1186 =cut
1187
1188 sub delete_all {
1189   my ($self) = @_;
1190   $_->delete for $self->all;
1191   return 1;
1192 }
1193
1194 =head2 pager
1195
1196 =over 4
1197
1198 =item Arguments: none
1199
1200 =item Return Value: $pager
1201
1202 =back
1203
1204 Return Value a L<Data::Page> object for the current resultset. Only makes
1205 sense for queries with a C<page> attribute.
1206
1207 =cut
1208
1209 sub pager {
1210   my ($self) = @_;
1211   my $attrs = $self->{attrs};
1212   $self->throw_exception("Can't create pager for non-paged rs")
1213     unless $self->{attrs}{page};
1214   $attrs->{rows} ||= 10;
1215   return $self->{pager} ||= Data::Page->new(
1216     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1217 }
1218
1219 =head2 page
1220
1221 =over 4
1222
1223 =item Arguments: $page_number
1224
1225 =item Return Value: $rs
1226
1227 =back
1228
1229 Returns a resultset for the $page_number page of the resultset on which page
1230 is called, where each page contains a number of rows equal to the 'rows'
1231 attribute set on the resultset (10 by default).
1232
1233 =cut
1234
1235 sub page {
1236   my ($self, $page) = @_;
1237   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1238 }
1239
1240 =head2 new_result
1241
1242 =over 4
1243
1244 =item Arguments: \%vals
1245
1246 =item Return Value: $object
1247
1248 =back
1249
1250 Creates an object in the resultset's result class and returns it.
1251
1252 =cut
1253
1254 sub new_result {
1255   my ($self, $values) = @_;
1256   $self->throw_exception( "new_result needs a hash" )
1257     unless (ref $values eq 'HASH');
1258   $self->throw_exception(
1259     "Can't abstract implicit construct, condition not a hash"
1260   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1261
1262   my $alias = $self->{attrs}{alias};
1263   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1264   my %new = (
1265     %{ $self->_remove_alias($values, $alias) },
1266     %{ $self->_remove_alias($collapsed_cond, $alias) },
1267     -source_handle => $self->_source_handle
1268   );
1269
1270   return $self->result_class->new(\%new);
1271 }
1272
1273 # _collapse_cond
1274 #
1275 # Recursively collapse the condition.
1276
1277 sub _collapse_cond {
1278   my ($self, $cond, $collapsed) = @_;
1279
1280   $collapsed ||= {};
1281
1282   if (ref $cond eq 'ARRAY') {
1283     foreach my $subcond (@$cond) {
1284       next unless ref $subcond;  # -or
1285 #      warn "ARRAY: " . Dumper $subcond;
1286       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1287     }
1288   }
1289   elsif (ref $cond eq 'HASH') {
1290     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1291       foreach my $subcond (@{$cond->{-and}}) {
1292 #        warn "HASH: " . Dumper $subcond;
1293         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1294       }
1295     }
1296     else {
1297 #      warn "LEAF: " . Dumper $cond;
1298       foreach my $col (keys %$cond) {
1299         my $value = $cond->{$col};
1300         $collapsed->{$col} = $value;
1301       }
1302     }
1303   }
1304
1305   return $collapsed;
1306 }
1307
1308 # _remove_alias
1309 #
1310 # Remove the specified alias from the specified query hash. A copy is made so
1311 # the original query is not modified.
1312
1313 sub _remove_alias {
1314   my ($self, $query, $alias) = @_;
1315
1316   my %orig = %{ $query || {} };
1317   my %unaliased;
1318
1319   foreach my $key (keys %orig) {
1320     if ($key !~ /\./) {
1321       $unaliased{$key} = $orig{$key};
1322       next;
1323     }
1324     $unaliased{$1} = $orig{$key}
1325       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1326   }
1327
1328   return \%unaliased;
1329 }
1330
1331 =head2 find_or_new
1332
1333 =over 4
1334
1335 =item Arguments: \%vals, \%attrs?
1336
1337 =item Return Value: $object
1338
1339 =back
1340
1341 Find an existing record from this resultset. If none exists, instantiate a new
1342 result object and return it. The object will not be saved into your storage
1343 until you call L<DBIx::Class::Row/insert> on it.
1344
1345 If you want objects to be saved immediately, use L</find_or_create> instead.
1346
1347 =cut
1348
1349 sub find_or_new {
1350   my $self     = shift;
1351   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1352   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1353   my $exists   = $self->find($hash, $attrs);
1354   return defined $exists ? $exists : $self->new_result($hash);
1355 }
1356
1357 =head2 create
1358
1359 =over 4
1360
1361 =item Arguments: \%vals
1362
1363 =item Return Value: $object
1364
1365 =back
1366
1367 Inserts a record into the resultset and returns the object representing it.
1368
1369 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1370
1371 =cut
1372
1373 sub create {
1374   my ($self, $attrs) = @_;
1375   $self->throw_exception( "create needs a hashref" )
1376     unless ref $attrs eq 'HASH';
1377   return $self->new_result($attrs)->insert;
1378 }
1379
1380 =head2 find_or_create
1381
1382 =over 4
1383
1384 =item Arguments: \%vals, \%attrs?
1385
1386 =item Return Value: $object
1387
1388 =back
1389
1390   $class->find_or_create({ key => $val, ... });
1391
1392 Tries to find a record based on its primary key or unique constraint; if none
1393 is found, creates one and returns that instead.
1394
1395   my $cd = $schema->resultset('CD')->find_or_create({
1396     cdid   => 5,
1397     artist => 'Massive Attack',
1398     title  => 'Mezzanine',
1399     year   => 2005,
1400   });
1401
1402 Also takes an optional C<key> attribute, to search by a specific key or unique
1403 constraint. For example:
1404
1405   my $cd = $schema->resultset('CD')->find_or_create(
1406     {
1407       artist => 'Massive Attack',
1408       title  => 'Mezzanine',
1409     },
1410     { key => 'cd_artist_title' }
1411   );
1412
1413 See also L</find> and L</update_or_create>. For information on how to declare
1414 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1415
1416 =cut
1417
1418 sub find_or_create {
1419   my $self     = shift;
1420   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1421   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1422   my $exists   = $self->find($hash, $attrs);
1423   return defined $exists ? $exists : $self->create($hash);
1424 }
1425
1426 =head2 update_or_create
1427
1428 =over 4
1429
1430 =item Arguments: \%col_values, { key => $unique_constraint }?
1431
1432 =item Return Value: $object
1433
1434 =back
1435
1436   $class->update_or_create({ col => $val, ... });
1437
1438 First, searches for an existing row matching one of the unique constraints
1439 (including the primary key) on the source of this resultset. If a row is
1440 found, updates it with the other given column values. Otherwise, creates a new
1441 row.
1442
1443 Takes an optional C<key> attribute to search on a specific unique constraint.
1444 For example:
1445
1446   # In your application
1447   my $cd = $schema->resultset('CD')->update_or_create(
1448     {
1449       artist => 'Massive Attack',
1450       title  => 'Mezzanine',
1451       year   => 1998,
1452     },
1453     { key => 'cd_artist_title' }
1454   );
1455
1456 If no C<key> is specified, it searches on all unique constraints defined on the
1457 source, including the primary key.
1458
1459 If the C<key> is specified as C<primary>, it searches only on the primary key.
1460
1461 See also L</find> and L</find_or_create>. For information on how to declare
1462 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1463
1464 =cut
1465
1466 sub update_or_create {
1467   my $self = shift;
1468   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1469   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1470
1471   my $row = $self->find($cond, $attrs);
1472   if (defined $row) {
1473     $row->update($cond);
1474     return $row;
1475   }
1476
1477   return $self->create($cond);
1478 }
1479
1480 =head2 get_cache
1481
1482 =over 4
1483
1484 =item Arguments: none
1485
1486 =item Return Value: \@cache_objects?
1487
1488 =back
1489
1490 Gets the contents of the cache for the resultset, if the cache is set.
1491
1492 =cut
1493
1494 sub get_cache {
1495   shift->{all_cache};
1496 }
1497
1498 =head2 set_cache
1499
1500 =over 4
1501
1502 =item Arguments: \@cache_objects
1503
1504 =item Return Value: \@cache_objects
1505
1506 =back
1507
1508 Sets the contents of the cache for the resultset. Expects an arrayref
1509 of objects of the same class as those produced by the resultset. Note that
1510 if the cache is set the resultset will return the cached objects rather
1511 than re-querying the database even if the cache attr is not set.
1512
1513 =cut
1514
1515 sub set_cache {
1516   my ( $self, $data ) = @_;
1517   $self->throw_exception("set_cache requires an arrayref")
1518       if defined($data) && (ref $data ne 'ARRAY');
1519   $self->{all_cache} = $data;
1520 }
1521
1522 =head2 clear_cache
1523
1524 =over 4
1525
1526 =item Arguments: none
1527
1528 =item Return Value: []
1529
1530 =back
1531
1532 Clears the cache for the resultset.
1533
1534 =cut
1535
1536 sub clear_cache {
1537   shift->set_cache(undef);
1538 }
1539
1540 =head2 related_resultset
1541
1542 =over 4
1543
1544 =item Arguments: $relationship_name
1545
1546 =item Return Value: $resultset
1547
1548 =back
1549
1550 Returns a related resultset for the supplied relationship name.
1551
1552   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1553
1554 =cut
1555
1556 sub related_resultset {
1557   my ($self, $rel) = @_;
1558
1559   $self->{related_resultsets} ||= {};
1560   return $self->{related_resultsets}{$rel} ||= do {
1561     my $rel_obj = $self->result_source->relationship_info($rel);
1562
1563     $self->throw_exception(
1564       "search_related: result source '" . $self->_source_handle->source_moniker .
1565         "' has no such relationship $rel")
1566       unless $rel_obj;
1567     
1568     my ($from,$seen) = $self->_resolve_from($rel);
1569
1570     my $join_count = $seen->{$rel};
1571     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1572
1573     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1574       undef, {
1575         %{$self->{attrs}||{}},
1576         join => undef,
1577         prefetch => undef,
1578         select => undef,
1579         as => undef,
1580         alias => $alias,
1581         where => $self->{cond},
1582         seen_join => $seen,
1583         from => $from,
1584     });
1585   };
1586 }
1587
1588 sub _resolve_from {
1589   my ($self, $extra_join) = @_;
1590   my $source = $self->result_source;
1591   my $attrs = $self->{attrs};
1592   
1593   my $from = $attrs->{from}
1594     || [ { $attrs->{alias} => $source->from } ];
1595     
1596   my $seen = { %{$attrs->{seen_join}||{}} };
1597
1598   my $join = ($attrs->{join}
1599                ? [ $attrs->{join}, $extra_join ]
1600                : $extra_join);
1601   $from = [
1602     @$from,
1603     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1604   ];
1605
1606   return ($from,$seen);
1607 }
1608
1609 sub _resolved_attrs {
1610   my $self = shift;
1611   return $self->{_attrs} if $self->{_attrs};
1612
1613   my $attrs = { %{$self->{attrs}||{}} };
1614   my $source = $self->result_source;
1615   my $alias = $attrs->{alias};
1616
1617   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1618   if ($attrs->{columns}) {
1619     delete $attrs->{as};
1620   } elsif (!$attrs->{select}) {
1621     $attrs->{columns} = [ $source->columns ];
1622   }
1623  
1624   $attrs->{select} = 
1625     ($attrs->{select}
1626       ? (ref $attrs->{select} eq 'ARRAY'
1627           ? [ @{$attrs->{select}} ]
1628           : [ $attrs->{select} ])
1629       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1630     );
1631   $attrs->{as} =
1632     ($attrs->{as}
1633       ? (ref $attrs->{as} eq 'ARRAY'
1634           ? [ @{$attrs->{as}} ]
1635           : [ $attrs->{as} ])
1636       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1637     );
1638   
1639   my $adds;
1640   if ($adds = delete $attrs->{include_columns}) {
1641     $adds = [$adds] unless ref $adds eq 'ARRAY';
1642     push(@{$attrs->{select}}, @$adds);
1643     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1644   }
1645   if ($adds = delete $attrs->{'+select'}) {
1646     $adds = [$adds] unless ref $adds eq 'ARRAY';
1647     push(@{$attrs->{select}},
1648            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1649   }
1650   if (my $adds = delete $attrs->{'+as'}) {
1651     $adds = [$adds] unless ref $adds eq 'ARRAY';
1652     push(@{$attrs->{as}}, @$adds);
1653   }
1654
1655   $attrs->{from} ||= [ { 'me' => $source->from } ];
1656
1657   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1658     my $join = delete $attrs->{join} || {};
1659
1660     if (defined $attrs->{prefetch}) {
1661       $join = $self->_merge_attr(
1662         $join, $attrs->{prefetch}
1663       );
1664     }
1665
1666     $attrs->{from} =   # have to copy here to avoid corrupting the original
1667       [
1668         @{$attrs->{from}}, 
1669         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1670       ];
1671   }
1672
1673   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1674   if ($attrs->{order_by}) {
1675     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1676                            ? [ @{$attrs->{order_by}} ]
1677                            : [ $attrs->{order_by} ]);
1678   } else {
1679     $attrs->{order_by} = [];    
1680   }
1681
1682   my $collapse = $attrs->{collapse} || {};
1683   if (my $prefetch = delete $attrs->{prefetch}) {
1684     $prefetch = $self->_merge_attr({}, $prefetch);
1685     my @pre_order;
1686     my $seen = $attrs->{seen_join} || {};
1687     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1688       # bring joins back to level of current class
1689       my @prefetch = $source->resolve_prefetch(
1690         $p, $alias, $seen, \@pre_order, $collapse
1691       );
1692       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1693       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1694     }
1695     push(@{$attrs->{order_by}}, @pre_order);
1696   }
1697   $attrs->{collapse} = $collapse;
1698
1699   return $self->{_attrs} = $attrs;
1700 }
1701
1702 sub _merge_attr {
1703   my ($self, $a, $b) = @_;
1704   return $b unless defined($a);
1705   return $a unless defined($b);
1706   
1707   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1708     foreach my $key (keys %{$b}) {
1709       if (exists $a->{$key}) {
1710         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1711       } else {
1712         $a->{$key} = $b->{$key};
1713       }
1714     }
1715     return $a;
1716   } else {
1717     $a = [$a] unless ref $a eq 'ARRAY';
1718     $b = [$b] unless ref $b eq 'ARRAY';
1719
1720     my $hash = {};
1721     my @array;
1722     foreach my $x ($a, $b) {
1723       foreach my $element (@{$x}) {
1724         if (ref $element eq 'HASH') {
1725           $hash = $self->_merge_attr($hash, $element);
1726         } elsif (ref $element eq 'ARRAY') {
1727           push(@array, @{$element});
1728         } else {
1729           push(@array, $element) unless $b == $x
1730             && grep { $_ eq $element } @array;
1731         }
1732       }
1733     }
1734     
1735     @array = grep { !exists $hash->{$_} } @array;
1736
1737     return keys %{$hash}
1738       ? ( scalar(@array)
1739             ? [$hash, @array]
1740             : $hash
1741         )
1742       : \@array;
1743   }
1744 }
1745
1746 sub result_source {
1747     my $self = shift;
1748
1749     if (@_) {
1750         $self->_source_handle($_[0]->handle);
1751     } else {
1752         $self->_source_handle->resolve;
1753     }
1754 }
1755
1756 =head2 throw_exception
1757
1758 See L<DBIx::Class::Schema/throw_exception> for details.
1759
1760 =cut
1761
1762 sub throw_exception {
1763   my $self=shift;
1764   $self->_source_handle->schema->throw_exception(@_);
1765 }
1766
1767 # XXX: FIXME: Attributes docs need clearing up
1768
1769 =head1 ATTRIBUTES
1770
1771 The resultset takes various attributes that modify its behavior. Here's an
1772 overview of them:
1773
1774 =head2 order_by
1775
1776 =over 4
1777
1778 =item Value: ($order_by | \@order_by)
1779
1780 =back
1781
1782 Which column(s) to order the results by. This is currently passed
1783 through directly to SQL, so you can give e.g. C<year DESC> for a
1784 descending order on the column `year'.
1785
1786 Please note that if you have C<quote_char> enabled (see
1787 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1788 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1789 so you will need to manually quote things as appropriate.)
1790
1791 =head2 columns
1792
1793 =over 4
1794
1795 =item Value: \@columns
1796
1797 =back
1798
1799 Shortcut to request a particular set of columns to be retrieved.  Adds
1800 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1801 from that, then auto-populates C<as> from C<select> as normal. (You may also
1802 use the C<cols> attribute, as in earlier versions of DBIC.)
1803
1804 =head2 include_columns
1805
1806 =over 4
1807
1808 =item Value: \@columns
1809
1810 =back
1811
1812 Shortcut to include additional columns in the returned results - for example
1813
1814   $schema->resultset('CD')->search(undef, {
1815     include_columns => ['artist.name'],
1816     join => ['artist']
1817   });
1818
1819 would return all CDs and include a 'name' column to the information
1820 passed to object inflation. Note that the 'artist' is the name of the
1821 column (or relationship) accessor, and 'name' is the name of the column
1822 accessor in the related table.
1823
1824 =head2 select
1825
1826 =over 4
1827
1828 =item Value: \@select_columns
1829
1830 =back
1831
1832 Indicates which columns should be selected from the storage. You can use
1833 column names, or in the case of RDBMS back ends, function or stored procedure
1834 names:
1835
1836   $rs = $schema->resultset('Employee')->search(undef, {
1837     select => [
1838       'name',
1839       { count => 'employeeid' },
1840       { sum => 'salary' }
1841     ]
1842   });
1843
1844 When you use function/stored procedure names and do not supply an C<as>
1845 attribute, the column names returned are storage-dependent. E.g. MySQL would
1846 return a column named C<count(employeeid)> in the above example.
1847
1848 =head2 +select
1849
1850 =over 4
1851
1852 Indicates additional columns to be selected from storage.  Works the same as
1853 L<select> but adds columns to the selection.
1854
1855 =back
1856
1857 =head2 +as
1858
1859 =over 4
1860
1861 Indicates additional column names for those added via L<+select>.
1862
1863 =back
1864
1865 =head2 as
1866
1867 =over 4
1868
1869 =item Value: \@inflation_names
1870
1871 =back
1872
1873 Indicates column names for object inflation. That is, c< as >
1874 indicates the name that the column can be accessed as via the
1875 C<get_column> method (or via the object accessor, B<if one already
1876 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
1877 >.
1878
1879 The C< as > attribute is used in conjunction with C<select>,
1880 usually when C<select> contains one or more function or stored
1881 procedure names:
1882
1883   $rs = $schema->resultset('Employee')->search(undef, {
1884     select => [
1885       'name',
1886       { count => 'employeeid' }
1887     ],
1888     as => ['name', 'employee_count'],
1889   });
1890
1891   my $employee = $rs->first(); # get the first Employee
1892
1893 If the object against which the search is performed already has an accessor
1894 matching a column name specified in C<as>, the value can be retrieved using
1895 the accessor as normal:
1896
1897   my $name = $employee->name();
1898
1899 If on the other hand an accessor does not exist in the object, you need to
1900 use C<get_column> instead:
1901
1902   my $employee_count = $employee->get_column('employee_count');
1903
1904 You can create your own accessors if required - see
1905 L<DBIx::Class::Manual::Cookbook> for details.
1906
1907 Please note: This will NOT insert an C<AS employee_count> into the SQL
1908 statement produced, it is used for internal access only. Thus
1909 attempting to use the accessor in an C<order_by> clause or similar
1910 will fail miserably.
1911
1912 To get around this limitation, you can supply literal SQL to your
1913 C<select> attibute that contains the C<AS alias> text, eg:
1914
1915   select => [\'myfield AS alias']
1916
1917 =head2 join
1918
1919 =over 4
1920
1921 =item Value: ($rel_name | \@rel_names | \%rel_names)
1922
1923 =back
1924
1925 Contains a list of relationships that should be joined for this query.  For
1926 example:
1927
1928   # Get CDs by Nine Inch Nails
1929   my $rs = $schema->resultset('CD')->search(
1930     { 'artist.name' => 'Nine Inch Nails' },
1931     { join => 'artist' }
1932   );
1933
1934 Can also contain a hash reference to refer to the other relation's relations.
1935 For example:
1936
1937   package MyApp::Schema::Track;
1938   use base qw/DBIx::Class/;
1939   __PACKAGE__->table('track');
1940   __PACKAGE__->add_columns(qw/trackid cd position title/);
1941   __PACKAGE__->set_primary_key('trackid');
1942   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1943   1;
1944
1945   # In your application
1946   my $rs = $schema->resultset('Artist')->search(
1947     { 'track.title' => 'Teardrop' },
1948     {
1949       join     => { cd => 'track' },
1950       order_by => 'artist.name',
1951     }
1952   );
1953
1954 You need to use the relationship (not the table) name in  conditions, 
1955 because they are aliased as such. The current table is aliased as "me", so 
1956 you need to use me.column_name in order to avoid ambiguity. For example:
1957
1958   # Get CDs from 1984 with a 'Foo' track 
1959   my $rs = $schema->resultset('CD')->search(
1960     { 
1961       'me.year' => 1984,
1962       'tracks.name' => 'Foo'
1963     },
1964     { join => 'tracks' }
1965   );
1966   
1967 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1968 similarly for a third time). For e.g.
1969
1970   my $rs = $schema->resultset('Artist')->search({
1971     'cds.title'   => 'Down to Earth',
1972     'cds_2.title' => 'Popular',
1973   }, {
1974     join => [ qw/cds cds/ ],
1975   });
1976
1977 will return a set of all artists that have both a cd with title 'Down
1978 to Earth' and a cd with title 'Popular'.
1979
1980 If you want to fetch related objects from other tables as well, see C<prefetch>
1981 below.
1982
1983 =head2 prefetch
1984
1985 =over 4
1986
1987 =item Value: ($rel_name | \@rel_names | \%rel_names)
1988
1989 =back
1990
1991 Contains one or more relationships that should be fetched along with the main
1992 query (when they are accessed afterwards they will have already been
1993 "prefetched").  This is useful for when you know you will need the related
1994 objects, because it saves at least one query:
1995
1996   my $rs = $schema->resultset('Tag')->search(
1997     undef,
1998     {
1999       prefetch => {
2000         cd => 'artist'
2001       }
2002     }
2003   );
2004
2005 The initial search results in SQL like the following:
2006
2007   SELECT tag.*, cd.*, artist.* FROM tag
2008   JOIN cd ON tag.cd = cd.cdid
2009   JOIN artist ON cd.artist = artist.artistid
2010
2011 L<DBIx::Class> has no need to go back to the database when we access the
2012 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2013 case.
2014
2015 Simple prefetches will be joined automatically, so there is no need
2016 for a C<join> attribute in the above search. If you're prefetching to
2017 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2018 specify the join as well.
2019
2020 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2021 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2022 with an accessor type of 'single' or 'filter').
2023
2024 =head2 page
2025
2026 =over 4
2027
2028 =item Value: $page
2029
2030 =back
2031
2032 Makes the resultset paged and specifies the page to retrieve. Effectively
2033 identical to creating a non-pages resultset and then calling ->page($page)
2034 on it.
2035
2036 If L<rows> attribute is not specified it defualts to 10 rows per page.
2037
2038 =head2 rows
2039
2040 =over 4
2041
2042 =item Value: $rows
2043
2044 =back
2045
2046 Specifes the maximum number of rows for direct retrieval or the number of
2047 rows per page if the page attribute or method is used.
2048
2049 =head2 offset
2050
2051 =over 4
2052
2053 =item Value: $offset
2054
2055 =back
2056
2057 Specifies the (zero-based) row number for the  first row to be returned, or the
2058 of the first row of the first page if paging is used.
2059
2060 =head2 group_by
2061
2062 =over 4
2063
2064 =item Value: \@columns
2065
2066 =back
2067
2068 A arrayref of columns to group by. Can include columns of joined tables.
2069
2070   group_by => [qw/ column1 column2 ... /]
2071
2072 =head2 having
2073
2074 =over 4
2075
2076 =item Value: $condition
2077
2078 =back
2079
2080 HAVING is a select statement attribute that is applied between GROUP BY and
2081 ORDER BY. It is applied to the after the grouping calculations have been
2082 done.
2083
2084   having => { 'count(employee)' => { '>=', 100 } }
2085
2086 =head2 distinct
2087
2088 =over 4
2089
2090 =item Value: (0 | 1)
2091
2092 =back
2093
2094 Set to 1 to group by all columns.
2095
2096 =head2 where
2097
2098 =over 4
2099
2100 Adds to the WHERE clause.
2101
2102   # only return rows WHERE deleted IS NULL for all searches
2103   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2104
2105 Can be overridden by passing C<{ where => undef }> as an attribute
2106 to a resulset.
2107
2108 =back
2109
2110 =head2 cache
2111
2112 Set to 1 to cache search results. This prevents extra SQL queries if you
2113 revisit rows in your ResultSet:
2114
2115   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2116
2117   while( my $artist = $resultset->next ) {
2118     ... do stuff ...
2119   }
2120
2121   $rs->first; # without cache, this would issue a query
2122
2123 By default, searches are not cached.
2124
2125 For more examples of using these attributes, see
2126 L<DBIx::Class::Manual::Cookbook>.
2127
2128 =head2 from
2129
2130 =over 4
2131
2132 =item Value: \@from_clause
2133
2134 =back
2135
2136 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2137 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2138 clauses.
2139
2140 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2141
2142 C<join> will usually do what you need and it is strongly recommended that you
2143 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2144 And we really do mean "cannot", not just tried and failed. Attempting to use
2145 this because you're having problems with C<join> is like trying to use x86
2146 ASM because you've got a syntax error in your C. Trust us on this.
2147
2148 Now, if you're still really, really sure you need to use this (and if you're
2149 not 100% sure, ask the mailing list first), here's an explanation of how this
2150 works.
2151
2152 The syntax is as follows -
2153
2154   [
2155     { <alias1> => <table1> },
2156     [
2157       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2158       [], # nested JOIN (optional)
2159       { <table1.column1> => <table2.column2>, ... (more conditions) },
2160     ],
2161     # More of the above [ ] may follow for additional joins
2162   ]
2163
2164   <table1> <alias1>
2165   JOIN
2166     <table2> <alias2>
2167     [JOIN ...]
2168   ON <table1.column1> = <table2.column2>
2169   <more joins may follow>
2170
2171 An easy way to follow the examples below is to remember the following:
2172
2173     Anything inside "[]" is a JOIN
2174     Anything inside "{}" is a condition for the enclosing JOIN
2175
2176 The following examples utilize a "person" table in a family tree application.
2177 In order to express parent->child relationships, this table is self-joined:
2178
2179     # Person->belongs_to('father' => 'Person');
2180     # Person->belongs_to('mother' => 'Person');
2181
2182 C<from> can be used to nest joins. Here we return all children with a father,
2183 then search against all mothers of those children:
2184
2185   $rs = $schema->resultset('Person')->search(
2186       undef,
2187       {
2188           alias => 'mother', # alias columns in accordance with "from"
2189           from => [
2190               { mother => 'person' },
2191               [
2192                   [
2193                       { child => 'person' },
2194                       [
2195                           { father => 'person' },
2196                           { 'father.person_id' => 'child.father_id' }
2197                       ]
2198                   ],
2199                   { 'mother.person_id' => 'child.mother_id' }
2200               ],
2201           ]
2202       },
2203   );
2204
2205   # Equivalent SQL:
2206   # SELECT mother.* FROM person mother
2207   # JOIN (
2208   #   person child
2209   #   JOIN person father
2210   #   ON ( father.person_id = child.father_id )
2211   # )
2212   # ON ( mother.person_id = child.mother_id )
2213
2214 The type of any join can be controlled manually. To search against only people
2215 with a father in the person table, we could explicitly use C<INNER JOIN>:
2216
2217     $rs = $schema->resultset('Person')->search(
2218         undef,
2219         {
2220             alias => 'child', # alias columns in accordance with "from"
2221             from => [
2222                 { child => 'person' },
2223                 [
2224                     { father => 'person', -join_type => 'inner' },
2225                     { 'father.id' => 'child.father_id' }
2226                 ],
2227             ]
2228         },
2229     );
2230
2231     # Equivalent SQL:
2232     # SELECT child.* FROM person child
2233     # INNER JOIN person father ON child.father_id = father.id
2234
2235 =cut
2236
2237 1;