Merge 'trunk' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my @unique_queries = $self->_unique_queries($input_query, $attrs);
347
348   # Build the final query: Default to the disjunction of the unique queries,
349   # but allow the input query in case the ResultSet defines the query or the
350   # user is abusing find
351   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
352   my $query = @unique_queries
353     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
354     : $self->_add_alias($input_query, $alias);
355
356   # Run the query
357   if (keys %$attrs) {
358     my $rs = $self->search($query, $attrs);
359     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
360   }
361   else {
362     return keys %{$self->_resolved_attrs->{collapse}}
363       ? $self->search($query)->next
364       : $self->single($query);
365   }
366 }
367
368 # _add_alias
369 #
370 # Add the specified alias to the specified query hash. A copy is made so the
371 # original query is not modified.
372
373 sub _add_alias {
374   my ($self, $query, $alias) = @_;
375
376   my %aliased = %$query;
377   foreach my $col (grep { ! m/\./ } keys %aliased) {
378     $aliased{"$alias.$col"} = delete $aliased{$col};
379   }
380
381   return \%aliased;
382 }
383
384 # _unique_queries
385 #
386 # Build a list of queries which satisfy unique constraints.
387
388 sub _unique_queries {
389   my ($self, $query, $attrs) = @_;
390
391   my @constraint_names = exists $attrs->{key}
392     ? ($attrs->{key})
393     : $self->result_source->unique_constraint_names;
394
395   my @unique_queries;
396   foreach my $name (@constraint_names) {
397     my @unique_cols = $self->result_source->unique_constraint_columns($name);
398     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
399
400     my $num_query = scalar keys %$unique_query;
401     next unless $num_query;
402
403     # XXX: Assuming quite a bit about $self->{attrs}{where}
404     my $num_cols = scalar @unique_cols;
405     my $num_where = exists $self->{attrs}{where}
406       ? scalar keys %{ $self->{attrs}{where} }
407       : 0;
408     push @unique_queries, $unique_query
409       if $num_query + $num_where == $num_cols;
410   }
411
412   return @unique_queries;
413 }
414
415 # _build_unique_query
416 #
417 # Constrain the specified query hash based on the specified column names.
418
419 sub _build_unique_query {
420   my ($self, $query, $unique_cols) = @_;
421
422   return {
423     map  { $_ => $query->{$_} }
424     grep { exists $query->{$_} }
425       @$unique_cols
426   };
427 }
428
429 =head2 search_related
430
431 =over 4
432
433 =item Arguments: $rel, $cond, \%attrs?
434
435 =item Return Value: $new_resultset
436
437 =back
438
439   $new_rs = $cd_rs->search_related('artist', {
440     name => 'Emo-R-Us',
441   });
442
443 Searches the specified relationship, optionally specifying a condition and
444 attributes for matching records. See L</ATTRIBUTES> for more information.
445
446 =cut
447
448 sub search_related {
449   return shift->related_resultset(shift)->search(@_);
450 }
451
452 =head2 cursor
453
454 =over 4
455
456 =item Arguments: none
457
458 =item Return Value: $cursor
459
460 =back
461
462 Returns a storage-driven cursor to the given resultset. See
463 L<DBIx::Class::Cursor> for more information.
464
465 =cut
466
467 sub cursor {
468   my ($self) = @_;
469
470   my $attrs = { %{$self->_resolved_attrs} };
471   return $self->{cursor}
472     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
473           $attrs->{where},$attrs);
474 }
475
476 =head2 single
477
478 =over 4
479
480 =item Arguments: $cond?
481
482 =item Return Value: $row_object?
483
484 =back
485
486   my $cd = $schema->resultset('CD')->single({ year => 2001 });
487
488 Inflates the first result without creating a cursor if the resultset has
489 any records in it; if not returns nothing. Used by L</find> as an optimisation.
490
491 Can optionally take an additional condition *only* - this is a fast-code-path
492 method; if you need to add extra joins or similar call ->search and then
493 ->single without a condition on the $rs returned from that.
494
495 =cut
496
497 sub single {
498   my ($self, $where) = @_;
499   my $attrs = { %{$self->_resolved_attrs} };
500   if ($where) {
501     if (defined $attrs->{where}) {
502       $attrs->{where} = {
503         '-and' =>
504             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
505                $where, delete $attrs->{where} ]
506       };
507     } else {
508       $attrs->{where} = $where;
509     }
510   }
511
512 #  XXX: Disabled since it doesn't infer uniqueness in all cases
513 #  unless ($self->_is_unique_query($attrs->{where})) {
514 #    carp "Query not guaranteed to return a single row"
515 #      . "; please declare your unique constraints or use search instead";
516 #  }
517
518   my @data = $self->result_source->storage->select_single(
519     $attrs->{from}, $attrs->{select},
520     $attrs->{where}, $attrs
521   );
522
523   return (@data ? $self->_construct_object(@data) : ());
524 }
525
526 # _is_unique_query
527 #
528 # Try to determine if the specified query is guaranteed to be unique, based on
529 # the declared unique constraints.
530
531 sub _is_unique_query {
532   my ($self, $query) = @_;
533
534   my $collapsed = $self->_collapse_query($query);
535   my $alias = $self->{attrs}{alias};
536
537   foreach my $name ($self->result_source->unique_constraint_names) {
538     my @unique_cols = map {
539       "$alias.$_"
540     } $self->result_source->unique_constraint_columns($name);
541
542     # Count the values for each unique column
543     my %seen = map { $_ => 0 } @unique_cols;
544
545     foreach my $key (keys %$collapsed) {
546       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
547       next unless exists $seen{$aliased};  # Additional constraints are okay
548       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
549     }
550
551     # If we get 0 or more than 1 value for a column, it's not necessarily unique
552     return 1 unless grep { $_ != 1 } values %seen;
553   }
554
555   return 0;
556 }
557
558 # _collapse_query
559 #
560 # Recursively collapse the query, accumulating values for each column.
561
562 sub _collapse_query {
563   my ($self, $query, $collapsed) = @_;
564
565   $collapsed ||= {};
566
567   if (ref $query eq 'ARRAY') {
568     foreach my $subquery (@$query) {
569       next unless ref $subquery;  # -or
570 #      warn "ARRAY: " . Dumper $subquery;
571       $collapsed = $self->_collapse_query($subquery, $collapsed);
572     }
573   }
574   elsif (ref $query eq 'HASH') {
575     if (keys %$query and (keys %$query)[0] eq '-and') {
576       foreach my $subquery (@{$query->{-and}}) {
577 #        warn "HASH: " . Dumper $subquery;
578         $collapsed = $self->_collapse_query($subquery, $collapsed);
579       }
580     }
581     else {
582 #      warn "LEAF: " . Dumper $query;
583       foreach my $col (keys %$query) {
584         my $value = $query->{$col};
585         $collapsed->{$col}{$value}++;
586       }
587     }
588   }
589
590   return $collapsed;
591 }
592
593 =head2 get_column
594
595 =over 4
596
597 =item Arguments: $cond?
598
599 =item Return Value: $resultsetcolumn
600
601 =back
602
603   my $max_length = $rs->get_column('length')->max;
604
605 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
606
607 =cut
608
609 sub get_column {
610   my ($self, $column) = @_;
611   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
612   return $new;
613 }
614
615 =head2 search_like
616
617 =over 4
618
619 =item Arguments: $cond, \%attrs?
620
621 =item Return Value: $resultset (scalar context), @row_objs (list context)
622
623 =back
624
625   # WHERE title LIKE '%blue%'
626   $cd_rs = $rs->search_like({ title => '%blue%'});
627
628 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
629 that this is simply a convenience method. You most likely want to use
630 L</search> with specific operators.
631
632 For more information, see L<DBIx::Class::Manual::Cookbook>.
633
634 =cut
635
636 sub search_like {
637   my $class = shift;
638   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
639   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
640   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
641   return $class->search($query, { %$attrs });
642 }
643
644 =head2 slice
645
646 =over 4
647
648 =item Arguments: $first, $last
649
650 =item Return Value: $resultset (scalar context), @row_objs (list context)
651
652 =back
653
654 Returns a resultset or object list representing a subset of elements from the
655 resultset slice is called on. Indexes are from 0, i.e., to get the first
656 three records, call:
657
658   my ($one, $two, $three) = $rs->slice(0, 2);
659
660 =cut
661
662 sub slice {
663   my ($self, $min, $max) = @_;
664   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
665   $attrs->{offset} = $self->{attrs}{offset} || 0;
666   $attrs->{offset} += $min;
667   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
668   return $self->search(undef(), $attrs);
669   #my $slice = (ref $self)->new($self->result_source, $attrs);
670   #return (wantarray ? $slice->all : $slice);
671 }
672
673 =head2 next
674
675 =over 4
676
677 =item Arguments: none
678
679 =item Return Value: $result?
680
681 =back
682
683 Returns the next element in the resultset (C<undef> is there is none).
684
685 Can be used to efficiently iterate over records in the resultset:
686
687   my $rs = $schema->resultset('CD')->search;
688   while (my $cd = $rs->next) {
689     print $cd->title;
690   }
691
692 Note that you need to store the resultset object, and call C<next> on it.
693 Calling C<< resultset('Table')->next >> repeatedly will always return the
694 first record from the resultset.
695
696 =cut
697
698 sub next {
699   my ($self) = @_;
700   if (my $cache = $self->get_cache) {
701     $self->{all_cache_position} ||= 0;
702     return $cache->[$self->{all_cache_position}++];
703   }
704   if ($self->{attrs}{cache}) {
705     $self->{all_cache_position} = 1;
706     return ($self->all)[0];
707   }
708   my @row = (
709     exists $self->{stashed_row}
710       ? @{delete $self->{stashed_row}}
711       : $self->cursor->next
712   );
713   return unless (@row);
714   return $self->_construct_object(@row);
715 }
716
717 sub _construct_object {
718   my ($self, @row) = @_;
719   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
720   my $new = $self->result_class->inflate_result($self->result_source, @$info);
721   $new = $self->{_attrs}{record_filter}->($new)
722     if exists $self->{_attrs}{record_filter};
723   return $new;
724 }
725
726 sub _collapse_result {
727   my ($self, $as, $row, $prefix) = @_;
728
729   my %const;
730   my @copy = @$row;
731   
732   foreach my $this_as (@$as) {
733     my $val = shift @copy;
734     if (defined $prefix) {
735       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
736         my $remain = $1;
737         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
738         $const{$1||''}{$2} = $val;
739       }
740     } else {
741       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
742       $const{$1||''}{$2} = $val;
743     }
744   }
745
746   my $alias = $self->{attrs}{alias};
747   my $info = [ {}, {} ];
748   foreach my $key (keys %const) {
749     if (length $key && $key ne $alias) {
750       my $target = $info;
751       my @parts = split(/\./, $key);
752       foreach my $p (@parts) {
753         $target = $target->[1]->{$p} ||= [];
754       }
755       $target->[0] = $const{$key};
756     } else {
757       $info->[0] = $const{$key};
758     }
759   }
760   
761   my @collapse;
762   if (defined $prefix) {
763     @collapse = map {
764         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
765     } keys %{$self->{_attrs}{collapse}}
766   } else {
767     @collapse = keys %{$self->{_attrs}{collapse}};
768   };
769
770   if (@collapse) {
771     my ($c) = sort { length $a <=> length $b } @collapse;
772     my $target = $info;
773     foreach my $p (split(/\./, $c)) {
774       $target = $target->[1]->{$p} ||= [];
775     }
776     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
777     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
778     my $tree = $self->_collapse_result($as, $row, $c_prefix);
779     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
780     my (@final, @raw);
781
782     while (
783       !(
784         grep {
785           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
786         } @co_key
787         )
788     ) {
789       push(@final, $tree);
790       last unless (@raw = $self->cursor->next);
791       $row = $self->{stashed_row} = \@raw;
792       $tree = $self->_collapse_result($as, $row, $c_prefix);
793     }
794     @$target = (@final ? @final : [ {}, {} ]);
795       # single empty result to indicate an empty prefetched has_many
796   }
797
798   #print "final info: " . Dumper($info);
799   return $info;
800 }
801
802 =head2 result_source
803
804 =over 4
805
806 =item Arguments: $result_source?
807
808 =item Return Value: $result_source
809
810 =back
811
812 An accessor for the primary ResultSource object from which this ResultSet
813 is derived.
814
815 =head2 result_class
816
817 =over 4
818
819 =item Arguments: $result_class?
820
821 =item Return Value: $result_class
822
823 =back
824
825 An accessor for the class to use when creating row objects. Defaults to 
826 C<< result_source->result_class >> - which in most cases is the name of the 
827 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
828
829 =cut
830
831
832 =head2 count
833
834 =over 4
835
836 =item Arguments: $cond, \%attrs??
837
838 =item Return Value: $count
839
840 =back
841
842 Performs an SQL C<COUNT> with the same query as the resultset was built
843 with to find the number of elements. If passed arguments, does a search
844 on the resultset and counts the results of that.
845
846 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
847 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
848 not support C<DISTINCT> with multiple columns. If you are using such a
849 database, you should only use columns from the main table in your C<group_by>
850 clause.
851
852 =cut
853
854 sub count {
855   my $self = shift;
856   return $self->search(@_)->count if @_ and defined $_[0];
857   return scalar @{ $self->get_cache } if $self->get_cache;
858   my $count = $self->_count;
859   return 0 unless $count;
860
861   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
862   $count = $self->{attrs}{rows} if
863     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
864   return $count;
865 }
866
867 sub _count { # Separated out so pager can get the full count
868   my $self = shift;
869   my $select = { count => '*' };
870
871   my $attrs = { %{$self->_resolved_attrs} };
872   if (my $group_by = delete $attrs->{group_by}) {
873     delete $attrs->{having};
874     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
875     # todo: try CONCAT for multi-column pk
876     my @pk = $self->result_source->primary_columns;
877     if (@pk == 1) {
878       my $alias = $attrs->{alias};
879       foreach my $column (@distinct) {
880         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
881           @distinct = ($column);
882           last;
883         }
884       }
885     }
886
887     $select = { count => { distinct => \@distinct } };
888   }
889
890   $attrs->{select} = $select;
891   $attrs->{as} = [qw/count/];
892
893   # offset, order by and page are not needed to count. record_filter is cdbi
894   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
895
896   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
897   my ($count) = $tmp_rs->cursor->next;
898   return $count;
899 }
900
901 =head2 count_literal
902
903 =over 4
904
905 =item Arguments: $sql_fragment, @bind_values
906
907 =item Return Value: $count
908
909 =back
910
911 Counts the results in a literal query. Equivalent to calling L</search_literal>
912 with the passed arguments, then L</count>.
913
914 =cut
915
916 sub count_literal { shift->search_literal(@_)->count; }
917
918 =head2 all
919
920 =over 4
921
922 =item Arguments: none
923
924 =item Return Value: @objects
925
926 =back
927
928 Returns all elements in the resultset. Called implicitly if the resultset
929 is returned in list context.
930
931 =cut
932
933 sub all {
934   my ($self) = @_;
935   return @{ $self->get_cache } if $self->get_cache;
936
937   my @obj;
938
939   # TODO: don't call resolve here
940   if (keys %{$self->_resolved_attrs->{collapse}}) {
941 #  if ($self->{attrs}{prefetch}) {
942       # Using $self->cursor->all is really just an optimisation.
943       # If we're collapsing has_many prefetches it probably makes
944       # very little difference, and this is cleaner than hacking
945       # _construct_object to survive the approach
946     my @row = $self->cursor->next;
947     while (@row) {
948       push(@obj, $self->_construct_object(@row));
949       @row = (exists $self->{stashed_row}
950                ? @{delete $self->{stashed_row}}
951                : $self->cursor->next);
952     }
953   } else {
954     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
955   }
956
957   $self->set_cache(\@obj) if $self->{attrs}{cache};
958   return @obj;
959 }
960
961 =head2 reset
962
963 =over 4
964
965 =item Arguments: none
966
967 =item Return Value: $self
968
969 =back
970
971 Resets the resultset's cursor, so you can iterate through the elements again.
972
973 =cut
974
975 sub reset {
976   my ($self) = @_;
977   delete $self->{_attrs} if exists $self->{_attrs};
978   $self->{all_cache_position} = 0;
979   $self->cursor->reset;
980   return $self;
981 }
982
983 =head2 first
984
985 =over 4
986
987 =item Arguments: none
988
989 =item Return Value: $object?
990
991 =back
992
993 Resets the resultset and returns an object for the first result (if the
994 resultset returns anything).
995
996 =cut
997
998 sub first {
999   return $_[0]->reset->next;
1000 }
1001
1002 # _cond_for_update_delete
1003 #
1004 # update/delete require the condition to be modified to handle
1005 # the differing SQL syntax available.  This transforms the $self->{cond}
1006 # appropriately, returning the new condition.
1007
1008 sub _cond_for_update_delete {
1009   my ($self, $full_cond) = @_;
1010   my $cond = {};
1011
1012   $full_cond ||= $self->{cond};
1013   # No-op. No condition, we're updating/deleting everything
1014   return $cond unless ref $full_cond;
1015
1016   if (ref $full_cond eq 'ARRAY') {
1017     $cond = [
1018       map {
1019         my %hash;
1020         foreach my $key (keys %{$_}) {
1021           $key =~ /([^.]+)$/;
1022           $hash{$1} = $_->{$key};
1023         }
1024         \%hash;
1025       } @{$full_cond}
1026     ];
1027   }
1028   elsif (ref $full_cond eq 'HASH') {
1029     if ((keys %{$full_cond})[0] eq '-and') {
1030       $cond->{-and} = [];
1031
1032       my @cond = @{$full_cond->{-and}};
1033       for (my $i = 0; $i < @cond; $i++) {
1034         my $entry = $cond[$i];
1035
1036         my $hash;
1037         if (ref $entry eq 'HASH') {
1038           $hash = $self->_cond_for_update_delete($entry);
1039         }
1040         else {
1041           $entry =~ /([^.]+)$/;
1042           $hash->{$1} = $cond[++$i];
1043         }
1044
1045         push @{$cond->{-and}}, $hash;
1046       }
1047     }
1048     else {
1049       foreach my $key (keys %{$full_cond}) {
1050         $key =~ /([^.]+)$/;
1051         $cond->{$1} = $full_cond->{$key};
1052       }
1053     }
1054   }
1055   else {
1056     $self->throw_exception(
1057       "Can't update/delete on resultset with condition unless hash or array"
1058     );
1059   }
1060
1061   return $cond;
1062 }
1063
1064
1065 =head2 update
1066
1067 =over 4
1068
1069 =item Arguments: \%values
1070
1071 =item Return Value: $storage_rv
1072
1073 =back
1074
1075 Sets the specified columns in the resultset to the supplied values in a
1076 single query. Return value will be true if the update succeeded or false
1077 if no records were updated; exact type of success value is storage-dependent.
1078
1079 =cut
1080
1081 sub update {
1082   my ($self, $values) = @_;
1083   $self->throw_exception("Values for update must be a hash")
1084     unless ref $values eq 'HASH';
1085
1086   my $cond = $self->_cond_for_update_delete;
1087
1088   return $self->result_source->storage->update(
1089     $self->result_source->from, $values, $cond
1090   );
1091 }
1092
1093 =head2 update_all
1094
1095 =over 4
1096
1097 =item Arguments: \%values
1098
1099 =item Return Value: 1
1100
1101 =back
1102
1103 Fetches all objects and updates them one at a time. Note that C<update_all>
1104 will run DBIC cascade triggers, while L</update> will not.
1105
1106 =cut
1107
1108 sub update_all {
1109   my ($self, $values) = @_;
1110   $self->throw_exception("Values for update must be a hash")
1111     unless ref $values eq 'HASH';
1112   foreach my $obj ($self->all) {
1113     $obj->set_columns($values)->update;
1114   }
1115   return 1;
1116 }
1117
1118 =head2 delete
1119
1120 =over 4
1121
1122 =item Arguments: none
1123
1124 =item Return Value: 1
1125
1126 =back
1127
1128 Deletes the contents of the resultset from its result source. Note that this
1129 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1130 to run. See also L<DBIx::Class::Row/delete>.
1131
1132 =cut
1133
1134 sub delete {
1135   my ($self) = @_;
1136
1137   my $cond = $self->_cond_for_update_delete;
1138
1139   $self->result_source->storage->delete($self->result_source->from, $cond);
1140   return 1;
1141 }
1142
1143 =head2 delete_all
1144
1145 =over 4
1146
1147 =item Arguments: none
1148
1149 =item Return Value: 1
1150
1151 =back
1152
1153 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1154 will run DBIC cascade triggers, while L</delete> will not.
1155
1156 =cut
1157
1158 sub delete_all {
1159   my ($self) = @_;
1160   $_->delete for $self->all;
1161   return 1;
1162 }
1163
1164 =head2 pager
1165
1166 =over 4
1167
1168 =item Arguments: none
1169
1170 =item Return Value: $pager
1171
1172 =back
1173
1174 Return Value a L<Data::Page> object for the current resultset. Only makes
1175 sense for queries with a C<page> attribute.
1176
1177 =cut
1178
1179 sub pager {
1180   my ($self) = @_;
1181   my $attrs = $self->{attrs};
1182   $self->throw_exception("Can't create pager for non-paged rs")
1183     unless $self->{attrs}{page};
1184   $attrs->{rows} ||= 10;
1185   return $self->{pager} ||= Data::Page->new(
1186     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1187 }
1188
1189 =head2 page
1190
1191 =over 4
1192
1193 =item Arguments: $page_number
1194
1195 =item Return Value: $rs
1196
1197 =back
1198
1199 Returns a resultset for the $page_number page of the resultset on which page
1200 is called, where each page contains a number of rows equal to the 'rows'
1201 attribute set on the resultset (10 by default).
1202
1203 =cut
1204
1205 sub page {
1206   my ($self, $page) = @_;
1207   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1208 }
1209
1210 =head2 new_result
1211
1212 =over 4
1213
1214 =item Arguments: \%vals
1215
1216 =item Return Value: $object
1217
1218 =back
1219
1220 Creates an object in the resultset's result class and returns it.
1221
1222 =cut
1223
1224 sub new_result {
1225   my ($self, $values) = @_;
1226   $self->throw_exception( "new_result needs a hash" )
1227     unless (ref $values eq 'HASH');
1228   $self->throw_exception(
1229     "Can't abstract implicit construct, condition not a hash"
1230   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1231
1232   my $alias = $self->{attrs}{alias};
1233   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1234   my %new = (
1235     %{ $self->_remove_alias($values, $alias) },
1236     %{ $self->_remove_alias($collapsed_cond, $alias) },
1237   );
1238
1239   my $obj = $self->result_class->new(\%new);
1240   $obj->result_source($self->result_source) if $obj->can('result_source');
1241   return $obj;
1242 }
1243
1244 # _collapse_cond
1245 #
1246 # Recursively collapse the condition.
1247
1248 sub _collapse_cond {
1249   my ($self, $cond, $collapsed) = @_;
1250
1251   $collapsed ||= {};
1252
1253   if (ref $cond eq 'ARRAY') {
1254     foreach my $subcond (@$cond) {
1255       next unless ref $subcond;  # -or
1256 #      warn "ARRAY: " . Dumper $subcond;
1257       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1258     }
1259   }
1260   elsif (ref $cond eq 'HASH') {
1261     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1262       foreach my $subcond (@{$cond->{-and}}) {
1263 #        warn "HASH: " . Dumper $subcond;
1264         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1265       }
1266     }
1267     else {
1268 #      warn "LEAF: " . Dumper $cond;
1269       foreach my $col (keys %$cond) {
1270         my $value = $cond->{$col};
1271         $collapsed->{$col} = $value;
1272       }
1273     }
1274   }
1275
1276   return $collapsed;
1277 }
1278
1279 # _remove_alias
1280 #
1281 # Remove the specified alias from the specified query hash. A copy is made so
1282 # the original query is not modified.
1283
1284 sub _remove_alias {
1285   my ($self, $query, $alias) = @_;
1286
1287   my %unaliased = %{ $query || {} };
1288   foreach my $key (keys %unaliased) {
1289     $unaliased{$1} = delete $unaliased{$key}
1290       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1291   }
1292
1293   return \%unaliased;
1294 }
1295
1296 =head2 find_or_new
1297
1298 =over 4
1299
1300 =item Arguments: \%vals, \%attrs?
1301
1302 =item Return Value: $object
1303
1304 =back
1305
1306 Find an existing record from this resultset. If none exists, instantiate a new
1307 result object and return it. The object will not be saved into your storage
1308 until you call L<DBIx::Class::Row/insert> on it.
1309
1310 If you want objects to be saved immediately, use L</find_or_create> instead.
1311
1312 =cut
1313
1314 sub find_or_new {
1315   my $self     = shift;
1316   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1317   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1318   my $exists   = $self->find($hash, $attrs);
1319   return defined $exists ? $exists : $self->new_result($hash);
1320 }
1321
1322 =head2 create
1323
1324 =over 4
1325
1326 =item Arguments: \%vals
1327
1328 =item Return Value: $object
1329
1330 =back
1331
1332 Inserts a record into the resultset and returns the object representing it.
1333
1334 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1335
1336 =cut
1337
1338 sub create {
1339   my ($self, $attrs) = @_;
1340   $self->throw_exception( "create needs a hashref" )
1341     unless ref $attrs eq 'HASH';
1342   return $self->new_result($attrs)->insert;
1343 }
1344
1345 =head2 find_or_create
1346
1347 =over 4
1348
1349 =item Arguments: \%vals, \%attrs?
1350
1351 =item Return Value: $object
1352
1353 =back
1354
1355   $class->find_or_create({ key => $val, ... });
1356
1357 Tries to find a record based on its primary key or unique constraint; if none
1358 is found, creates one and returns that instead.
1359
1360   my $cd = $schema->resultset('CD')->find_or_create({
1361     cdid   => 5,
1362     artist => 'Massive Attack',
1363     title  => 'Mezzanine',
1364     year   => 2005,
1365   });
1366
1367 Also takes an optional C<key> attribute, to search by a specific key or unique
1368 constraint. For example:
1369
1370   my $cd = $schema->resultset('CD')->find_or_create(
1371     {
1372       artist => 'Massive Attack',
1373       title  => 'Mezzanine',
1374     },
1375     { key => 'cd_artist_title' }
1376   );
1377
1378 See also L</find> and L</update_or_create>. For information on how to declare
1379 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1380
1381 =cut
1382
1383 sub find_or_create {
1384   my $self     = shift;
1385   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1386   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1387   my $exists   = $self->find($hash, $attrs);
1388   return defined $exists ? $exists : $self->create($hash);
1389 }
1390
1391 =head2 update_or_create
1392
1393 =over 4
1394
1395 =item Arguments: \%col_values, { key => $unique_constraint }?
1396
1397 =item Return Value: $object
1398
1399 =back
1400
1401   $class->update_or_create({ col => $val, ... });
1402
1403 First, searches for an existing row matching one of the unique constraints
1404 (including the primary key) on the source of this resultset. If a row is
1405 found, updates it with the other given column values. Otherwise, creates a new
1406 row.
1407
1408 Takes an optional C<key> attribute to search on a specific unique constraint.
1409 For example:
1410
1411   # In your application
1412   my $cd = $schema->resultset('CD')->update_or_create(
1413     {
1414       artist => 'Massive Attack',
1415       title  => 'Mezzanine',
1416       year   => 1998,
1417     },
1418     { key => 'cd_artist_title' }
1419   );
1420
1421 If no C<key> is specified, it searches on all unique constraints defined on the
1422 source, including the primary key.
1423
1424 If the C<key> is specified as C<primary>, it searches only on the primary key.
1425
1426 See also L</find> and L</find_or_create>. For information on how to declare
1427 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1428
1429 =cut
1430
1431 sub update_or_create {
1432   my $self = shift;
1433   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1434   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1435
1436   my $row = $self->find($cond, $attrs);
1437   if (defined $row) {
1438     $row->update($cond);
1439     return $row;
1440   }
1441
1442   return $self->create($cond);
1443 }
1444
1445 =head2 get_cache
1446
1447 =over 4
1448
1449 =item Arguments: none
1450
1451 =item Return Value: \@cache_objects?
1452
1453 =back
1454
1455 Gets the contents of the cache for the resultset, if the cache is set.
1456
1457 =cut
1458
1459 sub get_cache {
1460   shift->{all_cache};
1461 }
1462
1463 =head2 set_cache
1464
1465 =over 4
1466
1467 =item Arguments: \@cache_objects
1468
1469 =item Return Value: \@cache_objects
1470
1471 =back
1472
1473 Sets the contents of the cache for the resultset. Expects an arrayref
1474 of objects of the same class as those produced by the resultset. Note that
1475 if the cache is set the resultset will return the cached objects rather
1476 than re-querying the database even if the cache attr is not set.
1477
1478 =cut
1479
1480 sub set_cache {
1481   my ( $self, $data ) = @_;
1482   $self->throw_exception("set_cache requires an arrayref")
1483       if defined($data) && (ref $data ne 'ARRAY');
1484   $self->{all_cache} = $data;
1485 }
1486
1487 =head2 clear_cache
1488
1489 =over 4
1490
1491 =item Arguments: none
1492
1493 =item Return Value: []
1494
1495 =back
1496
1497 Clears the cache for the resultset.
1498
1499 =cut
1500
1501 sub clear_cache {
1502   shift->set_cache(undef);
1503 }
1504
1505 =head2 related_resultset
1506
1507 =over 4
1508
1509 =item Arguments: $relationship_name
1510
1511 =item Return Value: $resultset
1512
1513 =back
1514
1515 Returns a related resultset for the supplied relationship name.
1516
1517   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1518
1519 =cut
1520
1521 sub related_resultset {
1522   my ($self, $rel) = @_;
1523
1524   $self->{related_resultsets} ||= {};
1525   return $self->{related_resultsets}{$rel} ||= do {
1526     my $rel_obj = $self->result_source->relationship_info($rel);
1527
1528     $self->throw_exception(
1529       "search_related: result source '" . $self->result_source->name .
1530         "' has no such relationship $rel")
1531       unless $rel_obj;
1532     
1533     my ($from,$seen) = $self->_resolve_from($rel);
1534
1535     my $join_count = $seen->{$rel};
1536     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1537
1538     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1539       undef, {
1540         %{$self->{attrs}||{}},
1541         join => undef,
1542         prefetch => undef,
1543         select => undef,
1544         as => undef,
1545         alias => $alias,
1546         where => $self->{cond},
1547         seen_join => $seen,
1548         from => $from,
1549     });
1550   };
1551 }
1552
1553 sub _resolve_from {
1554   my ($self, $extra_join) = @_;
1555   my $source = $self->result_source;
1556   my $attrs = $self->{attrs};
1557   
1558   my $from = $attrs->{from}
1559     || [ { $attrs->{alias} => $source->from } ];
1560     
1561   my $seen = { %{$attrs->{seen_join}||{}} };
1562
1563   my $join = ($attrs->{join}
1564                ? [ $attrs->{join}, $extra_join ]
1565                : $extra_join);
1566   $from = [
1567     @$from,
1568     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1569   ];
1570
1571   return ($from,$seen);
1572 }
1573
1574 sub _resolved_attrs {
1575   my $self = shift;
1576   return $self->{_attrs} if $self->{_attrs};
1577
1578   my $attrs = { %{$self->{attrs}||{}} };
1579   my $source = $self->{result_source};
1580   my $alias = $attrs->{alias};
1581
1582   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1583   if ($attrs->{columns}) {
1584     delete $attrs->{as};
1585   } elsif (!$attrs->{select}) {
1586     $attrs->{columns} = [ $source->columns ];
1587   }
1588  
1589   $attrs->{select} = 
1590     ($attrs->{select}
1591       ? (ref $attrs->{select} eq 'ARRAY'
1592           ? [ @{$attrs->{select}} ]
1593           : [ $attrs->{select} ])
1594       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1595     );
1596   $attrs->{as} =
1597     ($attrs->{as}
1598       ? (ref $attrs->{as} eq 'ARRAY'
1599           ? [ @{$attrs->{as}} ]
1600           : [ $attrs->{as} ])
1601       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1602     );
1603   
1604   my $adds;
1605   if ($adds = delete $attrs->{include_columns}) {
1606     $adds = [$adds] unless ref $adds eq 'ARRAY';
1607     push(@{$attrs->{select}}, @$adds);
1608     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1609   }
1610   if ($adds = delete $attrs->{'+select'}) {
1611     $adds = [$adds] unless ref $adds eq 'ARRAY';
1612     push(@{$attrs->{select}},
1613            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1614   }
1615   if (my $adds = delete $attrs->{'+as'}) {
1616     $adds = [$adds] unless ref $adds eq 'ARRAY';
1617     push(@{$attrs->{as}}, @$adds);
1618   }
1619
1620   $attrs->{from} ||= [ { 'me' => $source->from } ];
1621
1622   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1623     my $join = delete $attrs->{join} || {};
1624
1625     if (defined $attrs->{prefetch}) {
1626       $join = $self->_merge_attr(
1627         $join, $attrs->{prefetch}
1628       );
1629     }
1630
1631     $attrs->{from} =   # have to copy here to avoid corrupting the original
1632       [
1633         @{$attrs->{from}}, 
1634         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1635       ];
1636   }
1637
1638   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1639   if ($attrs->{order_by}) {
1640     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1641                            ? [ @{$attrs->{order_by}} ]
1642                            : [ $attrs->{order_by} ]);
1643   } else {
1644     $attrs->{order_by} = [];    
1645   }
1646
1647   my $collapse = $attrs->{collapse} || {};
1648   if (my $prefetch = delete $attrs->{prefetch}) {
1649     $prefetch = $self->_merge_attr({}, $prefetch);
1650     my @pre_order;
1651     my $seen = $attrs->{seen_join} || {};
1652     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1653       # bring joins back to level of current class
1654       my @prefetch = $source->resolve_prefetch(
1655         $p, $alias, $seen, \@pre_order, $collapse
1656       );
1657       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1658       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1659     }
1660     push(@{$attrs->{order_by}}, @pre_order);
1661   }
1662   $attrs->{collapse} = $collapse;
1663
1664   return $self->{_attrs} = $attrs;
1665 }
1666
1667 sub _merge_attr {
1668   my ($self, $a, $b) = @_;
1669   return $b unless defined($a);
1670   return $a unless defined($b);
1671   
1672   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1673     foreach my $key (keys %{$b}) {
1674       if (exists $a->{$key}) {
1675         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1676       } else {
1677         $a->{$key} = $b->{$key};
1678       }
1679     }
1680     return $a;
1681   } else {
1682     $a = [$a] unless ref $a eq 'ARRAY';
1683     $b = [$b] unless ref $b eq 'ARRAY';
1684
1685     my $hash = {};
1686     my @array;
1687     foreach my $x ($a, $b) {
1688       foreach my $element (@{$x}) {
1689         if (ref $element eq 'HASH') {
1690           $hash = $self->_merge_attr($hash, $element);
1691         } elsif (ref $element eq 'ARRAY') {
1692           push(@array, @{$element});
1693         } else {
1694           push(@array, $element) unless $b == $x
1695             && grep { $_ eq $element } @array;
1696         }
1697       }
1698     }
1699     
1700     @array = grep { !exists $hash->{$_} } @array;
1701
1702     return keys %{$hash}
1703       ? ( scalar(@array)
1704             ? [$hash, @array]
1705             : $hash
1706         )
1707       : \@array;
1708   }
1709 }
1710
1711 =head2 throw_exception
1712
1713 See L<DBIx::Class::Schema/throw_exception> for details.
1714
1715 =cut
1716
1717 sub throw_exception {
1718   my $self=shift;
1719   $self->result_source->schema->throw_exception(@_);
1720 }
1721
1722 # XXX: FIXME: Attributes docs need clearing up
1723
1724 =head1 ATTRIBUTES
1725
1726 The resultset takes various attributes that modify its behavior. Here's an
1727 overview of them:
1728
1729 =head2 order_by
1730
1731 =over 4
1732
1733 =item Value: ($order_by | \@order_by)
1734
1735 =back
1736
1737 Which column(s) to order the results by. This is currently passed
1738 through directly to SQL, so you can give e.g. C<year DESC> for a
1739 descending order on the column `year'.
1740
1741 Please note that if you have C<quote_char> enabled (see
1742 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1743 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1744 so you will need to manually quote things as appropriate.)
1745
1746 =head2 columns
1747
1748 =over 4
1749
1750 =item Value: \@columns
1751
1752 =back
1753
1754 Shortcut to request a particular set of columns to be retrieved.  Adds
1755 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1756 from that, then auto-populates C<as> from C<select> as normal. (You may also
1757 use the C<cols> attribute, as in earlier versions of DBIC.)
1758
1759 =head2 include_columns
1760
1761 =over 4
1762
1763 =item Value: \@columns
1764
1765 =back
1766
1767 Shortcut to include additional columns in the returned results - for example
1768
1769   $schema->resultset('CD')->search(undef, {
1770     include_columns => ['artist.name'],
1771     join => ['artist']
1772   });
1773
1774 would return all CDs and include a 'name' column to the information
1775 passed to object inflation
1776
1777 =head2 select
1778
1779 =over 4
1780
1781 =item Value: \@select_columns
1782
1783 =back
1784
1785 Indicates which columns should be selected from the storage. You can use
1786 column names, or in the case of RDBMS back ends, function or stored procedure
1787 names:
1788
1789   $rs = $schema->resultset('Employee')->search(undef, {
1790     select => [
1791       'name',
1792       { count => 'employeeid' },
1793       { sum => 'salary' }
1794     ]
1795   });
1796
1797 When you use function/stored procedure names and do not supply an C<as>
1798 attribute, the column names returned are storage-dependent. E.g. MySQL would
1799 return a column named C<count(employeeid)> in the above example.
1800
1801 =head2 +select
1802
1803 =over 4
1804
1805 Indicates additional columns to be selected from storage.  Works the same as
1806 L<select> but adds columns to the selection.
1807
1808 =back
1809
1810 =head2 +as
1811
1812 =over 4
1813
1814 Indicates additional column names for those added via L<+select>.
1815
1816 =back
1817
1818 =head2 as
1819
1820 =over 4
1821
1822 =item Value: \@inflation_names
1823
1824 =back
1825
1826 Indicates column names for object inflation. This is used in conjunction with
1827 C<select>, usually when C<select> contains one or more function or stored
1828 procedure names:
1829
1830   $rs = $schema->resultset('Employee')->search(undef, {
1831     select => [
1832       'name',
1833       { count => 'employeeid' }
1834     ],
1835     as => ['name', 'employee_count'],
1836   });
1837
1838   my $employee = $rs->first(); # get the first Employee
1839
1840 If the object against which the search is performed already has an accessor
1841 matching a column name specified in C<as>, the value can be retrieved using
1842 the accessor as normal:
1843
1844   my $name = $employee->name();
1845
1846 If on the other hand an accessor does not exist in the object, you need to
1847 use C<get_column> instead:
1848
1849   my $employee_count = $employee->get_column('employee_count');
1850
1851 You can create your own accessors if required - see
1852 L<DBIx::Class::Manual::Cookbook> for details.
1853
1854 Please note: This will NOT insert an C<AS employee_count> into the SQL
1855 statement produced, it is used for internal access only. Thus
1856 attempting to use the accessor in an C<order_by> clause or similar
1857 will fail miserably.
1858
1859 To get around this limitation, you can supply literal SQL to your
1860 C<select> attibute that contains the C<AS alias> text, eg:
1861
1862   select => [\'myfield AS alias']
1863
1864 =head2 join
1865
1866 =over 4
1867
1868 =item Value: ($rel_name | \@rel_names | \%rel_names)
1869
1870 =back
1871
1872 Contains a list of relationships that should be joined for this query.  For
1873 example:
1874
1875   # Get CDs by Nine Inch Nails
1876   my $rs = $schema->resultset('CD')->search(
1877     { 'artist.name' => 'Nine Inch Nails' },
1878     { join => 'artist' }
1879   );
1880
1881 Can also contain a hash reference to refer to the other relation's relations.
1882 For example:
1883
1884   package MyApp::Schema::Track;
1885   use base qw/DBIx::Class/;
1886   __PACKAGE__->table('track');
1887   __PACKAGE__->add_columns(qw/trackid cd position title/);
1888   __PACKAGE__->set_primary_key('trackid');
1889   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1890   1;
1891
1892   # In your application
1893   my $rs = $schema->resultset('Artist')->search(
1894     { 'track.title' => 'Teardrop' },
1895     {
1896       join     => { cd => 'track' },
1897       order_by => 'artist.name',
1898     }
1899   );
1900
1901 You need to use the relationship (not the table) name in  conditions, 
1902 because they are aliased as such. The current table is aliased as "me", so 
1903 you need to use me.column_name in order to avoid ambiguity. For example:
1904
1905   # Get CDs from 1984 with a 'Foo' track 
1906   my $rs = $schema->resultset('CD')->search(
1907     { 
1908       'me.year' => 1984,
1909       'tracks.name' => 'Foo'
1910     },
1911     { join => 'tracks' }
1912   );
1913   
1914 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1915 similarly for a third time). For e.g.
1916
1917   my $rs = $schema->resultset('Artist')->search({
1918     'cds.title'   => 'Down to Earth',
1919     'cds_2.title' => 'Popular',
1920   }, {
1921     join => [ qw/cds cds/ ],
1922   });
1923
1924 will return a set of all artists that have both a cd with title 'Down
1925 to Earth' and a cd with title 'Popular'.
1926
1927 If you want to fetch related objects from other tables as well, see C<prefetch>
1928 below.
1929
1930 =head2 prefetch
1931
1932 =over 4
1933
1934 =item Value: ($rel_name | \@rel_names | \%rel_names)
1935
1936 =back
1937
1938 Contains one or more relationships that should be fetched along with the main
1939 query (when they are accessed afterwards they will have already been
1940 "prefetched").  This is useful for when you know you will need the related
1941 objects, because it saves at least one query:
1942
1943   my $rs = $schema->resultset('Tag')->search(
1944     undef,
1945     {
1946       prefetch => {
1947         cd => 'artist'
1948       }
1949     }
1950   );
1951
1952 The initial search results in SQL like the following:
1953
1954   SELECT tag.*, cd.*, artist.* FROM tag
1955   JOIN cd ON tag.cd = cd.cdid
1956   JOIN artist ON cd.artist = artist.artistid
1957
1958 L<DBIx::Class> has no need to go back to the database when we access the
1959 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1960 case.
1961
1962 Simple prefetches will be joined automatically, so there is no need
1963 for a C<join> attribute in the above search. If you're prefetching to
1964 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1965 specify the join as well.
1966
1967 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1968 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1969 with an accessor type of 'single' or 'filter').
1970
1971 =head2 page
1972
1973 =over 4
1974
1975 =item Value: $page
1976
1977 =back
1978
1979 Makes the resultset paged and specifies the page to retrieve. Effectively
1980 identical to creating a non-pages resultset and then calling ->page($page)
1981 on it.
1982
1983 If L<rows> attribute is not specified it defualts to 10 rows per page.
1984
1985 =head2 rows
1986
1987 =over 4
1988
1989 =item Value: $rows
1990
1991 =back
1992
1993 Specifes the maximum number of rows for direct retrieval or the number of
1994 rows per page if the page attribute or method is used.
1995
1996 =head2 offset
1997
1998 =over 4
1999
2000 =item Value: $offset
2001
2002 =back
2003
2004 Specifies the (zero-based) row number for the  first row to be returned, or the
2005 of the first row of the first page if paging is used.
2006
2007 =head2 group_by
2008
2009 =over 4
2010
2011 =item Value: \@columns
2012
2013 =back
2014
2015 A arrayref of columns to group by. Can include columns of joined tables.
2016
2017   group_by => [qw/ column1 column2 ... /]
2018
2019 =head2 having
2020
2021 =over 4
2022
2023 =item Value: $condition
2024
2025 =back
2026
2027 HAVING is a select statement attribute that is applied between GROUP BY and
2028 ORDER BY. It is applied to the after the grouping calculations have been
2029 done.
2030
2031   having => { 'count(employee)' => { '>=', 100 } }
2032
2033 =head2 distinct
2034
2035 =over 4
2036
2037 =item Value: (0 | 1)
2038
2039 =back
2040
2041 Set to 1 to group by all columns.
2042
2043 =head2 where
2044
2045 =over 4
2046
2047 Adds to the WHERE clause.
2048
2049   # only return rows WHERE deleted IS NULL for all searches
2050   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2051
2052 Can be overridden by passing C<{ where => undef }> as an attribute
2053 to a resulset.
2054
2055 =back
2056
2057 =head2 cache
2058
2059 Set to 1 to cache search results. This prevents extra SQL queries if you
2060 revisit rows in your ResultSet:
2061
2062   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2063
2064   while( my $artist = $resultset->next ) {
2065     ... do stuff ...
2066   }
2067
2068   $rs->first; # without cache, this would issue a query
2069
2070 By default, searches are not cached.
2071
2072 For more examples of using these attributes, see
2073 L<DBIx::Class::Manual::Cookbook>.
2074
2075 =head2 from
2076
2077 =over 4
2078
2079 =item Value: \@from_clause
2080
2081 =back
2082
2083 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2084 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2085 clauses.
2086
2087 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2088
2089 C<join> will usually do what you need and it is strongly recommended that you
2090 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2091 And we really do mean "cannot", not just tried and failed. Attempting to use
2092 this because you're having problems with C<join> is like trying to use x86
2093 ASM because you've got a syntax error in your C. Trust us on this.
2094
2095 Now, if you're still really, really sure you need to use this (and if you're
2096 not 100% sure, ask the mailing list first), here's an explanation of how this
2097 works.
2098
2099 The syntax is as follows -
2100
2101   [
2102     { <alias1> => <table1> },
2103     [
2104       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2105       [], # nested JOIN (optional)
2106       { <table1.column1> => <table2.column2>, ... (more conditions) },
2107     ],
2108     # More of the above [ ] may follow for additional joins
2109   ]
2110
2111   <table1> <alias1>
2112   JOIN
2113     <table2> <alias2>
2114     [JOIN ...]
2115   ON <table1.column1> = <table2.column2>
2116   <more joins may follow>
2117
2118 An easy way to follow the examples below is to remember the following:
2119
2120     Anything inside "[]" is a JOIN
2121     Anything inside "{}" is a condition for the enclosing JOIN
2122
2123 The following examples utilize a "person" table in a family tree application.
2124 In order to express parent->child relationships, this table is self-joined:
2125
2126     # Person->belongs_to('father' => 'Person');
2127     # Person->belongs_to('mother' => 'Person');
2128
2129 C<from> can be used to nest joins. Here we return all children with a father,
2130 then search against all mothers of those children:
2131
2132   $rs = $schema->resultset('Person')->search(
2133       undef,
2134       {
2135           alias => 'mother', # alias columns in accordance with "from"
2136           from => [
2137               { mother => 'person' },
2138               [
2139                   [
2140                       { child => 'person' },
2141                       [
2142                           { father => 'person' },
2143                           { 'father.person_id' => 'child.father_id' }
2144                       ]
2145                   ],
2146                   { 'mother.person_id' => 'child.mother_id' }
2147               ],
2148           ]
2149       },
2150   );
2151
2152   # Equivalent SQL:
2153   # SELECT mother.* FROM person mother
2154   # JOIN (
2155   #   person child
2156   #   JOIN person father
2157   #   ON ( father.person_id = child.father_id )
2158   # )
2159   # ON ( mother.person_id = child.mother_id )
2160
2161 The type of any join can be controlled manually. To search against only people
2162 with a father in the person table, we could explicitly use C<INNER JOIN>:
2163
2164     $rs = $schema->resultset('Person')->search(
2165         undef,
2166         {
2167             alias => 'child', # alias columns in accordance with "from"
2168             from => [
2169                 { child => 'person' },
2170                 [
2171                     { father => 'person', -join_type => 'inner' },
2172                     { 'father.id' => 'child.father_id' }
2173                 ],
2174             ]
2175         },
2176     );
2177
2178     # Equivalent SQL:
2179     # SELECT child.* FROM person child
2180     # INNER JOIN person father ON child.father_id = father.id
2181
2182 =cut
2183
2184 1;