1ace32943dc600d481e07fa78b069f986b1b3ea5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90
91   if ($attrs->{page}) {
92     $attrs->{rows} ||= 10;
93     $attrs->{offset} ||= 0;
94     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
95   }
96
97   $attrs->{alias} ||= 'me';
98
99   my $self = {
100     _source_handle => $source,
101     result_class => $attrs->{result_class} || $source->resolve->result_class,
102     cond => $attrs->{where},
103     count => undef,
104     pager => undef,
105     attrs => $attrs
106   };
107
108   bless $self, $class;
109
110   return $self;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
138
139 =cut
140
141 sub search {
142   my $self = shift;
143   my $rs = $self->search_rs( @_ );
144   return (wantarray ? $rs->all : $rs);
145 }
146
147 =head2 search_rs
148
149 =over 4
150
151 =item Arguments: $cond, \%attrs?
152
153 =item Return Value: $resultset
154
155 =back
156
157 This method does the same exact thing as search() except it will
158 always return a resultset, even in list context.
159
160 =cut
161
162 sub search_rs {
163   my $self = shift;
164
165   my $rows;
166
167   unless (@_) {                 # no search, effectively just a clone
168     $rows = $self->get_cache;
169   }
170
171   my $attrs = {};
172   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
173   my $our_attrs = { %{$self->{attrs}} };
174   my $having = delete $our_attrs->{having};
175   my $where = delete $our_attrs->{where};
176
177   my $new_attrs = { %{$our_attrs}, %{$attrs} };
178
179   # merge new attrs into inherited
180   foreach my $key (qw/join prefetch/) {
181     next unless exists $attrs->{$key};
182     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
183   }
184
185   my $cond = (@_
186     ? (
187         (@_ == 1 || ref $_[0] eq "HASH")
188           ? (
189               (ref $_[0] eq 'HASH')
190                 ? (
191                     (keys %{ $_[0] }  > 0)
192                       ? shift
193                       : undef
194                    )
195                 :  shift
196              )
197           : (
198               (@_ % 2)
199                 ? $self->throw_exception("Odd number of arguments to search")
200                 : {@_}
201              )
202       )
203     : undef
204   );
205
206   if (defined $where) {
207     $new_attrs->{where} = (
208       defined $new_attrs->{where}
209         ? { '-and' => [
210               map {
211                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
212               } $where, $new_attrs->{where}
213             ]
214           }
215         : $where);
216   }
217
218   if (defined $cond) {
219     $new_attrs->{where} = (
220       defined $new_attrs->{where}
221         ? { '-and' => [
222               map {
223                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
224               } $cond, $new_attrs->{where}
225             ]
226           }
227         : $cond);
228   }
229
230   if (defined $having) {
231     $new_attrs->{having} = (
232       defined $new_attrs->{having}
233         ? { '-and' => [
234               map {
235                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
236               } $having, $new_attrs->{having}
237             ]
238           }
239         : $having);
240   }
241
242   my $rs = (ref $self)->new($self->_source_handle, $new_attrs);
243   if ($rows) {
244     $rs->set_cache($rows);
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 If your table does not have a primary key, you B<must> provide a value for the
312 C<key> attribute matching one of the unique constraints on the source.
313
314 See also L</find_or_create> and L</update_or_create>. For information on how to
315 declare unique constraints, see
316 L<DBIx::Class::ResultSource/add_unique_constraint>.
317
318 =cut
319
320 sub find {
321   my $self = shift;
322   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
323
324   # Default to the primary key, but allow a specific key
325   my @cols = exists $attrs->{key}
326     ? $self->result_source->unique_constraint_columns($attrs->{key})
327     : $self->result_source->primary_columns;
328   $self->throw_exception(
329     "Can't find unless a primary key is defined or unique constraint is specified"
330   ) unless @cols;
331
332   # Parse out a hashref from input
333   my $input_query;
334   if (ref $_[0] eq 'HASH') {
335     $input_query = { %{$_[0]} };
336   }
337   elsif (@_ == @cols) {
338     $input_query = {};
339     @{$input_query}{@cols} = @_;
340   }
341   else {
342     # Compatibility: Allow e.g. find(id => $value)
343     carp "Find by key => value deprecated; please use a hashref instead";
344     $input_query = {@_};
345   }
346
347   my (%related, $info);
348
349   foreach my $key (keys %$input_query) {
350     if (ref($input_query->{$key})
351         && ($info = $self->result_source->relationship_info($key))) {
352       my $rel_q = $self->result_source->resolve_condition(
353                     $info->{cond}, delete $input_query->{$key}, $key
354                   );
355       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
356       @related{keys %$rel_q} = values %$rel_q;
357     }
358   }
359   if (my @keys = keys %related) {
360     @{$input_query}{@keys} = values %related;
361   }
362
363   my @unique_queries = $self->_unique_queries($input_query, $attrs);
364
365   # Build the final query: Default to the disjunction of the unique queries,
366   # but allow the input query in case the ResultSet defines the query or the
367   # user is abusing find
368   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
369   my $query = @unique_queries
370     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
371     : $self->_add_alias($input_query, $alias);
372
373   # Run the query
374   if (keys %$attrs) {
375     my $rs = $self->search($query, $attrs);
376     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
377   }
378   else {
379     return keys %{$self->_resolved_attrs->{collapse}}
380       ? $self->search($query)->next
381       : $self->single($query);
382   }
383 }
384
385 # _add_alias
386 #
387 # Add the specified alias to the specified query hash. A copy is made so the
388 # original query is not modified.
389
390 sub _add_alias {
391   my ($self, $query, $alias) = @_;
392
393   my %aliased = %$query;
394   foreach my $col (grep { ! m/\./ } keys %aliased) {
395     $aliased{"$alias.$col"} = delete $aliased{$col};
396   }
397
398   return \%aliased;
399 }
400
401 # _unique_queries
402 #
403 # Build a list of queries which satisfy unique constraints.
404
405 sub _unique_queries {
406   my ($self, $query, $attrs) = @_;
407
408   my @constraint_names = exists $attrs->{key}
409     ? ($attrs->{key})
410     : $self->result_source->unique_constraint_names;
411
412   my @unique_queries;
413   foreach my $name (@constraint_names) {
414     my @unique_cols = $self->result_source->unique_constraint_columns($name);
415     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
416
417     my $num_query = scalar keys %$unique_query;
418     next unless $num_query;
419
420     # XXX: Assuming quite a bit about $self->{attrs}{where}
421     my $num_cols = scalar @unique_cols;
422     my $num_where = exists $self->{attrs}{where}
423       ? scalar keys %{ $self->{attrs}{where} }
424       : 0;
425     push @unique_queries, $unique_query
426       if $num_query + $num_where == $num_cols;
427   }
428
429   return @unique_queries;
430 }
431
432 # _build_unique_query
433 #
434 # Constrain the specified query hash based on the specified column names.
435
436 sub _build_unique_query {
437   my ($self, $query, $unique_cols) = @_;
438
439   return {
440     map  { $_ => $query->{$_} }
441     grep { exists $query->{$_} }
442       @$unique_cols
443   };
444 }
445
446 =head2 search_related
447
448 =over 4
449
450 =item Arguments: $rel, $cond, \%attrs?
451
452 =item Return Value: $new_resultset
453
454 =back
455
456   $new_rs = $cd_rs->search_related('artist', {
457     name => 'Emo-R-Us',
458   });
459
460 Searches the specified relationship, optionally specifying a condition and
461 attributes for matching records. See L</ATTRIBUTES> for more information.
462
463 =cut
464
465 sub search_related {
466   return shift->related_resultset(shift)->search(@_);
467 }
468
469 =head2 cursor
470
471 =over 4
472
473 =item Arguments: none
474
475 =item Return Value: $cursor
476
477 =back
478
479 Returns a storage-driven cursor to the given resultset. See
480 L<DBIx::Class::Cursor> for more information.
481
482 =cut
483
484 sub cursor {
485   my ($self) = @_;
486
487   my $attrs = { %{$self->_resolved_attrs} };
488   return $self->{cursor}
489     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
490           $attrs->{where},$attrs);
491 }
492
493 =head2 single
494
495 =over 4
496
497 =item Arguments: $cond?
498
499 =item Return Value: $row_object?
500
501 =back
502
503   my $cd = $schema->resultset('CD')->single({ year => 2001 });
504
505 Inflates the first result without creating a cursor if the resultset has
506 any records in it; if not returns nothing. Used by L</find> as an optimisation.
507
508 Can optionally take an additional condition *only* - this is a fast-code-path
509 method; if you need to add extra joins or similar call ->search and then
510 ->single without a condition on the $rs returned from that.
511
512 =cut
513
514 sub single {
515   my ($self, $where) = @_;
516   my $attrs = { %{$self->_resolved_attrs} };
517   if ($where) {
518     if (defined $attrs->{where}) {
519       $attrs->{where} = {
520         '-and' =>
521             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
522                $where, delete $attrs->{where} ]
523       };
524     } else {
525       $attrs->{where} = $where;
526     }
527   }
528
529 #  XXX: Disabled since it doesn't infer uniqueness in all cases
530 #  unless ($self->_is_unique_query($attrs->{where})) {
531 #    carp "Query not guaranteed to return a single row"
532 #      . "; please declare your unique constraints or use search instead";
533 #  }
534
535   my @data = $self->result_source->storage->select_single(
536     $attrs->{from}, $attrs->{select},
537     $attrs->{where}, $attrs
538   );
539
540   return (@data ? ($self->_construct_object(@data))[0] : ());
541 }
542
543 # _is_unique_query
544 #
545 # Try to determine if the specified query is guaranteed to be unique, based on
546 # the declared unique constraints.
547
548 sub _is_unique_query {
549   my ($self, $query) = @_;
550
551   my $collapsed = $self->_collapse_query($query);
552   my $alias = $self->{attrs}{alias};
553
554   foreach my $name ($self->result_source->unique_constraint_names) {
555     my @unique_cols = map {
556       "$alias.$_"
557     } $self->result_source->unique_constraint_columns($name);
558
559     # Count the values for each unique column
560     my %seen = map { $_ => 0 } @unique_cols;
561
562     foreach my $key (keys %$collapsed) {
563       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
564       next unless exists $seen{$aliased};  # Additional constraints are okay
565       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
566     }
567
568     # If we get 0 or more than 1 value for a column, it's not necessarily unique
569     return 1 unless grep { $_ != 1 } values %seen;
570   }
571
572   return 0;
573 }
574
575 # _collapse_query
576 #
577 # Recursively collapse the query, accumulating values for each column.
578
579 sub _collapse_query {
580   my ($self, $query, $collapsed) = @_;
581
582   $collapsed ||= {};
583
584   if (ref $query eq 'ARRAY') {
585     foreach my $subquery (@$query) {
586       next unless ref $subquery;  # -or
587 #      warn "ARRAY: " . Dumper $subquery;
588       $collapsed = $self->_collapse_query($subquery, $collapsed);
589     }
590   }
591   elsif (ref $query eq 'HASH') {
592     if (keys %$query and (keys %$query)[0] eq '-and') {
593       foreach my $subquery (@{$query->{-and}}) {
594 #        warn "HASH: " . Dumper $subquery;
595         $collapsed = $self->_collapse_query($subquery, $collapsed);
596       }
597     }
598     else {
599 #      warn "LEAF: " . Dumper $query;
600       foreach my $col (keys %$query) {
601         my $value = $query->{$col};
602         $collapsed->{$col}{$value}++;
603       }
604     }
605   }
606
607   return $collapsed;
608 }
609
610 =head2 get_column
611
612 =over 4
613
614 =item Arguments: $cond?
615
616 =item Return Value: $resultsetcolumn
617
618 =back
619
620   my $max_length = $rs->get_column('length')->max;
621
622 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
623
624 =cut
625
626 sub get_column {
627   my ($self, $column) = @_;
628   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
629   return $new;
630 }
631
632 =head2 search_like
633
634 =over 4
635
636 =item Arguments: $cond, \%attrs?
637
638 =item Return Value: $resultset (scalar context), @row_objs (list context)
639
640 =back
641
642   # WHERE title LIKE '%blue%'
643   $cd_rs = $rs->search_like({ title => '%blue%'});
644
645 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
646 that this is simply a convenience method. You most likely want to use
647 L</search> with specific operators.
648
649 For more information, see L<DBIx::Class::Manual::Cookbook>.
650
651 =cut
652
653 sub search_like {
654   my $class = shift;
655   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
656   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
657   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
658   return $class->search($query, { %$attrs });
659 }
660
661 =head2 slice
662
663 =over 4
664
665 =item Arguments: $first, $last
666
667 =item Return Value: $resultset (scalar context), @row_objs (list context)
668
669 =back
670
671 Returns a resultset or object list representing a subset of elements from the
672 resultset slice is called on. Indexes are from 0, i.e., to get the first
673 three records, call:
674
675   my ($one, $two, $three) = $rs->slice(0, 2);
676
677 =cut
678
679 sub slice {
680   my ($self, $min, $max) = @_;
681   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
682   $attrs->{offset} = $self->{attrs}{offset} || 0;
683   $attrs->{offset} += $min;
684   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
685   return $self->search(undef(), $attrs);
686   #my $slice = (ref $self)->new($self->result_source, $attrs);
687   #return (wantarray ? $slice->all : $slice);
688 }
689
690 =head2 next
691
692 =over 4
693
694 =item Arguments: none
695
696 =item Return Value: $result?
697
698 =back
699
700 Returns the next element in the resultset (C<undef> is there is none).
701
702 Can be used to efficiently iterate over records in the resultset:
703
704   my $rs = $schema->resultset('CD')->search;
705   while (my $cd = $rs->next) {
706     print $cd->title;
707   }
708
709 Note that you need to store the resultset object, and call C<next> on it.
710 Calling C<< resultset('Table')->next >> repeatedly will always return the
711 first record from the resultset.
712
713 =cut
714
715 sub next {
716   my ($self) = @_;
717   if (my $cache = $self->get_cache) {
718     $self->{all_cache_position} ||= 0;
719     return $cache->[$self->{all_cache_position}++];
720   }
721   if ($self->{attrs}{cache}) {
722     $self->{all_cache_position} = 1;
723     return ($self->all)[0];
724   }
725   if ($self->{stashed_objects}) {
726     my $obj = shift(@{$self->{stashed_objects}});
727     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
728     return $obj;
729   }
730   my @row = (
731     exists $self->{stashed_row}
732       ? @{delete $self->{stashed_row}}
733       : $self->cursor->next
734   );
735   return unless (@row);
736   my ($row, @more) = $self->_construct_object(@row);
737   $self->{stashed_objects} = \@more if @more;
738   return $row;
739 }
740
741 sub _construct_object {
742   my ($self, @row) = @_;
743   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
744   my @new = $self->result_class->inflate_result($self->_source_handle, @$info);
745   @new = $self->{_attrs}{record_filter}->(@new)
746     if exists $self->{_attrs}{record_filter};
747   return @new;
748 }
749
750 sub _collapse_result {
751   my ($self, $as, $row, $prefix) = @_;
752
753   my %const;
754   my @copy = @$row;
755   
756   foreach my $this_as (@$as) {
757     my $val = shift @copy;
758     if (defined $prefix) {
759       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
760         my $remain = $1;
761         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
762         $const{$1||''}{$2} = $val;
763       }
764     } else {
765       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
766       $const{$1||''}{$2} = $val;
767     }
768   }
769
770   my $alias = $self->{attrs}{alias};
771   my $info = [ {}, {} ];
772   foreach my $key (keys %const) {
773     if (length $key && $key ne $alias) {
774       my $target = $info;
775       my @parts = split(/\./, $key);
776       foreach my $p (@parts) {
777         $target = $target->[1]->{$p} ||= [];
778       }
779       $target->[0] = $const{$key};
780     } else {
781       $info->[0] = $const{$key};
782     }
783   }
784   
785   my @collapse;
786   if (defined $prefix) {
787     @collapse = map {
788         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
789     } keys %{$self->{_attrs}{collapse}}
790   } else {
791     @collapse = keys %{$self->{_attrs}{collapse}};
792   };
793
794   if (@collapse) {
795     my ($c) = sort { length $a <=> length $b } @collapse;
796     my $target = $info;
797     foreach my $p (split(/\./, $c)) {
798       $target = $target->[1]->{$p} ||= [];
799     }
800     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
801     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
802     my $tree = $self->_collapse_result($as, $row, $c_prefix);
803     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
804     my (@final, @raw);
805
806     while (
807       !(
808         grep {
809           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
810         } @co_key
811         )
812     ) {
813       push(@final, $tree);
814       last unless (@raw = $self->cursor->next);
815       $row = $self->{stashed_row} = \@raw;
816       $tree = $self->_collapse_result($as, $row, $c_prefix);
817     }
818     @$target = (@final ? @final : [ {}, {} ]);
819       # single empty result to indicate an empty prefetched has_many
820   }
821
822   #print "final info: " . Dumper($info);
823   return $info;
824 }
825
826 =head2 result_source
827
828 =over 4
829
830 =item Arguments: $result_source?
831
832 =item Return Value: $result_source
833
834 =back
835
836 An accessor for the primary ResultSource object from which this ResultSet
837 is derived.
838
839 =head2 result_class
840
841 =over 4
842
843 =item Arguments: $result_class?
844
845 =item Return Value: $result_class
846
847 =back
848
849 An accessor for the class to use when creating row objects. Defaults to 
850 C<< result_source->result_class >> - which in most cases is the name of the 
851 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
852
853 =cut
854
855
856 =head2 count
857
858 =over 4
859
860 =item Arguments: $cond, \%attrs??
861
862 =item Return Value: $count
863
864 =back
865
866 Performs an SQL C<COUNT> with the same query as the resultset was built
867 with to find the number of elements. If passed arguments, does a search
868 on the resultset and counts the results of that.
869
870 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
871 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
872 not support C<DISTINCT> with multiple columns. If you are using such a
873 database, you should only use columns from the main table in your C<group_by>
874 clause.
875
876 =cut
877
878 sub count {
879   my $self = shift;
880   return $self->search(@_)->count if @_ and defined $_[0];
881   return scalar @{ $self->get_cache } if $self->get_cache;
882   my $count = $self->_count;
883   return 0 unless $count;
884
885   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
886   $count = $self->{attrs}{rows} if
887     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
888   return $count;
889 }
890
891 sub _count { # Separated out so pager can get the full count
892   my $self = shift;
893   my $select = { count => '*' };
894
895   my $attrs = { %{$self->_resolved_attrs} };
896   if (my $group_by = delete $attrs->{group_by}) {
897     delete $attrs->{having};
898     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
899     # todo: try CONCAT for multi-column pk
900     my @pk = $self->result_source->primary_columns;
901     if (@pk == 1) {
902       my $alias = $attrs->{alias};
903       foreach my $column (@distinct) {
904         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
905           @distinct = ($column);
906           last;
907         }
908       }
909     }
910
911     $select = { count => { distinct => \@distinct } };
912   }
913
914   $attrs->{select} = $select;
915   $attrs->{as} = [qw/count/];
916
917   # offset, order by and page are not needed to count. record_filter is cdbi
918   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
919
920   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
921   my ($count) = $tmp_rs->cursor->next;
922   return $count;
923 }
924
925 =head2 count_literal
926
927 =over 4
928
929 =item Arguments: $sql_fragment, @bind_values
930
931 =item Return Value: $count
932
933 =back
934
935 Counts the results in a literal query. Equivalent to calling L</search_literal>
936 with the passed arguments, then L</count>.
937
938 =cut
939
940 sub count_literal { shift->search_literal(@_)->count; }
941
942 =head2 all
943
944 =over 4
945
946 =item Arguments: none
947
948 =item Return Value: @objects
949
950 =back
951
952 Returns all elements in the resultset. Called implicitly if the resultset
953 is returned in list context.
954
955 =cut
956
957 sub all {
958   my ($self) = @_;
959   return @{ $self->get_cache } if $self->get_cache;
960
961   my @obj;
962
963   # TODO: don't call resolve here
964   if (keys %{$self->_resolved_attrs->{collapse}}) {
965 #  if ($self->{attrs}{prefetch}) {
966       # Using $self->cursor->all is really just an optimisation.
967       # If we're collapsing has_many prefetches it probably makes
968       # very little difference, and this is cleaner than hacking
969       # _construct_object to survive the approach
970     my @row = $self->cursor->next;
971     while (@row) {
972       push(@obj, $self->_construct_object(@row));
973       @row = (exists $self->{stashed_row}
974                ? @{delete $self->{stashed_row}}
975                : $self->cursor->next);
976     }
977   } else {
978     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
979   }
980
981   $self->set_cache(\@obj) if $self->{attrs}{cache};
982   return @obj;
983 }
984
985 =head2 reset
986
987 =over 4
988
989 =item Arguments: none
990
991 =item Return Value: $self
992
993 =back
994
995 Resets the resultset's cursor, so you can iterate through the elements again.
996
997 =cut
998
999 sub reset {
1000   my ($self) = @_;
1001   delete $self->{_attrs} if exists $self->{_attrs};
1002   $self->{all_cache_position} = 0;
1003   $self->cursor->reset;
1004   return $self;
1005 }
1006
1007 =head2 first
1008
1009 =over 4
1010
1011 =item Arguments: none
1012
1013 =item Return Value: $object?
1014
1015 =back
1016
1017 Resets the resultset and returns an object for the first result (if the
1018 resultset returns anything).
1019
1020 =cut
1021
1022 sub first {
1023   return $_[0]->reset->next;
1024 }
1025
1026 # _cond_for_update_delete
1027 #
1028 # update/delete require the condition to be modified to handle
1029 # the differing SQL syntax available.  This transforms the $self->{cond}
1030 # appropriately, returning the new condition.
1031
1032 sub _cond_for_update_delete {
1033   my ($self, $full_cond) = @_;
1034   my $cond = {};
1035
1036   $full_cond ||= $self->{cond};
1037   # No-op. No condition, we're updating/deleting everything
1038   return $cond unless ref $full_cond;
1039
1040   if (ref $full_cond eq 'ARRAY') {
1041     $cond = [
1042       map {
1043         my %hash;
1044         foreach my $key (keys %{$_}) {
1045           $key =~ /([^.]+)$/;
1046           $hash{$1} = $_->{$key};
1047         }
1048         \%hash;
1049       } @{$full_cond}
1050     ];
1051   }
1052   elsif (ref $full_cond eq 'HASH') {
1053     if ((keys %{$full_cond})[0] eq '-and') {
1054       $cond->{-and} = [];
1055
1056       my @cond = @{$full_cond->{-and}};
1057       for (my $i = 0; $i < @cond; $i++) {
1058         my $entry = $cond[$i];
1059
1060         my $hash;
1061         if (ref $entry eq 'HASH') {
1062           $hash = $self->_cond_for_update_delete($entry);
1063         }
1064         else {
1065           $entry =~ /([^.]+)$/;
1066           $hash->{$1} = $cond[++$i];
1067         }
1068
1069         push @{$cond->{-and}}, $hash;
1070       }
1071     }
1072     else {
1073       foreach my $key (keys %{$full_cond}) {
1074         $key =~ /([^.]+)$/;
1075         $cond->{$1} = $full_cond->{$key};
1076       }
1077     }
1078   }
1079   else {
1080     $self->throw_exception(
1081       "Can't update/delete on resultset with condition unless hash or array"
1082     );
1083   }
1084
1085   return $cond;
1086 }
1087
1088
1089 =head2 update
1090
1091 =over 4
1092
1093 =item Arguments: \%values
1094
1095 =item Return Value: $storage_rv
1096
1097 =back
1098
1099 Sets the specified columns in the resultset to the supplied values in a
1100 single query. Return value will be true if the update succeeded or false
1101 if no records were updated; exact type of success value is storage-dependent.
1102
1103 =cut
1104
1105 sub update {
1106   my ($self, $values) = @_;
1107   $self->throw_exception("Values for update must be a hash")
1108     unless ref $values eq 'HASH';
1109
1110   my $cond = $self->_cond_for_update_delete;
1111
1112   return $self->result_source->storage->update(
1113     $self->result_source->from, $values, $cond
1114   );
1115 }
1116
1117 =head2 update_all
1118
1119 =over 4
1120
1121 =item Arguments: \%values
1122
1123 =item Return Value: 1
1124
1125 =back
1126
1127 Fetches all objects and updates them one at a time. Note that C<update_all>
1128 will run DBIC cascade triggers, while L</update> will not.
1129
1130 =cut
1131
1132 sub update_all {
1133   my ($self, $values) = @_;
1134   $self->throw_exception("Values for update must be a hash")
1135     unless ref $values eq 'HASH';
1136   foreach my $obj ($self->all) {
1137     $obj->set_columns($values)->update;
1138   }
1139   return 1;
1140 }
1141
1142 =head2 delete
1143
1144 =over 4
1145
1146 =item Arguments: none
1147
1148 =item Return Value: 1
1149
1150 =back
1151
1152 Deletes the contents of the resultset from its result source. Note that this
1153 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1154 to run. See also L<DBIx::Class::Row/delete>.
1155
1156 =cut
1157
1158 sub delete {
1159   my ($self) = @_;
1160
1161   my $cond = $self->_cond_for_update_delete;
1162
1163   $self->result_source->storage->delete($self->result_source->from, $cond);
1164   return 1;
1165 }
1166
1167 =head2 delete_all
1168
1169 =over 4
1170
1171 =item Arguments: none
1172
1173 =item Return Value: 1
1174
1175 =back
1176
1177 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1178 will run DBIC cascade triggers, while L</delete> will not.
1179
1180 =cut
1181
1182 sub delete_all {
1183   my ($self) = @_;
1184   $_->delete for $self->all;
1185   return 1;
1186 }
1187
1188 =head2 pager
1189
1190 =over 4
1191
1192 =item Arguments: none
1193
1194 =item Return Value: $pager
1195
1196 =back
1197
1198 Return Value a L<Data::Page> object for the current resultset. Only makes
1199 sense for queries with a C<page> attribute.
1200
1201 =cut
1202
1203 sub pager {
1204   my ($self) = @_;
1205   my $attrs = $self->{attrs};
1206   $self->throw_exception("Can't create pager for non-paged rs")
1207     unless $self->{attrs}{page};
1208   $attrs->{rows} ||= 10;
1209   return $self->{pager} ||= Data::Page->new(
1210     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1211 }
1212
1213 =head2 page
1214
1215 =over 4
1216
1217 =item Arguments: $page_number
1218
1219 =item Return Value: $rs
1220
1221 =back
1222
1223 Returns a resultset for the $page_number page of the resultset on which page
1224 is called, where each page contains a number of rows equal to the 'rows'
1225 attribute set on the resultset (10 by default).
1226
1227 =cut
1228
1229 sub page {
1230   my ($self, $page) = @_;
1231   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1232 }
1233
1234 =head2 new_result
1235
1236 =over 4
1237
1238 =item Arguments: \%vals
1239
1240 =item Return Value: $object
1241
1242 =back
1243
1244 Creates an object in the resultset's result class and returns it.
1245
1246 =cut
1247
1248 sub new_result {
1249   my ($self, $values) = @_;
1250   $self->throw_exception( "new_result needs a hash" )
1251     unless (ref $values eq 'HASH');
1252   $self->throw_exception(
1253     "Can't abstract implicit construct, condition not a hash"
1254   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1255
1256   my $alias = $self->{attrs}{alias};
1257   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1258   my %new = (
1259     %{ $self->_remove_alias($values, $alias) },
1260     %{ $self->_remove_alias($collapsed_cond, $alias) },
1261   );
1262
1263   return $self->result_class->new(\%new,$self->_source_handle);
1264 }
1265
1266 # _collapse_cond
1267 #
1268 # Recursively collapse the condition.
1269
1270 sub _collapse_cond {
1271   my ($self, $cond, $collapsed) = @_;
1272
1273   $collapsed ||= {};
1274
1275   if (ref $cond eq 'ARRAY') {
1276     foreach my $subcond (@$cond) {
1277       next unless ref $subcond;  # -or
1278 #      warn "ARRAY: " . Dumper $subcond;
1279       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1280     }
1281   }
1282   elsif (ref $cond eq 'HASH') {
1283     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1284       foreach my $subcond (@{$cond->{-and}}) {
1285 #        warn "HASH: " . Dumper $subcond;
1286         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1287       }
1288     }
1289     else {
1290 #      warn "LEAF: " . Dumper $cond;
1291       foreach my $col (keys %$cond) {
1292         my $value = $cond->{$col};
1293         $collapsed->{$col} = $value;
1294       }
1295     }
1296   }
1297
1298   return $collapsed;
1299 }
1300
1301 # _remove_alias
1302 #
1303 # Remove the specified alias from the specified query hash. A copy is made so
1304 # the original query is not modified.
1305
1306 sub _remove_alias {
1307   my ($self, $query, $alias) = @_;
1308
1309   my %orig = %{ $query || {} };
1310   my %unaliased;
1311
1312   foreach my $key (keys %orig) {
1313     if ($key !~ /\./) {
1314       $unaliased{$key} = $orig{$key};
1315       next;
1316     }
1317     $unaliased{$1} = $orig{$key}
1318       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1319   }
1320
1321   return \%unaliased;
1322 }
1323
1324 =head2 find_or_new
1325
1326 =over 4
1327
1328 =item Arguments: \%vals, \%attrs?
1329
1330 =item Return Value: $object
1331
1332 =back
1333
1334 Find an existing record from this resultset. If none exists, instantiate a new
1335 result object and return it. The object will not be saved into your storage
1336 until you call L<DBIx::Class::Row/insert> on it.
1337
1338 If you want objects to be saved immediately, use L</find_or_create> instead.
1339
1340 =cut
1341
1342 sub find_or_new {
1343   my $self     = shift;
1344   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1345   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1346   my $exists   = $self->find($hash, $attrs);
1347   return defined $exists ? $exists : $self->new_result($hash);
1348 }
1349
1350 =head2 create
1351
1352 =over 4
1353
1354 =item Arguments: \%vals
1355
1356 =item Return Value: $object
1357
1358 =back
1359
1360 Inserts a record into the resultset and returns the object representing it.
1361
1362 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1363
1364 =cut
1365
1366 sub create {
1367   my ($self, $attrs) = @_;
1368   $self->throw_exception( "create needs a hashref" )
1369     unless ref $attrs eq 'HASH';
1370   return $self->new_result($attrs)->insert;
1371 }
1372
1373 =head2 find_or_create
1374
1375 =over 4
1376
1377 =item Arguments: \%vals, \%attrs?
1378
1379 =item Return Value: $object
1380
1381 =back
1382
1383   $class->find_or_create({ key => $val, ... });
1384
1385 Tries to find a record based on its primary key or unique constraint; if none
1386 is found, creates one and returns that instead.
1387
1388   my $cd = $schema->resultset('CD')->find_or_create({
1389     cdid   => 5,
1390     artist => 'Massive Attack',
1391     title  => 'Mezzanine',
1392     year   => 2005,
1393   });
1394
1395 Also takes an optional C<key> attribute, to search by a specific key or unique
1396 constraint. For example:
1397
1398   my $cd = $schema->resultset('CD')->find_or_create(
1399     {
1400       artist => 'Massive Attack',
1401       title  => 'Mezzanine',
1402     },
1403     { key => 'cd_artist_title' }
1404   );
1405
1406 See also L</find> and L</update_or_create>. For information on how to declare
1407 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1408
1409 =cut
1410
1411 sub find_or_create {
1412   my $self     = shift;
1413   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1414   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1415   my $exists   = $self->find($hash, $attrs);
1416   return defined $exists ? $exists : $self->create($hash);
1417 }
1418
1419 =head2 update_or_create
1420
1421 =over 4
1422
1423 =item Arguments: \%col_values, { key => $unique_constraint }?
1424
1425 =item Return Value: $object
1426
1427 =back
1428
1429   $class->update_or_create({ col => $val, ... });
1430
1431 First, searches for an existing row matching one of the unique constraints
1432 (including the primary key) on the source of this resultset. If a row is
1433 found, updates it with the other given column values. Otherwise, creates a new
1434 row.
1435
1436 Takes an optional C<key> attribute to search on a specific unique constraint.
1437 For example:
1438
1439   # In your application
1440   my $cd = $schema->resultset('CD')->update_or_create(
1441     {
1442       artist => 'Massive Attack',
1443       title  => 'Mezzanine',
1444       year   => 1998,
1445     },
1446     { key => 'cd_artist_title' }
1447   );
1448
1449 If no C<key> is specified, it searches on all unique constraints defined on the
1450 source, including the primary key.
1451
1452 If the C<key> is specified as C<primary>, it searches only on the primary key.
1453
1454 See also L</find> and L</find_or_create>. For information on how to declare
1455 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1456
1457 =cut
1458
1459 sub update_or_create {
1460   my $self = shift;
1461   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1462   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1463
1464   my $row = $self->find($cond, $attrs);
1465   if (defined $row) {
1466     $row->update($cond);
1467     return $row;
1468   }
1469
1470   return $self->create($cond);
1471 }
1472
1473 =head2 get_cache
1474
1475 =over 4
1476
1477 =item Arguments: none
1478
1479 =item Return Value: \@cache_objects?
1480
1481 =back
1482
1483 Gets the contents of the cache for the resultset, if the cache is set.
1484
1485 =cut
1486
1487 sub get_cache {
1488   shift->{all_cache};
1489 }
1490
1491 =head2 set_cache
1492
1493 =over 4
1494
1495 =item Arguments: \@cache_objects
1496
1497 =item Return Value: \@cache_objects
1498
1499 =back
1500
1501 Sets the contents of the cache for the resultset. Expects an arrayref
1502 of objects of the same class as those produced by the resultset. Note that
1503 if the cache is set the resultset will return the cached objects rather
1504 than re-querying the database even if the cache attr is not set.
1505
1506 =cut
1507
1508 sub set_cache {
1509   my ( $self, $data ) = @_;
1510   $self->throw_exception("set_cache requires an arrayref")
1511       if defined($data) && (ref $data ne 'ARRAY');
1512   $self->{all_cache} = $data;
1513 }
1514
1515 =head2 clear_cache
1516
1517 =over 4
1518
1519 =item Arguments: none
1520
1521 =item Return Value: []
1522
1523 =back
1524
1525 Clears the cache for the resultset.
1526
1527 =cut
1528
1529 sub clear_cache {
1530   shift->set_cache(undef);
1531 }
1532
1533 =head2 related_resultset
1534
1535 =over 4
1536
1537 =item Arguments: $relationship_name
1538
1539 =item Return Value: $resultset
1540
1541 =back
1542
1543 Returns a related resultset for the supplied relationship name.
1544
1545   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1546
1547 =cut
1548
1549 sub related_resultset {
1550   my ($self, $rel) = @_;
1551
1552   $self->{related_resultsets} ||= {};
1553   return $self->{related_resultsets}{$rel} ||= do {
1554     my $rel_obj = $self->result_source->relationship_info($rel);
1555
1556     $self->throw_exception(
1557       "search_related: result source '" . $self->_source_handle->source_moniker .
1558         "' has no such relationship $rel")
1559       unless $rel_obj;
1560     
1561     my ($from,$seen) = $self->_resolve_from($rel);
1562
1563     my $join_count = $seen->{$rel};
1564     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1565
1566     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1567       undef, {
1568         %{$self->{attrs}||{}},
1569         join => undef,
1570         prefetch => undef,
1571         select => undef,
1572         as => undef,
1573         alias => $alias,
1574         where => $self->{cond},
1575         seen_join => $seen,
1576         from => $from,
1577     });
1578   };
1579 }
1580
1581 sub _resolve_from {
1582   my ($self, $extra_join) = @_;
1583   my $source = $self->result_source;
1584   my $attrs = $self->{attrs};
1585   
1586   my $from = $attrs->{from}
1587     || [ { $attrs->{alias} => $source->from } ];
1588     
1589   my $seen = { %{$attrs->{seen_join}||{}} };
1590
1591   my $join = ($attrs->{join}
1592                ? [ $attrs->{join}, $extra_join ]
1593                : $extra_join);
1594   $from = [
1595     @$from,
1596     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1597   ];
1598
1599   return ($from,$seen);
1600 }
1601
1602 sub _resolved_attrs {
1603   my $self = shift;
1604   return $self->{_attrs} if $self->{_attrs};
1605
1606   my $attrs = { %{$self->{attrs}||{}} };
1607   my $source = $self->result_source;
1608   my $alias = $attrs->{alias};
1609
1610   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1611   if ($attrs->{columns}) {
1612     delete $attrs->{as};
1613   } elsif (!$attrs->{select}) {
1614     $attrs->{columns} = [ $source->columns ];
1615   }
1616  
1617   $attrs->{select} = 
1618     ($attrs->{select}
1619       ? (ref $attrs->{select} eq 'ARRAY'
1620           ? [ @{$attrs->{select}} ]
1621           : [ $attrs->{select} ])
1622       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1623     );
1624   $attrs->{as} =
1625     ($attrs->{as}
1626       ? (ref $attrs->{as} eq 'ARRAY'
1627           ? [ @{$attrs->{as}} ]
1628           : [ $attrs->{as} ])
1629       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1630     );
1631   
1632   my $adds;
1633   if ($adds = delete $attrs->{include_columns}) {
1634     $adds = [$adds] unless ref $adds eq 'ARRAY';
1635     push(@{$attrs->{select}}, @$adds);
1636     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1637   }
1638   if ($adds = delete $attrs->{'+select'}) {
1639     $adds = [$adds] unless ref $adds eq 'ARRAY';
1640     push(@{$attrs->{select}},
1641            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1642   }
1643   if (my $adds = delete $attrs->{'+as'}) {
1644     $adds = [$adds] unless ref $adds eq 'ARRAY';
1645     push(@{$attrs->{as}}, @$adds);
1646   }
1647
1648   $attrs->{from} ||= [ { 'me' => $source->from } ];
1649
1650   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1651     my $join = delete $attrs->{join} || {};
1652
1653     if (defined $attrs->{prefetch}) {
1654       $join = $self->_merge_attr(
1655         $join, $attrs->{prefetch}
1656       );
1657     }
1658
1659     $attrs->{from} =   # have to copy here to avoid corrupting the original
1660       [
1661         @{$attrs->{from}}, 
1662         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1663       ];
1664   }
1665
1666   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1667   if ($attrs->{order_by}) {
1668     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1669                            ? [ @{$attrs->{order_by}} ]
1670                            : [ $attrs->{order_by} ]);
1671   } else {
1672     $attrs->{order_by} = [];    
1673   }
1674
1675   my $collapse = $attrs->{collapse} || {};
1676   if (my $prefetch = delete $attrs->{prefetch}) {
1677     $prefetch = $self->_merge_attr({}, $prefetch);
1678     my @pre_order;
1679     my $seen = $attrs->{seen_join} || {};
1680     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1681       # bring joins back to level of current class
1682       my @prefetch = $source->resolve_prefetch(
1683         $p, $alias, $seen, \@pre_order, $collapse
1684       );
1685       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1686       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1687     }
1688     push(@{$attrs->{order_by}}, @pre_order);
1689   }
1690   $attrs->{collapse} = $collapse;
1691
1692   return $self->{_attrs} = $attrs;
1693 }
1694
1695 sub _merge_attr {
1696   my ($self, $a, $b) = @_;
1697   return $b unless defined($a);
1698   return $a unless defined($b);
1699   
1700   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1701     foreach my $key (keys %{$b}) {
1702       if (exists $a->{$key}) {
1703         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1704       } else {
1705         $a->{$key} = $b->{$key};
1706       }
1707     }
1708     return $a;
1709   } else {
1710     $a = [$a] unless ref $a eq 'ARRAY';
1711     $b = [$b] unless ref $b eq 'ARRAY';
1712
1713     my $hash = {};
1714     my @array;
1715     foreach my $x ($a, $b) {
1716       foreach my $element (@{$x}) {
1717         if (ref $element eq 'HASH') {
1718           $hash = $self->_merge_attr($hash, $element);
1719         } elsif (ref $element eq 'ARRAY') {
1720           push(@array, @{$element});
1721         } else {
1722           push(@array, $element) unless $b == $x
1723             && grep { $_ eq $element } @array;
1724         }
1725       }
1726     }
1727     
1728     @array = grep { !exists $hash->{$_} } @array;
1729
1730     return keys %{$hash}
1731       ? ( scalar(@array)
1732             ? [$hash, @array]
1733             : $hash
1734         )
1735       : \@array;
1736   }
1737 }
1738
1739 sub result_source {
1740     my $self = shift;
1741
1742     if (@_) {
1743         $self->_source_handle($_[0]->handle);
1744     } else {
1745         $self->_source_handle->resolve;
1746     }
1747 }
1748
1749 =head2 throw_exception
1750
1751 See L<DBIx::Class::Schema/throw_exception> for details.
1752
1753 =cut
1754
1755 sub throw_exception {
1756   my $self=shift;
1757   $self->_source_handle->schema->throw_exception(@_);
1758 }
1759
1760 # XXX: FIXME: Attributes docs need clearing up
1761
1762 =head1 ATTRIBUTES
1763
1764 The resultset takes various attributes that modify its behavior. Here's an
1765 overview of them:
1766
1767 =head2 order_by
1768
1769 =over 4
1770
1771 =item Value: ($order_by | \@order_by)
1772
1773 =back
1774
1775 Which column(s) to order the results by. This is currently passed
1776 through directly to SQL, so you can give e.g. C<year DESC> for a
1777 descending order on the column `year'.
1778
1779 Please note that if you have C<quote_char> enabled (see
1780 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1781 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1782 so you will need to manually quote things as appropriate.)
1783
1784 =head2 columns
1785
1786 =over 4
1787
1788 =item Value: \@columns
1789
1790 =back
1791
1792 Shortcut to request a particular set of columns to be retrieved.  Adds
1793 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1794 from that, then auto-populates C<as> from C<select> as normal. (You may also
1795 use the C<cols> attribute, as in earlier versions of DBIC.)
1796
1797 =head2 include_columns
1798
1799 =over 4
1800
1801 =item Value: \@columns
1802
1803 =back
1804
1805 Shortcut to include additional columns in the returned results - for example
1806
1807   $schema->resultset('CD')->search(undef, {
1808     include_columns => ['artist.name'],
1809     join => ['artist']
1810   });
1811
1812 would return all CDs and include a 'name' column to the information
1813 passed to object inflation
1814
1815 =head2 select
1816
1817 =over 4
1818
1819 =item Value: \@select_columns
1820
1821 =back
1822
1823 Indicates which columns should be selected from the storage. You can use
1824 column names, or in the case of RDBMS back ends, function or stored procedure
1825 names:
1826
1827   $rs = $schema->resultset('Employee')->search(undef, {
1828     select => [
1829       'name',
1830       { count => 'employeeid' },
1831       { sum => 'salary' }
1832     ]
1833   });
1834
1835 When you use function/stored procedure names and do not supply an C<as>
1836 attribute, the column names returned are storage-dependent. E.g. MySQL would
1837 return a column named C<count(employeeid)> in the above example.
1838
1839 =head2 +select
1840
1841 =over 4
1842
1843 Indicates additional columns to be selected from storage.  Works the same as
1844 L<select> but adds columns to the selection.
1845
1846 =back
1847
1848 =head2 +as
1849
1850 =over 4
1851
1852 Indicates additional column names for those added via L<+select>.
1853
1854 =back
1855
1856 =head2 as
1857
1858 =over 4
1859
1860 =item Value: \@inflation_names
1861
1862 =back
1863
1864 Indicates column names for object inflation. This is used in conjunction with
1865 C<select>, usually when C<select> contains one or more function or stored
1866 procedure names:
1867
1868   $rs = $schema->resultset('Employee')->search(undef, {
1869     select => [
1870       'name',
1871       { count => 'employeeid' }
1872     ],
1873     as => ['name', 'employee_count'],
1874   });
1875
1876   my $employee = $rs->first(); # get the first Employee
1877
1878 If the object against which the search is performed already has an accessor
1879 matching a column name specified in C<as>, the value can be retrieved using
1880 the accessor as normal:
1881
1882   my $name = $employee->name();
1883
1884 If on the other hand an accessor does not exist in the object, you need to
1885 use C<get_column> instead:
1886
1887   my $employee_count = $employee->get_column('employee_count');
1888
1889 You can create your own accessors if required - see
1890 L<DBIx::Class::Manual::Cookbook> for details.
1891
1892 Please note: This will NOT insert an C<AS employee_count> into the SQL
1893 statement produced, it is used for internal access only. Thus
1894 attempting to use the accessor in an C<order_by> clause or similar
1895 will fail miserably.
1896
1897 To get around this limitation, you can supply literal SQL to your
1898 C<select> attibute that contains the C<AS alias> text, eg:
1899
1900   select => [\'myfield AS alias']
1901
1902 =head2 join
1903
1904 =over 4
1905
1906 =item Value: ($rel_name | \@rel_names | \%rel_names)
1907
1908 =back
1909
1910 Contains a list of relationships that should be joined for this query.  For
1911 example:
1912
1913   # Get CDs by Nine Inch Nails
1914   my $rs = $schema->resultset('CD')->search(
1915     { 'artist.name' => 'Nine Inch Nails' },
1916     { join => 'artist' }
1917   );
1918
1919 Can also contain a hash reference to refer to the other relation's relations.
1920 For example:
1921
1922   package MyApp::Schema::Track;
1923   use base qw/DBIx::Class/;
1924   __PACKAGE__->table('track');
1925   __PACKAGE__->add_columns(qw/trackid cd position title/);
1926   __PACKAGE__->set_primary_key('trackid');
1927   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1928   1;
1929
1930   # In your application
1931   my $rs = $schema->resultset('Artist')->search(
1932     { 'track.title' => 'Teardrop' },
1933     {
1934       join     => { cd => 'track' },
1935       order_by => 'artist.name',
1936     }
1937   );
1938
1939 You need to use the relationship (not the table) name in  conditions, 
1940 because they are aliased as such. The current table is aliased as "me", so 
1941 you need to use me.column_name in order to avoid ambiguity. For example:
1942
1943   # Get CDs from 1984 with a 'Foo' track 
1944   my $rs = $schema->resultset('CD')->search(
1945     { 
1946       'me.year' => 1984,
1947       'tracks.name' => 'Foo'
1948     },
1949     { join => 'tracks' }
1950   );
1951   
1952 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1953 similarly for a third time). For e.g.
1954
1955   my $rs = $schema->resultset('Artist')->search({
1956     'cds.title'   => 'Down to Earth',
1957     'cds_2.title' => 'Popular',
1958   }, {
1959     join => [ qw/cds cds/ ],
1960   });
1961
1962 will return a set of all artists that have both a cd with title 'Down
1963 to Earth' and a cd with title 'Popular'.
1964
1965 If you want to fetch related objects from other tables as well, see C<prefetch>
1966 below.
1967
1968 =head2 prefetch
1969
1970 =over 4
1971
1972 =item Value: ($rel_name | \@rel_names | \%rel_names)
1973
1974 =back
1975
1976 Contains one or more relationships that should be fetched along with the main
1977 query (when they are accessed afterwards they will have already been
1978 "prefetched").  This is useful for when you know you will need the related
1979 objects, because it saves at least one query:
1980
1981   my $rs = $schema->resultset('Tag')->search(
1982     undef,
1983     {
1984       prefetch => {
1985         cd => 'artist'
1986       }
1987     }
1988   );
1989
1990 The initial search results in SQL like the following:
1991
1992   SELECT tag.*, cd.*, artist.* FROM tag
1993   JOIN cd ON tag.cd = cd.cdid
1994   JOIN artist ON cd.artist = artist.artistid
1995
1996 L<DBIx::Class> has no need to go back to the database when we access the
1997 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1998 case.
1999
2000 Simple prefetches will be joined automatically, so there is no need
2001 for a C<join> attribute in the above search. If you're prefetching to
2002 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2003 specify the join as well.
2004
2005 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2006 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2007 with an accessor type of 'single' or 'filter').
2008
2009 =head2 page
2010
2011 =over 4
2012
2013 =item Value: $page
2014
2015 =back
2016
2017 Makes the resultset paged and specifies the page to retrieve. Effectively
2018 identical to creating a non-pages resultset and then calling ->page($page)
2019 on it.
2020
2021 If L<rows> attribute is not specified it defualts to 10 rows per page.
2022
2023 =head2 rows
2024
2025 =over 4
2026
2027 =item Value: $rows
2028
2029 =back
2030
2031 Specifes the maximum number of rows for direct retrieval or the number of
2032 rows per page if the page attribute or method is used.
2033
2034 =head2 offset
2035
2036 =over 4
2037
2038 =item Value: $offset
2039
2040 =back
2041
2042 Specifies the (zero-based) row number for the  first row to be returned, or the
2043 of the first row of the first page if paging is used.
2044
2045 =head2 group_by
2046
2047 =over 4
2048
2049 =item Value: \@columns
2050
2051 =back
2052
2053 A arrayref of columns to group by. Can include columns of joined tables.
2054
2055   group_by => [qw/ column1 column2 ... /]
2056
2057 =head2 having
2058
2059 =over 4
2060
2061 =item Value: $condition
2062
2063 =back
2064
2065 HAVING is a select statement attribute that is applied between GROUP BY and
2066 ORDER BY. It is applied to the after the grouping calculations have been
2067 done.
2068
2069   having => { 'count(employee)' => { '>=', 100 } }
2070
2071 =head2 distinct
2072
2073 =over 4
2074
2075 =item Value: (0 | 1)
2076
2077 =back
2078
2079 Set to 1 to group by all columns.
2080
2081 =head2 where
2082
2083 =over 4
2084
2085 Adds to the WHERE clause.
2086
2087   # only return rows WHERE deleted IS NULL for all searches
2088   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2089
2090 Can be overridden by passing C<{ where => undef }> as an attribute
2091 to a resulset.
2092
2093 =back
2094
2095 =head2 cache
2096
2097 Set to 1 to cache search results. This prevents extra SQL queries if you
2098 revisit rows in your ResultSet:
2099
2100   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2101
2102   while( my $artist = $resultset->next ) {
2103     ... do stuff ...
2104   }
2105
2106   $rs->first; # without cache, this would issue a query
2107
2108 By default, searches are not cached.
2109
2110 For more examples of using these attributes, see
2111 L<DBIx::Class::Manual::Cookbook>.
2112
2113 =head2 from
2114
2115 =over 4
2116
2117 =item Value: \@from_clause
2118
2119 =back
2120
2121 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2122 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2123 clauses.
2124
2125 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2126
2127 C<join> will usually do what you need and it is strongly recommended that you
2128 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2129 And we really do mean "cannot", not just tried and failed. Attempting to use
2130 this because you're having problems with C<join> is like trying to use x86
2131 ASM because you've got a syntax error in your C. Trust us on this.
2132
2133 Now, if you're still really, really sure you need to use this (and if you're
2134 not 100% sure, ask the mailing list first), here's an explanation of how this
2135 works.
2136
2137 The syntax is as follows -
2138
2139   [
2140     { <alias1> => <table1> },
2141     [
2142       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2143       [], # nested JOIN (optional)
2144       { <table1.column1> => <table2.column2>, ... (more conditions) },
2145     ],
2146     # More of the above [ ] may follow for additional joins
2147   ]
2148
2149   <table1> <alias1>
2150   JOIN
2151     <table2> <alias2>
2152     [JOIN ...]
2153   ON <table1.column1> = <table2.column2>
2154   <more joins may follow>
2155
2156 An easy way to follow the examples below is to remember the following:
2157
2158     Anything inside "[]" is a JOIN
2159     Anything inside "{}" is a condition for the enclosing JOIN
2160
2161 The following examples utilize a "person" table in a family tree application.
2162 In order to express parent->child relationships, this table is self-joined:
2163
2164     # Person->belongs_to('father' => 'Person');
2165     # Person->belongs_to('mother' => 'Person');
2166
2167 C<from> can be used to nest joins. Here we return all children with a father,
2168 then search against all mothers of those children:
2169
2170   $rs = $schema->resultset('Person')->search(
2171       undef,
2172       {
2173           alias => 'mother', # alias columns in accordance with "from"
2174           from => [
2175               { mother => 'person' },
2176               [
2177                   [
2178                       { child => 'person' },
2179                       [
2180                           { father => 'person' },
2181                           { 'father.person_id' => 'child.father_id' }
2182                       ]
2183                   ],
2184                   { 'mother.person_id' => 'child.mother_id' }
2185               ],
2186           ]
2187       },
2188   );
2189
2190   # Equivalent SQL:
2191   # SELECT mother.* FROM person mother
2192   # JOIN (
2193   #   person child
2194   #   JOIN person father
2195   #   ON ( father.person_id = child.father_id )
2196   # )
2197   # ON ( mother.person_id = child.mother_id )
2198
2199 The type of any join can be controlled manually. To search against only people
2200 with a father in the person table, we could explicitly use C<INNER JOIN>:
2201
2202     $rs = $schema->resultset('Person')->search(
2203         undef,
2204         {
2205             alias => 'child', # alias columns in accordance with "from"
2206             from => [
2207                 { child => 'person' },
2208                 [
2209                     { father => 'person', -join_type => 'inner' },
2210                     { 'father.id' => 'child.father_id' }
2211                 ],
2212             ]
2213         },
2214     );
2215
2216     # Equivalent SQL:
2217     # SELECT child.* FROM person child
2218     # INNER JOIN person father ON child.father_id = father.id
2219
2220 =cut
2221
2222 1;