better fix for pk mutation based on mst irc conversation
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89   $attrs = { %{$attrs||{}} };
90
91   if ($attrs->{page}) {
92     $attrs->{rows} ||= 10;
93     $attrs->{offset} ||= 0;
94     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
95   }
96
97   $attrs->{alias} ||= 'me';
98
99   my $self = {
100     result_source => $source,
101     result_class => $attrs->{result_class} || $source->result_class,
102     cond => $attrs->{where},
103     count => undef,
104     pager => undef,
105     attrs => $attrs
106   };
107
108   bless $self, $class;
109
110   return $self;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
138
139 =cut
140
141 sub search {
142   my $self = shift;
143   my $rs = $self->search_rs( @_ );
144   return (wantarray ? $rs->all : $rs);
145 }
146
147 =head2 search_rs
148
149 =over 4
150
151 =item Arguments: $cond, \%attrs?
152
153 =item Return Value: $resultset
154
155 =back
156
157 This method does the same exact thing as search() except it will
158 always return a resultset, even in list context.
159
160 =cut
161
162 sub search_rs {
163   my $self = shift;
164
165   my $rows;
166
167   unless (@_) {                 # no search, effectively just a clone
168     $rows = $self->get_cache;
169   }
170
171   my $attrs = {};
172   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
173   my $our_attrs = { %{$self->{attrs}} };
174   my $having = delete $our_attrs->{having};
175   my $where = delete $our_attrs->{where};
176
177   my $new_attrs = { %{$our_attrs}, %{$attrs} };
178
179   # merge new attrs into inherited
180   foreach my $key (qw/join prefetch/) {
181     next unless exists $attrs->{$key};
182     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
183   }
184
185   my $cond = (@_
186     ? (
187         (@_ == 1 || ref $_[0] eq "HASH")
188           ? (
189               (ref $_[0] eq 'HASH')
190                 ? (
191                     (keys %{ $_[0] }  > 0)
192                       ? shift
193                       : undef
194                    )
195                 :  shift
196              )
197           : (
198               (@_ % 2)
199                 ? $self->throw_exception("Odd number of arguments to search")
200                 : {@_}
201              )
202       )
203     : undef
204   );
205
206   if (defined $where) {
207     $new_attrs->{where} = (
208       defined $new_attrs->{where}
209         ? { '-and' => [
210               map {
211                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
212               } $where, $new_attrs->{where}
213             ]
214           }
215         : $where);
216   }
217
218   if (defined $cond) {
219     $new_attrs->{where} = (
220       defined $new_attrs->{where}
221         ? { '-and' => [
222               map {
223                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
224               } $cond, $new_attrs->{where}
225             ]
226           }
227         : $cond);
228   }
229
230   if (defined $having) {
231     $new_attrs->{having} = (
232       defined $new_attrs->{having}
233         ? { '-and' => [
234               map {
235                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
236               } $having, $new_attrs->{having}
237             ]
238           }
239         : $having);
240   }
241
242   my $rs = (ref $self)->new($self->result_source, $new_attrs);
243   if ($rows) {
244     $rs->set_cache($rows);
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 If your table does not have a primary key, you B<must> provide a value for the
312 C<key> attribute matching one of the unique constraints on the source.
313
314 See also L</find_or_create> and L</update_or_create>. For information on how to
315 declare unique constraints, see
316 L<DBIx::Class::ResultSource/add_unique_constraint>.
317
318 =cut
319
320 sub find {
321   my $self = shift;
322   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
323
324   # Default to the primary key, but allow a specific key
325   my @cols = exists $attrs->{key}
326     ? $self->result_source->unique_constraint_columns($attrs->{key})
327     : $self->result_source->primary_columns;
328   $self->throw_exception(
329     "Can't find unless a primary key is defined or unique constraint is specified"
330   ) unless @cols;
331
332   # Parse out a hashref from input
333   my $input_query;
334   if (ref $_[0] eq 'HASH') {
335     $input_query = { %{$_[0]} };
336   }
337   elsif (@_ == @cols) {
338     $input_query = {};
339     @{$input_query}{@cols} = @_;
340   }
341   else {
342     # Compatibility: Allow e.g. find(id => $value)
343     carp "Find by key => value deprecated; please use a hashref instead";
344     $input_query = {@_};
345   }
346
347   my (%related, $info);
348
349   foreach my $key (keys %$input_query) {
350     if (ref($input_query->{$key})
351         && ($info = $self->result_source->relationship_info($key))) {
352       my $rel_q = $self->result_source->resolve_condition(
353                     $info->{cond}, delete $input_query->{$key}, $key
354                   );
355       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
356       @related{keys %$rel_q} = values %$rel_q;
357     }
358   }
359   if (my @keys = keys %related) {
360     @{$input_query}{@keys} = values %related;
361   }
362
363   my @unique_queries = $self->_unique_queries($input_query, $attrs);
364
365   # Build the final query: Default to the disjunction of the unique queries,
366   # but allow the input query in case the ResultSet defines the query or the
367   # user is abusing find
368   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
369   my $query = @unique_queries
370     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
371     : $self->_add_alias($input_query, $alias);
372
373   # Run the query
374   if (keys %$attrs) {
375     my $rs = $self->search($query, $attrs);
376     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
377   }
378   else {
379     return keys %{$self->_resolved_attrs->{collapse}}
380       ? $self->search($query)->next
381       : $self->single($query);
382   }
383 }
384
385 # _add_alias
386 #
387 # Add the specified alias to the specified query hash. A copy is made so the
388 # original query is not modified.
389
390 sub _add_alias {
391   my ($self, $query, $alias) = @_;
392
393   my %aliased = %$query;
394   foreach my $col (grep { ! m/\./ } keys %aliased) {
395     $aliased{"$alias.$col"} = delete $aliased{$col};
396   }
397
398   return \%aliased;
399 }
400
401 # _unique_queries
402 #
403 # Build a list of queries which satisfy unique constraints.
404
405 sub _unique_queries {
406   my ($self, $query, $attrs) = @_;
407
408   my @constraint_names = exists $attrs->{key}
409     ? ($attrs->{key})
410     : $self->result_source->unique_constraint_names;
411
412   my @unique_queries;
413   foreach my $name (@constraint_names) {
414     my @unique_cols = $self->result_source->unique_constraint_columns($name);
415     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
416
417     my $num_query = scalar keys %$unique_query;
418     next unless $num_query;
419
420     # XXX: Assuming quite a bit about $self->{attrs}{where}
421     my $num_cols = scalar @unique_cols;
422     my $num_where = exists $self->{attrs}{where}
423       ? scalar keys %{ $self->{attrs}{where} }
424       : 0;
425     push @unique_queries, $unique_query
426       if $num_query + $num_where == $num_cols;
427   }
428
429   return @unique_queries;
430 }
431
432 # _build_unique_query
433 #
434 # Constrain the specified query hash based on the specified column names.
435
436 sub _build_unique_query {
437   my ($self, $query, $unique_cols) = @_;
438
439   return {
440     map  { $_ => $query->{$_} }
441     grep { exists $query->{$_} }
442       @$unique_cols
443   };
444 }
445
446 =head2 search_related
447
448 =over 4
449
450 =item Arguments: $rel, $cond, \%attrs?
451
452 =item Return Value: $new_resultset
453
454 =back
455
456   $new_rs = $cd_rs->search_related('artist', {
457     name => 'Emo-R-Us',
458   });
459
460 Searches the specified relationship, optionally specifying a condition and
461 attributes for matching records. See L</ATTRIBUTES> for more information.
462
463 =cut
464
465 sub search_related {
466   return shift->related_resultset(shift)->search(@_);
467 }
468
469 =head2 cursor
470
471 =over 4
472
473 =item Arguments: none
474
475 =item Return Value: $cursor
476
477 =back
478
479 Returns a storage-driven cursor to the given resultset. See
480 L<DBIx::Class::Cursor> for more information.
481
482 =cut
483
484 sub cursor {
485   my ($self) = @_;
486
487   my $attrs = { %{$self->_resolved_attrs} };
488   return $self->{cursor}
489     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
490           $attrs->{where},$attrs);
491 }
492
493 =head2 single
494
495 =over 4
496
497 =item Arguments: $cond?
498
499 =item Return Value: $row_object?
500
501 =back
502
503   my $cd = $schema->resultset('CD')->single({ year => 2001 });
504
505 Inflates the first result without creating a cursor if the resultset has
506 any records in it; if not returns nothing. Used by L</find> as an optimisation.
507
508 Can optionally take an additional condition *only* - this is a fast-code-path
509 method; if you need to add extra joins or similar call ->search and then
510 ->single without a condition on the $rs returned from that.
511
512 =cut
513
514 sub single {
515   my ($self, $where) = @_;
516   my $attrs = { %{$self->_resolved_attrs} };
517   if ($where) {
518     if (defined $attrs->{where}) {
519       $attrs->{where} = {
520         '-and' =>
521             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
522                $where, delete $attrs->{where} ]
523       };
524     } else {
525       $attrs->{where} = $where;
526     }
527   }
528
529 #  XXX: Disabled since it doesn't infer uniqueness in all cases
530 #  unless ($self->_is_unique_query($attrs->{where})) {
531 #    carp "Query not guaranteed to return a single row"
532 #      . "; please declare your unique constraints or use search instead";
533 #  }
534
535   my @data = $self->result_source->storage->select_single(
536     $attrs->{from}, $attrs->{select},
537     $attrs->{where}, $attrs
538   );
539
540   return (@data ? ($self->_construct_object(@data))[0] : ());
541 }
542
543 # _is_unique_query
544 #
545 # Try to determine if the specified query is guaranteed to be unique, based on
546 # the declared unique constraints.
547
548 sub _is_unique_query {
549   my ($self, $query) = @_;
550
551   my $collapsed = $self->_collapse_query($query);
552   my $alias = $self->{attrs}{alias};
553
554   foreach my $name ($self->result_source->unique_constraint_names) {
555     my @unique_cols = map {
556       "$alias.$_"
557     } $self->result_source->unique_constraint_columns($name);
558
559     # Count the values for each unique column
560     my %seen = map { $_ => 0 } @unique_cols;
561
562     foreach my $key (keys %$collapsed) {
563       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
564       next unless exists $seen{$aliased};  # Additional constraints are okay
565       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
566     }
567
568     # If we get 0 or more than 1 value for a column, it's not necessarily unique
569     return 1 unless grep { $_ != 1 } values %seen;
570   }
571
572   return 0;
573 }
574
575 # _collapse_query
576 #
577 # Recursively collapse the query, accumulating values for each column.
578
579 sub _collapse_query {
580   my ($self, $query, $collapsed) = @_;
581
582   $collapsed ||= {};
583
584   if (ref $query eq 'ARRAY') {
585     foreach my $subquery (@$query) {
586       next unless ref $subquery;  # -or
587 #      warn "ARRAY: " . Dumper $subquery;
588       $collapsed = $self->_collapse_query($subquery, $collapsed);
589     }
590   }
591   elsif (ref $query eq 'HASH') {
592     if (keys %$query and (keys %$query)[0] eq '-and') {
593       foreach my $subquery (@{$query->{-and}}) {
594 #        warn "HASH: " . Dumper $subquery;
595         $collapsed = $self->_collapse_query($subquery, $collapsed);
596       }
597     }
598     else {
599 #      warn "LEAF: " . Dumper $query;
600       foreach my $col (keys %$query) {
601         my $value = $query->{$col};
602         $collapsed->{$col}{$value}++;
603       }
604     }
605   }
606
607   return $collapsed;
608 }
609
610 =head2 get_column
611
612 =over 4
613
614 =item Arguments: $cond?
615
616 =item Return Value: $resultsetcolumn
617
618 =back
619
620   my $max_length = $rs->get_column('length')->max;
621
622 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
623
624 =cut
625
626 sub get_column {
627   my ($self, $column) = @_;
628   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
629   return $new;
630 }
631
632 =head2 search_like
633
634 =over 4
635
636 =item Arguments: $cond, \%attrs?
637
638 =item Return Value: $resultset (scalar context), @row_objs (list context)
639
640 =back
641
642   # WHERE title LIKE '%blue%'
643   $cd_rs = $rs->search_like({ title => '%blue%'});
644
645 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
646 that this is simply a convenience method. You most likely want to use
647 L</search> with specific operators.
648
649 For more information, see L<DBIx::Class::Manual::Cookbook>.
650
651 =cut
652
653 sub search_like {
654   my $class = shift;
655   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
656   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
657   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
658   return $class->search($query, { %$attrs });
659 }
660
661 =head2 slice
662
663 =over 4
664
665 =item Arguments: $first, $last
666
667 =item Return Value: $resultset (scalar context), @row_objs (list context)
668
669 =back
670
671 Returns a resultset or object list representing a subset of elements from the
672 resultset slice is called on. Indexes are from 0, i.e., to get the first
673 three records, call:
674
675   my ($one, $two, $three) = $rs->slice(0, 2);
676
677 =cut
678
679 sub slice {
680   my ($self, $min, $max) = @_;
681   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
682   $attrs->{offset} = $self->{attrs}{offset} || 0;
683   $attrs->{offset} += $min;
684   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
685   return $self->search(undef(), $attrs);
686   #my $slice = (ref $self)->new($self->result_source, $attrs);
687   #return (wantarray ? $slice->all : $slice);
688 }
689
690 =head2 next
691
692 =over 4
693
694 =item Arguments: none
695
696 =item Return Value: $result?
697
698 =back
699
700 Returns the next element in the resultset (C<undef> is there is none).
701
702 Can be used to efficiently iterate over records in the resultset:
703
704   my $rs = $schema->resultset('CD')->search;
705   while (my $cd = $rs->next) {
706     print $cd->title;
707   }
708
709 Note that you need to store the resultset object, and call C<next> on it.
710 Calling C<< resultset('Table')->next >> repeatedly will always return the
711 first record from the resultset.
712
713 =cut
714
715 sub next {
716   my ($self) = @_;
717   if (my $cache = $self->get_cache) {
718     $self->{all_cache_position} ||= 0;
719     return $cache->[$self->{all_cache_position}++];
720   }
721   if ($self->{attrs}{cache}) {
722     $self->{all_cache_position} = 1;
723     return ($self->all)[0];
724   }
725   if ($self->{stashed_objects}) {
726     my $obj = shift(@{$self->{stashed_objects}});
727     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
728     return $obj;
729   }
730   my @row = (
731     exists $self->{stashed_row}
732       ? @{delete $self->{stashed_row}}
733       : $self->cursor->next
734   );
735   return unless (@row);
736   my ($row, @more) = $self->_construct_object(@row);
737   $self->{stashed_objects} = \@more if @more;
738   return $row;
739 }
740
741 sub _construct_object {
742   my ($self, @row) = @_;
743   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
744   my @new = $self->result_class->inflate_result($self->result_source, @$info);
745   @new = $self->{_attrs}{record_filter}->(@new)
746     if exists $self->{_attrs}{record_filter};
747   return @new;
748 }
749
750 sub _collapse_result {
751   my ($self, $as, $row, $prefix) = @_;
752
753   my %const;
754   my @copy = @$row;
755   
756   foreach my $this_as (@$as) {
757     my $val = shift @copy;
758     if (defined $prefix) {
759       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
760         my $remain = $1;
761         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
762         $const{$1||''}{$2} = $val;
763       }
764     } else {
765       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
766       $const{$1||''}{$2} = $val;
767     }
768   }
769
770   my $alias = $self->{attrs}{alias};
771   my $info = [ {}, {} ];
772   foreach my $key (keys %const) {
773     if (length $key && $key ne $alias) {
774       my $target = $info;
775       my @parts = split(/\./, $key);
776       foreach my $p (@parts) {
777         $target = $target->[1]->{$p} ||= [];
778       }
779       $target->[0] = $const{$key};
780     } else {
781       $info->[0] = $const{$key};
782     }
783   }
784   
785   my @collapse;
786   if (defined $prefix) {
787     @collapse = map {
788         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
789     } keys %{$self->{_attrs}{collapse}}
790   } else {
791     @collapse = keys %{$self->{_attrs}{collapse}};
792   };
793
794   if (@collapse) {
795     my ($c) = sort { length $a <=> length $b } @collapse;
796     my $target = $info;
797     foreach my $p (split(/\./, $c)) {
798       $target = $target->[1]->{$p} ||= [];
799     }
800     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
801     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
802     my $tree = $self->_collapse_result($as, $row, $c_prefix);
803     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
804     my (@final, @raw);
805
806     while (
807       !(
808         grep {
809           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
810         } @co_key
811         )
812     ) {
813       push(@final, $tree);
814       last unless (@raw = $self->cursor->next);
815       $row = $self->{stashed_row} = \@raw;
816       $tree = $self->_collapse_result($as, $row, $c_prefix);
817     }
818     @$target = (@final ? @final : [ {}, {} ]);
819       # single empty result to indicate an empty prefetched has_many
820   }
821
822   #print "final info: " . Dumper($info);
823   return $info;
824 }
825
826 =head2 result_source
827
828 =over 4
829
830 =item Arguments: $result_source?
831
832 =item Return Value: $result_source
833
834 =back
835
836 An accessor for the primary ResultSource object from which this ResultSet
837 is derived.
838
839 =head2 result_class
840
841 =over 4
842
843 =item Arguments: $result_class?
844
845 =item Return Value: $result_class
846
847 =back
848
849 An accessor for the class to use when creating row objects. Defaults to 
850 C<< result_source->result_class >> - which in most cases is the name of the 
851 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
852
853 =cut
854
855
856 =head2 count
857
858 =over 4
859
860 =item Arguments: $cond, \%attrs??
861
862 =item Return Value: $count
863
864 =back
865
866 Performs an SQL C<COUNT> with the same query as the resultset was built
867 with to find the number of elements. If passed arguments, does a search
868 on the resultset and counts the results of that.
869
870 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
871 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
872 not support C<DISTINCT> with multiple columns. If you are using such a
873 database, you should only use columns from the main table in your C<group_by>
874 clause.
875
876 =cut
877
878 sub count {
879   my $self = shift;
880   return $self->search(@_)->count if @_ and defined $_[0];
881   return scalar @{ $self->get_cache } if $self->get_cache;
882   my $count = $self->_count;
883   return 0 unless $count;
884
885   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
886   $count = $self->{attrs}{rows} if
887     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
888   return $count;
889 }
890
891 sub _count { # Separated out so pager can get the full count
892   my $self = shift;
893   my $select = { count => '*' };
894
895   my $attrs = { %{$self->_resolved_attrs} };
896   if (my $group_by = delete $attrs->{group_by}) {
897     delete $attrs->{having};
898     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
899     # todo: try CONCAT for multi-column pk
900     my @pk = $self->result_source->primary_columns;
901     if (@pk == 1) {
902       my $alias = $attrs->{alias};
903       foreach my $column (@distinct) {
904         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
905           @distinct = ($column);
906           last;
907         }
908       }
909     }
910
911     $select = { count => { distinct => \@distinct } };
912   }
913
914   $attrs->{select} = $select;
915   $attrs->{as} = [qw/count/];
916
917   # offset, order by and page are not needed to count. record_filter is cdbi
918   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
919
920   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
921   my ($count) = $tmp_rs->cursor->next;
922   return $count;
923 }
924
925 =head2 count_literal
926
927 =over 4
928
929 =item Arguments: $sql_fragment, @bind_values
930
931 =item Return Value: $count
932
933 =back
934
935 Counts the results in a literal query. Equivalent to calling L</search_literal>
936 with the passed arguments, then L</count>.
937
938 =cut
939
940 sub count_literal { shift->search_literal(@_)->count; }
941
942 =head2 all
943
944 =over 4
945
946 =item Arguments: none
947
948 =item Return Value: @objects
949
950 =back
951
952 Returns all elements in the resultset. Called implicitly if the resultset
953 is returned in list context.
954
955 =cut
956
957 sub all {
958   my ($self) = @_;
959   return @{ $self->get_cache } if $self->get_cache;
960
961   my @obj;
962
963   # TODO: don't call resolve here
964   if (keys %{$self->_resolved_attrs->{collapse}}) {
965 #  if ($self->{attrs}{prefetch}) {
966       # Using $self->cursor->all is really just an optimisation.
967       # If we're collapsing has_many prefetches it probably makes
968       # very little difference, and this is cleaner than hacking
969       # _construct_object to survive the approach
970     my @row = $self->cursor->next;
971     while (@row) {
972       push(@obj, $self->_construct_object(@row));
973       @row = (exists $self->{stashed_row}
974                ? @{delete $self->{stashed_row}}
975                : $self->cursor->next);
976     }
977   } else {
978     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
979   }
980
981   $self->set_cache(\@obj) if $self->{attrs}{cache};
982   return @obj;
983 }
984
985 =head2 reset
986
987 =over 4
988
989 =item Arguments: none
990
991 =item Return Value: $self
992
993 =back
994
995 Resets the resultset's cursor, so you can iterate through the elements again.
996
997 =cut
998
999 sub reset {
1000   my ($self) = @_;
1001   delete $self->{_attrs} if exists $self->{_attrs};
1002   $self->{all_cache_position} = 0;
1003   $self->cursor->reset;
1004   return $self;
1005 }
1006
1007 =head2 first
1008
1009 =over 4
1010
1011 =item Arguments: none
1012
1013 =item Return Value: $object?
1014
1015 =back
1016
1017 Resets the resultset and returns an object for the first result (if the
1018 resultset returns anything).
1019
1020 =cut
1021
1022 sub first {
1023   return $_[0]->reset->next;
1024 }
1025
1026 # _cond_for_update_delete
1027 #
1028 # update/delete require the condition to be modified to handle
1029 # the differing SQL syntax available.  This transforms the $self->{cond}
1030 # appropriately, returning the new condition.
1031
1032 sub _cond_for_update_delete {
1033   my ($self, $full_cond) = @_;
1034   my $cond = {};
1035
1036   $full_cond ||= $self->{cond};
1037   # No-op. No condition, we're updating/deleting everything
1038   return $cond unless ref $full_cond;
1039
1040   if (ref $full_cond eq 'ARRAY') {
1041     $cond = [
1042       map {
1043         my %hash;
1044         foreach my $key (keys %{$_}) {
1045           $key =~ /([^.]+)$/;
1046           $hash{$1} = $_->{$key};
1047         }
1048         \%hash;
1049       } @{$full_cond}
1050     ];
1051   }
1052   elsif (ref $full_cond eq 'HASH') {
1053     if ((keys %{$full_cond})[0] eq '-and') {
1054       $cond->{-and} = [];
1055
1056       my @cond = @{$full_cond->{-and}};
1057       for (my $i = 0; $i < @cond; $i++) {
1058         my $entry = $cond[$i];
1059
1060         my $hash;
1061         if (ref $entry eq 'HASH') {
1062           $hash = $self->_cond_for_update_delete($entry);
1063         }
1064         else {
1065           $entry =~ /([^.]+)$/;
1066           $hash->{$1} = $cond[++$i];
1067         }
1068
1069         push @{$cond->{-and}}, $hash;
1070       }
1071     }
1072     else {
1073       foreach my $key (keys %{$full_cond}) {
1074         $key =~ /([^.]+)$/;
1075         $cond->{$1} = $full_cond->{$key};
1076       }
1077     }
1078   }
1079   else {
1080     $self->throw_exception(
1081       "Can't update/delete on resultset with condition unless hash or array"
1082     );
1083   }
1084
1085   return $cond;
1086 }
1087
1088
1089 =head2 update
1090
1091 =over 4
1092
1093 =item Arguments: \%values
1094
1095 =item Return Value: $storage_rv
1096
1097 =back
1098
1099 Sets the specified columns in the resultset to the supplied values in a
1100 single query. Return value will be true if the update succeeded or false
1101 if no records were updated; exact type of success value is storage-dependent.
1102
1103 =cut
1104
1105 sub update {
1106   my ($self, $values) = @_;
1107   $self->throw_exception("Values for update must be a hash")
1108     unless ref $values eq 'HASH';
1109
1110   my $cond = $self->_cond_for_update_delete;
1111
1112   return $self->result_source->storage->update(
1113     $self->result_source->from, $values, $cond
1114   );
1115 }
1116
1117 =head2 update_all
1118
1119 =over 4
1120
1121 =item Arguments: \%values
1122
1123 =item Return Value: 1
1124
1125 =back
1126
1127 Fetches all objects and updates them one at a time. Note that C<update_all>
1128 will run DBIC cascade triggers, while L</update> will not.
1129
1130 =cut
1131
1132 sub update_all {
1133   my ($self, $values) = @_;
1134   $self->throw_exception("Values for update must be a hash")
1135     unless ref $values eq 'HASH';
1136   foreach my $obj ($self->all) {
1137     $obj->set_columns($values)->update;
1138   }
1139   return 1;
1140 }
1141
1142 =head2 delete
1143
1144 =over 4
1145
1146 =item Arguments: none
1147
1148 =item Return Value: 1
1149
1150 =back
1151
1152 Deletes the contents of the resultset from its result source. Note that this
1153 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1154 to run. See also L<DBIx::Class::Row/delete>.
1155
1156 =cut
1157
1158 sub delete {
1159   my ($self) = @_;
1160
1161   my $cond = $self->_cond_for_update_delete;
1162
1163   $self->result_source->storage->delete($self->result_source->from, $cond);
1164   return 1;
1165 }
1166
1167 =head2 delete_all
1168
1169 =over 4
1170
1171 =item Arguments: none
1172
1173 =item Return Value: 1
1174
1175 =back
1176
1177 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1178 will run DBIC cascade triggers, while L</delete> will not.
1179
1180 =cut
1181
1182 sub delete_all {
1183   my ($self) = @_;
1184   $_->delete for $self->all;
1185   return 1;
1186 }
1187
1188 =head2 pager
1189
1190 =over 4
1191
1192 =item Arguments: none
1193
1194 =item Return Value: $pager
1195
1196 =back
1197
1198 Return Value a L<Data::Page> object for the current resultset. Only makes
1199 sense for queries with a C<page> attribute.
1200
1201 =cut
1202
1203 sub pager {
1204   my ($self) = @_;
1205   my $attrs = $self->{attrs};
1206   $self->throw_exception("Can't create pager for non-paged rs")
1207     unless $self->{attrs}{page};
1208   $attrs->{rows} ||= 10;
1209   return $self->{pager} ||= Data::Page->new(
1210     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1211 }
1212
1213 =head2 page
1214
1215 =over 4
1216
1217 =item Arguments: $page_number
1218
1219 =item Return Value: $rs
1220
1221 =back
1222
1223 Returns a resultset for the $page_number page of the resultset on which page
1224 is called, where each page contains a number of rows equal to the 'rows'
1225 attribute set on the resultset (10 by default).
1226
1227 =cut
1228
1229 sub page {
1230   my ($self, $page) = @_;
1231   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1232 }
1233
1234 =head2 new_result
1235
1236 =over 4
1237
1238 =item Arguments: \%vals
1239
1240 =item Return Value: $object
1241
1242 =back
1243
1244 Creates an object in the resultset's result class and returns it.
1245
1246 =cut
1247
1248 sub new_result {
1249   my ($self, $values) = @_;
1250   $self->throw_exception( "new_result needs a hash" )
1251     unless (ref $values eq 'HASH');
1252   $self->throw_exception(
1253     "Can't abstract implicit construct, condition not a hash"
1254   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1255
1256   my $alias = $self->{attrs}{alias};
1257   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1258   my %new = (
1259     %{ $self->_remove_alias($values, $alias) },
1260     %{ $self->_remove_alias($collapsed_cond, $alias) },
1261     -result_source => $self->result_source,
1262   );
1263
1264   my $obj = $self->result_class->new(\%new);
1265   return $obj;
1266 }
1267
1268 # _collapse_cond
1269 #
1270 # Recursively collapse the condition.
1271
1272 sub _collapse_cond {
1273   my ($self, $cond, $collapsed) = @_;
1274
1275   $collapsed ||= {};
1276
1277   if (ref $cond eq 'ARRAY') {
1278     foreach my $subcond (@$cond) {
1279       next unless ref $subcond;  # -or
1280 #      warn "ARRAY: " . Dumper $subcond;
1281       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1282     }
1283   }
1284   elsif (ref $cond eq 'HASH') {
1285     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1286       foreach my $subcond (@{$cond->{-and}}) {
1287 #        warn "HASH: " . Dumper $subcond;
1288         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1289       }
1290     }
1291     else {
1292 #      warn "LEAF: " . Dumper $cond;
1293       foreach my $col (keys %$cond) {
1294         my $value = $cond->{$col};
1295         $collapsed->{$col} = $value;
1296       }
1297     }
1298   }
1299
1300   return $collapsed;
1301 }
1302
1303 # _remove_alias
1304 #
1305 # Remove the specified alias from the specified query hash. A copy is made so
1306 # the original query is not modified.
1307
1308 sub _remove_alias {
1309   my ($self, $query, $alias) = @_;
1310
1311   my %orig = %{ $query || {} };
1312   my %unaliased;
1313
1314   foreach my $key (keys %orig) {
1315     if ($key !~ /\./) {
1316       $unaliased{$key} = $orig{$key};
1317       next;
1318     }
1319     $unaliased{$1} = $orig{$key}
1320       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1321   }
1322
1323   return \%unaliased;
1324 }
1325
1326 =head2 find_or_new
1327
1328 =over 4
1329
1330 =item Arguments: \%vals, \%attrs?
1331
1332 =item Return Value: $object
1333
1334 =back
1335
1336 Find an existing record from this resultset. If none exists, instantiate a new
1337 result object and return it. The object will not be saved into your storage
1338 until you call L<DBIx::Class::Row/insert> on it.
1339
1340 If you want objects to be saved immediately, use L</find_or_create> instead.
1341
1342 =cut
1343
1344 sub find_or_new {
1345   my $self     = shift;
1346   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1347   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1348   my $exists   = $self->find($hash, $attrs);
1349   return defined $exists ? $exists : $self->new_result($hash);
1350 }
1351
1352 =head2 create
1353
1354 =over 4
1355
1356 =item Arguments: \%vals
1357
1358 =item Return Value: $object
1359
1360 =back
1361
1362 Inserts a record into the resultset and returns the object representing it.
1363
1364 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1365
1366 =cut
1367
1368 sub create {
1369   my ($self, $attrs) = @_;
1370   $self->throw_exception( "create needs a hashref" )
1371     unless ref $attrs eq 'HASH';
1372   return $self->new_result($attrs)->insert;
1373 }
1374
1375 =head2 find_or_create
1376
1377 =over 4
1378
1379 =item Arguments: \%vals, \%attrs?
1380
1381 =item Return Value: $object
1382
1383 =back
1384
1385   $class->find_or_create({ key => $val, ... });
1386
1387 Tries to find a record based on its primary key or unique constraint; if none
1388 is found, creates one and returns that instead.
1389
1390   my $cd = $schema->resultset('CD')->find_or_create({
1391     cdid   => 5,
1392     artist => 'Massive Attack',
1393     title  => 'Mezzanine',
1394     year   => 2005,
1395   });
1396
1397 Also takes an optional C<key> attribute, to search by a specific key or unique
1398 constraint. For example:
1399
1400   my $cd = $schema->resultset('CD')->find_or_create(
1401     {
1402       artist => 'Massive Attack',
1403       title  => 'Mezzanine',
1404     },
1405     { key => 'cd_artist_title' }
1406   );
1407
1408 See also L</find> and L</update_or_create>. For information on how to declare
1409 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1410
1411 =cut
1412
1413 sub find_or_create {
1414   my $self     = shift;
1415   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1416   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1417   my $exists   = $self->find($hash, $attrs);
1418   return defined $exists ? $exists : $self->create($hash);
1419 }
1420
1421 =head2 update_or_create
1422
1423 =over 4
1424
1425 =item Arguments: \%col_values, { key => $unique_constraint }?
1426
1427 =item Return Value: $object
1428
1429 =back
1430
1431   $class->update_or_create({ col => $val, ... });
1432
1433 First, searches for an existing row matching one of the unique constraints
1434 (including the primary key) on the source of this resultset. If a row is
1435 found, updates it with the other given column values. Otherwise, creates a new
1436 row.
1437
1438 Takes an optional C<key> attribute to search on a specific unique constraint.
1439 For example:
1440
1441   # In your application
1442   my $cd = $schema->resultset('CD')->update_or_create(
1443     {
1444       artist => 'Massive Attack',
1445       title  => 'Mezzanine',
1446       year   => 1998,
1447     },
1448     { key => 'cd_artist_title' }
1449   );
1450
1451 If no C<key> is specified, it searches on all unique constraints defined on the
1452 source, including the primary key.
1453
1454 If the C<key> is specified as C<primary>, it searches only on the primary key.
1455
1456 See also L</find> and L</find_or_create>. For information on how to declare
1457 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1458
1459 =cut
1460
1461 sub update_or_create {
1462   my $self = shift;
1463   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1464   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1465
1466   my $row = $self->find($cond, $attrs);
1467   if (defined $row) {
1468     $row->update($cond);
1469     return $row;
1470   }
1471
1472   return $self->create($cond);
1473 }
1474
1475 =head2 get_cache
1476
1477 =over 4
1478
1479 =item Arguments: none
1480
1481 =item Return Value: \@cache_objects?
1482
1483 =back
1484
1485 Gets the contents of the cache for the resultset, if the cache is set.
1486
1487 =cut
1488
1489 sub get_cache {
1490   shift->{all_cache};
1491 }
1492
1493 =head2 set_cache
1494
1495 =over 4
1496
1497 =item Arguments: \@cache_objects
1498
1499 =item Return Value: \@cache_objects
1500
1501 =back
1502
1503 Sets the contents of the cache for the resultset. Expects an arrayref
1504 of objects of the same class as those produced by the resultset. Note that
1505 if the cache is set the resultset will return the cached objects rather
1506 than re-querying the database even if the cache attr is not set.
1507
1508 =cut
1509
1510 sub set_cache {
1511   my ( $self, $data ) = @_;
1512   $self->throw_exception("set_cache requires an arrayref")
1513       if defined($data) && (ref $data ne 'ARRAY');
1514   $self->{all_cache} = $data;
1515 }
1516
1517 =head2 clear_cache
1518
1519 =over 4
1520
1521 =item Arguments: none
1522
1523 =item Return Value: []
1524
1525 =back
1526
1527 Clears the cache for the resultset.
1528
1529 =cut
1530
1531 sub clear_cache {
1532   shift->set_cache(undef);
1533 }
1534
1535 =head2 related_resultset
1536
1537 =over 4
1538
1539 =item Arguments: $relationship_name
1540
1541 =item Return Value: $resultset
1542
1543 =back
1544
1545 Returns a related resultset for the supplied relationship name.
1546
1547   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1548
1549 =cut
1550
1551 sub related_resultset {
1552   my ($self, $rel) = @_;
1553
1554   $self->{related_resultsets} ||= {};
1555   return $self->{related_resultsets}{$rel} ||= do {
1556     my $rel_obj = $self->result_source->relationship_info($rel);
1557
1558     $self->throw_exception(
1559       "search_related: result source '" . $self->result_source->name .
1560         "' has no such relationship $rel")
1561       unless $rel_obj;
1562     
1563     my ($from,$seen) = $self->_resolve_from($rel);
1564
1565     my $join_count = $seen->{$rel};
1566     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1567
1568     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1569       undef, {
1570         %{$self->{attrs}||{}},
1571         join => undef,
1572         prefetch => undef,
1573         select => undef,
1574         as => undef,
1575         alias => $alias,
1576         where => $self->{cond},
1577         seen_join => $seen,
1578         from => $from,
1579     });
1580   };
1581 }
1582
1583 sub _resolve_from {
1584   my ($self, $extra_join) = @_;
1585   my $source = $self->result_source;
1586   my $attrs = $self->{attrs};
1587   
1588   my $from = $attrs->{from}
1589     || [ { $attrs->{alias} => $source->from } ];
1590     
1591   my $seen = { %{$attrs->{seen_join}||{}} };
1592
1593   my $join = ($attrs->{join}
1594                ? [ $attrs->{join}, $extra_join ]
1595                : $extra_join);
1596   $from = [
1597     @$from,
1598     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1599   ];
1600
1601   return ($from,$seen);
1602 }
1603
1604 sub _resolved_attrs {
1605   my $self = shift;
1606   return $self->{_attrs} if $self->{_attrs};
1607
1608   my $attrs = { %{$self->{attrs}||{}} };
1609   my $source = $self->{result_source};
1610   my $alias = $attrs->{alias};
1611
1612   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1613   if ($attrs->{columns}) {
1614     delete $attrs->{as};
1615   } elsif (!$attrs->{select}) {
1616     $attrs->{columns} = [ $source->columns ];
1617   }
1618  
1619   $attrs->{select} = 
1620     ($attrs->{select}
1621       ? (ref $attrs->{select} eq 'ARRAY'
1622           ? [ @{$attrs->{select}} ]
1623           : [ $attrs->{select} ])
1624       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1625     );
1626   $attrs->{as} =
1627     ($attrs->{as}
1628       ? (ref $attrs->{as} eq 'ARRAY'
1629           ? [ @{$attrs->{as}} ]
1630           : [ $attrs->{as} ])
1631       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1632     );
1633   
1634   my $adds;
1635   if ($adds = delete $attrs->{include_columns}) {
1636     $adds = [$adds] unless ref $adds eq 'ARRAY';
1637     push(@{$attrs->{select}}, @$adds);
1638     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1639   }
1640   if ($adds = delete $attrs->{'+select'}) {
1641     $adds = [$adds] unless ref $adds eq 'ARRAY';
1642     push(@{$attrs->{select}},
1643            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1644   }
1645   if (my $adds = delete $attrs->{'+as'}) {
1646     $adds = [$adds] unless ref $adds eq 'ARRAY';
1647     push(@{$attrs->{as}}, @$adds);
1648   }
1649
1650   $attrs->{from} ||= [ { 'me' => $source->from } ];
1651
1652   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1653     my $join = delete $attrs->{join} || {};
1654
1655     if (defined $attrs->{prefetch}) {
1656       $join = $self->_merge_attr(
1657         $join, $attrs->{prefetch}
1658       );
1659     }
1660
1661     $attrs->{from} =   # have to copy here to avoid corrupting the original
1662       [
1663         @{$attrs->{from}}, 
1664         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1665       ];
1666   }
1667
1668   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1669   if ($attrs->{order_by}) {
1670     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1671                            ? [ @{$attrs->{order_by}} ]
1672                            : [ $attrs->{order_by} ]);
1673   } else {
1674     $attrs->{order_by} = [];    
1675   }
1676
1677   my $collapse = $attrs->{collapse} || {};
1678   if (my $prefetch = delete $attrs->{prefetch}) {
1679     $prefetch = $self->_merge_attr({}, $prefetch);
1680     my @pre_order;
1681     my $seen = $attrs->{seen_join} || {};
1682     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1683       # bring joins back to level of current class
1684       my @prefetch = $source->resolve_prefetch(
1685         $p, $alias, $seen, \@pre_order, $collapse
1686       );
1687       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1688       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1689     }
1690     push(@{$attrs->{order_by}}, @pre_order);
1691   }
1692   $attrs->{collapse} = $collapse;
1693
1694   return $self->{_attrs} = $attrs;
1695 }
1696
1697 sub _merge_attr {
1698   my ($self, $a, $b) = @_;
1699   return $b unless defined($a);
1700   return $a unless defined($b);
1701   
1702   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1703     foreach my $key (keys %{$b}) {
1704       if (exists $a->{$key}) {
1705         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1706       } else {
1707         $a->{$key} = $b->{$key};
1708       }
1709     }
1710     return $a;
1711   } else {
1712     $a = [$a] unless ref $a eq 'ARRAY';
1713     $b = [$b] unless ref $b eq 'ARRAY';
1714
1715     my $hash = {};
1716     my @array;
1717     foreach my $x ($a, $b) {
1718       foreach my $element (@{$x}) {
1719         if (ref $element eq 'HASH') {
1720           $hash = $self->_merge_attr($hash, $element);
1721         } elsif (ref $element eq 'ARRAY') {
1722           push(@array, @{$element});
1723         } else {
1724           push(@array, $element) unless $b == $x
1725             && grep { $_ eq $element } @array;
1726         }
1727       }
1728     }
1729     
1730     @array = grep { !exists $hash->{$_} } @array;
1731
1732     return keys %{$hash}
1733       ? ( scalar(@array)
1734             ? [$hash, @array]
1735             : $hash
1736         )
1737       : \@array;
1738   }
1739 }
1740
1741 =head2 throw_exception
1742
1743 See L<DBIx::Class::Schema/throw_exception> for details.
1744
1745 =cut
1746
1747 sub throw_exception {
1748   my $self=shift;
1749   $self->result_source->schema->throw_exception(@_);
1750 }
1751
1752 # XXX: FIXME: Attributes docs need clearing up
1753
1754 =head1 ATTRIBUTES
1755
1756 The resultset takes various attributes that modify its behavior. Here's an
1757 overview of them:
1758
1759 =head2 order_by
1760
1761 =over 4
1762
1763 =item Value: ($order_by | \@order_by)
1764
1765 =back
1766
1767 Which column(s) to order the results by. This is currently passed
1768 through directly to SQL, so you can give e.g. C<year DESC> for a
1769 descending order on the column `year'.
1770
1771 Please note that if you have quoting enabled (see
1772 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1773 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1774 so you will need to manually quote things as appropriate.)
1775
1776 =head2 columns
1777
1778 =over 4
1779
1780 =item Value: \@columns
1781
1782 =back
1783
1784 Shortcut to request a particular set of columns to be retrieved.  Adds
1785 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1786 from that, then auto-populates C<as> from C<select> as normal. (You may also
1787 use the C<cols> attribute, as in earlier versions of DBIC.)
1788
1789 =head2 include_columns
1790
1791 =over 4
1792
1793 =item Value: \@columns
1794
1795 =back
1796
1797 Shortcut to include additional columns in the returned results - for example
1798
1799   $schema->resultset('CD')->search(undef, {
1800     include_columns => ['artist.name'],
1801     join => ['artist']
1802   });
1803
1804 would return all CDs and include a 'name' column to the information
1805 passed to object inflation
1806
1807 =head2 select
1808
1809 =over 4
1810
1811 =item Value: \@select_columns
1812
1813 =back
1814
1815 Indicates which columns should be selected from the storage. You can use
1816 column names, or in the case of RDBMS back ends, function or stored procedure
1817 names:
1818
1819   $rs = $schema->resultset('Employee')->search(undef, {
1820     select => [
1821       'name',
1822       { count => 'employeeid' },
1823       { sum => 'salary' }
1824     ]
1825   });
1826
1827 When you use function/stored procedure names and do not supply an C<as>
1828 attribute, the column names returned are storage-dependent. E.g. MySQL would
1829 return a column named C<count(employeeid)> in the above example.
1830
1831 =head2 +select
1832
1833 =over 4
1834
1835 Indicates additional columns to be selected from storage.  Works the same as
1836 L<select> but adds columns to the selection.
1837
1838 =back
1839
1840 =head2 +as
1841
1842 =over 4
1843
1844 Indicates additional column names for those added via L<+select>.
1845
1846 =back
1847
1848 =head2 as
1849
1850 =over 4
1851
1852 =item Value: \@inflation_names
1853
1854 =back
1855
1856 Indicates column names for object inflation. This is used in conjunction with
1857 C<select>, usually when C<select> contains one or more function or stored
1858 procedure names:
1859
1860   $rs = $schema->resultset('Employee')->search(undef, {
1861     select => [
1862       'name',
1863       { count => 'employeeid' }
1864     ],
1865     as => ['name', 'employee_count'],
1866   });
1867
1868   my $employee = $rs->first(); # get the first Employee
1869
1870 If the object against which the search is performed already has an accessor
1871 matching a column name specified in C<as>, the value can be retrieved using
1872 the accessor as normal:
1873
1874   my $name = $employee->name();
1875
1876 If on the other hand an accessor does not exist in the object, you need to
1877 use C<get_column> instead:
1878
1879   my $employee_count = $employee->get_column('employee_count');
1880
1881 You can create your own accessors if required - see
1882 L<DBIx::Class::Manual::Cookbook> for details.
1883
1884 Please note: This will NOT insert an C<AS employee_count> into the SQL
1885 statement produced, it is used for internal access only. Thus
1886 attempting to use the accessor in an C<order_by> clause or similar
1887 will fail miserably.
1888
1889 To get around this limitation, you can supply literal SQL to your
1890 C<select> attibute that contains the C<AS alias> text, eg:
1891
1892   select => [\'myfield AS alias']
1893
1894 =head2 join
1895
1896 =over 4
1897
1898 =item Value: ($rel_name | \@rel_names | \%rel_names)
1899
1900 =back
1901
1902 Contains a list of relationships that should be joined for this query.  For
1903 example:
1904
1905   # Get CDs by Nine Inch Nails
1906   my $rs = $schema->resultset('CD')->search(
1907     { 'artist.name' => 'Nine Inch Nails' },
1908     { join => 'artist' }
1909   );
1910
1911 Can also contain a hash reference to refer to the other relation's relations.
1912 For example:
1913
1914   package MyApp::Schema::Track;
1915   use base qw/DBIx::Class/;
1916   __PACKAGE__->table('track');
1917   __PACKAGE__->add_columns(qw/trackid cd position title/);
1918   __PACKAGE__->set_primary_key('trackid');
1919   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1920   1;
1921
1922   # In your application
1923   my $rs = $schema->resultset('Artist')->search(
1924     { 'track.title' => 'Teardrop' },
1925     {
1926       join     => { cd => 'track' },
1927       order_by => 'artist.name',
1928     }
1929   );
1930
1931 You need to use the relationship (not the table) name in  conditions, 
1932 because they are aliased as such. The current table is aliased as "me", so 
1933 you need to use me.column_name in order to avoid ambiguity. For example:
1934
1935   # Get CDs from 1984 with a 'Foo' track 
1936   my $rs = $schema->resultset('CD')->search(
1937     { 
1938       'me.year' => 1984,
1939       'tracks.name' => 'Foo'
1940     },
1941     { join => 'tracks' }
1942   );
1943   
1944 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1945 similarly for a third time). For e.g.
1946
1947   my $rs = $schema->resultset('Artist')->search({
1948     'cds.title'   => 'Down to Earth',
1949     'cds_2.title' => 'Popular',
1950   }, {
1951     join => [ qw/cds cds/ ],
1952   });
1953
1954 will return a set of all artists that have both a cd with title 'Down
1955 to Earth' and a cd with title 'Popular'.
1956
1957 If you want to fetch related objects from other tables as well, see C<prefetch>
1958 below.
1959
1960 =head2 prefetch
1961
1962 =over 4
1963
1964 =item Value: ($rel_name | \@rel_names | \%rel_names)
1965
1966 =back
1967
1968 Contains one or more relationships that should be fetched along with the main
1969 query (when they are accessed afterwards they will have already been
1970 "prefetched").  This is useful for when you know you will need the related
1971 objects, because it saves at least one query:
1972
1973   my $rs = $schema->resultset('Tag')->search(
1974     undef,
1975     {
1976       prefetch => {
1977         cd => 'artist'
1978       }
1979     }
1980   );
1981
1982 The initial search results in SQL like the following:
1983
1984   SELECT tag.*, cd.*, artist.* FROM tag
1985   JOIN cd ON tag.cd = cd.cdid
1986   JOIN artist ON cd.artist = artist.artistid
1987
1988 L<DBIx::Class> has no need to go back to the database when we access the
1989 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1990 case.
1991
1992 Simple prefetches will be joined automatically, so there is no need
1993 for a C<join> attribute in the above search. If you're prefetching to
1994 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1995 specify the join as well.
1996
1997 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1998 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1999 with an accessor type of 'single' or 'filter').
2000
2001 =head2 page
2002
2003 =over 4
2004
2005 =item Value: $page
2006
2007 =back
2008
2009 Makes the resultset paged and specifies the page to retrieve. Effectively
2010 identical to creating a non-pages resultset and then calling ->page($page)
2011 on it.
2012
2013 If L<rows> attribute is not specified it defualts to 10 rows per page.
2014
2015 =head2 rows
2016
2017 =over 4
2018
2019 =item Value: $rows
2020
2021 =back
2022
2023 Specifes the maximum number of rows for direct retrieval or the number of
2024 rows per page if the page attribute or method is used.
2025
2026 =head2 offset
2027
2028 =over 4
2029
2030 =item Value: $offset
2031
2032 =back
2033
2034 Specifies the (zero-based) row number for the  first row to be returned, or the
2035 of the first row of the first page if paging is used.
2036
2037 =head2 group_by
2038
2039 =over 4
2040
2041 =item Value: \@columns
2042
2043 =back
2044
2045 A arrayref of columns to group by. Can include columns of joined tables.
2046
2047   group_by => [qw/ column1 column2 ... /]
2048
2049 =head2 having
2050
2051 =over 4
2052
2053 =item Value: $condition
2054
2055 =back
2056
2057 HAVING is a select statement attribute that is applied between GROUP BY and
2058 ORDER BY. It is applied to the after the grouping calculations have been
2059 done.
2060
2061   having => { 'count(employee)' => { '>=', 100 } }
2062
2063 =head2 distinct
2064
2065 =over 4
2066
2067 =item Value: (0 | 1)
2068
2069 =back
2070
2071 Set to 1 to group by all columns.
2072
2073 =head2 where
2074
2075 =over 4
2076
2077 Adds to the WHERE clause.
2078
2079   # only return rows WHERE deleted IS NULL for all searches
2080   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2081
2082 Can be overridden by passing C<{ where => undef }> as an attribute
2083 to a resulset.
2084
2085 =back
2086
2087 =head2 cache
2088
2089 Set to 1 to cache search results. This prevents extra SQL queries if you
2090 revisit rows in your ResultSet:
2091
2092   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2093
2094   while( my $artist = $resultset->next ) {
2095     ... do stuff ...
2096   }
2097
2098   $rs->first; # without cache, this would issue a query
2099
2100 By default, searches are not cached.
2101
2102 For more examples of using these attributes, see
2103 L<DBIx::Class::Manual::Cookbook>.
2104
2105 =head2 from
2106
2107 =over 4
2108
2109 =item Value: \@from_clause
2110
2111 =back
2112
2113 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2114 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2115 clauses.
2116
2117 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2118
2119 C<join> will usually do what you need and it is strongly recommended that you
2120 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2121 And we really do mean "cannot", not just tried and failed. Attempting to use
2122 this because you're having problems with C<join> is like trying to use x86
2123 ASM because you've got a syntax error in your C. Trust us on this.
2124
2125 Now, if you're still really, really sure you need to use this (and if you're
2126 not 100% sure, ask the mailing list first), here's an explanation of how this
2127 works.
2128
2129 The syntax is as follows -
2130
2131   [
2132     { <alias1> => <table1> },
2133     [
2134       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2135       [], # nested JOIN (optional)
2136       { <table1.column1> => <table2.column2>, ... (more conditions) },
2137     ],
2138     # More of the above [ ] may follow for additional joins
2139   ]
2140
2141   <table1> <alias1>
2142   JOIN
2143     <table2> <alias2>
2144     [JOIN ...]
2145   ON <table1.column1> = <table2.column2>
2146   <more joins may follow>
2147
2148 An easy way to follow the examples below is to remember the following:
2149
2150     Anything inside "[]" is a JOIN
2151     Anything inside "{}" is a condition for the enclosing JOIN
2152
2153 The following examples utilize a "person" table in a family tree application.
2154 In order to express parent->child relationships, this table is self-joined:
2155
2156     # Person->belongs_to('father' => 'Person');
2157     # Person->belongs_to('mother' => 'Person');
2158
2159 C<from> can be used to nest joins. Here we return all children with a father,
2160 then search against all mothers of those children:
2161
2162   $rs = $schema->resultset('Person')->search(
2163       undef,
2164       {
2165           alias => 'mother', # alias columns in accordance with "from"
2166           from => [
2167               { mother => 'person' },
2168               [
2169                   [
2170                       { child => 'person' },
2171                       [
2172                           { father => 'person' },
2173                           { 'father.person_id' => 'child.father_id' }
2174                       ]
2175                   ],
2176                   { 'mother.person_id' => 'child.mother_id' }
2177               ],
2178           ]
2179       },
2180   );
2181
2182   # Equivalent SQL:
2183   # SELECT mother.* FROM person mother
2184   # JOIN (
2185   #   person child
2186   #   JOIN person father
2187   #   ON ( father.person_id = child.father_id )
2188   # )
2189   # ON ( mother.person_id = child.mother_id )
2190
2191 The type of any join can be controlled manually. To search against only people
2192 with a father in the person table, we could explicitly use C<INNER JOIN>:
2193
2194     $rs = $schema->resultset('Person')->search(
2195         undef,
2196         {
2197             alias => 'child', # alias columns in accordance with "from"
2198             from => [
2199                 { child => 'person' },
2200                 [
2201                     { father => 'person', -join_type => 'inner' },
2202                     { 'father.id' => 'child.father_id' }
2203                 ],
2204             ]
2205         },
2206     );
2207
2208     # Equivalent SQL:
2209     # SELECT child.* FROM person child
2210     # INNER JOIN person father ON child.father_id = father.id
2211
2212 =cut
2213
2214 1;