inflate_result can return an array now. somebody write me tests please
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my (%related, $info);
347
348   foreach my $key (keys %$input_query) {
349     if (ref($input_query->{$key})
350         && ($info = $self->result_source->relationship_info($key))) {
351       my $rel_q = $self->result_source->resolve_condition(
352                     $info->{cond}, delete $input_query->{$key}, $key
353                   );
354       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
355       @related{keys %$rel_q} = values %$rel_q;
356     }
357   }
358   if (my @keys = keys %related) {
359     @{$input_query}{@keys} = values %related;
360   }
361
362   my @unique_queries = $self->_unique_queries($input_query, $attrs);
363
364   # Build the final query: Default to the disjunction of the unique queries,
365   # but allow the input query in case the ResultSet defines the query or the
366   # user is abusing find
367   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
368   my $query = @unique_queries
369     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
370     : $self->_add_alias($input_query, $alias);
371
372   # Run the query
373   if (keys %$attrs) {
374     my $rs = $self->search($query, $attrs);
375     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
376   }
377   else {
378     return keys %{$self->_resolved_attrs->{collapse}}
379       ? $self->search($query)->next
380       : $self->single($query);
381   }
382 }
383
384 # _add_alias
385 #
386 # Add the specified alias to the specified query hash. A copy is made so the
387 # original query is not modified.
388
389 sub _add_alias {
390   my ($self, $query, $alias) = @_;
391
392   my %aliased = %$query;
393   foreach my $col (grep { ! m/\./ } keys %aliased) {
394     $aliased{"$alias.$col"} = delete $aliased{$col};
395   }
396
397   return \%aliased;
398 }
399
400 # _unique_queries
401 #
402 # Build a list of queries which satisfy unique constraints.
403
404 sub _unique_queries {
405   my ($self, $query, $attrs) = @_;
406
407   my @constraint_names = exists $attrs->{key}
408     ? ($attrs->{key})
409     : $self->result_source->unique_constraint_names;
410
411   my @unique_queries;
412   foreach my $name (@constraint_names) {
413     my @unique_cols = $self->result_source->unique_constraint_columns($name);
414     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
415
416     my $num_query = scalar keys %$unique_query;
417     next unless $num_query;
418
419     # XXX: Assuming quite a bit about $self->{attrs}{where}
420     my $num_cols = scalar @unique_cols;
421     my $num_where = exists $self->{attrs}{where}
422       ? scalar keys %{ $self->{attrs}{where} }
423       : 0;
424     push @unique_queries, $unique_query
425       if $num_query + $num_where == $num_cols;
426   }
427
428   return @unique_queries;
429 }
430
431 # _build_unique_query
432 #
433 # Constrain the specified query hash based on the specified column names.
434
435 sub _build_unique_query {
436   my ($self, $query, $unique_cols) = @_;
437
438   return {
439     map  { $_ => $query->{$_} }
440     grep { exists $query->{$_} }
441       @$unique_cols
442   };
443 }
444
445 =head2 search_related
446
447 =over 4
448
449 =item Arguments: $rel, $cond, \%attrs?
450
451 =item Return Value: $new_resultset
452
453 =back
454
455   $new_rs = $cd_rs->search_related('artist', {
456     name => 'Emo-R-Us',
457   });
458
459 Searches the specified relationship, optionally specifying a condition and
460 attributes for matching records. See L</ATTRIBUTES> for more information.
461
462 =cut
463
464 sub search_related {
465   return shift->related_resultset(shift)->search(@_);
466 }
467
468 =head2 cursor
469
470 =over 4
471
472 =item Arguments: none
473
474 =item Return Value: $cursor
475
476 =back
477
478 Returns a storage-driven cursor to the given resultset. See
479 L<DBIx::Class::Cursor> for more information.
480
481 =cut
482
483 sub cursor {
484   my ($self) = @_;
485
486   my $attrs = { %{$self->_resolved_attrs} };
487   return $self->{cursor}
488     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
489           $attrs->{where},$attrs);
490 }
491
492 =head2 single
493
494 =over 4
495
496 =item Arguments: $cond?
497
498 =item Return Value: $row_object?
499
500 =back
501
502   my $cd = $schema->resultset('CD')->single({ year => 2001 });
503
504 Inflates the first result without creating a cursor if the resultset has
505 any records in it; if not returns nothing. Used by L</find> as an optimisation.
506
507 Can optionally take an additional condition *only* - this is a fast-code-path
508 method; if you need to add extra joins or similar call ->search and then
509 ->single without a condition on the $rs returned from that.
510
511 =cut
512
513 sub single {
514   my ($self, $where) = @_;
515   my $attrs = { %{$self->_resolved_attrs} };
516   if ($where) {
517     if (defined $attrs->{where}) {
518       $attrs->{where} = {
519         '-and' =>
520             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
521                $where, delete $attrs->{where} ]
522       };
523     } else {
524       $attrs->{where} = $where;
525     }
526   }
527
528 #  XXX: Disabled since it doesn't infer uniqueness in all cases
529 #  unless ($self->_is_unique_query($attrs->{where})) {
530 #    carp "Query not guaranteed to return a single row"
531 #      . "; please declare your unique constraints or use search instead";
532 #  }
533
534   my @data = $self->result_source->storage->select_single(
535     $attrs->{from}, $attrs->{select},
536     $attrs->{where}, $attrs
537   );
538
539   return (@data ? ($self->_construct_object(@data))[0] : ());
540 }
541
542 # _is_unique_query
543 #
544 # Try to determine if the specified query is guaranteed to be unique, based on
545 # the declared unique constraints.
546
547 sub _is_unique_query {
548   my ($self, $query) = @_;
549
550   my $collapsed = $self->_collapse_query($query);
551   my $alias = $self->{attrs}{alias};
552
553   foreach my $name ($self->result_source->unique_constraint_names) {
554     my @unique_cols = map {
555       "$alias.$_"
556     } $self->result_source->unique_constraint_columns($name);
557
558     # Count the values for each unique column
559     my %seen = map { $_ => 0 } @unique_cols;
560
561     foreach my $key (keys %$collapsed) {
562       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
563       next unless exists $seen{$aliased};  # Additional constraints are okay
564       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
565     }
566
567     # If we get 0 or more than 1 value for a column, it's not necessarily unique
568     return 1 unless grep { $_ != 1 } values %seen;
569   }
570
571   return 0;
572 }
573
574 # _collapse_query
575 #
576 # Recursively collapse the query, accumulating values for each column.
577
578 sub _collapse_query {
579   my ($self, $query, $collapsed) = @_;
580
581   $collapsed ||= {};
582
583   if (ref $query eq 'ARRAY') {
584     foreach my $subquery (@$query) {
585       next unless ref $subquery;  # -or
586 #      warn "ARRAY: " . Dumper $subquery;
587       $collapsed = $self->_collapse_query($subquery, $collapsed);
588     }
589   }
590   elsif (ref $query eq 'HASH') {
591     if (keys %$query and (keys %$query)[0] eq '-and') {
592       foreach my $subquery (@{$query->{-and}}) {
593 #        warn "HASH: " . Dumper $subquery;
594         $collapsed = $self->_collapse_query($subquery, $collapsed);
595       }
596     }
597     else {
598 #      warn "LEAF: " . Dumper $query;
599       foreach my $col (keys %$query) {
600         my $value = $query->{$col};
601         $collapsed->{$col}{$value}++;
602       }
603     }
604   }
605
606   return $collapsed;
607 }
608
609 =head2 get_column
610
611 =over 4
612
613 =item Arguments: $cond?
614
615 =item Return Value: $resultsetcolumn
616
617 =back
618
619   my $max_length = $rs->get_column('length')->max;
620
621 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
622
623 =cut
624
625 sub get_column {
626   my ($self, $column) = @_;
627   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
628   return $new;
629 }
630
631 =head2 search_like
632
633 =over 4
634
635 =item Arguments: $cond, \%attrs?
636
637 =item Return Value: $resultset (scalar context), @row_objs (list context)
638
639 =back
640
641   # WHERE title LIKE '%blue%'
642   $cd_rs = $rs->search_like({ title => '%blue%'});
643
644 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
645 that this is simply a convenience method. You most likely want to use
646 L</search> with specific operators.
647
648 For more information, see L<DBIx::Class::Manual::Cookbook>.
649
650 =cut
651
652 sub search_like {
653   my $class = shift;
654   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
655   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
656   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
657   return $class->search($query, { %$attrs });
658 }
659
660 =head2 slice
661
662 =over 4
663
664 =item Arguments: $first, $last
665
666 =item Return Value: $resultset (scalar context), @row_objs (list context)
667
668 =back
669
670 Returns a resultset or object list representing a subset of elements from the
671 resultset slice is called on. Indexes are from 0, i.e., to get the first
672 three records, call:
673
674   my ($one, $two, $three) = $rs->slice(0, 2);
675
676 =cut
677
678 sub slice {
679   my ($self, $min, $max) = @_;
680   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
681   $attrs->{offset} = $self->{attrs}{offset} || 0;
682   $attrs->{offset} += $min;
683   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
684   return $self->search(undef(), $attrs);
685   #my $slice = (ref $self)->new($self->result_source, $attrs);
686   #return (wantarray ? $slice->all : $slice);
687 }
688
689 =head2 next
690
691 =over 4
692
693 =item Arguments: none
694
695 =item Return Value: $result?
696
697 =back
698
699 Returns the next element in the resultset (C<undef> is there is none).
700
701 Can be used to efficiently iterate over records in the resultset:
702
703   my $rs = $schema->resultset('CD')->search;
704   while (my $cd = $rs->next) {
705     print $cd->title;
706   }
707
708 Note that you need to store the resultset object, and call C<next> on it.
709 Calling C<< resultset('Table')->next >> repeatedly will always return the
710 first record from the resultset.
711
712 =cut
713
714 sub next {
715   my ($self) = @_;
716   if (my $cache = $self->get_cache) {
717     $self->{all_cache_position} ||= 0;
718     return $cache->[$self->{all_cache_position}++];
719   }
720   if ($self->{attrs}{cache}) {
721     $self->{all_cache_position} = 1;
722     return ($self->all)[0];
723   }
724   if ($self->{stashed_objects}) {
725     my $obj = shift(@{$self->{stashed_objects}});
726     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
727   }
728   my @row = (
729     exists $self->{stashed_row}
730       ? @{delete $self->{stashed_row}}
731       : $self->cursor->next
732   );
733   return unless (@row);
734   my ($row, @more) = $self->_construct_object(@row);
735   $self->{stashed_objects} = \@more if @more;
736   return $row;
737 }
738
739 sub _construct_object {
740   my ($self, @row) = @_;
741   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
742   my @new = $self->result_class->inflate_result($self->result_source, @$info);
743   @new = $self->{_attrs}{record_filter}->(@new)
744     if exists $self->{_attrs}{record_filter};
745   return @new;
746 }
747
748 sub _collapse_result {
749   my ($self, $as, $row, $prefix) = @_;
750
751   my %const;
752   my @copy = @$row;
753   
754   foreach my $this_as (@$as) {
755     my $val = shift @copy;
756     if (defined $prefix) {
757       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
758         my $remain = $1;
759         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
760         $const{$1||''}{$2} = $val;
761       }
762     } else {
763       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
764       $const{$1||''}{$2} = $val;
765     }
766   }
767
768   my $alias = $self->{attrs}{alias};
769   my $info = [ {}, {} ];
770   foreach my $key (keys %const) {
771     if (length $key && $key ne $alias) {
772       my $target = $info;
773       my @parts = split(/\./, $key);
774       foreach my $p (@parts) {
775         $target = $target->[1]->{$p} ||= [];
776       }
777       $target->[0] = $const{$key};
778     } else {
779       $info->[0] = $const{$key};
780     }
781   }
782   
783   my @collapse;
784   if (defined $prefix) {
785     @collapse = map {
786         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
787     } keys %{$self->{_attrs}{collapse}}
788   } else {
789     @collapse = keys %{$self->{_attrs}{collapse}};
790   };
791
792   if (@collapse) {
793     my ($c) = sort { length $a <=> length $b } @collapse;
794     my $target = $info;
795     foreach my $p (split(/\./, $c)) {
796       $target = $target->[1]->{$p} ||= [];
797     }
798     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
799     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
800     my $tree = $self->_collapse_result($as, $row, $c_prefix);
801     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
802     my (@final, @raw);
803
804     while (
805       !(
806         grep {
807           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
808         } @co_key
809         )
810     ) {
811       push(@final, $tree);
812       last unless (@raw = $self->cursor->next);
813       $row = $self->{stashed_row} = \@raw;
814       $tree = $self->_collapse_result($as, $row, $c_prefix);
815     }
816     @$target = (@final ? @final : [ {}, {} ]);
817       # single empty result to indicate an empty prefetched has_many
818   }
819
820   #print "final info: " . Dumper($info);
821   return $info;
822 }
823
824 =head2 result_source
825
826 =over 4
827
828 =item Arguments: $result_source?
829
830 =item Return Value: $result_source
831
832 =back
833
834 An accessor for the primary ResultSource object from which this ResultSet
835 is derived.
836
837 =head2 result_class
838
839 =over 4
840
841 =item Arguments: $result_class?
842
843 =item Return Value: $result_class
844
845 =back
846
847 An accessor for the class to use when creating row objects. Defaults to 
848 C<< result_source->result_class >> - which in most cases is the name of the 
849 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
850
851 =cut
852
853
854 =head2 count
855
856 =over 4
857
858 =item Arguments: $cond, \%attrs??
859
860 =item Return Value: $count
861
862 =back
863
864 Performs an SQL C<COUNT> with the same query as the resultset was built
865 with to find the number of elements. If passed arguments, does a search
866 on the resultset and counts the results of that.
867
868 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
869 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
870 not support C<DISTINCT> with multiple columns. If you are using such a
871 database, you should only use columns from the main table in your C<group_by>
872 clause.
873
874 =cut
875
876 sub count {
877   my $self = shift;
878   return $self->search(@_)->count if @_ and defined $_[0];
879   return scalar @{ $self->get_cache } if $self->get_cache;
880   my $count = $self->_count;
881   return 0 unless $count;
882
883   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
884   $count = $self->{attrs}{rows} if
885     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
886   return $count;
887 }
888
889 sub _count { # Separated out so pager can get the full count
890   my $self = shift;
891   my $select = { count => '*' };
892
893   my $attrs = { %{$self->_resolved_attrs} };
894   if (my $group_by = delete $attrs->{group_by}) {
895     delete $attrs->{having};
896     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
897     # todo: try CONCAT for multi-column pk
898     my @pk = $self->result_source->primary_columns;
899     if (@pk == 1) {
900       my $alias = $attrs->{alias};
901       foreach my $column (@distinct) {
902         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
903           @distinct = ($column);
904           last;
905         }
906       }
907     }
908
909     $select = { count => { distinct => \@distinct } };
910   }
911
912   $attrs->{select} = $select;
913   $attrs->{as} = [qw/count/];
914
915   # offset, order by and page are not needed to count. record_filter is cdbi
916   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
917
918   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
919   my ($count) = $tmp_rs->cursor->next;
920   return $count;
921 }
922
923 =head2 count_literal
924
925 =over 4
926
927 =item Arguments: $sql_fragment, @bind_values
928
929 =item Return Value: $count
930
931 =back
932
933 Counts the results in a literal query. Equivalent to calling L</search_literal>
934 with the passed arguments, then L</count>.
935
936 =cut
937
938 sub count_literal { shift->search_literal(@_)->count; }
939
940 =head2 all
941
942 =over 4
943
944 =item Arguments: none
945
946 =item Return Value: @objects
947
948 =back
949
950 Returns all elements in the resultset. Called implicitly if the resultset
951 is returned in list context.
952
953 =cut
954
955 sub all {
956   my ($self) = @_;
957   return @{ $self->get_cache } if $self->get_cache;
958
959   my @obj;
960
961   # TODO: don't call resolve here
962   if (keys %{$self->_resolved_attrs->{collapse}}) {
963 #  if ($self->{attrs}{prefetch}) {
964       # Using $self->cursor->all is really just an optimisation.
965       # If we're collapsing has_many prefetches it probably makes
966       # very little difference, and this is cleaner than hacking
967       # _construct_object to survive the approach
968     my @row = $self->cursor->next;
969     while (@row) {
970       push(@obj, $self->_construct_object(@row));
971       @row = (exists $self->{stashed_row}
972                ? @{delete $self->{stashed_row}}
973                : $self->cursor->next);
974     }
975   } else {
976     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
977   }
978
979   $self->set_cache(\@obj) if $self->{attrs}{cache};
980   return @obj;
981 }
982
983 =head2 reset
984
985 =over 4
986
987 =item Arguments: none
988
989 =item Return Value: $self
990
991 =back
992
993 Resets the resultset's cursor, so you can iterate through the elements again.
994
995 =cut
996
997 sub reset {
998   my ($self) = @_;
999   delete $self->{_attrs} if exists $self->{_attrs};
1000   $self->{all_cache_position} = 0;
1001   $self->cursor->reset;
1002   return $self;
1003 }
1004
1005 =head2 first
1006
1007 =over 4
1008
1009 =item Arguments: none
1010
1011 =item Return Value: $object?
1012
1013 =back
1014
1015 Resets the resultset and returns an object for the first result (if the
1016 resultset returns anything).
1017
1018 =cut
1019
1020 sub first {
1021   return $_[0]->reset->next;
1022 }
1023
1024 # _cond_for_update_delete
1025 #
1026 # update/delete require the condition to be modified to handle
1027 # the differing SQL syntax available.  This transforms the $self->{cond}
1028 # appropriately, returning the new condition.
1029
1030 sub _cond_for_update_delete {
1031   my ($self, $full_cond) = @_;
1032   my $cond = {};
1033
1034   $full_cond ||= $self->{cond};
1035   # No-op. No condition, we're updating/deleting everything
1036   return $cond unless ref $full_cond;
1037
1038   if (ref $full_cond eq 'ARRAY') {
1039     $cond = [
1040       map {
1041         my %hash;
1042         foreach my $key (keys %{$_}) {
1043           $key =~ /([^.]+)$/;
1044           $hash{$1} = $_->{$key};
1045         }
1046         \%hash;
1047       } @{$full_cond}
1048     ];
1049   }
1050   elsif (ref $full_cond eq 'HASH') {
1051     if ((keys %{$full_cond})[0] eq '-and') {
1052       $cond->{-and} = [];
1053
1054       my @cond = @{$full_cond->{-and}};
1055       for (my $i = 0; $i < @cond; $i++) {
1056         my $entry = $cond[$i];
1057
1058         my $hash;
1059         if (ref $entry eq 'HASH') {
1060           $hash = $self->_cond_for_update_delete($entry);
1061         }
1062         else {
1063           $entry =~ /([^.]+)$/;
1064           $hash->{$1} = $cond[++$i];
1065         }
1066
1067         push @{$cond->{-and}}, $hash;
1068       }
1069     }
1070     else {
1071       foreach my $key (keys %{$full_cond}) {
1072         $key =~ /([^.]+)$/;
1073         $cond->{$1} = $full_cond->{$key};
1074       }
1075     }
1076   }
1077   else {
1078     $self->throw_exception(
1079       "Can't update/delete on resultset with condition unless hash or array"
1080     );
1081   }
1082
1083   return $cond;
1084 }
1085
1086
1087 =head2 update
1088
1089 =over 4
1090
1091 =item Arguments: \%values
1092
1093 =item Return Value: $storage_rv
1094
1095 =back
1096
1097 Sets the specified columns in the resultset to the supplied values in a
1098 single query. Return value will be true if the update succeeded or false
1099 if no records were updated; exact type of success value is storage-dependent.
1100
1101 =cut
1102
1103 sub update {
1104   my ($self, $values) = @_;
1105   $self->throw_exception("Values for update must be a hash")
1106     unless ref $values eq 'HASH';
1107
1108   my $cond = $self->_cond_for_update_delete;
1109
1110   return $self->result_source->storage->update(
1111     $self->result_source->from, $values, $cond
1112   );
1113 }
1114
1115 =head2 update_all
1116
1117 =over 4
1118
1119 =item Arguments: \%values
1120
1121 =item Return Value: 1
1122
1123 =back
1124
1125 Fetches all objects and updates them one at a time. Note that C<update_all>
1126 will run DBIC cascade triggers, while L</update> will not.
1127
1128 =cut
1129
1130 sub update_all {
1131   my ($self, $values) = @_;
1132   $self->throw_exception("Values for update must be a hash")
1133     unless ref $values eq 'HASH';
1134   foreach my $obj ($self->all) {
1135     $obj->set_columns($values)->update;
1136   }
1137   return 1;
1138 }
1139
1140 =head2 delete
1141
1142 =over 4
1143
1144 =item Arguments: none
1145
1146 =item Return Value: 1
1147
1148 =back
1149
1150 Deletes the contents of the resultset from its result source. Note that this
1151 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1152 to run. See also L<DBIx::Class::Row/delete>.
1153
1154 =cut
1155
1156 sub delete {
1157   my ($self) = @_;
1158
1159   my $cond = $self->_cond_for_update_delete;
1160
1161   $self->result_source->storage->delete($self->result_source->from, $cond);
1162   return 1;
1163 }
1164
1165 =head2 delete_all
1166
1167 =over 4
1168
1169 =item Arguments: none
1170
1171 =item Return Value: 1
1172
1173 =back
1174
1175 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1176 will run DBIC cascade triggers, while L</delete> will not.
1177
1178 =cut
1179
1180 sub delete_all {
1181   my ($self) = @_;
1182   $_->delete for $self->all;
1183   return 1;
1184 }
1185
1186 =head2 pager
1187
1188 =over 4
1189
1190 =item Arguments: none
1191
1192 =item Return Value: $pager
1193
1194 =back
1195
1196 Return Value a L<Data::Page> object for the current resultset. Only makes
1197 sense for queries with a C<page> attribute.
1198
1199 =cut
1200
1201 sub pager {
1202   my ($self) = @_;
1203   my $attrs = $self->{attrs};
1204   $self->throw_exception("Can't create pager for non-paged rs")
1205     unless $self->{attrs}{page};
1206   $attrs->{rows} ||= 10;
1207   return $self->{pager} ||= Data::Page->new(
1208     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1209 }
1210
1211 =head2 page
1212
1213 =over 4
1214
1215 =item Arguments: $page_number
1216
1217 =item Return Value: $rs
1218
1219 =back
1220
1221 Returns a resultset for the $page_number page of the resultset on which page
1222 is called, where each page contains a number of rows equal to the 'rows'
1223 attribute set on the resultset (10 by default).
1224
1225 =cut
1226
1227 sub page {
1228   my ($self, $page) = @_;
1229   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1230 }
1231
1232 =head2 new_result
1233
1234 =over 4
1235
1236 =item Arguments: \%vals
1237
1238 =item Return Value: $object
1239
1240 =back
1241
1242 Creates an object in the resultset's result class and returns it.
1243
1244 =cut
1245
1246 sub new_result {
1247   my ($self, $values) = @_;
1248   $self->throw_exception( "new_result needs a hash" )
1249     unless (ref $values eq 'HASH');
1250   $self->throw_exception(
1251     "Can't abstract implicit construct, condition not a hash"
1252   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1253
1254   my $alias = $self->{attrs}{alias};
1255   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1256   my %new = (
1257     %{ $self->_remove_alias($values, $alias) },
1258     %{ $self->_remove_alias($collapsed_cond, $alias) },
1259     -result_source => $self->result_source,
1260   );
1261
1262   my $obj = $self->result_class->new(\%new);
1263   return $obj;
1264 }
1265
1266 # _collapse_cond
1267 #
1268 # Recursively collapse the condition.
1269
1270 sub _collapse_cond {
1271   my ($self, $cond, $collapsed) = @_;
1272
1273   $collapsed ||= {};
1274
1275   if (ref $cond eq 'ARRAY') {
1276     foreach my $subcond (@$cond) {
1277       next unless ref $subcond;  # -or
1278 #      warn "ARRAY: " . Dumper $subcond;
1279       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1280     }
1281   }
1282   elsif (ref $cond eq 'HASH') {
1283     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1284       foreach my $subcond (@{$cond->{-and}}) {
1285 #        warn "HASH: " . Dumper $subcond;
1286         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1287       }
1288     }
1289     else {
1290 #      warn "LEAF: " . Dumper $cond;
1291       foreach my $col (keys %$cond) {
1292         my $value = $cond->{$col};
1293         $collapsed->{$col} = $value;
1294       }
1295     }
1296   }
1297
1298   return $collapsed;
1299 }
1300
1301 # _remove_alias
1302 #
1303 # Remove the specified alias from the specified query hash. A copy is made so
1304 # the original query is not modified.
1305
1306 sub _remove_alias {
1307   my ($self, $query, $alias) = @_;
1308
1309   my %orig = %{ $query || {} };
1310   my %unaliased;
1311
1312   foreach my $key (keys %orig) {
1313     if ($key !~ /\./) {
1314       $unaliased{$key} = $orig{$key};
1315       next;
1316     }
1317     $unaliased{$1} = $orig{$key}
1318       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1319   }
1320
1321   return \%unaliased;
1322 }
1323
1324 =head2 find_or_new
1325
1326 =over 4
1327
1328 =item Arguments: \%vals, \%attrs?
1329
1330 =item Return Value: $object
1331
1332 =back
1333
1334 Find an existing record from this resultset. If none exists, instantiate a new
1335 result object and return it. The object will not be saved into your storage
1336 until you call L<DBIx::Class::Row/insert> on it.
1337
1338 If you want objects to be saved immediately, use L</find_or_create> instead.
1339
1340 =cut
1341
1342 sub find_or_new {
1343   my $self     = shift;
1344   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1345   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1346   my $exists   = $self->find($hash, $attrs);
1347   return defined $exists ? $exists : $self->new_result($hash);
1348 }
1349
1350 =head2 create
1351
1352 =over 4
1353
1354 =item Arguments: \%vals
1355
1356 =item Return Value: $object
1357
1358 =back
1359
1360 Inserts a record into the resultset and returns the object representing it.
1361
1362 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1363
1364 =cut
1365
1366 sub create {
1367   my ($self, $attrs) = @_;
1368   $self->throw_exception( "create needs a hashref" )
1369     unless ref $attrs eq 'HASH';
1370   return $self->new_result($attrs)->insert;
1371 }
1372
1373 =head2 find_or_create
1374
1375 =over 4
1376
1377 =item Arguments: \%vals, \%attrs?
1378
1379 =item Return Value: $object
1380
1381 =back
1382
1383   $class->find_or_create({ key => $val, ... });
1384
1385 Tries to find a record based on its primary key or unique constraint; if none
1386 is found, creates one and returns that instead.
1387
1388   my $cd = $schema->resultset('CD')->find_or_create({
1389     cdid   => 5,
1390     artist => 'Massive Attack',
1391     title  => 'Mezzanine',
1392     year   => 2005,
1393   });
1394
1395 Also takes an optional C<key> attribute, to search by a specific key or unique
1396 constraint. For example:
1397
1398   my $cd = $schema->resultset('CD')->find_or_create(
1399     {
1400       artist => 'Massive Attack',
1401       title  => 'Mezzanine',
1402     },
1403     { key => 'cd_artist_title' }
1404   );
1405
1406 See also L</find> and L</update_or_create>. For information on how to declare
1407 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1408
1409 =cut
1410
1411 sub find_or_create {
1412   my $self     = shift;
1413   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1414   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1415   my $exists   = $self->find($hash, $attrs);
1416   return defined $exists ? $exists : $self->create($hash);
1417 }
1418
1419 =head2 update_or_create
1420
1421 =over 4
1422
1423 =item Arguments: \%col_values, { key => $unique_constraint }?
1424
1425 =item Return Value: $object
1426
1427 =back
1428
1429   $class->update_or_create({ col => $val, ... });
1430
1431 First, searches for an existing row matching one of the unique constraints
1432 (including the primary key) on the source of this resultset. If a row is
1433 found, updates it with the other given column values. Otherwise, creates a new
1434 row.
1435
1436 Takes an optional C<key> attribute to search on a specific unique constraint.
1437 For example:
1438
1439   # In your application
1440   my $cd = $schema->resultset('CD')->update_or_create(
1441     {
1442       artist => 'Massive Attack',
1443       title  => 'Mezzanine',
1444       year   => 1998,
1445     },
1446     { key => 'cd_artist_title' }
1447   );
1448
1449 If no C<key> is specified, it searches on all unique constraints defined on the
1450 source, including the primary key.
1451
1452 If the C<key> is specified as C<primary>, it searches only on the primary key.
1453
1454 See also L</find> and L</find_or_create>. For information on how to declare
1455 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1456
1457 =cut
1458
1459 sub update_or_create {
1460   my $self = shift;
1461   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1462   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1463
1464   my $row = $self->find($cond, $attrs);
1465   if (defined $row) {
1466     $row->update($cond);
1467     return $row;
1468   }
1469
1470   return $self->create($cond);
1471 }
1472
1473 =head2 get_cache
1474
1475 =over 4
1476
1477 =item Arguments: none
1478
1479 =item Return Value: \@cache_objects?
1480
1481 =back
1482
1483 Gets the contents of the cache for the resultset, if the cache is set.
1484
1485 =cut
1486
1487 sub get_cache {
1488   shift->{all_cache};
1489 }
1490
1491 =head2 set_cache
1492
1493 =over 4
1494
1495 =item Arguments: \@cache_objects
1496
1497 =item Return Value: \@cache_objects
1498
1499 =back
1500
1501 Sets the contents of the cache for the resultset. Expects an arrayref
1502 of objects of the same class as those produced by the resultset. Note that
1503 if the cache is set the resultset will return the cached objects rather
1504 than re-querying the database even if the cache attr is not set.
1505
1506 =cut
1507
1508 sub set_cache {
1509   my ( $self, $data ) = @_;
1510   $self->throw_exception("set_cache requires an arrayref")
1511       if defined($data) && (ref $data ne 'ARRAY');
1512   $self->{all_cache} = $data;
1513 }
1514
1515 =head2 clear_cache
1516
1517 =over 4
1518
1519 =item Arguments: none
1520
1521 =item Return Value: []
1522
1523 =back
1524
1525 Clears the cache for the resultset.
1526
1527 =cut
1528
1529 sub clear_cache {
1530   shift->set_cache(undef);
1531 }
1532
1533 =head2 related_resultset
1534
1535 =over 4
1536
1537 =item Arguments: $relationship_name
1538
1539 =item Return Value: $resultset
1540
1541 =back
1542
1543 Returns a related resultset for the supplied relationship name.
1544
1545   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1546
1547 =cut
1548
1549 sub related_resultset {
1550   my ($self, $rel) = @_;
1551
1552   $self->{related_resultsets} ||= {};
1553   return $self->{related_resultsets}{$rel} ||= do {
1554     my $rel_obj = $self->result_source->relationship_info($rel);
1555
1556     $self->throw_exception(
1557       "search_related: result source '" . $self->result_source->name .
1558         "' has no such relationship $rel")
1559       unless $rel_obj;
1560     
1561     my ($from,$seen) = $self->_resolve_from($rel);
1562
1563     my $join_count = $seen->{$rel};
1564     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1565
1566     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1567       undef, {
1568         %{$self->{attrs}||{}},
1569         join => undef,
1570         prefetch => undef,
1571         select => undef,
1572         as => undef,
1573         alias => $alias,
1574         where => $self->{cond},
1575         seen_join => $seen,
1576         from => $from,
1577     });
1578   };
1579 }
1580
1581 sub _resolve_from {
1582   my ($self, $extra_join) = @_;
1583   my $source = $self->result_source;
1584   my $attrs = $self->{attrs};
1585   
1586   my $from = $attrs->{from}
1587     || [ { $attrs->{alias} => $source->from } ];
1588     
1589   my $seen = { %{$attrs->{seen_join}||{}} };
1590
1591   my $join = ($attrs->{join}
1592                ? [ $attrs->{join}, $extra_join ]
1593                : $extra_join);
1594   $from = [
1595     @$from,
1596     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1597   ];
1598
1599   return ($from,$seen);
1600 }
1601
1602 sub _resolved_attrs {
1603   my $self = shift;
1604   return $self->{_attrs} if $self->{_attrs};
1605
1606   my $attrs = { %{$self->{attrs}||{}} };
1607   my $source = $self->{result_source};
1608   my $alias = $attrs->{alias};
1609
1610   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1611   if ($attrs->{columns}) {
1612     delete $attrs->{as};
1613   } elsif (!$attrs->{select}) {
1614     $attrs->{columns} = [ $source->columns ];
1615   }
1616  
1617   $attrs->{select} = 
1618     ($attrs->{select}
1619       ? (ref $attrs->{select} eq 'ARRAY'
1620           ? [ @{$attrs->{select}} ]
1621           : [ $attrs->{select} ])
1622       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1623     );
1624   $attrs->{as} =
1625     ($attrs->{as}
1626       ? (ref $attrs->{as} eq 'ARRAY'
1627           ? [ @{$attrs->{as}} ]
1628           : [ $attrs->{as} ])
1629       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1630     );
1631   
1632   my $adds;
1633   if ($adds = delete $attrs->{include_columns}) {
1634     $adds = [$adds] unless ref $adds eq 'ARRAY';
1635     push(@{$attrs->{select}}, @$adds);
1636     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1637   }
1638   if ($adds = delete $attrs->{'+select'}) {
1639     $adds = [$adds] unless ref $adds eq 'ARRAY';
1640     push(@{$attrs->{select}},
1641            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1642   }
1643   if (my $adds = delete $attrs->{'+as'}) {
1644     $adds = [$adds] unless ref $adds eq 'ARRAY';
1645     push(@{$attrs->{as}}, @$adds);
1646   }
1647
1648   $attrs->{from} ||= [ { 'me' => $source->from } ];
1649
1650   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1651     my $join = delete $attrs->{join} || {};
1652
1653     if (defined $attrs->{prefetch}) {
1654       $join = $self->_merge_attr(
1655         $join, $attrs->{prefetch}
1656       );
1657     }
1658
1659     $attrs->{from} =   # have to copy here to avoid corrupting the original
1660       [
1661         @{$attrs->{from}}, 
1662         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1663       ];
1664   }
1665
1666   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1667   if ($attrs->{order_by}) {
1668     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1669                            ? [ @{$attrs->{order_by}} ]
1670                            : [ $attrs->{order_by} ]);
1671   } else {
1672     $attrs->{order_by} = [];    
1673   }
1674
1675   my $collapse = $attrs->{collapse} || {};
1676   if (my $prefetch = delete $attrs->{prefetch}) {
1677     $prefetch = $self->_merge_attr({}, $prefetch);
1678     my @pre_order;
1679     my $seen = $attrs->{seen_join} || {};
1680     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1681       # bring joins back to level of current class
1682       my @prefetch = $source->resolve_prefetch(
1683         $p, $alias, $seen, \@pre_order, $collapse
1684       );
1685       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1686       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1687     }
1688     push(@{$attrs->{order_by}}, @pre_order);
1689   }
1690   $attrs->{collapse} = $collapse;
1691
1692   return $self->{_attrs} = $attrs;
1693 }
1694
1695 sub _merge_attr {
1696   my ($self, $a, $b) = @_;
1697   return $b unless defined($a);
1698   return $a unless defined($b);
1699   
1700   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1701     foreach my $key (keys %{$b}) {
1702       if (exists $a->{$key}) {
1703         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1704       } else {
1705         $a->{$key} = $b->{$key};
1706       }
1707     }
1708     return $a;
1709   } else {
1710     $a = [$a] unless ref $a eq 'ARRAY';
1711     $b = [$b] unless ref $b eq 'ARRAY';
1712
1713     my $hash = {};
1714     my @array;
1715     foreach my $x ($a, $b) {
1716       foreach my $element (@{$x}) {
1717         if (ref $element eq 'HASH') {
1718           $hash = $self->_merge_attr($hash, $element);
1719         } elsif (ref $element eq 'ARRAY') {
1720           push(@array, @{$element});
1721         } else {
1722           push(@array, $element) unless $b == $x
1723             && grep { $_ eq $element } @array;
1724         }
1725       }
1726     }
1727     
1728     @array = grep { !exists $hash->{$_} } @array;
1729
1730     return keys %{$hash}
1731       ? ( scalar(@array)
1732             ? [$hash, @array]
1733             : $hash
1734         )
1735       : \@array;
1736   }
1737 }
1738
1739 =head2 throw_exception
1740
1741 See L<DBIx::Class::Schema/throw_exception> for details.
1742
1743 =cut
1744
1745 sub throw_exception {
1746   my $self=shift;
1747   $self->result_source->schema->throw_exception(@_);
1748 }
1749
1750 # XXX: FIXME: Attributes docs need clearing up
1751
1752 =head1 ATTRIBUTES
1753
1754 The resultset takes various attributes that modify its behavior. Here's an
1755 overview of them:
1756
1757 =head2 order_by
1758
1759 =over 4
1760
1761 =item Value: ($order_by | \@order_by)
1762
1763 =back
1764
1765 Which column(s) to order the results by. This is currently passed
1766 through directly to SQL, so you can give e.g. C<year DESC> for a
1767 descending order on the column `year'.
1768
1769 Please note that if you have quoting enabled (see
1770 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1771 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1772 so you will need to manually quote things as appropriate.)
1773
1774 =head2 columns
1775
1776 =over 4
1777
1778 =item Value: \@columns
1779
1780 =back
1781
1782 Shortcut to request a particular set of columns to be retrieved.  Adds
1783 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1784 from that, then auto-populates C<as> from C<select> as normal. (You may also
1785 use the C<cols> attribute, as in earlier versions of DBIC.)
1786
1787 =head2 include_columns
1788
1789 =over 4
1790
1791 =item Value: \@columns
1792
1793 =back
1794
1795 Shortcut to include additional columns in the returned results - for example
1796
1797   $schema->resultset('CD')->search(undef, {
1798     include_columns => ['artist.name'],
1799     join => ['artist']
1800   });
1801
1802 would return all CDs and include a 'name' column to the information
1803 passed to object inflation
1804
1805 =head2 select
1806
1807 =over 4
1808
1809 =item Value: \@select_columns
1810
1811 =back
1812
1813 Indicates which columns should be selected from the storage. You can use
1814 column names, or in the case of RDBMS back ends, function or stored procedure
1815 names:
1816
1817   $rs = $schema->resultset('Employee')->search(undef, {
1818     select => [
1819       'name',
1820       { count => 'employeeid' },
1821       { sum => 'salary' }
1822     ]
1823   });
1824
1825 When you use function/stored procedure names and do not supply an C<as>
1826 attribute, the column names returned are storage-dependent. E.g. MySQL would
1827 return a column named C<count(employeeid)> in the above example.
1828
1829 =head2 +select
1830
1831 =over 4
1832
1833 Indicates additional columns to be selected from storage.  Works the same as
1834 L<select> but adds columns to the selection.
1835
1836 =back
1837
1838 =head2 +as
1839
1840 =over 4
1841
1842 Indicates additional column names for those added via L<+select>.
1843
1844 =back
1845
1846 =head2 as
1847
1848 =over 4
1849
1850 =item Value: \@inflation_names
1851
1852 =back
1853
1854 Indicates column names for object inflation. This is used in conjunction with
1855 C<select>, usually when C<select> contains one or more function or stored
1856 procedure names:
1857
1858   $rs = $schema->resultset('Employee')->search(undef, {
1859     select => [
1860       'name',
1861       { count => 'employeeid' }
1862     ],
1863     as => ['name', 'employee_count'],
1864   });
1865
1866   my $employee = $rs->first(); # get the first Employee
1867
1868 If the object against which the search is performed already has an accessor
1869 matching a column name specified in C<as>, the value can be retrieved using
1870 the accessor as normal:
1871
1872   my $name = $employee->name();
1873
1874 If on the other hand an accessor does not exist in the object, you need to
1875 use C<get_column> instead:
1876
1877   my $employee_count = $employee->get_column('employee_count');
1878
1879 You can create your own accessors if required - see
1880 L<DBIx::Class::Manual::Cookbook> for details.
1881
1882 Please note: This will NOT insert an C<AS employee_count> into the SQL
1883 statement produced, it is used for internal access only. Thus
1884 attempting to use the accessor in an C<order_by> clause or similar
1885 will fail miserably.
1886
1887 To get around this limitation, you can supply literal SQL to your
1888 C<select> attibute that contains the C<AS alias> text, eg:
1889
1890   select => [\'myfield AS alias']
1891
1892 =head2 join
1893
1894 =over 4
1895
1896 =item Value: ($rel_name | \@rel_names | \%rel_names)
1897
1898 =back
1899
1900 Contains a list of relationships that should be joined for this query.  For
1901 example:
1902
1903   # Get CDs by Nine Inch Nails
1904   my $rs = $schema->resultset('CD')->search(
1905     { 'artist.name' => 'Nine Inch Nails' },
1906     { join => 'artist' }
1907   );
1908
1909 Can also contain a hash reference to refer to the other relation's relations.
1910 For example:
1911
1912   package MyApp::Schema::Track;
1913   use base qw/DBIx::Class/;
1914   __PACKAGE__->table('track');
1915   __PACKAGE__->add_columns(qw/trackid cd position title/);
1916   __PACKAGE__->set_primary_key('trackid');
1917   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1918   1;
1919
1920   # In your application
1921   my $rs = $schema->resultset('Artist')->search(
1922     { 'track.title' => 'Teardrop' },
1923     {
1924       join     => { cd => 'track' },
1925       order_by => 'artist.name',
1926     }
1927   );
1928
1929 You need to use the relationship (not the table) name in  conditions, 
1930 because they are aliased as such. The current table is aliased as "me", so 
1931 you need to use me.column_name in order to avoid ambiguity. For example:
1932
1933   # Get CDs from 1984 with a 'Foo' track 
1934   my $rs = $schema->resultset('CD')->search(
1935     { 
1936       'me.year' => 1984,
1937       'tracks.name' => 'Foo'
1938     },
1939     { join => 'tracks' }
1940   );
1941   
1942 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1943 similarly for a third time). For e.g.
1944
1945   my $rs = $schema->resultset('Artist')->search({
1946     'cds.title'   => 'Down to Earth',
1947     'cds_2.title' => 'Popular',
1948   }, {
1949     join => [ qw/cds cds/ ],
1950   });
1951
1952 will return a set of all artists that have both a cd with title 'Down
1953 to Earth' and a cd with title 'Popular'.
1954
1955 If you want to fetch related objects from other tables as well, see C<prefetch>
1956 below.
1957
1958 =head2 prefetch
1959
1960 =over 4
1961
1962 =item Value: ($rel_name | \@rel_names | \%rel_names)
1963
1964 =back
1965
1966 Contains one or more relationships that should be fetched along with the main
1967 query (when they are accessed afterwards they will have already been
1968 "prefetched").  This is useful for when you know you will need the related
1969 objects, because it saves at least one query:
1970
1971   my $rs = $schema->resultset('Tag')->search(
1972     undef,
1973     {
1974       prefetch => {
1975         cd => 'artist'
1976       }
1977     }
1978   );
1979
1980 The initial search results in SQL like the following:
1981
1982   SELECT tag.*, cd.*, artist.* FROM tag
1983   JOIN cd ON tag.cd = cd.cdid
1984   JOIN artist ON cd.artist = artist.artistid
1985
1986 L<DBIx::Class> has no need to go back to the database when we access the
1987 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1988 case.
1989
1990 Simple prefetches will be joined automatically, so there is no need
1991 for a C<join> attribute in the above search. If you're prefetching to
1992 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1993 specify the join as well.
1994
1995 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1996 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1997 with an accessor type of 'single' or 'filter').
1998
1999 =head2 page
2000
2001 =over 4
2002
2003 =item Value: $page
2004
2005 =back
2006
2007 Makes the resultset paged and specifies the page to retrieve. Effectively
2008 identical to creating a non-pages resultset and then calling ->page($page)
2009 on it.
2010
2011 If L<rows> attribute is not specified it defualts to 10 rows per page.
2012
2013 =head2 rows
2014
2015 =over 4
2016
2017 =item Value: $rows
2018
2019 =back
2020
2021 Specifes the maximum number of rows for direct retrieval or the number of
2022 rows per page if the page attribute or method is used.
2023
2024 =head2 offset
2025
2026 =over 4
2027
2028 =item Value: $offset
2029
2030 =back
2031
2032 Specifies the (zero-based) row number for the  first row to be returned, or the
2033 of the first row of the first page if paging is used.
2034
2035 =head2 group_by
2036
2037 =over 4
2038
2039 =item Value: \@columns
2040
2041 =back
2042
2043 A arrayref of columns to group by. Can include columns of joined tables.
2044
2045   group_by => [qw/ column1 column2 ... /]
2046
2047 =head2 having
2048
2049 =over 4
2050
2051 =item Value: $condition
2052
2053 =back
2054
2055 HAVING is a select statement attribute that is applied between GROUP BY and
2056 ORDER BY. It is applied to the after the grouping calculations have been
2057 done.
2058
2059   having => { 'count(employee)' => { '>=', 100 } }
2060
2061 =head2 distinct
2062
2063 =over 4
2064
2065 =item Value: (0 | 1)
2066
2067 =back
2068
2069 Set to 1 to group by all columns.
2070
2071 =head2 where
2072
2073 =over 4
2074
2075 Adds to the WHERE clause.
2076
2077   # only return rows WHERE deleted IS NULL for all searches
2078   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2079
2080 Can be overridden by passing C<{ where => undef }> as an attribute
2081 to a resulset.
2082
2083 =back
2084
2085 =head2 cache
2086
2087 Set to 1 to cache search results. This prevents extra SQL queries if you
2088 revisit rows in your ResultSet:
2089
2090   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2091
2092   while( my $artist = $resultset->next ) {
2093     ... do stuff ...
2094   }
2095
2096   $rs->first; # without cache, this would issue a query
2097
2098 By default, searches are not cached.
2099
2100 For more examples of using these attributes, see
2101 L<DBIx::Class::Manual::Cookbook>.
2102
2103 =head2 from
2104
2105 =over 4
2106
2107 =item Value: \@from_clause
2108
2109 =back
2110
2111 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2112 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2113 clauses.
2114
2115 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2116
2117 C<join> will usually do what you need and it is strongly recommended that you
2118 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2119 And we really do mean "cannot", not just tried and failed. Attempting to use
2120 this because you're having problems with C<join> is like trying to use x86
2121 ASM because you've got a syntax error in your C. Trust us on this.
2122
2123 Now, if you're still really, really sure you need to use this (and if you're
2124 not 100% sure, ask the mailing list first), here's an explanation of how this
2125 works.
2126
2127 The syntax is as follows -
2128
2129   [
2130     { <alias1> => <table1> },
2131     [
2132       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2133       [], # nested JOIN (optional)
2134       { <table1.column1> => <table2.column2>, ... (more conditions) },
2135     ],
2136     # More of the above [ ] may follow for additional joins
2137   ]
2138
2139   <table1> <alias1>
2140   JOIN
2141     <table2> <alias2>
2142     [JOIN ...]
2143   ON <table1.column1> = <table2.column2>
2144   <more joins may follow>
2145
2146 An easy way to follow the examples below is to remember the following:
2147
2148     Anything inside "[]" is a JOIN
2149     Anything inside "{}" is a condition for the enclosing JOIN
2150
2151 The following examples utilize a "person" table in a family tree application.
2152 In order to express parent->child relationships, this table is self-joined:
2153
2154     # Person->belongs_to('father' => 'Person');
2155     # Person->belongs_to('mother' => 'Person');
2156
2157 C<from> can be used to nest joins. Here we return all children with a father,
2158 then search against all mothers of those children:
2159
2160   $rs = $schema->resultset('Person')->search(
2161       undef,
2162       {
2163           alias => 'mother', # alias columns in accordance with "from"
2164           from => [
2165               { mother => 'person' },
2166               [
2167                   [
2168                       { child => 'person' },
2169                       [
2170                           { father => 'person' },
2171                           { 'father.person_id' => 'child.father_id' }
2172                       ]
2173                   ],
2174                   { 'mother.person_id' => 'child.mother_id' }
2175               ],
2176           ]
2177       },
2178   );
2179
2180   # Equivalent SQL:
2181   # SELECT mother.* FROM person mother
2182   # JOIN (
2183   #   person child
2184   #   JOIN person father
2185   #   ON ( father.person_id = child.father_id )
2186   # )
2187   # ON ( mother.person_id = child.mother_id )
2188
2189 The type of any join can be controlled manually. To search against only people
2190 with a father in the person table, we could explicitly use C<INNER JOIN>:
2191
2192     $rs = $schema->resultset('Person')->search(
2193         undef,
2194         {
2195             alias => 'child', # alias columns in accordance with "from"
2196             from => [
2197                 { child => 'person' },
2198                 [
2199                     { father => 'person', -join_type => 'inner' },
2200                     { 'father.id' => 'child.father_id' }
2201                 ],
2202             ]
2203         },
2204     );
2205
2206     # Equivalent SQL:
2207     # SELECT child.* FROM person child
2208     # INNER JOIN person father ON child.father_id = father.id
2209
2210 =cut
2211
2212 1;