6ab3ec08344aef2970cafd8c9a9cae1810a6af0f
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my @unique_queries = $self->_unique_queries($input_query, $attrs);
347
348   # Build the final query: Default to the disjunction of the unique queries,
349   # but allow the input query in case the ResultSet defines the query or the
350   # user is abusing find
351   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
352   my $query = @unique_queries
353     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
354     : $self->_add_alias($input_query, $alias);
355
356   # Run the query
357   if (keys %$attrs) {
358     my $rs = $self->search($query, $attrs);
359     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
360   }
361   else {
362     return keys %{$self->_resolved_attrs->{collapse}}
363       ? $self->search($query)->next
364       : $self->single($query);
365   }
366 }
367
368 # _add_alias
369 #
370 # Add the specified alias to the specified query hash. A copy is made so the
371 # original query is not modified.
372
373 sub _add_alias {
374   my ($self, $query, $alias) = @_;
375
376   my %aliased = %$query;
377   foreach my $col (grep { ! m/\./ } keys %aliased) {
378     $aliased{"$alias.$col"} = delete $aliased{$col};
379   }
380
381   return \%aliased;
382 }
383
384 # _unique_queries
385 #
386 # Build a list of queries which satisfy unique constraints.
387
388 sub _unique_queries {
389   my ($self, $query, $attrs) = @_;
390
391   my @constraint_names = exists $attrs->{key}
392     ? ($attrs->{key})
393     : $self->result_source->unique_constraint_names;
394
395   my @unique_queries;
396   foreach my $name (@constraint_names) {
397     my @unique_cols = $self->result_source->unique_constraint_columns($name);
398     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
399
400     my $num_query = scalar keys %$unique_query;
401     next unless $num_query;
402
403     # XXX: Assuming quite a bit about $self->{attrs}{where}
404     my $num_cols = scalar @unique_cols;
405     my $num_where = exists $self->{attrs}{where}
406       ? scalar keys %{ $self->{attrs}{where} }
407       : 0;
408     push @unique_queries, $unique_query
409       if $num_query + $num_where == $num_cols;
410   }
411
412   return @unique_queries;
413 }
414
415 # _build_unique_query
416 #
417 # Constrain the specified query hash based on the specified column names.
418
419 sub _build_unique_query {
420   my ($self, $query, $unique_cols) = @_;
421
422   return {
423     map  { $_ => $query->{$_} }
424     grep { exists $query->{$_} }
425       @$unique_cols
426   };
427 }
428
429 =head2 search_related
430
431 =over 4
432
433 =item Arguments: $rel, $cond, \%attrs?
434
435 =item Return Value: $new_resultset
436
437 =back
438
439   $new_rs = $cd_rs->search_related('artist', {
440     name => 'Emo-R-Us',
441   });
442
443 Searches the specified relationship, optionally specifying a condition and
444 attributes for matching records. See L</ATTRIBUTES> for more information.
445
446 =cut
447
448 sub search_related {
449   return shift->related_resultset(shift)->search(@_);
450 }
451
452 =head2 cursor
453
454 =over 4
455
456 =item Arguments: none
457
458 =item Return Value: $cursor
459
460 =back
461
462 Returns a storage-driven cursor to the given resultset. See
463 L<DBIx::Class::Cursor> for more information.
464
465 =cut
466
467 sub cursor {
468   my ($self) = @_;
469
470   my $attrs = { %{$self->_resolved_attrs} };
471   return $self->{cursor}
472     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
473           $attrs->{where},$attrs);
474 }
475
476 =head2 single
477
478 =over 4
479
480 =item Arguments: $cond?
481
482 =item Return Value: $row_object?
483
484 =back
485
486   my $cd = $schema->resultset('CD')->single({ year => 2001 });
487
488 Inflates the first result without creating a cursor if the resultset has
489 any records in it; if not returns nothing. Used by L</find> as an optimisation.
490
491 Can optionally take an additional condition *only* - this is a fast-code-path
492 method; if you need to add extra joins or similar call ->search and then
493 ->single without a condition on the $rs returned from that.
494
495 =cut
496
497 sub single {
498   my ($self, $where) = @_;
499   my $attrs = { %{$self->_resolved_attrs} };
500   if ($where) {
501     if (defined $attrs->{where}) {
502       $attrs->{where} = {
503         '-and' =>
504             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
505                $where, delete $attrs->{where} ]
506       };
507     } else {
508       $attrs->{where} = $where;
509     }
510   }
511
512 #  XXX: Disabled since it doesn't infer uniqueness in all cases
513 #  unless ($self->_is_unique_query($attrs->{where})) {
514 #    carp "Query not guaranteed to return a single row"
515 #      . "; please declare your unique constraints or use search instead";
516 #  }
517
518   my @data = $self->result_source->storage->select_single(
519     $attrs->{from}, $attrs->{select},
520     $attrs->{where}, $attrs
521   );
522
523   return (@data ? $self->_construct_object(@data) : ());
524 }
525
526 # _is_unique_query
527 #
528 # Try to determine if the specified query is guaranteed to be unique, based on
529 # the declared unique constraints.
530
531 sub _is_unique_query {
532   my ($self, $query) = @_;
533
534   my $collapsed = $self->_collapse_query($query);
535   my $alias = $self->{attrs}{alias};
536
537   foreach my $name ($self->result_source->unique_constraint_names) {
538     my @unique_cols = map {
539       "$alias.$_"
540     } $self->result_source->unique_constraint_columns($name);
541
542     # Count the values for each unique column
543     my %seen = map { $_ => 0 } @unique_cols;
544
545     foreach my $key (keys %$collapsed) {
546       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
547       next unless exists $seen{$aliased};  # Additional constraints are okay
548       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
549     }
550
551     # If we get 0 or more than 1 value for a column, it's not necessarily unique
552     return 1 unless grep { $_ != 1 } values %seen;
553   }
554
555   return 0;
556 }
557
558 # _collapse_query
559 #
560 # Recursively collapse the query, accumulating values for each column.
561
562 sub _collapse_query {
563   my ($self, $query, $collapsed) = @_;
564
565   $collapsed ||= {};
566
567   if (ref $query eq 'ARRAY') {
568     foreach my $subquery (@$query) {
569       next unless ref $subquery;  # -or
570 #      warn "ARRAY: " . Dumper $subquery;
571       $collapsed = $self->_collapse_query($subquery, $collapsed);
572     }
573   }
574   elsif (ref $query eq 'HASH') {
575     if (keys %$query and (keys %$query)[0] eq '-and') {
576       foreach my $subquery (@{$query->{-and}}) {
577 #        warn "HASH: " . Dumper $subquery;
578         $collapsed = $self->_collapse_query($subquery, $collapsed);
579       }
580     }
581     else {
582 #      warn "LEAF: " . Dumper $query;
583       foreach my $col (keys %$query) {
584         my $value = $query->{$col};
585         $collapsed->{$col}{$value}++;
586       }
587     }
588   }
589
590   return $collapsed;
591 }
592
593 =head2 get_column
594
595 =over 4
596
597 =item Arguments: $cond?
598
599 =item Return Value: $resultsetcolumn
600
601 =back
602
603   my $max_length = $rs->get_column('length')->max;
604
605 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
606
607 =cut
608
609 sub get_column {
610   my ($self, $column) = @_;
611   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
612   return $new;
613 }
614
615 =head2 search_like
616
617 =over 4
618
619 =item Arguments: $cond, \%attrs?
620
621 =item Return Value: $resultset (scalar context), @row_objs (list context)
622
623 =back
624
625   # WHERE title LIKE '%blue%'
626   $cd_rs = $rs->search_like({ title => '%blue%'});
627
628 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
629 that this is simply a convenience method. You most likely want to use
630 L</search> with specific operators.
631
632 For more information, see L<DBIx::Class::Manual::Cookbook>.
633
634 =cut
635
636 sub search_like {
637   my $class = shift;
638   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
639   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
640   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
641   return $class->search($query, { %$attrs });
642 }
643
644 =head2 slice
645
646 =over 4
647
648 =item Arguments: $first, $last
649
650 =item Return Value: $resultset (scalar context), @row_objs (list context)
651
652 =back
653
654 Returns a resultset or object list representing a subset of elements from the
655 resultset slice is called on. Indexes are from 0, i.e., to get the first
656 three records, call:
657
658   my ($one, $two, $three) = $rs->slice(0, 2);
659
660 =cut
661
662 sub slice {
663   my ($self, $min, $max) = @_;
664   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
665   $attrs->{offset} = $self->{attrs}{offset} || 0;
666   $attrs->{offset} += $min;
667   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
668   return $self->search(undef(), $attrs);
669   #my $slice = (ref $self)->new($self->result_source, $attrs);
670   #return (wantarray ? $slice->all : $slice);
671 }
672
673 =head2 next
674
675 =over 4
676
677 =item Arguments: none
678
679 =item Return Value: $result?
680
681 =back
682
683 Returns the next element in the resultset (C<undef> is there is none).
684
685 Can be used to efficiently iterate over records in the resultset:
686
687   my $rs = $schema->resultset('CD')->search;
688   while (my $cd = $rs->next) {
689     print $cd->title;
690   }
691
692 Note that you need to store the resultset object, and call C<next> on it.
693 Calling C<< resultset('Table')->next >> repeatedly will always return the
694 first record from the resultset.
695
696 =cut
697
698 sub next {
699   my ($self) = @_;
700   if (my $cache = $self->get_cache) {
701     $self->{all_cache_position} ||= 0;
702     return $cache->[$self->{all_cache_position}++];
703   }
704   if ($self->{attrs}{cache}) {
705     $self->{all_cache_position} = 1;
706     return ($self->all)[0];
707   }
708   my @row = (
709     exists $self->{stashed_row}
710       ? @{delete $self->{stashed_row}}
711       : $self->cursor->next
712   );
713   return unless (@row);
714   return $self->_construct_object(@row);
715 }
716
717 sub _construct_object {
718   my ($self, @row) = @_;
719   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
720   my $new = $self->result_class->inflate_result($self->result_source, @$info);
721   $new = $self->{_attrs}{record_filter}->($new)
722     if exists $self->{_attrs}{record_filter};
723   return $new;
724 }
725
726 sub _collapse_result {
727   my ($self, $as, $row, $prefix) = @_;
728
729   my %const;
730   my @copy = @$row;
731   
732   foreach my $this_as (@$as) {
733     my $val = shift @copy;
734     if (defined $prefix) {
735       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
736         my $remain = $1;
737         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
738         $const{$1||''}{$2} = $val;
739       }
740     } else {
741       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
742       $const{$1||''}{$2} = $val;
743     }
744   }
745
746   my $alias = $self->{attrs}{alias};
747   my $info = [ {}, {} ];
748   foreach my $key (keys %const) {
749     if (length $key && $key ne $alias) {
750       my $target = $info;
751       my @parts = split(/\./, $key);
752       foreach my $p (@parts) {
753         $target = $target->[1]->{$p} ||= [];
754       }
755       $target->[0] = $const{$key};
756     } else {
757       $info->[0] = $const{$key};
758     }
759   }
760   
761   my @collapse;
762   if (defined $prefix) {
763     @collapse = map {
764         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
765     } keys %{$self->{_attrs}{collapse}}
766   } else {
767     @collapse = keys %{$self->{_attrs}{collapse}};
768   };
769
770   if (@collapse) {
771     my ($c) = sort { length $a <=> length $b } @collapse;
772     my $target = $info;
773     foreach my $p (split(/\./, $c)) {
774       $target = $target->[1]->{$p} ||= [];
775     }
776     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
777     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
778     my $tree = $self->_collapse_result($as, $row, $c_prefix);
779     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
780     my (@final, @raw);
781
782     while (
783       !(
784         grep {
785           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
786         } @co_key
787         )
788     ) {
789       push(@final, $tree);
790       last unless (@raw = $self->cursor->next);
791       $row = $self->{stashed_row} = \@raw;
792       $tree = $self->_collapse_result($as, $row, $c_prefix);
793     }
794     @$target = (@final ? @final : [ {}, {} ]);
795       # single empty result to indicate an empty prefetched has_many
796   }
797
798   #print "final info: " . Dumper($info);
799   return $info;
800 }
801
802 =head2 result_source
803
804 =over 4
805
806 =item Arguments: $result_source?
807
808 =item Return Value: $result_source
809
810 =back
811
812 An accessor for the primary ResultSource object from which this ResultSet
813 is derived.
814
815 =head2 result_class
816
817 =over 4
818
819 =item Arguments: $result_class?
820
821 =item Return Value: $result_class
822
823 =back
824
825 An accessor for the class to use when creating row objects. Defaults to 
826 C<< result_source->result_class >> - which in most cases is the name of the 
827 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
828
829 =cut
830
831
832 =head2 count
833
834 =over 4
835
836 =item Arguments: $cond, \%attrs??
837
838 =item Return Value: $count
839
840 =back
841
842 Performs an SQL C<COUNT> with the same query as the resultset was built
843 with to find the number of elements. If passed arguments, does a search
844 on the resultset and counts the results of that.
845
846 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
847 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
848 not support C<DISTINCT> with multiple columns. If you are using such a
849 database, you should only use columns from the main table in your C<group_by>
850 clause.
851
852 =cut
853
854 sub count {
855   my $self = shift;
856   return $self->search(@_)->count if @_ and defined $_[0];
857   return scalar @{ $self->get_cache } if $self->get_cache;
858   my $count = $self->_count;
859   return 0 unless $count;
860
861   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
862   $count = $self->{attrs}{rows} if
863     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
864   return $count;
865 }
866
867 sub _count { # Separated out so pager can get the full count
868   my $self = shift;
869   my $select = { count => '*' };
870
871   my $attrs = { %{$self->_resolved_attrs} };
872   if (my $group_by = delete $attrs->{group_by}) {
873     delete $attrs->{having};
874     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
875     # todo: try CONCAT for multi-column pk
876     my @pk = $self->result_source->primary_columns;
877     if (@pk == 1) {
878       my $alias = $attrs->{alias};
879       foreach my $column (@distinct) {
880         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
881           @distinct = ($column);
882           last;
883         }
884       }
885     }
886
887     $select = { count => { distinct => \@distinct } };
888   }
889
890   $attrs->{select} = $select;
891   $attrs->{as} = [qw/count/];
892
893   # offset, order by and page are not needed to count. record_filter is cdbi
894   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
895
896   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
897   my ($count) = $tmp_rs->cursor->next;
898   return $count;
899 }
900
901 =head2 count_literal
902
903 =over 4
904
905 =item Arguments: $sql_fragment, @bind_values
906
907 =item Return Value: $count
908
909 =back
910
911 Counts the results in a literal query. Equivalent to calling L</search_literal>
912 with the passed arguments, then L</count>.
913
914 =cut
915
916 sub count_literal { shift->search_literal(@_)->count; }
917
918 =head2 all
919
920 =over 4
921
922 =item Arguments: none
923
924 =item Return Value: @objects
925
926 =back
927
928 Returns all elements in the resultset. Called implicitly if the resultset
929 is returned in list context.
930
931 =cut
932
933 sub all {
934   my ($self) = @_;
935   return @{ $self->get_cache } if $self->get_cache;
936
937   my @obj;
938
939   # TODO: don't call resolve here
940   if (keys %{$self->_resolved_attrs->{collapse}}) {
941 #  if ($self->{attrs}{prefetch}) {
942       # Using $self->cursor->all is really just an optimisation.
943       # If we're collapsing has_many prefetches it probably makes
944       # very little difference, and this is cleaner than hacking
945       # _construct_object to survive the approach
946     my @row = $self->cursor->next;
947     while (@row) {
948       push(@obj, $self->_construct_object(@row));
949       @row = (exists $self->{stashed_row}
950                ? @{delete $self->{stashed_row}}
951                : $self->cursor->next);
952     }
953   } else {
954     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
955   }
956
957   $self->set_cache(\@obj) if $self->{attrs}{cache};
958   return @obj;
959 }
960
961 =head2 reset
962
963 =over 4
964
965 =item Arguments: none
966
967 =item Return Value: $self
968
969 =back
970
971 Resets the resultset's cursor, so you can iterate through the elements again.
972
973 =cut
974
975 sub reset {
976   my ($self) = @_;
977   delete $self->{_attrs} if exists $self->{_attrs};
978   $self->{all_cache_position} = 0;
979   $self->cursor->reset;
980   return $self;
981 }
982
983 =head2 first
984
985 =over 4
986
987 =item Arguments: none
988
989 =item Return Value: $object?
990
991 =back
992
993 Resets the resultset and returns an object for the first result (if the
994 resultset returns anything).
995
996 =cut
997
998 sub first {
999   return $_[0]->reset->next;
1000 }
1001
1002 # _cond_for_update_delete
1003 #
1004 # update/delete require the condition to be modified to handle
1005 # the differing SQL syntax available.  This transforms the $self->{cond}
1006 # appropriately, returning the new condition.
1007
1008 sub _cond_for_update_delete {
1009   my ($self, $full_cond) = @_;
1010   my $cond = {};
1011
1012   $full_cond ||= $self->{cond};
1013   # No-op. No condition, we're updating/deleting everything
1014   return $cond unless ref $full_cond;
1015
1016   if (ref $full_cond eq 'ARRAY') {
1017     $cond = [
1018       map {
1019         my %hash;
1020         foreach my $key (keys %{$_}) {
1021           $key =~ /([^.]+)$/;
1022           $hash{$1} = $_->{$key};
1023         }
1024         \%hash;
1025       } @{$full_cond}
1026     ];
1027   }
1028   elsif (ref $full_cond eq 'HASH') {
1029     if ((keys %{$full_cond})[0] eq '-and') {
1030       $cond->{-and} = [];
1031
1032       my @cond = @{$full_cond->{-and}};
1033       for (my $i = 0; $i < @cond; $i++) {
1034         my $entry = $cond[$i];
1035
1036         my $hash;
1037         if (ref $entry eq 'HASH') {
1038           $hash = $self->_cond_for_update_delete($entry);
1039         }
1040         else {
1041           $entry =~ /([^.]+)$/;
1042           $hash->{$1} = $cond[++$i];
1043         }
1044
1045         push @{$cond->{-and}}, $hash;
1046       }
1047     }
1048     else {
1049       foreach my $key (keys %{$full_cond}) {
1050         $key =~ /([^.]+)$/;
1051         $cond->{$1} = $full_cond->{$key};
1052       }
1053     }
1054   }
1055   else {
1056     $self->throw_exception(
1057       "Can't update/delete on resultset with condition unless hash or array"
1058     );
1059   }
1060
1061   return $cond;
1062 }
1063
1064
1065 =head2 update
1066
1067 =over 4
1068
1069 =item Arguments: \%values
1070
1071 =item Return Value: $storage_rv
1072
1073 =back
1074
1075 Sets the specified columns in the resultset to the supplied values in a
1076 single query. Return value will be true if the update succeeded or false
1077 if no records were updated; exact type of success value is storage-dependent.
1078
1079 =cut
1080
1081 sub update {
1082   my ($self, $values) = @_;
1083   $self->throw_exception("Values for update must be a hash")
1084     unless ref $values eq 'HASH';
1085
1086   my $cond = $self->_cond_for_update_delete;
1087
1088   return $self->result_source->storage->update(
1089     $self->result_source->from, $values, $cond
1090   );
1091 }
1092
1093 =head2 update_all
1094
1095 =over 4
1096
1097 =item Arguments: \%values
1098
1099 =item Return Value: 1
1100
1101 =back
1102
1103 Fetches all objects and updates them one at a time. Note that C<update_all>
1104 will run DBIC cascade triggers, while L</update> will not.
1105
1106 =cut
1107
1108 sub update_all {
1109   my ($self, $values) = @_;
1110   $self->throw_exception("Values for update must be a hash")
1111     unless ref $values eq 'HASH';
1112   foreach my $obj ($self->all) {
1113     $obj->set_columns($values)->update;
1114   }
1115   return 1;
1116 }
1117
1118 =head2 delete
1119
1120 =over 4
1121
1122 =item Arguments: none
1123
1124 =item Return Value: 1
1125
1126 =back
1127
1128 Deletes the contents of the resultset from its result source. Note that this
1129 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1130 to run. See also L<DBIx::Class::Row/delete>.
1131
1132 =cut
1133
1134 sub delete {
1135   my ($self) = @_;
1136
1137   my $cond = $self->_cond_for_update_delete;
1138
1139   $self->result_source->storage->delete($self->result_source->from, $cond);
1140   return 1;
1141 }
1142
1143 =head2 delete_all
1144
1145 =over 4
1146
1147 =item Arguments: none
1148
1149 =item Return Value: 1
1150
1151 =back
1152
1153 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1154 will run DBIC cascade triggers, while L</delete> will not.
1155
1156 =cut
1157
1158 sub delete_all {
1159   my ($self) = @_;
1160   $_->delete for $self->all;
1161   return 1;
1162 }
1163
1164 =head2 pager
1165
1166 =over 4
1167
1168 =item Arguments: none
1169
1170 =item Return Value: $pager
1171
1172 =back
1173
1174 Return Value a L<Data::Page> object for the current resultset. Only makes
1175 sense for queries with a C<page> attribute.
1176
1177 =cut
1178
1179 sub pager {
1180   my ($self) = @_;
1181   my $attrs = $self->{attrs};
1182   $self->throw_exception("Can't create pager for non-paged rs")
1183     unless $self->{attrs}{page};
1184   $attrs->{rows} ||= 10;
1185   return $self->{pager} ||= Data::Page->new(
1186     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1187 }
1188
1189 =head2 page
1190
1191 =over 4
1192
1193 =item Arguments: $page_number
1194
1195 =item Return Value: $rs
1196
1197 =back
1198
1199 Returns a resultset for the $page_number page of the resultset on which page
1200 is called, where each page contains a number of rows equal to the 'rows'
1201 attribute set on the resultset (10 by default).
1202
1203 =cut
1204
1205 sub page {
1206   my ($self, $page) = @_;
1207   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1208 }
1209
1210 =head2 new_result
1211
1212 =over 4
1213
1214 =item Arguments: \%vals
1215
1216 =item Return Value: $object
1217
1218 =back
1219
1220 Creates an object in the resultset's result class and returns it.
1221
1222 =cut
1223
1224 sub new_result {
1225   my ($self, $values) = @_;
1226   $self->throw_exception( "new_result needs a hash" )
1227     unless (ref $values eq 'HASH');
1228   $self->throw_exception(
1229     "Can't abstract implicit construct, condition not a hash"
1230   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1231
1232   my $alias = $self->{attrs}{alias};
1233   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1234   my %new = (
1235     %{ $self->_remove_alias($values, $alias) },
1236     %{ $self->_remove_alias($collapsed_cond, $alias) },
1237   );
1238
1239   my $obj = $self->result_class->new(\%new);
1240   $obj->result_source($self->result_source) if $obj->can('result_source');
1241   return $obj;
1242 }
1243
1244 # _collapse_cond
1245 #
1246 # Recursively collapse the condition.
1247
1248 sub _collapse_cond {
1249   my ($self, $cond, $collapsed) = @_;
1250
1251   $collapsed ||= {};
1252
1253   if (ref $cond eq 'ARRAY') {
1254     foreach my $subcond (@$cond) {
1255       next unless ref $subcond;  # -or
1256 #      warn "ARRAY: " . Dumper $subcond;
1257       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1258     }
1259   }
1260   elsif (ref $cond eq 'HASH') {
1261     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1262       foreach my $subcond (@{$cond->{-and}}) {
1263 #        warn "HASH: " . Dumper $subcond;
1264         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1265       }
1266     }
1267     else {
1268 #      warn "LEAF: " . Dumper $cond;
1269       foreach my $col (keys %$cond) {
1270         my $value = $cond->{$col};
1271         $collapsed->{$col} = $value;
1272       }
1273     }
1274   }
1275
1276   return $collapsed;
1277 }
1278
1279 # _remove_alias
1280 #
1281 # Remove the specified alias from the specified query hash. A copy is made so
1282 # the original query is not modified.
1283
1284 sub _remove_alias {
1285   my ($self, $query, $alias) = @_;
1286
1287   my %orig = %{ $query || {} };
1288   my %unaliased;
1289
1290   foreach my $key (keys %orig) {
1291     if ($key !~ /\./) {
1292       $unaliased{$key} = $orig{$key};
1293       next;
1294     }
1295     $unaliased{$1} = $orig{$key}
1296       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1297   }
1298
1299   return \%unaliased;
1300 }
1301
1302 =head2 find_or_new
1303
1304 =over 4
1305
1306 =item Arguments: \%vals, \%attrs?
1307
1308 =item Return Value: $object
1309
1310 =back
1311
1312 Find an existing record from this resultset. If none exists, instantiate a new
1313 result object and return it. The object will not be saved into your storage
1314 until you call L<DBIx::Class::Row/insert> on it.
1315
1316 If you want objects to be saved immediately, use L</find_or_create> instead.
1317
1318 =cut
1319
1320 sub find_or_new {
1321   my $self     = shift;
1322   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1323   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1324   my $exists   = $self->find($hash, $attrs);
1325   return defined $exists ? $exists : $self->new_result($hash);
1326 }
1327
1328 =head2 create
1329
1330 =over 4
1331
1332 =item Arguments: \%vals
1333
1334 =item Return Value: $object
1335
1336 =back
1337
1338 Inserts a record into the resultset and returns the object representing it.
1339
1340 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1341
1342 =cut
1343
1344 sub create {
1345   my ($self, $attrs) = @_;
1346   $self->throw_exception( "create needs a hashref" )
1347     unless ref $attrs eq 'HASH';
1348   return $self->new_result($attrs)->insert;
1349 }
1350
1351 =head2 find_or_create
1352
1353 =over 4
1354
1355 =item Arguments: \%vals, \%attrs?
1356
1357 =item Return Value: $object
1358
1359 =back
1360
1361   $class->find_or_create({ key => $val, ... });
1362
1363 Tries to find a record based on its primary key or unique constraint; if none
1364 is found, creates one and returns that instead.
1365
1366   my $cd = $schema->resultset('CD')->find_or_create({
1367     cdid   => 5,
1368     artist => 'Massive Attack',
1369     title  => 'Mezzanine',
1370     year   => 2005,
1371   });
1372
1373 Also takes an optional C<key> attribute, to search by a specific key or unique
1374 constraint. For example:
1375
1376   my $cd = $schema->resultset('CD')->find_or_create(
1377     {
1378       artist => 'Massive Attack',
1379       title  => 'Mezzanine',
1380     },
1381     { key => 'cd_artist_title' }
1382   );
1383
1384 See also L</find> and L</update_or_create>. For information on how to declare
1385 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1386
1387 =cut
1388
1389 sub find_or_create {
1390   my $self     = shift;
1391   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1392   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1393   my $exists   = $self->find($hash, $attrs);
1394   return defined $exists ? $exists : $self->create($hash);
1395 }
1396
1397 =head2 update_or_create
1398
1399 =over 4
1400
1401 =item Arguments: \%col_values, { key => $unique_constraint }?
1402
1403 =item Return Value: $object
1404
1405 =back
1406
1407   $class->update_or_create({ col => $val, ... });
1408
1409 First, searches for an existing row matching one of the unique constraints
1410 (including the primary key) on the source of this resultset. If a row is
1411 found, updates it with the other given column values. Otherwise, creates a new
1412 row.
1413
1414 Takes an optional C<key> attribute to search on a specific unique constraint.
1415 For example:
1416
1417   # In your application
1418   my $cd = $schema->resultset('CD')->update_or_create(
1419     {
1420       artist => 'Massive Attack',
1421       title  => 'Mezzanine',
1422       year   => 1998,
1423     },
1424     { key => 'cd_artist_title' }
1425   );
1426
1427 If no C<key> is specified, it searches on all unique constraints defined on the
1428 source, including the primary key.
1429
1430 If the C<key> is specified as C<primary>, it searches only on the primary key.
1431
1432 See also L</find> and L</find_or_create>. For information on how to declare
1433 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1434
1435 =cut
1436
1437 sub update_or_create {
1438   my $self = shift;
1439   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1440   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1441
1442   my $row = $self->find($cond, $attrs);
1443   if (defined $row) {
1444     $row->update($cond);
1445     return $row;
1446   }
1447
1448   return $self->create($cond);
1449 }
1450
1451 =head2 get_cache
1452
1453 =over 4
1454
1455 =item Arguments: none
1456
1457 =item Return Value: \@cache_objects?
1458
1459 =back
1460
1461 Gets the contents of the cache for the resultset, if the cache is set.
1462
1463 =cut
1464
1465 sub get_cache {
1466   shift->{all_cache};
1467 }
1468
1469 =head2 set_cache
1470
1471 =over 4
1472
1473 =item Arguments: \@cache_objects
1474
1475 =item Return Value: \@cache_objects
1476
1477 =back
1478
1479 Sets the contents of the cache for the resultset. Expects an arrayref
1480 of objects of the same class as those produced by the resultset. Note that
1481 if the cache is set the resultset will return the cached objects rather
1482 than re-querying the database even if the cache attr is not set.
1483
1484 =cut
1485
1486 sub set_cache {
1487   my ( $self, $data ) = @_;
1488   $self->throw_exception("set_cache requires an arrayref")
1489       if defined($data) && (ref $data ne 'ARRAY');
1490   $self->{all_cache} = $data;
1491 }
1492
1493 =head2 clear_cache
1494
1495 =over 4
1496
1497 =item Arguments: none
1498
1499 =item Return Value: []
1500
1501 =back
1502
1503 Clears the cache for the resultset.
1504
1505 =cut
1506
1507 sub clear_cache {
1508   shift->set_cache(undef);
1509 }
1510
1511 =head2 related_resultset
1512
1513 =over 4
1514
1515 =item Arguments: $relationship_name
1516
1517 =item Return Value: $resultset
1518
1519 =back
1520
1521 Returns a related resultset for the supplied relationship name.
1522
1523   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1524
1525 =cut
1526
1527 sub related_resultset {
1528   my ($self, $rel) = @_;
1529
1530   $self->{related_resultsets} ||= {};
1531   return $self->{related_resultsets}{$rel} ||= do {
1532     my $rel_obj = $self->result_source->relationship_info($rel);
1533
1534     $self->throw_exception(
1535       "search_related: result source '" . $self->result_source->name .
1536         "' has no such relationship $rel")
1537       unless $rel_obj;
1538     
1539     my ($from,$seen) = $self->_resolve_from($rel);
1540
1541     my $join_count = $seen->{$rel};
1542     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1543
1544     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1545       undef, {
1546         %{$self->{attrs}||{}},
1547         join => undef,
1548         prefetch => undef,
1549         select => undef,
1550         as => undef,
1551         alias => $alias,
1552         where => $self->{cond},
1553         seen_join => $seen,
1554         from => $from,
1555     });
1556   };
1557 }
1558
1559 sub _resolve_from {
1560   my ($self, $extra_join) = @_;
1561   my $source = $self->result_source;
1562   my $attrs = $self->{attrs};
1563   
1564   my $from = $attrs->{from}
1565     || [ { $attrs->{alias} => $source->from } ];
1566     
1567   my $seen = { %{$attrs->{seen_join}||{}} };
1568
1569   my $join = ($attrs->{join}
1570                ? [ $attrs->{join}, $extra_join ]
1571                : $extra_join);
1572   $from = [
1573     @$from,
1574     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1575   ];
1576
1577   return ($from,$seen);
1578 }
1579
1580 sub _resolved_attrs {
1581   my $self = shift;
1582   return $self->{_attrs} if $self->{_attrs};
1583
1584   my $attrs = { %{$self->{attrs}||{}} };
1585   my $source = $self->{result_source};
1586   my $alias = $attrs->{alias};
1587
1588   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1589   if ($attrs->{columns}) {
1590     delete $attrs->{as};
1591   } elsif (!$attrs->{select}) {
1592     $attrs->{columns} = [ $source->columns ];
1593   }
1594  
1595   $attrs->{select} = 
1596     ($attrs->{select}
1597       ? (ref $attrs->{select} eq 'ARRAY'
1598           ? [ @{$attrs->{select}} ]
1599           : [ $attrs->{select} ])
1600       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1601     );
1602   $attrs->{as} =
1603     ($attrs->{as}
1604       ? (ref $attrs->{as} eq 'ARRAY'
1605           ? [ @{$attrs->{as}} ]
1606           : [ $attrs->{as} ])
1607       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1608     );
1609   
1610   my $adds;
1611   if ($adds = delete $attrs->{include_columns}) {
1612     $adds = [$adds] unless ref $adds eq 'ARRAY';
1613     push(@{$attrs->{select}}, @$adds);
1614     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1615   }
1616   if ($adds = delete $attrs->{'+select'}) {
1617     $adds = [$adds] unless ref $adds eq 'ARRAY';
1618     push(@{$attrs->{select}},
1619            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1620   }
1621   if (my $adds = delete $attrs->{'+as'}) {
1622     $adds = [$adds] unless ref $adds eq 'ARRAY';
1623     push(@{$attrs->{as}}, @$adds);
1624   }
1625
1626   $attrs->{from} ||= [ { 'me' => $source->from } ];
1627
1628   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1629     my $join = delete $attrs->{join} || {};
1630
1631     if (defined $attrs->{prefetch}) {
1632       $join = $self->_merge_attr(
1633         $join, $attrs->{prefetch}
1634       );
1635     }
1636
1637     $attrs->{from} =   # have to copy here to avoid corrupting the original
1638       [
1639         @{$attrs->{from}}, 
1640         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1641       ];
1642   }
1643
1644   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1645   if ($attrs->{order_by}) {
1646     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1647                            ? [ @{$attrs->{order_by}} ]
1648                            : [ $attrs->{order_by} ]);
1649   } else {
1650     $attrs->{order_by} = [];    
1651   }
1652
1653   my $collapse = $attrs->{collapse} || {};
1654   if (my $prefetch = delete $attrs->{prefetch}) {
1655     $prefetch = $self->_merge_attr({}, $prefetch);
1656     my @pre_order;
1657     my $seen = $attrs->{seen_join} || {};
1658     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1659       # bring joins back to level of current class
1660       my @prefetch = $source->resolve_prefetch(
1661         $p, $alias, $seen, \@pre_order, $collapse
1662       );
1663       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1664       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1665     }
1666     push(@{$attrs->{order_by}}, @pre_order);
1667   }
1668   $attrs->{collapse} = $collapse;
1669
1670   return $self->{_attrs} = $attrs;
1671 }
1672
1673 sub _merge_attr {
1674   my ($self, $a, $b) = @_;
1675   return $b unless defined($a);
1676   return $a unless defined($b);
1677   
1678   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1679     foreach my $key (keys %{$b}) {
1680       if (exists $a->{$key}) {
1681         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1682       } else {
1683         $a->{$key} = $b->{$key};
1684       }
1685     }
1686     return $a;
1687   } else {
1688     $a = [$a] unless ref $a eq 'ARRAY';
1689     $b = [$b] unless ref $b eq 'ARRAY';
1690
1691     my $hash = {};
1692     my @array;
1693     foreach my $x ($a, $b) {
1694       foreach my $element (@{$x}) {
1695         if (ref $element eq 'HASH') {
1696           $hash = $self->_merge_attr($hash, $element);
1697         } elsif (ref $element eq 'ARRAY') {
1698           push(@array, @{$element});
1699         } else {
1700           push(@array, $element) unless $b == $x
1701             && grep { $_ eq $element } @array;
1702         }
1703       }
1704     }
1705     
1706     @array = grep { !exists $hash->{$_} } @array;
1707
1708     return keys %{$hash}
1709       ? ( scalar(@array)
1710             ? [$hash, @array]
1711             : $hash
1712         )
1713       : \@array;
1714   }
1715 }
1716
1717 =head2 throw_exception
1718
1719 See L<DBIx::Class::Schema/throw_exception> for details.
1720
1721 =cut
1722
1723 sub throw_exception {
1724   my $self=shift;
1725   $self->result_source->schema->throw_exception(@_);
1726 }
1727
1728 # XXX: FIXME: Attributes docs need clearing up
1729
1730 =head1 ATTRIBUTES
1731
1732 The resultset takes various attributes that modify its behavior. Here's an
1733 overview of them:
1734
1735 =head2 order_by
1736
1737 =over 4
1738
1739 =item Value: ($order_by | \@order_by)
1740
1741 =back
1742
1743 Which column(s) to order the results by. This is currently passed
1744 through directly to SQL, so you can give e.g. C<year DESC> for a
1745 descending order on the column `year'.
1746
1747 Please note that if you have quoting enabled (see
1748 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1749 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1750 so you will need to manually quote things as appropriate.)
1751
1752 =head2 columns
1753
1754 =over 4
1755
1756 =item Value: \@columns
1757
1758 =back
1759
1760 Shortcut to request a particular set of columns to be retrieved.  Adds
1761 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1762 from that, then auto-populates C<as> from C<select> as normal. (You may also
1763 use the C<cols> attribute, as in earlier versions of DBIC.)
1764
1765 =head2 include_columns
1766
1767 =over 4
1768
1769 =item Value: \@columns
1770
1771 =back
1772
1773 Shortcut to include additional columns in the returned results - for example
1774
1775   $schema->resultset('CD')->search(undef, {
1776     include_columns => ['artist.name'],
1777     join => ['artist']
1778   });
1779
1780 would return all CDs and include a 'name' column to the information
1781 passed to object inflation
1782
1783 =head2 select
1784
1785 =over 4
1786
1787 =item Value: \@select_columns
1788
1789 =back
1790
1791 Indicates which columns should be selected from the storage. You can use
1792 column names, or in the case of RDBMS back ends, function or stored procedure
1793 names:
1794
1795   $rs = $schema->resultset('Employee')->search(undef, {
1796     select => [
1797       'name',
1798       { count => 'employeeid' },
1799       { sum => 'salary' }
1800     ]
1801   });
1802
1803 When you use function/stored procedure names and do not supply an C<as>
1804 attribute, the column names returned are storage-dependent. E.g. MySQL would
1805 return a column named C<count(employeeid)> in the above example.
1806
1807 =head2 +select
1808
1809 =over 4
1810
1811 Indicates additional columns to be selected from storage.  Works the same as
1812 L<select> but adds columns to the selection.
1813
1814 =back
1815
1816 =head2 +as
1817
1818 =over 4
1819
1820 Indicates additional column names for those added via L<+select>.
1821
1822 =back
1823
1824 =head2 as
1825
1826 =over 4
1827
1828 =item Value: \@inflation_names
1829
1830 =back
1831
1832 Indicates column names for object inflation. This is used in conjunction with
1833 C<select>, usually when C<select> contains one or more function or stored
1834 procedure names:
1835
1836   $rs = $schema->resultset('Employee')->search(undef, {
1837     select => [
1838       'name',
1839       { count => 'employeeid' }
1840     ],
1841     as => ['name', 'employee_count'],
1842   });
1843
1844   my $employee = $rs->first(); # get the first Employee
1845
1846 If the object against which the search is performed already has an accessor
1847 matching a column name specified in C<as>, the value can be retrieved using
1848 the accessor as normal:
1849
1850   my $name = $employee->name();
1851
1852 If on the other hand an accessor does not exist in the object, you need to
1853 use C<get_column> instead:
1854
1855   my $employee_count = $employee->get_column('employee_count');
1856
1857 You can create your own accessors if required - see
1858 L<DBIx::Class::Manual::Cookbook> for details.
1859
1860 Please note: This will NOT insert an C<AS employee_count> into the SQL
1861 statement produced, it is used for internal access only. Thus
1862 attempting to use the accessor in an C<order_by> clause or similar
1863 will fail miserably.
1864
1865 To get around this limitation, you can supply literal SQL to your
1866 C<select> attibute that contains the C<AS alias> text, eg:
1867
1868   select => [\'myfield AS alias']
1869
1870 =head2 join
1871
1872 =over 4
1873
1874 =item Value: ($rel_name | \@rel_names | \%rel_names)
1875
1876 =back
1877
1878 Contains a list of relationships that should be joined for this query.  For
1879 example:
1880
1881   # Get CDs by Nine Inch Nails
1882   my $rs = $schema->resultset('CD')->search(
1883     { 'artist.name' => 'Nine Inch Nails' },
1884     { join => 'artist' }
1885   );
1886
1887 Can also contain a hash reference to refer to the other relation's relations.
1888 For example:
1889
1890   package MyApp::Schema::Track;
1891   use base qw/DBIx::Class/;
1892   __PACKAGE__->table('track');
1893   __PACKAGE__->add_columns(qw/trackid cd position title/);
1894   __PACKAGE__->set_primary_key('trackid');
1895   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1896   1;
1897
1898   # In your application
1899   my $rs = $schema->resultset('Artist')->search(
1900     { 'track.title' => 'Teardrop' },
1901     {
1902       join     => { cd => 'track' },
1903       order_by => 'artist.name',
1904     }
1905   );
1906
1907 You need to use the relationship (not the table) name in  conditions, 
1908 because they are aliased as such. The current table is aliased as "me", so 
1909 you need to use me.column_name in order to avoid ambiguity. For example:
1910
1911   # Get CDs from 1984 with a 'Foo' track 
1912   my $rs = $schema->resultset('CD')->search(
1913     { 
1914       'me.year' => 1984,
1915       'tracks.name' => 'Foo'
1916     },
1917     { join => 'tracks' }
1918   );
1919   
1920 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1921 similarly for a third time). For e.g.
1922
1923   my $rs = $schema->resultset('Artist')->search({
1924     'cds.title'   => 'Down to Earth',
1925     'cds_2.title' => 'Popular',
1926   }, {
1927     join => [ qw/cds cds/ ],
1928   });
1929
1930 will return a set of all artists that have both a cd with title 'Down
1931 to Earth' and a cd with title 'Popular'.
1932
1933 If you want to fetch related objects from other tables as well, see C<prefetch>
1934 below.
1935
1936 =head2 prefetch
1937
1938 =over 4
1939
1940 =item Value: ($rel_name | \@rel_names | \%rel_names)
1941
1942 =back
1943
1944 Contains one or more relationships that should be fetched along with the main
1945 query (when they are accessed afterwards they will have already been
1946 "prefetched").  This is useful for when you know you will need the related
1947 objects, because it saves at least one query:
1948
1949   my $rs = $schema->resultset('Tag')->search(
1950     undef,
1951     {
1952       prefetch => {
1953         cd => 'artist'
1954       }
1955     }
1956   );
1957
1958 The initial search results in SQL like the following:
1959
1960   SELECT tag.*, cd.*, artist.* FROM tag
1961   JOIN cd ON tag.cd = cd.cdid
1962   JOIN artist ON cd.artist = artist.artistid
1963
1964 L<DBIx::Class> has no need to go back to the database when we access the
1965 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1966 case.
1967
1968 Simple prefetches will be joined automatically, so there is no need
1969 for a C<join> attribute in the above search. If you're prefetching to
1970 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1971 specify the join as well.
1972
1973 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1974 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1975 with an accessor type of 'single' or 'filter').
1976
1977 =head2 page
1978
1979 =over 4
1980
1981 =item Value: $page
1982
1983 =back
1984
1985 Makes the resultset paged and specifies the page to retrieve. Effectively
1986 identical to creating a non-pages resultset and then calling ->page($page)
1987 on it.
1988
1989 If L<rows> attribute is not specified it defualts to 10 rows per page.
1990
1991 =head2 rows
1992
1993 =over 4
1994
1995 =item Value: $rows
1996
1997 =back
1998
1999 Specifes the maximum number of rows for direct retrieval or the number of
2000 rows per page if the page attribute or method is used.
2001
2002 =head2 offset
2003
2004 =over 4
2005
2006 =item Value: $offset
2007
2008 =back
2009
2010 Specifies the (zero-based) row number for the  first row to be returned, or the
2011 of the first row of the first page if paging is used.
2012
2013 =head2 group_by
2014
2015 =over 4
2016
2017 =item Value: \@columns
2018
2019 =back
2020
2021 A arrayref of columns to group by. Can include columns of joined tables.
2022
2023   group_by => [qw/ column1 column2 ... /]
2024
2025 =head2 having
2026
2027 =over 4
2028
2029 =item Value: $condition
2030
2031 =back
2032
2033 HAVING is a select statement attribute that is applied between GROUP BY and
2034 ORDER BY. It is applied to the after the grouping calculations have been
2035 done.
2036
2037   having => { 'count(employee)' => { '>=', 100 } }
2038
2039 =head2 distinct
2040
2041 =over 4
2042
2043 =item Value: (0 | 1)
2044
2045 =back
2046
2047 Set to 1 to group by all columns.
2048
2049 =head2 where
2050
2051 =over 4
2052
2053 Adds to the WHERE clause.
2054
2055   # only return rows WHERE deleted IS NULL for all searches
2056   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2057
2058 Can be overridden by passing C<{ where => undef }> as an attribute
2059 to a resulset.
2060
2061 =back
2062
2063 =head2 cache
2064
2065 Set to 1 to cache search results. This prevents extra SQL queries if you
2066 revisit rows in your ResultSet:
2067
2068   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2069
2070   while( my $artist = $resultset->next ) {
2071     ... do stuff ...
2072   }
2073
2074   $rs->first; # without cache, this would issue a query
2075
2076 By default, searches are not cached.
2077
2078 For more examples of using these attributes, see
2079 L<DBIx::Class::Manual::Cookbook>.
2080
2081 =head2 from
2082
2083 =over 4
2084
2085 =item Value: \@from_clause
2086
2087 =back
2088
2089 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2090 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2091 clauses.
2092
2093 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2094
2095 C<join> will usually do what you need and it is strongly recommended that you
2096 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2097 And we really do mean "cannot", not just tried and failed. Attempting to use
2098 this because you're having problems with C<join> is like trying to use x86
2099 ASM because you've got a syntax error in your C. Trust us on this.
2100
2101 Now, if you're still really, really sure you need to use this (and if you're
2102 not 100% sure, ask the mailing list first), here's an explanation of how this
2103 works.
2104
2105 The syntax is as follows -
2106
2107   [
2108     { <alias1> => <table1> },
2109     [
2110       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2111       [], # nested JOIN (optional)
2112       { <table1.column1> => <table2.column2>, ... (more conditions) },
2113     ],
2114     # More of the above [ ] may follow for additional joins
2115   ]
2116
2117   <table1> <alias1>
2118   JOIN
2119     <table2> <alias2>
2120     [JOIN ...]
2121   ON <table1.column1> = <table2.column2>
2122   <more joins may follow>
2123
2124 An easy way to follow the examples below is to remember the following:
2125
2126     Anything inside "[]" is a JOIN
2127     Anything inside "{}" is a condition for the enclosing JOIN
2128
2129 The following examples utilize a "person" table in a family tree application.
2130 In order to express parent->child relationships, this table is self-joined:
2131
2132     # Person->belongs_to('father' => 'Person');
2133     # Person->belongs_to('mother' => 'Person');
2134
2135 C<from> can be used to nest joins. Here we return all children with a father,
2136 then search against all mothers of those children:
2137
2138   $rs = $schema->resultset('Person')->search(
2139       undef,
2140       {
2141           alias => 'mother', # alias columns in accordance with "from"
2142           from => [
2143               { mother => 'person' },
2144               [
2145                   [
2146                       { child => 'person' },
2147                       [
2148                           { father => 'person' },
2149                           { 'father.person_id' => 'child.father_id' }
2150                       ]
2151                   ],
2152                   { 'mother.person_id' => 'child.mother_id' }
2153               ],
2154           ]
2155       },
2156   );
2157
2158   # Equivalent SQL:
2159   # SELECT mother.* FROM person mother
2160   # JOIN (
2161   #   person child
2162   #   JOIN person father
2163   #   ON ( father.person_id = child.father_id )
2164   # )
2165   # ON ( mother.person_id = child.mother_id )
2166
2167 The type of any join can be controlled manually. To search against only people
2168 with a father in the person table, we could explicitly use C<INNER JOIN>:
2169
2170     $rs = $schema->resultset('Person')->search(
2171         undef,
2172         {
2173             alias => 'child', # alias columns in accordance with "from"
2174             from => [
2175                 { child => 'person' },
2176                 [
2177                     { father => 'person', -join_type => 'inner' },
2178                     { 'father.id' => 'child.father_id' }
2179                 ],
2180             ]
2181         },
2182     );
2183
2184     # Equivalent SQL:
2185     # SELECT child.* FROM person child
2186     # INNER JOIN person father ON child.father_id = father.id
2187
2188 =cut
2189
2190 1;