fix find_related-based queries to correctly grep the unique key
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89   $attrs = { %{$attrs||{}} };
90
91   if ($attrs->{page}) {
92     $attrs->{rows} ||= 10;
93     $attrs->{offset} ||= 0;
94     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
95   }
96
97   $attrs->{alias} ||= 'me';
98
99   my $self = {
100     result_source => $source,
101     result_class => $attrs->{result_class} || $source->result_class,
102     cond => $attrs->{where},
103     count => undef,
104     pager => undef,
105     attrs => $attrs
106   };
107
108   bless $self, $class;
109
110   return $self;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
138
139 =cut
140
141 sub search {
142   my $self = shift;
143   my $rs = $self->search_rs( @_ );
144   return (wantarray ? $rs->all : $rs);
145 }
146
147 =head2 search_rs
148
149 =over 4
150
151 =item Arguments: $cond, \%attrs?
152
153 =item Return Value: $resultset
154
155 =back
156
157 This method does the same exact thing as search() except it will
158 always return a resultset, even in list context.
159
160 =cut
161
162 sub search_rs {
163   my $self = shift;
164
165   my $rows;
166
167   unless (@_) {                 # no search, effectively just a clone
168     $rows = $self->get_cache;
169   }
170
171   my $attrs = {};
172   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
173   my $our_attrs = { %{$self->{attrs}} };
174   my $having = delete $our_attrs->{having};
175   my $where = delete $our_attrs->{where};
176
177   my $new_attrs = { %{$our_attrs}, %{$attrs} };
178
179   # merge new attrs into inherited
180   foreach my $key (qw/join prefetch/) {
181     next unless exists $attrs->{$key};
182     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
183   }
184
185   my $cond = (@_
186     ? (
187         (@_ == 1 || ref $_[0] eq "HASH")
188           ? (
189               (ref $_[0] eq 'HASH')
190                 ? (
191                     (keys %{ $_[0] }  > 0)
192                       ? shift
193                       : undef
194                    )
195                 :  shift
196              )
197           : (
198               (@_ % 2)
199                 ? $self->throw_exception("Odd number of arguments to search")
200                 : {@_}
201              )
202       )
203     : undef
204   );
205
206   if (defined $where) {
207     $new_attrs->{where} = (
208       defined $new_attrs->{where}
209         ? { '-and' => [
210               map {
211                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
212               } $where, $new_attrs->{where}
213             ]
214           }
215         : $where);
216   }
217
218   if (defined $cond) {
219     $new_attrs->{where} = (
220       defined $new_attrs->{where}
221         ? { '-and' => [
222               map {
223                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
224               } $cond, $new_attrs->{where}
225             ]
226           }
227         : $cond);
228   }
229
230   if (defined $having) {
231     $new_attrs->{having} = (
232       defined $new_attrs->{having}
233         ? { '-and' => [
234               map {
235                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
236               } $having, $new_attrs->{having}
237             ]
238           }
239         : $having);
240   }
241
242   my $rs = (ref $self)->new($self->result_source, $new_attrs);
243   if ($rows) {
244     $rs->set_cache($rows);
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 If your table does not have a primary key, you B<must> provide a value for the
312 C<key> attribute matching one of the unique constraints on the source.
313
314 See also L</find_or_create> and L</update_or_create>. For information on how to
315 declare unique constraints, see
316 L<DBIx::Class::ResultSource/add_unique_constraint>.
317
318 =cut
319
320 sub find {
321   my $self = shift;
322   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
323
324   # Default to the primary key, but allow a specific key
325   my @cols = exists $attrs->{key}
326     ? $self->result_source->unique_constraint_columns($attrs->{key})
327     : $self->result_source->primary_columns;
328   $self->throw_exception(
329     "Can't find unless a primary key is defined or unique constraint is specified"
330   ) unless @cols;
331
332   # Parse out a hashref from input
333   my $input_query;
334   if (ref $_[0] eq 'HASH') {
335     $input_query = { %{$_[0]} };
336   }
337   elsif (@_ == @cols) {
338     $input_query = {};
339     @{$input_query}{@cols} = @_;
340   }
341   else {
342     # Compatibility: Allow e.g. find(id => $value)
343     carp "Find by key => value deprecated; please use a hashref instead";
344     $input_query = {@_};
345   }
346
347   my (%related, $info);
348
349   foreach my $key (keys %$input_query) {
350     if (ref($input_query->{$key})
351         && ($info = $self->result_source->relationship_info($key))) {
352       my $rel_q = $self->result_source->resolve_condition(
353                     $info->{cond}, delete $input_query->{$key}, $key
354                   );
355       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
356       @related{keys %$rel_q} = values %$rel_q;
357     }
358   }
359   if (my @keys = keys %related) {
360     @{$input_query}{@keys} = values %related;
361   }
362
363   my @unique_queries = $self->_unique_queries($input_query, $attrs);
364
365   # Build the final query: Default to the disjunction of the unique queries,
366   # but allow the input query in case the ResultSet defines the query or the
367   # user is abusing find
368   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
369   my $query = @unique_queries
370     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
371     : $self->_add_alias($input_query, $alias);
372
373   # Run the query
374   if (keys %$attrs) {
375     my $rs = $self->search($query, $attrs);
376     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
377   }
378   else {
379     return keys %{$self->_resolved_attrs->{collapse}}
380       ? $self->search($query)->next
381       : $self->single($query);
382   }
383 }
384
385 # _add_alias
386 #
387 # Add the specified alias to the specified query hash. A copy is made so the
388 # original query is not modified.
389
390 sub _add_alias {
391   my ($self, $query, $alias) = @_;
392
393   my %aliased = %$query;
394   foreach my $col (grep { ! m/\./ } keys %aliased) {
395     $aliased{"$alias.$col"} = delete $aliased{$col};
396   }
397
398   return \%aliased;
399 }
400
401 # _unique_queries
402 #
403 # Build a list of queries which satisfy unique constraints.
404
405 sub _unique_queries {
406   my ($self, $query, $attrs) = @_;
407
408   my @constraint_names = exists $attrs->{key}
409     ? ($attrs->{key})
410     : $self->result_source->unique_constraint_names;
411
412   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
413   my $num_where = scalar keys %$where;
414
415   my @unique_queries;
416   foreach my $name (@constraint_names) {
417     my @unique_cols = $self->result_source->unique_constraint_columns($name);
418     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
419
420     my $num_cols = scalar @unique_cols;
421     my $num_query = scalar keys %$unique_query;
422
423     my $total = $num_query + $num_where;
424     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
425       # The query is either unique on its own or is unique in combination with
426       # the existing where clause
427       push @unique_queries, $unique_query;
428     }
429   }
430
431   return @unique_queries;
432 }
433
434 # _build_unique_query
435 #
436 # Constrain the specified query hash based on the specified column names.
437
438 sub _build_unique_query {
439   my ($self, $query, $unique_cols) = @_;
440
441   return {
442     map  { $_ => $query->{$_} }
443     grep { exists $query->{$_} }
444       @$unique_cols
445   };
446 }
447
448 =head2 search_related
449
450 =over 4
451
452 =item Arguments: $rel, $cond, \%attrs?
453
454 =item Return Value: $new_resultset
455
456 =back
457
458   $new_rs = $cd_rs->search_related('artist', {
459     name => 'Emo-R-Us',
460   });
461
462 Searches the specified relationship, optionally specifying a condition and
463 attributes for matching records. See L</ATTRIBUTES> for more information.
464
465 =cut
466
467 sub search_related {
468   return shift->related_resultset(shift)->search(@_);
469 }
470
471 =head2 cursor
472
473 =over 4
474
475 =item Arguments: none
476
477 =item Return Value: $cursor
478
479 =back
480
481 Returns a storage-driven cursor to the given resultset. See
482 L<DBIx::Class::Cursor> for more information.
483
484 =cut
485
486 sub cursor {
487   my ($self) = @_;
488
489   my $attrs = { %{$self->_resolved_attrs} };
490   return $self->{cursor}
491     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
492           $attrs->{where},$attrs);
493 }
494
495 =head2 single
496
497 =over 4
498
499 =item Arguments: $cond?
500
501 =item Return Value: $row_object?
502
503 =back
504
505   my $cd = $schema->resultset('CD')->single({ year => 2001 });
506
507 Inflates the first result without creating a cursor if the resultset has
508 any records in it; if not returns nothing. Used by L</find> as an optimisation.
509
510 Can optionally take an additional condition *only* - this is a fast-code-path
511 method; if you need to add extra joins or similar call ->search and then
512 ->single without a condition on the $rs returned from that.
513
514 =cut
515
516 sub single {
517   my ($self, $where) = @_;
518   my $attrs = { %{$self->_resolved_attrs} };
519   if ($where) {
520     if (defined $attrs->{where}) {
521       $attrs->{where} = {
522         '-and' =>
523             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
524                $where, delete $attrs->{where} ]
525       };
526     } else {
527       $attrs->{where} = $where;
528     }
529   }
530
531 #  XXX: Disabled since it doesn't infer uniqueness in all cases
532 #  unless ($self->_is_unique_query($attrs->{where})) {
533 #    carp "Query not guaranteed to return a single row"
534 #      . "; please declare your unique constraints or use search instead";
535 #  }
536
537   my @data = $self->result_source->storage->select_single(
538     $attrs->{from}, $attrs->{select},
539     $attrs->{where}, $attrs
540   );
541
542   return (@data ? ($self->_construct_object(@data))[0] : ());
543 }
544
545 # _is_unique_query
546 #
547 # Try to determine if the specified query is guaranteed to be unique, based on
548 # the declared unique constraints.
549
550 sub _is_unique_query {
551   my ($self, $query) = @_;
552
553   my $collapsed = $self->_collapse_query($query);
554   my $alias = $self->{attrs}{alias};
555
556   foreach my $name ($self->result_source->unique_constraint_names) {
557     my @unique_cols = map {
558       "$alias.$_"
559     } $self->result_source->unique_constraint_columns($name);
560
561     # Count the values for each unique column
562     my %seen = map { $_ => 0 } @unique_cols;
563
564     foreach my $key (keys %$collapsed) {
565       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
566       next unless exists $seen{$aliased};  # Additional constraints are okay
567       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
568     }
569
570     # If we get 0 or more than 1 value for a column, it's not necessarily unique
571     return 1 unless grep { $_ != 1 } values %seen;
572   }
573
574   return 0;
575 }
576
577 # _collapse_query
578 #
579 # Recursively collapse the query, accumulating values for each column.
580
581 sub _collapse_query {
582   my ($self, $query, $collapsed) = @_;
583
584   $collapsed ||= {};
585
586   if (ref $query eq 'ARRAY') {
587     foreach my $subquery (@$query) {
588       next unless ref $subquery;  # -or
589 #      warn "ARRAY: " . Dumper $subquery;
590       $collapsed = $self->_collapse_query($subquery, $collapsed);
591     }
592   }
593   elsif (ref $query eq 'HASH') {
594     if (keys %$query and (keys %$query)[0] eq '-and') {
595       foreach my $subquery (@{$query->{-and}}) {
596 #        warn "HASH: " . Dumper $subquery;
597         $collapsed = $self->_collapse_query($subquery, $collapsed);
598       }
599     }
600     else {
601 #      warn "LEAF: " . Dumper $query;
602       foreach my $col (keys %$query) {
603         my $value = $query->{$col};
604         $collapsed->{$col}{$value}++;
605       }
606     }
607   }
608
609   return $collapsed;
610 }
611
612 =head2 get_column
613
614 =over 4
615
616 =item Arguments: $cond?
617
618 =item Return Value: $resultsetcolumn
619
620 =back
621
622   my $max_length = $rs->get_column('length')->max;
623
624 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
625
626 =cut
627
628 sub get_column {
629   my ($self, $column) = @_;
630   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
631   return $new;
632 }
633
634 =head2 search_like
635
636 =over 4
637
638 =item Arguments: $cond, \%attrs?
639
640 =item Return Value: $resultset (scalar context), @row_objs (list context)
641
642 =back
643
644   # WHERE title LIKE '%blue%'
645   $cd_rs = $rs->search_like({ title => '%blue%'});
646
647 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
648 that this is simply a convenience method. You most likely want to use
649 L</search> with specific operators.
650
651 For more information, see L<DBIx::Class::Manual::Cookbook>.
652
653 =cut
654
655 sub search_like {
656   my $class = shift;
657   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
658   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
659   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
660   return $class->search($query, { %$attrs });
661 }
662
663 =head2 slice
664
665 =over 4
666
667 =item Arguments: $first, $last
668
669 =item Return Value: $resultset (scalar context), @row_objs (list context)
670
671 =back
672
673 Returns a resultset or object list representing a subset of elements from the
674 resultset slice is called on. Indexes are from 0, i.e., to get the first
675 three records, call:
676
677   my ($one, $two, $three) = $rs->slice(0, 2);
678
679 =cut
680
681 sub slice {
682   my ($self, $min, $max) = @_;
683   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
684   $attrs->{offset} = $self->{attrs}{offset} || 0;
685   $attrs->{offset} += $min;
686   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
687   return $self->search(undef(), $attrs);
688   #my $slice = (ref $self)->new($self->result_source, $attrs);
689   #return (wantarray ? $slice->all : $slice);
690 }
691
692 =head2 next
693
694 =over 4
695
696 =item Arguments: none
697
698 =item Return Value: $result?
699
700 =back
701
702 Returns the next element in the resultset (C<undef> is there is none).
703
704 Can be used to efficiently iterate over records in the resultset:
705
706   my $rs = $schema->resultset('CD')->search;
707   while (my $cd = $rs->next) {
708     print $cd->title;
709   }
710
711 Note that you need to store the resultset object, and call C<next> on it.
712 Calling C<< resultset('Table')->next >> repeatedly will always return the
713 first record from the resultset.
714
715 =cut
716
717 sub next {
718   my ($self) = @_;
719   if (my $cache = $self->get_cache) {
720     $self->{all_cache_position} ||= 0;
721     return $cache->[$self->{all_cache_position}++];
722   }
723   if ($self->{attrs}{cache}) {
724     $self->{all_cache_position} = 1;
725     return ($self->all)[0];
726   }
727   if ($self->{stashed_objects}) {
728     my $obj = shift(@{$self->{stashed_objects}});
729     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
730     return $obj;
731   }
732   my @row = (
733     exists $self->{stashed_row}
734       ? @{delete $self->{stashed_row}}
735       : $self->cursor->next
736   );
737   return unless (@row);
738   my ($row, @more) = $self->_construct_object(@row);
739   $self->{stashed_objects} = \@more if @more;
740   return $row;
741 }
742
743 sub _construct_object {
744   my ($self, @row) = @_;
745   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
746   my @new = $self->result_class->inflate_result($self->result_source, @$info);
747   @new = $self->{_attrs}{record_filter}->(@new)
748     if exists $self->{_attrs}{record_filter};
749   return @new;
750 }
751
752 sub _collapse_result {
753   my ($self, $as, $row, $prefix) = @_;
754
755   my %const;
756   my @copy = @$row;
757   
758   foreach my $this_as (@$as) {
759     my $val = shift @copy;
760     if (defined $prefix) {
761       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
762         my $remain = $1;
763         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
764         $const{$1||''}{$2} = $val;
765       }
766     } else {
767       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
768       $const{$1||''}{$2} = $val;
769     }
770   }
771
772   my $alias = $self->{attrs}{alias};
773   my $info = [ {}, {} ];
774   foreach my $key (keys %const) {
775     if (length $key && $key ne $alias) {
776       my $target = $info;
777       my @parts = split(/\./, $key);
778       foreach my $p (@parts) {
779         $target = $target->[1]->{$p} ||= [];
780       }
781       $target->[0] = $const{$key};
782     } else {
783       $info->[0] = $const{$key};
784     }
785   }
786   
787   my @collapse;
788   if (defined $prefix) {
789     @collapse = map {
790         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
791     } keys %{$self->{_attrs}{collapse}}
792   } else {
793     @collapse = keys %{$self->{_attrs}{collapse}};
794   };
795
796   if (@collapse) {
797     my ($c) = sort { length $a <=> length $b } @collapse;
798     my $target = $info;
799     foreach my $p (split(/\./, $c)) {
800       $target = $target->[1]->{$p} ||= [];
801     }
802     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
803     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
804     my $tree = $self->_collapse_result($as, $row, $c_prefix);
805     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
806     my (@final, @raw);
807
808     while (
809       !(
810         grep {
811           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
812         } @co_key
813         )
814     ) {
815       push(@final, $tree);
816       last unless (@raw = $self->cursor->next);
817       $row = $self->{stashed_row} = \@raw;
818       $tree = $self->_collapse_result($as, $row, $c_prefix);
819     }
820     @$target = (@final ? @final : [ {}, {} ]);
821       # single empty result to indicate an empty prefetched has_many
822   }
823
824   #print "final info: " . Dumper($info);
825   return $info;
826 }
827
828 =head2 result_source
829
830 =over 4
831
832 =item Arguments: $result_source?
833
834 =item Return Value: $result_source
835
836 =back
837
838 An accessor for the primary ResultSource object from which this ResultSet
839 is derived.
840
841 =head2 result_class
842
843 =over 4
844
845 =item Arguments: $result_class?
846
847 =item Return Value: $result_class
848
849 =back
850
851 An accessor for the class to use when creating row objects. Defaults to 
852 C<< result_source->result_class >> - which in most cases is the name of the 
853 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
854
855 =cut
856
857
858 =head2 count
859
860 =over 4
861
862 =item Arguments: $cond, \%attrs??
863
864 =item Return Value: $count
865
866 =back
867
868 Performs an SQL C<COUNT> with the same query as the resultset was built
869 with to find the number of elements. If passed arguments, does a search
870 on the resultset and counts the results of that.
871
872 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
873 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
874 not support C<DISTINCT> with multiple columns. If you are using such a
875 database, you should only use columns from the main table in your C<group_by>
876 clause.
877
878 =cut
879
880 sub count {
881   my $self = shift;
882   return $self->search(@_)->count if @_ and defined $_[0];
883   return scalar @{ $self->get_cache } if $self->get_cache;
884   my $count = $self->_count;
885   return 0 unless $count;
886
887   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
888   $count = $self->{attrs}{rows} if
889     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
890   return $count;
891 }
892
893 sub _count { # Separated out so pager can get the full count
894   my $self = shift;
895   my $select = { count => '*' };
896
897   my $attrs = { %{$self->_resolved_attrs} };
898   if (my $group_by = delete $attrs->{group_by}) {
899     delete $attrs->{having};
900     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
901     # todo: try CONCAT for multi-column pk
902     my @pk = $self->result_source->primary_columns;
903     if (@pk == 1) {
904       my $alias = $attrs->{alias};
905       foreach my $column (@distinct) {
906         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
907           @distinct = ($column);
908           last;
909         }
910       }
911     }
912
913     $select = { count => { distinct => \@distinct } };
914   }
915
916   $attrs->{select} = $select;
917   $attrs->{as} = [qw/count/];
918
919   # offset, order by and page are not needed to count. record_filter is cdbi
920   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
921
922   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
923   my ($count) = $tmp_rs->cursor->next;
924   return $count;
925 }
926
927 =head2 count_literal
928
929 =over 4
930
931 =item Arguments: $sql_fragment, @bind_values
932
933 =item Return Value: $count
934
935 =back
936
937 Counts the results in a literal query. Equivalent to calling L</search_literal>
938 with the passed arguments, then L</count>.
939
940 =cut
941
942 sub count_literal { shift->search_literal(@_)->count; }
943
944 =head2 all
945
946 =over 4
947
948 =item Arguments: none
949
950 =item Return Value: @objects
951
952 =back
953
954 Returns all elements in the resultset. Called implicitly if the resultset
955 is returned in list context.
956
957 =cut
958
959 sub all {
960   my ($self) = @_;
961   return @{ $self->get_cache } if $self->get_cache;
962
963   my @obj;
964
965   # TODO: don't call resolve here
966   if (keys %{$self->_resolved_attrs->{collapse}}) {
967 #  if ($self->{attrs}{prefetch}) {
968       # Using $self->cursor->all is really just an optimisation.
969       # If we're collapsing has_many prefetches it probably makes
970       # very little difference, and this is cleaner than hacking
971       # _construct_object to survive the approach
972     my @row = $self->cursor->next;
973     while (@row) {
974       push(@obj, $self->_construct_object(@row));
975       @row = (exists $self->{stashed_row}
976                ? @{delete $self->{stashed_row}}
977                : $self->cursor->next);
978     }
979   } else {
980     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
981   }
982
983   $self->set_cache(\@obj) if $self->{attrs}{cache};
984   return @obj;
985 }
986
987 =head2 reset
988
989 =over 4
990
991 =item Arguments: none
992
993 =item Return Value: $self
994
995 =back
996
997 Resets the resultset's cursor, so you can iterate through the elements again.
998
999 =cut
1000
1001 sub reset {
1002   my ($self) = @_;
1003   delete $self->{_attrs} if exists $self->{_attrs};
1004   $self->{all_cache_position} = 0;
1005   $self->cursor->reset;
1006   return $self;
1007 }
1008
1009 =head2 first
1010
1011 =over 4
1012
1013 =item Arguments: none
1014
1015 =item Return Value: $object?
1016
1017 =back
1018
1019 Resets the resultset and returns an object for the first result (if the
1020 resultset returns anything).
1021
1022 =cut
1023
1024 sub first {
1025   return $_[0]->reset->next;
1026 }
1027
1028 # _cond_for_update_delete
1029 #
1030 # update/delete require the condition to be modified to handle
1031 # the differing SQL syntax available.  This transforms the $self->{cond}
1032 # appropriately, returning the new condition.
1033
1034 sub _cond_for_update_delete {
1035   my ($self, $full_cond) = @_;
1036   my $cond = {};
1037
1038   $full_cond ||= $self->{cond};
1039   # No-op. No condition, we're updating/deleting everything
1040   return $cond unless ref $full_cond;
1041
1042   if (ref $full_cond eq 'ARRAY') {
1043     $cond = [
1044       map {
1045         my %hash;
1046         foreach my $key (keys %{$_}) {
1047           $key =~ /([^.]+)$/;
1048           $hash{$1} = $_->{$key};
1049         }
1050         \%hash;
1051       } @{$full_cond}
1052     ];
1053   }
1054   elsif (ref $full_cond eq 'HASH') {
1055     if ((keys %{$full_cond})[0] eq '-and') {
1056       $cond->{-and} = [];
1057
1058       my @cond = @{$full_cond->{-and}};
1059       for (my $i = 0; $i < @cond; $i++) {
1060         my $entry = $cond[$i];
1061
1062         my $hash;
1063         if (ref $entry eq 'HASH') {
1064           $hash = $self->_cond_for_update_delete($entry);
1065         }
1066         else {
1067           $entry =~ /([^.]+)$/;
1068           $hash->{$1} = $cond[++$i];
1069         }
1070
1071         push @{$cond->{-and}}, $hash;
1072       }
1073     }
1074     else {
1075       foreach my $key (keys %{$full_cond}) {
1076         $key =~ /([^.]+)$/;
1077         $cond->{$1} = $full_cond->{$key};
1078       }
1079     }
1080   }
1081   else {
1082     $self->throw_exception(
1083       "Can't update/delete on resultset with condition unless hash or array"
1084     );
1085   }
1086
1087   return $cond;
1088 }
1089
1090
1091 =head2 update
1092
1093 =over 4
1094
1095 =item Arguments: \%values
1096
1097 =item Return Value: $storage_rv
1098
1099 =back
1100
1101 Sets the specified columns in the resultset to the supplied values in a
1102 single query. Return value will be true if the update succeeded or false
1103 if no records were updated; exact type of success value is storage-dependent.
1104
1105 =cut
1106
1107 sub update {
1108   my ($self, $values) = @_;
1109   $self->throw_exception("Values for update must be a hash")
1110     unless ref $values eq 'HASH';
1111
1112   my $cond = $self->_cond_for_update_delete;
1113
1114   return $self->result_source->storage->update(
1115     $self->result_source->from, $values, $cond
1116   );
1117 }
1118
1119 =head2 update_all
1120
1121 =over 4
1122
1123 =item Arguments: \%values
1124
1125 =item Return Value: 1
1126
1127 =back
1128
1129 Fetches all objects and updates them one at a time. Note that C<update_all>
1130 will run DBIC cascade triggers, while L</update> will not.
1131
1132 =cut
1133
1134 sub update_all {
1135   my ($self, $values) = @_;
1136   $self->throw_exception("Values for update must be a hash")
1137     unless ref $values eq 'HASH';
1138   foreach my $obj ($self->all) {
1139     $obj->set_columns($values)->update;
1140   }
1141   return 1;
1142 }
1143
1144 =head2 delete
1145
1146 =over 4
1147
1148 =item Arguments: none
1149
1150 =item Return Value: 1
1151
1152 =back
1153
1154 Deletes the contents of the resultset from its result source. Note that this
1155 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1156 to run. See also L<DBIx::Class::Row/delete>.
1157
1158 =cut
1159
1160 sub delete {
1161   my ($self) = @_;
1162
1163   my $cond = $self->_cond_for_update_delete;
1164
1165   $self->result_source->storage->delete($self->result_source->from, $cond);
1166   return 1;
1167 }
1168
1169 =head2 delete_all
1170
1171 =over 4
1172
1173 =item Arguments: none
1174
1175 =item Return Value: 1
1176
1177 =back
1178
1179 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1180 will run DBIC cascade triggers, while L</delete> will not.
1181
1182 =cut
1183
1184 sub delete_all {
1185   my ($self) = @_;
1186   $_->delete for $self->all;
1187   return 1;
1188 }
1189
1190 =head2 pager
1191
1192 =over 4
1193
1194 =item Arguments: none
1195
1196 =item Return Value: $pager
1197
1198 =back
1199
1200 Return Value a L<Data::Page> object for the current resultset. Only makes
1201 sense for queries with a C<page> attribute.
1202
1203 =cut
1204
1205 sub pager {
1206   my ($self) = @_;
1207   my $attrs = $self->{attrs};
1208   $self->throw_exception("Can't create pager for non-paged rs")
1209     unless $self->{attrs}{page};
1210   $attrs->{rows} ||= 10;
1211   return $self->{pager} ||= Data::Page->new(
1212     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1213 }
1214
1215 =head2 page
1216
1217 =over 4
1218
1219 =item Arguments: $page_number
1220
1221 =item Return Value: $rs
1222
1223 =back
1224
1225 Returns a resultset for the $page_number page of the resultset on which page
1226 is called, where each page contains a number of rows equal to the 'rows'
1227 attribute set on the resultset (10 by default).
1228
1229 =cut
1230
1231 sub page {
1232   my ($self, $page) = @_;
1233   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1234 }
1235
1236 =head2 new_result
1237
1238 =over 4
1239
1240 =item Arguments: \%vals
1241
1242 =item Return Value: $object
1243
1244 =back
1245
1246 Creates an object in the resultset's result class and returns it.
1247
1248 =cut
1249
1250 sub new_result {
1251   my ($self, $values) = @_;
1252   $self->throw_exception( "new_result needs a hash" )
1253     unless (ref $values eq 'HASH');
1254   $self->throw_exception(
1255     "Can't abstract implicit construct, condition not a hash"
1256   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1257
1258   my $alias = $self->{attrs}{alias};
1259   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1260   my %new = (
1261     %{ $self->_remove_alias($values, $alias) },
1262     %{ $self->_remove_alias($collapsed_cond, $alias) },
1263     -result_source => $self->result_source,
1264   );
1265
1266   my $obj = $self->result_class->new(\%new);
1267   return $obj;
1268 }
1269
1270 # _collapse_cond
1271 #
1272 # Recursively collapse the condition.
1273
1274 sub _collapse_cond {
1275   my ($self, $cond, $collapsed) = @_;
1276
1277   $collapsed ||= {};
1278
1279   if (ref $cond eq 'ARRAY') {
1280     foreach my $subcond (@$cond) {
1281       next unless ref $subcond;  # -or
1282 #      warn "ARRAY: " . Dumper $subcond;
1283       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1284     }
1285   }
1286   elsif (ref $cond eq 'HASH') {
1287     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1288       foreach my $subcond (@{$cond->{-and}}) {
1289 #        warn "HASH: " . Dumper $subcond;
1290         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1291       }
1292     }
1293     else {
1294 #      warn "LEAF: " . Dumper $cond;
1295       foreach my $col (keys %$cond) {
1296         my $value = $cond->{$col};
1297         $collapsed->{$col} = $value;
1298       }
1299     }
1300   }
1301
1302   return $collapsed;
1303 }
1304
1305 # _remove_alias
1306 #
1307 # Remove the specified alias from the specified query hash. A copy is made so
1308 # the original query is not modified.
1309
1310 sub _remove_alias {
1311   my ($self, $query, $alias) = @_;
1312
1313   my %orig = %{ $query || {} };
1314   my %unaliased;
1315
1316   foreach my $key (keys %orig) {
1317     if ($key !~ /\./) {
1318       $unaliased{$key} = $orig{$key};
1319       next;
1320     }
1321     $unaliased{$1} = $orig{$key}
1322       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1323   }
1324
1325   return \%unaliased;
1326 }
1327
1328 =head2 find_or_new
1329
1330 =over 4
1331
1332 =item Arguments: \%vals, \%attrs?
1333
1334 =item Return Value: $object
1335
1336 =back
1337
1338 Find an existing record from this resultset. If none exists, instantiate a new
1339 result object and return it. The object will not be saved into your storage
1340 until you call L<DBIx::Class::Row/insert> on it.
1341
1342 If you want objects to be saved immediately, use L</find_or_create> instead.
1343
1344 =cut
1345
1346 sub find_or_new {
1347   my $self     = shift;
1348   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1349   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1350   my $exists   = $self->find($hash, $attrs);
1351   return defined $exists ? $exists : $self->new_result($hash);
1352 }
1353
1354 =head2 create
1355
1356 =over 4
1357
1358 =item Arguments: \%vals
1359
1360 =item Return Value: $object
1361
1362 =back
1363
1364 Inserts a record into the resultset and returns the object representing it.
1365
1366 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1367
1368 =cut
1369
1370 sub create {
1371   my ($self, $attrs) = @_;
1372   $self->throw_exception( "create needs a hashref" )
1373     unless ref $attrs eq 'HASH';
1374   return $self->new_result($attrs)->insert;
1375 }
1376
1377 =head2 find_or_create
1378
1379 =over 4
1380
1381 =item Arguments: \%vals, \%attrs?
1382
1383 =item Return Value: $object
1384
1385 =back
1386
1387   $class->find_or_create({ key => $val, ... });
1388
1389 Tries to find a record based on its primary key or unique constraint; if none
1390 is found, creates one and returns that instead.
1391
1392   my $cd = $schema->resultset('CD')->find_or_create({
1393     cdid   => 5,
1394     artist => 'Massive Attack',
1395     title  => 'Mezzanine',
1396     year   => 2005,
1397   });
1398
1399 Also takes an optional C<key> attribute, to search by a specific key or unique
1400 constraint. For example:
1401
1402   my $cd = $schema->resultset('CD')->find_or_create(
1403     {
1404       artist => 'Massive Attack',
1405       title  => 'Mezzanine',
1406     },
1407     { key => 'cd_artist_title' }
1408   );
1409
1410 See also L</find> and L</update_or_create>. For information on how to declare
1411 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1412
1413 =cut
1414
1415 sub find_or_create {
1416   my $self     = shift;
1417   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1418   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1419   my $exists   = $self->find($hash, $attrs);
1420   return defined $exists ? $exists : $self->create($hash);
1421 }
1422
1423 =head2 update_or_create
1424
1425 =over 4
1426
1427 =item Arguments: \%col_values, { key => $unique_constraint }?
1428
1429 =item Return Value: $object
1430
1431 =back
1432
1433   $class->update_or_create({ col => $val, ... });
1434
1435 First, searches for an existing row matching one of the unique constraints
1436 (including the primary key) on the source of this resultset. If a row is
1437 found, updates it with the other given column values. Otherwise, creates a new
1438 row.
1439
1440 Takes an optional C<key> attribute to search on a specific unique constraint.
1441 For example:
1442
1443   # In your application
1444   my $cd = $schema->resultset('CD')->update_or_create(
1445     {
1446       artist => 'Massive Attack',
1447       title  => 'Mezzanine',
1448       year   => 1998,
1449     },
1450     { key => 'cd_artist_title' }
1451   );
1452
1453 If no C<key> is specified, it searches on all unique constraints defined on the
1454 source, including the primary key.
1455
1456 If the C<key> is specified as C<primary>, it searches only on the primary key.
1457
1458 See also L</find> and L</find_or_create>. For information on how to declare
1459 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1460
1461 =cut
1462
1463 sub update_or_create {
1464   my $self = shift;
1465   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1466   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1467
1468   my $row = $self->find($cond, $attrs);
1469   if (defined $row) {
1470     $row->update($cond);
1471     return $row;
1472   }
1473
1474   return $self->create($cond);
1475 }
1476
1477 =head2 get_cache
1478
1479 =over 4
1480
1481 =item Arguments: none
1482
1483 =item Return Value: \@cache_objects?
1484
1485 =back
1486
1487 Gets the contents of the cache for the resultset, if the cache is set.
1488
1489 =cut
1490
1491 sub get_cache {
1492   shift->{all_cache};
1493 }
1494
1495 =head2 set_cache
1496
1497 =over 4
1498
1499 =item Arguments: \@cache_objects
1500
1501 =item Return Value: \@cache_objects
1502
1503 =back
1504
1505 Sets the contents of the cache for the resultset. Expects an arrayref
1506 of objects of the same class as those produced by the resultset. Note that
1507 if the cache is set the resultset will return the cached objects rather
1508 than re-querying the database even if the cache attr is not set.
1509
1510 =cut
1511
1512 sub set_cache {
1513   my ( $self, $data ) = @_;
1514   $self->throw_exception("set_cache requires an arrayref")
1515       if defined($data) && (ref $data ne 'ARRAY');
1516   $self->{all_cache} = $data;
1517 }
1518
1519 =head2 clear_cache
1520
1521 =over 4
1522
1523 =item Arguments: none
1524
1525 =item Return Value: []
1526
1527 =back
1528
1529 Clears the cache for the resultset.
1530
1531 =cut
1532
1533 sub clear_cache {
1534   shift->set_cache(undef);
1535 }
1536
1537 =head2 related_resultset
1538
1539 =over 4
1540
1541 =item Arguments: $relationship_name
1542
1543 =item Return Value: $resultset
1544
1545 =back
1546
1547 Returns a related resultset for the supplied relationship name.
1548
1549   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1550
1551 =cut
1552
1553 sub related_resultset {
1554   my ($self, $rel) = @_;
1555
1556   $self->{related_resultsets} ||= {};
1557   return $self->{related_resultsets}{$rel} ||= do {
1558     my $rel_obj = $self->result_source->relationship_info($rel);
1559
1560     $self->throw_exception(
1561       "search_related: result source '" . $self->result_source->name .
1562         "' has no such relationship $rel")
1563       unless $rel_obj;
1564     
1565     my ($from,$seen) = $self->_resolve_from($rel);
1566
1567     my $join_count = $seen->{$rel};
1568     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1569
1570     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1571       undef, {
1572         %{$self->{attrs}||{}},
1573         join => undef,
1574         prefetch => undef,
1575         select => undef,
1576         as => undef,
1577         alias => $alias,
1578         where => $self->{cond},
1579         seen_join => $seen,
1580         from => $from,
1581     });
1582   };
1583 }
1584
1585 sub _resolve_from {
1586   my ($self, $extra_join) = @_;
1587   my $source = $self->result_source;
1588   my $attrs = $self->{attrs};
1589   
1590   my $from = $attrs->{from}
1591     || [ { $attrs->{alias} => $source->from } ];
1592     
1593   my $seen = { %{$attrs->{seen_join}||{}} };
1594
1595   my $join = ($attrs->{join}
1596                ? [ $attrs->{join}, $extra_join ]
1597                : $extra_join);
1598   $from = [
1599     @$from,
1600     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1601   ];
1602
1603   return ($from,$seen);
1604 }
1605
1606 sub _resolved_attrs {
1607   my $self = shift;
1608   return $self->{_attrs} if $self->{_attrs};
1609
1610   my $attrs = { %{$self->{attrs}||{}} };
1611   my $source = $self->{result_source};
1612   my $alias = $attrs->{alias};
1613
1614   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1615   if ($attrs->{columns}) {
1616     delete $attrs->{as};
1617   } elsif (!$attrs->{select}) {
1618     $attrs->{columns} = [ $source->columns ];
1619   }
1620  
1621   $attrs->{select} = 
1622     ($attrs->{select}
1623       ? (ref $attrs->{select} eq 'ARRAY'
1624           ? [ @{$attrs->{select}} ]
1625           : [ $attrs->{select} ])
1626       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1627     );
1628   $attrs->{as} =
1629     ($attrs->{as}
1630       ? (ref $attrs->{as} eq 'ARRAY'
1631           ? [ @{$attrs->{as}} ]
1632           : [ $attrs->{as} ])
1633       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1634     );
1635   
1636   my $adds;
1637   if ($adds = delete $attrs->{include_columns}) {
1638     $adds = [$adds] unless ref $adds eq 'ARRAY';
1639     push(@{$attrs->{select}}, @$adds);
1640     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1641   }
1642   if ($adds = delete $attrs->{'+select'}) {
1643     $adds = [$adds] unless ref $adds eq 'ARRAY';
1644     push(@{$attrs->{select}},
1645            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1646   }
1647   if (my $adds = delete $attrs->{'+as'}) {
1648     $adds = [$adds] unless ref $adds eq 'ARRAY';
1649     push(@{$attrs->{as}}, @$adds);
1650   }
1651
1652   $attrs->{from} ||= [ { 'me' => $source->from } ];
1653
1654   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1655     my $join = delete $attrs->{join} || {};
1656
1657     if (defined $attrs->{prefetch}) {
1658       $join = $self->_merge_attr(
1659         $join, $attrs->{prefetch}
1660       );
1661     }
1662
1663     $attrs->{from} =   # have to copy here to avoid corrupting the original
1664       [
1665         @{$attrs->{from}}, 
1666         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1667       ];
1668   }
1669
1670   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1671   if ($attrs->{order_by}) {
1672     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1673                            ? [ @{$attrs->{order_by}} ]
1674                            : [ $attrs->{order_by} ]);
1675   } else {
1676     $attrs->{order_by} = [];    
1677   }
1678
1679   my $collapse = $attrs->{collapse} || {};
1680   if (my $prefetch = delete $attrs->{prefetch}) {
1681     $prefetch = $self->_merge_attr({}, $prefetch);
1682     my @pre_order;
1683     my $seen = $attrs->{seen_join} || {};
1684     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1685       # bring joins back to level of current class
1686       my @prefetch = $source->resolve_prefetch(
1687         $p, $alias, $seen, \@pre_order, $collapse
1688       );
1689       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1690       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1691     }
1692     push(@{$attrs->{order_by}}, @pre_order);
1693   }
1694   $attrs->{collapse} = $collapse;
1695
1696   return $self->{_attrs} = $attrs;
1697 }
1698
1699 sub _merge_attr {
1700   my ($self, $a, $b) = @_;
1701   return $b unless defined($a);
1702   return $a unless defined($b);
1703   
1704   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1705     foreach my $key (keys %{$b}) {
1706       if (exists $a->{$key}) {
1707         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1708       } else {
1709         $a->{$key} = $b->{$key};
1710       }
1711     }
1712     return $a;
1713   } else {
1714     $a = [$a] unless ref $a eq 'ARRAY';
1715     $b = [$b] unless ref $b eq 'ARRAY';
1716
1717     my $hash = {};
1718     my @array;
1719     foreach my $x ($a, $b) {
1720       foreach my $element (@{$x}) {
1721         if (ref $element eq 'HASH') {
1722           $hash = $self->_merge_attr($hash, $element);
1723         } elsif (ref $element eq 'ARRAY') {
1724           push(@array, @{$element});
1725         } else {
1726           push(@array, $element) unless $b == $x
1727             && grep { $_ eq $element } @array;
1728         }
1729       }
1730     }
1731     
1732     @array = grep { !exists $hash->{$_} } @array;
1733
1734     return keys %{$hash}
1735       ? ( scalar(@array)
1736             ? [$hash, @array]
1737             : $hash
1738         )
1739       : \@array;
1740   }
1741 }
1742
1743 =head2 throw_exception
1744
1745 See L<DBIx::Class::Schema/throw_exception> for details.
1746
1747 =cut
1748
1749 sub throw_exception {
1750   my $self=shift;
1751   $self->result_source->schema->throw_exception(@_);
1752 }
1753
1754 # XXX: FIXME: Attributes docs need clearing up
1755
1756 =head1 ATTRIBUTES
1757
1758 The resultset takes various attributes that modify its behavior. Here's an
1759 overview of them:
1760
1761 =head2 order_by
1762
1763 =over 4
1764
1765 =item Value: ($order_by | \@order_by)
1766
1767 =back
1768
1769 Which column(s) to order the results by. This is currently passed
1770 through directly to SQL, so you can give e.g. C<year DESC> for a
1771 descending order on the column `year'.
1772
1773 Please note that if you have quoting enabled (see
1774 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1775 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1776 so you will need to manually quote things as appropriate.)
1777
1778 =head2 columns
1779
1780 =over 4
1781
1782 =item Value: \@columns
1783
1784 =back
1785
1786 Shortcut to request a particular set of columns to be retrieved.  Adds
1787 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1788 from that, then auto-populates C<as> from C<select> as normal. (You may also
1789 use the C<cols> attribute, as in earlier versions of DBIC.)
1790
1791 =head2 include_columns
1792
1793 =over 4
1794
1795 =item Value: \@columns
1796
1797 =back
1798
1799 Shortcut to include additional columns in the returned results - for example
1800
1801   $schema->resultset('CD')->search(undef, {
1802     include_columns => ['artist.name'],
1803     join => ['artist']
1804   });
1805
1806 would return all CDs and include a 'name' column to the information
1807 passed to object inflation
1808
1809 =head2 select
1810
1811 =over 4
1812
1813 =item Value: \@select_columns
1814
1815 =back
1816
1817 Indicates which columns should be selected from the storage. You can use
1818 column names, or in the case of RDBMS back ends, function or stored procedure
1819 names:
1820
1821   $rs = $schema->resultset('Employee')->search(undef, {
1822     select => [
1823       'name',
1824       { count => 'employeeid' },
1825       { sum => 'salary' }
1826     ]
1827   });
1828
1829 When you use function/stored procedure names and do not supply an C<as>
1830 attribute, the column names returned are storage-dependent. E.g. MySQL would
1831 return a column named C<count(employeeid)> in the above example.
1832
1833 =head2 +select
1834
1835 =over 4
1836
1837 Indicates additional columns to be selected from storage.  Works the same as
1838 L<select> but adds columns to the selection.
1839
1840 =back
1841
1842 =head2 +as
1843
1844 =over 4
1845
1846 Indicates additional column names for those added via L<+select>.
1847
1848 =back
1849
1850 =head2 as
1851
1852 =over 4
1853
1854 =item Value: \@inflation_names
1855
1856 =back
1857
1858 Indicates column names for object inflation. This is used in conjunction with
1859 C<select>, usually when C<select> contains one or more function or stored
1860 procedure names:
1861
1862   $rs = $schema->resultset('Employee')->search(undef, {
1863     select => [
1864       'name',
1865       { count => 'employeeid' }
1866     ],
1867     as => ['name', 'employee_count'],
1868   });
1869
1870   my $employee = $rs->first(); # get the first Employee
1871
1872 If the object against which the search is performed already has an accessor
1873 matching a column name specified in C<as>, the value can be retrieved using
1874 the accessor as normal:
1875
1876   my $name = $employee->name();
1877
1878 If on the other hand an accessor does not exist in the object, you need to
1879 use C<get_column> instead:
1880
1881   my $employee_count = $employee->get_column('employee_count');
1882
1883 You can create your own accessors if required - see
1884 L<DBIx::Class::Manual::Cookbook> for details.
1885
1886 Please note: This will NOT insert an C<AS employee_count> into the SQL
1887 statement produced, it is used for internal access only. Thus
1888 attempting to use the accessor in an C<order_by> clause or similar
1889 will fail miserably.
1890
1891 To get around this limitation, you can supply literal SQL to your
1892 C<select> attibute that contains the C<AS alias> text, eg:
1893
1894   select => [\'myfield AS alias']
1895
1896 =head2 join
1897
1898 =over 4
1899
1900 =item Value: ($rel_name | \@rel_names | \%rel_names)
1901
1902 =back
1903
1904 Contains a list of relationships that should be joined for this query.  For
1905 example:
1906
1907   # Get CDs by Nine Inch Nails
1908   my $rs = $schema->resultset('CD')->search(
1909     { 'artist.name' => 'Nine Inch Nails' },
1910     { join => 'artist' }
1911   );
1912
1913 Can also contain a hash reference to refer to the other relation's relations.
1914 For example:
1915
1916   package MyApp::Schema::Track;
1917   use base qw/DBIx::Class/;
1918   __PACKAGE__->table('track');
1919   __PACKAGE__->add_columns(qw/trackid cd position title/);
1920   __PACKAGE__->set_primary_key('trackid');
1921   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1922   1;
1923
1924   # In your application
1925   my $rs = $schema->resultset('Artist')->search(
1926     { 'track.title' => 'Teardrop' },
1927     {
1928       join     => { cd => 'track' },
1929       order_by => 'artist.name',
1930     }
1931   );
1932
1933 You need to use the relationship (not the table) name in  conditions, 
1934 because they are aliased as such. The current table is aliased as "me", so 
1935 you need to use me.column_name in order to avoid ambiguity. For example:
1936
1937   # Get CDs from 1984 with a 'Foo' track 
1938   my $rs = $schema->resultset('CD')->search(
1939     { 
1940       'me.year' => 1984,
1941       'tracks.name' => 'Foo'
1942     },
1943     { join => 'tracks' }
1944   );
1945   
1946 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1947 similarly for a third time). For e.g.
1948
1949   my $rs = $schema->resultset('Artist')->search({
1950     'cds.title'   => 'Down to Earth',
1951     'cds_2.title' => 'Popular',
1952   }, {
1953     join => [ qw/cds cds/ ],
1954   });
1955
1956 will return a set of all artists that have both a cd with title 'Down
1957 to Earth' and a cd with title 'Popular'.
1958
1959 If you want to fetch related objects from other tables as well, see C<prefetch>
1960 below.
1961
1962 =head2 prefetch
1963
1964 =over 4
1965
1966 =item Value: ($rel_name | \@rel_names | \%rel_names)
1967
1968 =back
1969
1970 Contains one or more relationships that should be fetched along with the main
1971 query (when they are accessed afterwards they will have already been
1972 "prefetched").  This is useful for when you know you will need the related
1973 objects, because it saves at least one query:
1974
1975   my $rs = $schema->resultset('Tag')->search(
1976     undef,
1977     {
1978       prefetch => {
1979         cd => 'artist'
1980       }
1981     }
1982   );
1983
1984 The initial search results in SQL like the following:
1985
1986   SELECT tag.*, cd.*, artist.* FROM tag
1987   JOIN cd ON tag.cd = cd.cdid
1988   JOIN artist ON cd.artist = artist.artistid
1989
1990 L<DBIx::Class> has no need to go back to the database when we access the
1991 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1992 case.
1993
1994 Simple prefetches will be joined automatically, so there is no need
1995 for a C<join> attribute in the above search. If you're prefetching to
1996 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1997 specify the join as well.
1998
1999 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2000 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2001 with an accessor type of 'single' or 'filter').
2002
2003 =head2 page
2004
2005 =over 4
2006
2007 =item Value: $page
2008
2009 =back
2010
2011 Makes the resultset paged and specifies the page to retrieve. Effectively
2012 identical to creating a non-pages resultset and then calling ->page($page)
2013 on it.
2014
2015 If L<rows> attribute is not specified it defualts to 10 rows per page.
2016
2017 =head2 rows
2018
2019 =over 4
2020
2021 =item Value: $rows
2022
2023 =back
2024
2025 Specifes the maximum number of rows for direct retrieval or the number of
2026 rows per page if the page attribute or method is used.
2027
2028 =head2 offset
2029
2030 =over 4
2031
2032 =item Value: $offset
2033
2034 =back
2035
2036 Specifies the (zero-based) row number for the  first row to be returned, or the
2037 of the first row of the first page if paging is used.
2038
2039 =head2 group_by
2040
2041 =over 4
2042
2043 =item Value: \@columns
2044
2045 =back
2046
2047 A arrayref of columns to group by. Can include columns of joined tables.
2048
2049   group_by => [qw/ column1 column2 ... /]
2050
2051 =head2 having
2052
2053 =over 4
2054
2055 =item Value: $condition
2056
2057 =back
2058
2059 HAVING is a select statement attribute that is applied between GROUP BY and
2060 ORDER BY. It is applied to the after the grouping calculations have been
2061 done.
2062
2063   having => { 'count(employee)' => { '>=', 100 } }
2064
2065 =head2 distinct
2066
2067 =over 4
2068
2069 =item Value: (0 | 1)
2070
2071 =back
2072
2073 Set to 1 to group by all columns.
2074
2075 =head2 where
2076
2077 =over 4
2078
2079 Adds to the WHERE clause.
2080
2081   # only return rows WHERE deleted IS NULL for all searches
2082   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2083
2084 Can be overridden by passing C<{ where => undef }> as an attribute
2085 to a resulset.
2086
2087 =back
2088
2089 =head2 cache
2090
2091 Set to 1 to cache search results. This prevents extra SQL queries if you
2092 revisit rows in your ResultSet:
2093
2094   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2095
2096   while( my $artist = $resultset->next ) {
2097     ... do stuff ...
2098   }
2099
2100   $rs->first; # without cache, this would issue a query
2101
2102 By default, searches are not cached.
2103
2104 For more examples of using these attributes, see
2105 L<DBIx::Class::Manual::Cookbook>.
2106
2107 =head2 from
2108
2109 =over 4
2110
2111 =item Value: \@from_clause
2112
2113 =back
2114
2115 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2116 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2117 clauses.
2118
2119 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2120
2121 C<join> will usually do what you need and it is strongly recommended that you
2122 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2123 And we really do mean "cannot", not just tried and failed. Attempting to use
2124 this because you're having problems with C<join> is like trying to use x86
2125 ASM because you've got a syntax error in your C. Trust us on this.
2126
2127 Now, if you're still really, really sure you need to use this (and if you're
2128 not 100% sure, ask the mailing list first), here's an explanation of how this
2129 works.
2130
2131 The syntax is as follows -
2132
2133   [
2134     { <alias1> => <table1> },
2135     [
2136       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2137       [], # nested JOIN (optional)
2138       { <table1.column1> => <table2.column2>, ... (more conditions) },
2139     ],
2140     # More of the above [ ] may follow for additional joins
2141   ]
2142
2143   <table1> <alias1>
2144   JOIN
2145     <table2> <alias2>
2146     [JOIN ...]
2147   ON <table1.column1> = <table2.column2>
2148   <more joins may follow>
2149
2150 An easy way to follow the examples below is to remember the following:
2151
2152     Anything inside "[]" is a JOIN
2153     Anything inside "{}" is a condition for the enclosing JOIN
2154
2155 The following examples utilize a "person" table in a family tree application.
2156 In order to express parent->child relationships, this table is self-joined:
2157
2158     # Person->belongs_to('father' => 'Person');
2159     # Person->belongs_to('mother' => 'Person');
2160
2161 C<from> can be used to nest joins. Here we return all children with a father,
2162 then search against all mothers of those children:
2163
2164   $rs = $schema->resultset('Person')->search(
2165       undef,
2166       {
2167           alias => 'mother', # alias columns in accordance with "from"
2168           from => [
2169               { mother => 'person' },
2170               [
2171                   [
2172                       { child => 'person' },
2173                       [
2174                           { father => 'person' },
2175                           { 'father.person_id' => 'child.father_id' }
2176                       ]
2177                   ],
2178                   { 'mother.person_id' => 'child.mother_id' }
2179               ],
2180           ]
2181       },
2182   );
2183
2184   # Equivalent SQL:
2185   # SELECT mother.* FROM person mother
2186   # JOIN (
2187   #   person child
2188   #   JOIN person father
2189   #   ON ( father.person_id = child.father_id )
2190   # )
2191   # ON ( mother.person_id = child.mother_id )
2192
2193 The type of any join can be controlled manually. To search against only people
2194 with a father in the person table, we could explicitly use C<INNER JOIN>:
2195
2196     $rs = $schema->resultset('Person')->search(
2197         undef,
2198         {
2199             alias => 'child', # alias columns in accordance with "from"
2200             from => [
2201                 { child => 'person' },
2202                 [
2203                     { father => 'person', -join_type => 'inner' },
2204                     { 'father.id' => 'child.father_id' }
2205                 ],
2206             ]
2207         },
2208     );
2209
2210     # Equivalent SQL:
2211     # SELECT child.* FROM person child
2212     # INNER JOIN person father ON child.father_id = father.id
2213
2214 =cut
2215
2216 1;