fixed set_$rel (from many_to_many) to accept a listref
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? shift
188           : (
189               (@_ % 2)
190                 ? $self->throw_exception("Odd number of arguments to search")
191                 : {@_}
192              )
193       )
194     : undef
195   );
196
197   if (defined $where) {
198     $new_attrs->{where} = (
199       defined $new_attrs->{where}
200         ? { '-and' => [
201               map {
202                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
203               } $where, $new_attrs->{where}
204             ]
205           }
206         : $where);
207   }
208   if (defined $cond) {
209     $new_attrs->{where} = (
210       defined $new_attrs->{where}
211         ? { '-and' => [
212               map {
213                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
214               } $cond, $new_attrs->{where}
215             ]
216           }
217         : $cond);
218   }
219
220   if (defined $having) {
221     $new_attrs->{having} = (
222       defined $new_attrs->{having}
223         ? { '-and' => [
224               map {
225                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
226               } $having, $new_attrs->{having}
227             ]
228           }
229         : $having);
230   }
231
232   my $rs = (ref $self)->new($self->result_source, $new_attrs);
233   if ($rows) {
234     $rs->set_cache($rows);
235   }
236   return $rs;
237 }
238
239 =head2 search_literal
240
241 =over 4
242
243 =item Arguments: $sql_fragment, @bind_values
244
245 =item Return Value: $resultset (scalar context), @row_objs (list context)
246
247 =back
248
249   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
250   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
251
252 Pass a literal chunk of SQL to be added to the conditional part of the
253 resultset query.
254
255 =cut
256
257 sub search_literal {
258   my ($self, $cond, @vals) = @_;
259   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
260   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
261   return $self->search(\$cond, $attrs);
262 }
263
264 =head2 find
265
266 =over 4
267
268 =item Arguments: @values | \%cols, \%attrs?
269
270 =item Return Value: $row_object
271
272 =back
273
274 Finds a row based on its primary key or unique constraint. For example, to find
275 a row by its primary key:
276
277   my $cd = $schema->resultset('CD')->find(5);
278
279 You can also find a row by a specific unique constraint using the C<key>
280 attribute. For example:
281
282   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
283     key => 'cd_artist_title'
284   });
285
286 Additionally, you can specify the columns explicitly by name:
287
288   my $cd = $schema->resultset('CD')->find(
289     {
290       artist => 'Massive Attack',
291       title  => 'Mezzanine',
292     },
293     { key => 'cd_artist_title' }
294   );
295
296 If the C<key> is specified as C<primary>, it searches only on the primary key.
297
298 If no C<key> is specified, it searches on all unique constraints defined on the
299 source, including the primary key.
300
301 If your table does not have a primary key, you B<must> provide a value for the
302 C<key> attribute matching one of the unique constraints on the source.
303
304 See also L</find_or_create> and L</update_or_create>. For information on how to
305 declare unique constraints, see
306 L<DBIx::Class::ResultSource/add_unique_constraint>.
307
308 =cut
309
310 sub find {
311   my $self = shift;
312   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
313
314   # Default to the primary key, but allow a specific key
315   my @cols = exists $attrs->{key}
316     ? $self->result_source->unique_constraint_columns($attrs->{key})
317     : $self->result_source->primary_columns;
318   $self->throw_exception(
319     "Can't find unless a primary key is defined or unique constraint is specified"
320   ) unless @cols;
321
322   # Parse out a hashref from input
323   my $input_query;
324   if (ref $_[0] eq 'HASH') {
325     $input_query = { %{$_[0]} };
326   }
327   elsif (@_ == @cols) {
328     $input_query = {};
329     @{$input_query}{@cols} = @_;
330   }
331   else {
332     # Compatibility: Allow e.g. find(id => $value)
333     carp "Find by key => value deprecated; please use a hashref instead";
334     $input_query = {@_};
335   }
336
337   my @unique_queries = $self->_unique_queries($input_query, $attrs);
338
339   # Build the final query: Default to the disjunction of the unique queries,
340   # but allow the input query in case the ResultSet defines the query or the
341   # user is abusing find
342   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
343   my $query = @unique_queries
344     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
345     : $self->_add_alias($input_query, $alias);
346
347   # Run the query
348   if (keys %$attrs) {
349     my $rs = $self->search($query, $attrs);
350     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
351   }
352   else {
353     return keys %{$self->_resolved_attrs->{collapse}}
354       ? $self->search($query)->next
355       : $self->single($query);
356   }
357 }
358
359 # _add_alias
360 #
361 # Add the specified alias to the specified query hash. A copy is made so the
362 # original query is not modified.
363
364 sub _add_alias {
365   my ($self, $query, $alias) = @_;
366
367   my %aliased = %$query;
368   foreach my $col (grep { ! m/\./ } keys %aliased) {
369     $aliased{"$alias.$col"} = delete $aliased{$col};
370   }
371
372   return \%aliased;
373 }
374
375 # _unique_queries
376 #
377 # Build a list of queries which satisfy unique constraints.
378
379 sub _unique_queries {
380   my ($self, $query, $attrs) = @_;
381
382   my @constraint_names = exists $attrs->{key}
383     ? ($attrs->{key})
384     : $self->result_source->unique_constraint_names;
385
386   my @unique_queries;
387   foreach my $name (@constraint_names) {
388     my @unique_cols = $self->result_source->unique_constraint_columns($name);
389     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
390
391     my $num_query = scalar keys %$unique_query;
392     next unless $num_query;
393
394     # XXX: Assuming quite a bit about $self->{attrs}{where}
395     my $num_cols = scalar @unique_cols;
396     my $num_where = exists $self->{attrs}{where}
397       ? scalar keys %{ $self->{attrs}{where} }
398       : 0;
399     push @unique_queries, $unique_query
400       if $num_query + $num_where == $num_cols;
401   }
402
403   return @unique_queries;
404 }
405
406 # _build_unique_query
407 #
408 # Constrain the specified query hash based on the specified column names.
409
410 sub _build_unique_query {
411   my ($self, $query, $unique_cols) = @_;
412
413   return {
414     map  { $_ => $query->{$_} }
415     grep { exists $query->{$_} }
416       @$unique_cols
417   };
418 }
419
420 =head2 search_related
421
422 =over 4
423
424 =item Arguments: $rel, $cond, \%attrs?
425
426 =item Return Value: $new_resultset
427
428 =back
429
430   $new_rs = $cd_rs->search_related('artist', {
431     name => 'Emo-R-Us',
432   });
433
434 Searches the specified relationship, optionally specifying a condition and
435 attributes for matching records. See L</ATTRIBUTES> for more information.
436
437 =cut
438
439 sub search_related {
440   return shift->related_resultset(shift)->search(@_);
441 }
442
443 =head2 cursor
444
445 =over 4
446
447 =item Arguments: none
448
449 =item Return Value: $cursor
450
451 =back
452
453 Returns a storage-driven cursor to the given resultset. See
454 L<DBIx::Class::Cursor> for more information.
455
456 =cut
457
458 sub cursor {
459   my ($self) = @_;
460
461   my $attrs = { %{$self->_resolved_attrs} };
462   return $self->{cursor}
463     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
464           $attrs->{where},$attrs);
465 }
466
467 =head2 single
468
469 =over 4
470
471 =item Arguments: $cond?
472
473 =item Return Value: $row_object?
474
475 =back
476
477   my $cd = $schema->resultset('CD')->single({ year => 2001 });
478
479 Inflates the first result without creating a cursor if the resultset has
480 any records in it; if not returns nothing. Used by L</find> as an optimisation.
481
482 Can optionally take an additional condition *only* - this is a fast-code-path
483 method; if you need to add extra joins or similar call ->search and then
484 ->single without a condition on the $rs returned from that.
485
486 =cut
487
488 sub single {
489   my ($self, $where) = @_;
490   my $attrs = { %{$self->_resolved_attrs} };
491   if ($where) {
492     if (defined $attrs->{where}) {
493       $attrs->{where} = {
494         '-and' =>
495             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
496                $where, delete $attrs->{where} ]
497       };
498     } else {
499       $attrs->{where} = $where;
500     }
501   }
502
503 #  XXX: Disabled since it doesn't infer uniqueness in all cases
504 #  unless ($self->_is_unique_query($attrs->{where})) {
505 #    carp "Query not guaranteed to return a single row"
506 #      . "; please declare your unique constraints or use search instead";
507 #  }
508
509   my @data = $self->result_source->storage->select_single(
510     $attrs->{from}, $attrs->{select},
511     $attrs->{where}, $attrs
512   );
513
514   return (@data ? $self->_construct_object(@data) : ());
515 }
516
517 # _is_unique_query
518 #
519 # Try to determine if the specified query is guaranteed to be unique, based on
520 # the declared unique constraints.
521
522 sub _is_unique_query {
523   my ($self, $query) = @_;
524
525   my $collapsed = $self->_collapse_query($query);
526   my $alias = $self->{attrs}{alias};
527
528   foreach my $name ($self->result_source->unique_constraint_names) {
529     my @unique_cols = map {
530       "$alias.$_"
531     } $self->result_source->unique_constraint_columns($name);
532
533     # Count the values for each unique column
534     my %seen = map { $_ => 0 } @unique_cols;
535
536     foreach my $key (keys %$collapsed) {
537       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
538       next unless exists $seen{$aliased};  # Additional constraints are okay
539       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
540     }
541
542     # If we get 0 or more than 1 value for a column, it's not necessarily unique
543     return 1 unless grep { $_ != 1 } values %seen;
544   }
545
546   return 0;
547 }
548
549 # _collapse_query
550 #
551 # Recursively collapse the query, accumulating values for each column.
552
553 sub _collapse_query {
554   my ($self, $query, $collapsed) = @_;
555
556   $collapsed ||= {};
557
558   if (ref $query eq 'ARRAY') {
559     foreach my $subquery (@$query) {
560       next unless ref $subquery;  # -or
561 #      warn "ARRAY: " . Dumper $subquery;
562       $collapsed = $self->_collapse_query($subquery, $collapsed);
563     }
564   }
565   elsif (ref $query eq 'HASH') {
566     if (keys %$query and (keys %$query)[0] eq '-and') {
567       foreach my $subquery (@{$query->{-and}}) {
568 #        warn "HASH: " . Dumper $subquery;
569         $collapsed = $self->_collapse_query($subquery, $collapsed);
570       }
571     }
572     else {
573 #      warn "LEAF: " . Dumper $query;
574       foreach my $col (keys %$query) {
575         my $value = $query->{$col};
576         $collapsed->{$col}{$value}++;
577       }
578     }
579   }
580
581   return $collapsed;
582 }
583
584 =head2 get_column
585
586 =over 4
587
588 =item Arguments: $cond?
589
590 =item Return Value: $resultsetcolumn
591
592 =back
593
594   my $max_length = $rs->get_column('length')->max;
595
596 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
597
598 =cut
599
600 sub get_column {
601   my ($self, $column) = @_;
602   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
603   return $new;
604 }
605
606 =head2 search_like
607
608 =over 4
609
610 =item Arguments: $cond, \%attrs?
611
612 =item Return Value: $resultset (scalar context), @row_objs (list context)
613
614 =back
615
616   # WHERE title LIKE '%blue%'
617   $cd_rs = $rs->search_like({ title => '%blue%'});
618
619 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
620 that this is simply a convenience method. You most likely want to use
621 L</search> with specific operators.
622
623 For more information, see L<DBIx::Class::Manual::Cookbook>.
624
625 =cut
626
627 sub search_like {
628   my $class = shift;
629   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
630   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
631   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
632   return $class->search($query, { %$attrs });
633 }
634
635 =head2 slice
636
637 =over 4
638
639 =item Arguments: $first, $last
640
641 =item Return Value: $resultset (scalar context), @row_objs (list context)
642
643 =back
644
645 Returns a resultset or object list representing a subset of elements from the
646 resultset slice is called on. Indexes are from 0, i.e., to get the first
647 three records, call:
648
649   my ($one, $two, $three) = $rs->slice(0, 2);
650
651 =cut
652
653 sub slice {
654   my ($self, $min, $max) = @_;
655   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
656   $attrs->{offset} = $self->{attrs}{offset} || 0;
657   $attrs->{offset} += $min;
658   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
659   return $self->search(undef(), $attrs);
660   #my $slice = (ref $self)->new($self->result_source, $attrs);
661   #return (wantarray ? $slice->all : $slice);
662 }
663
664 =head2 next
665
666 =over 4
667
668 =item Arguments: none
669
670 =item Return Value: $result?
671
672 =back
673
674 Returns the next element in the resultset (C<undef> is there is none).
675
676 Can be used to efficiently iterate over records in the resultset:
677
678   my $rs = $schema->resultset('CD')->search;
679   while (my $cd = $rs->next) {
680     print $cd->title;
681   }
682
683 Note that you need to store the resultset object, and call C<next> on it.
684 Calling C<< resultset('Table')->next >> repeatedly will always return the
685 first record from the resultset.
686
687 =cut
688
689 sub next {
690   my ($self) = @_;
691   if (my $cache = $self->get_cache) {
692     $self->{all_cache_position} ||= 0;
693     return $cache->[$self->{all_cache_position}++];
694   }
695   if ($self->{attrs}{cache}) {
696     $self->{all_cache_position} = 1;
697     return ($self->all)[0];
698   }
699   my @row = (
700     exists $self->{stashed_row}
701       ? @{delete $self->{stashed_row}}
702       : $self->cursor->next
703   );
704   return unless (@row);
705   return $self->_construct_object(@row);
706 }
707
708 sub _construct_object {
709   my ($self, @row) = @_;
710   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
711   my $new = $self->result_class->inflate_result($self->result_source, @$info);
712   $new = $self->{_attrs}{record_filter}->($new)
713     if exists $self->{_attrs}{record_filter};
714   return $new;
715 }
716
717 sub _collapse_result {
718   my ($self, $as, $row, $prefix) = @_;
719
720   my %const;
721   my @copy = @$row;
722   
723   foreach my $this_as (@$as) {
724     my $val = shift @copy;
725     if (defined $prefix) {
726       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
727         my $remain = $1;
728         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
729         $const{$1||''}{$2} = $val;
730       }
731     } else {
732       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
733       $const{$1||''}{$2} = $val;
734     }
735   }
736
737   my $alias = $self->{attrs}{alias};
738   my $info = [ {}, {} ];
739   foreach my $key (keys %const) {
740     if (length $key && $key ne $alias) {
741       my $target = $info;
742       my @parts = split(/\./, $key);
743       foreach my $p (@parts) {
744         $target = $target->[1]->{$p} ||= [];
745       }
746       $target->[0] = $const{$key};
747     } else {
748       $info->[0] = $const{$key};
749     }
750   }
751   
752   my @collapse;
753   if (defined $prefix) {
754     @collapse = map {
755         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
756     } keys %{$self->{_attrs}{collapse}}
757   } else {
758     @collapse = keys %{$self->{_attrs}{collapse}};
759   };
760
761   if (@collapse) {
762     my ($c) = sort { length $a <=> length $b } @collapse;
763     my $target = $info;
764     foreach my $p (split(/\./, $c)) {
765       $target = $target->[1]->{$p} ||= [];
766     }
767     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
768     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
769     my $tree = $self->_collapse_result($as, $row, $c_prefix);
770     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
771     my (@final, @raw);
772
773     while (
774       !(
775         grep {
776           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
777         } @co_key
778         )
779     ) {
780       push(@final, $tree);
781       last unless (@raw = $self->cursor->next);
782       $row = $self->{stashed_row} = \@raw;
783       $tree = $self->_collapse_result($as, $row, $c_prefix);
784     }
785     @$target = (@final ? @final : [ {}, {} ]);
786       # single empty result to indicate an empty prefetched has_many
787   }
788
789   #print "final info: " . Dumper($info);
790   return $info;
791 }
792
793 =head2 result_source
794
795 =over 4
796
797 =item Arguments: $result_source?
798
799 =item Return Value: $result_source
800
801 =back
802
803 An accessor for the primary ResultSource object from which this ResultSet
804 is derived.
805
806 =head2 result_class
807
808 =over 4
809
810 =item Arguments: $result_class?
811
812 =item Return Value: $result_class
813
814 =back
815
816 An accessor for the class to use when creating row objects. Defaults to 
817 C<< result_source->result_class >> - which in most cases is the name of the 
818 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
819
820 =cut
821
822
823 =head2 count
824
825 =over 4
826
827 =item Arguments: $cond, \%attrs??
828
829 =item Return Value: $count
830
831 =back
832
833 Performs an SQL C<COUNT> with the same query as the resultset was built
834 with to find the number of elements. If passed arguments, does a search
835 on the resultset and counts the results of that.
836
837 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
838 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
839 not support C<DISTINCT> with multiple columns. If you are using such a
840 database, you should only use columns from the main table in your C<group_by>
841 clause.
842
843 =cut
844
845 sub count {
846   my $self = shift;
847   return $self->search(@_)->count if @_ and defined $_[0];
848   return scalar @{ $self->get_cache } if $self->get_cache;
849   my $count = $self->_count;
850   return 0 unless $count;
851
852   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
853   $count = $self->{attrs}{rows} if
854     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
855   return $count;
856 }
857
858 sub _count { # Separated out so pager can get the full count
859   my $self = shift;
860   my $select = { count => '*' };
861
862   my $attrs = { %{$self->_resolved_attrs} };
863   if (my $group_by = delete $attrs->{group_by}) {
864     delete $attrs->{having};
865     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
866     # todo: try CONCAT for multi-column pk
867     my @pk = $self->result_source->primary_columns;
868     if (@pk == 1) {
869       my $alias = $attrs->{alias};
870       foreach my $column (@distinct) {
871         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
872           @distinct = ($column);
873           last;
874         }
875       }
876     }
877
878     $select = { count => { distinct => \@distinct } };
879   }
880
881   $attrs->{select} = $select;
882   $attrs->{as} = [qw/count/];
883
884   # offset, order by and page are not needed to count. record_filter is cdbi
885   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
886
887   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
888   my ($count) = $tmp_rs->cursor->next;
889   return $count;
890 }
891
892 =head2 count_literal
893
894 =over 4
895
896 =item Arguments: $sql_fragment, @bind_values
897
898 =item Return Value: $count
899
900 =back
901
902 Counts the results in a literal query. Equivalent to calling L</search_literal>
903 with the passed arguments, then L</count>.
904
905 =cut
906
907 sub count_literal { shift->search_literal(@_)->count; }
908
909 =head2 all
910
911 =over 4
912
913 =item Arguments: none
914
915 =item Return Value: @objects
916
917 =back
918
919 Returns all elements in the resultset. Called implicitly if the resultset
920 is returned in list context.
921
922 =cut
923
924 sub all {
925   my ($self) = @_;
926   return @{ $self->get_cache } if $self->get_cache;
927
928   my @obj;
929
930   # TODO: don't call resolve here
931   if (keys %{$self->_resolved_attrs->{collapse}}) {
932 #  if ($self->{attrs}{prefetch}) {
933       # Using $self->cursor->all is really just an optimisation.
934       # If we're collapsing has_many prefetches it probably makes
935       # very little difference, and this is cleaner than hacking
936       # _construct_object to survive the approach
937     my @row = $self->cursor->next;
938     while (@row) {
939       push(@obj, $self->_construct_object(@row));
940       @row = (exists $self->{stashed_row}
941                ? @{delete $self->{stashed_row}}
942                : $self->cursor->next);
943     }
944   } else {
945     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
946   }
947
948   $self->set_cache(\@obj) if $self->{attrs}{cache};
949   return @obj;
950 }
951
952 =head2 reset
953
954 =over 4
955
956 =item Arguments: none
957
958 =item Return Value: $self
959
960 =back
961
962 Resets the resultset's cursor, so you can iterate through the elements again.
963
964 =cut
965
966 sub reset {
967   my ($self) = @_;
968   delete $self->{_attrs} if exists $self->{_attrs};
969   $self->{all_cache_position} = 0;
970   $self->cursor->reset;
971   return $self;
972 }
973
974 =head2 first
975
976 =over 4
977
978 =item Arguments: none
979
980 =item Return Value: $object?
981
982 =back
983
984 Resets the resultset and returns an object for the first result (if the
985 resultset returns anything).
986
987 =cut
988
989 sub first {
990   return $_[0]->reset->next;
991 }
992
993 # _cond_for_update_delete
994 #
995 # update/delete require the condition to be modified to handle
996 # the differing SQL syntax available.  This transforms the $self->{cond}
997 # appropriately, returning the new condition.
998
999 sub _cond_for_update_delete {
1000   my ($self, $full_cond) = @_;
1001   my $cond = {};
1002
1003   $full_cond ||= $self->{cond};
1004   # No-op. No condition, we're updating/deleting everything
1005   return $cond unless ref $full_cond;
1006
1007   if (ref $full_cond eq 'ARRAY') {
1008     $cond = [
1009       map {
1010         my %hash;
1011         foreach my $key (keys %{$_}) {
1012           $key =~ /([^.]+)$/;
1013           $hash{$1} = $_->{$key};
1014         }
1015         \%hash;
1016       } @{$full_cond}
1017     ];
1018   }
1019   elsif (ref $full_cond eq 'HASH') {
1020     if ((keys %{$full_cond})[0] eq '-and') {
1021       $cond->{-and} = [];
1022
1023       my @cond = @{$full_cond->{-and}};
1024       for (my $i = 0; $i < @cond; $i++) {
1025         my $entry = $cond[$i];
1026
1027         my $hash;
1028         if (ref $entry eq 'HASH') {
1029           $hash = $self->_cond_for_update_delete($entry);
1030         }
1031         else {
1032           $entry =~ /([^.]+)$/;
1033           $hash->{$1} = $cond[++$i];
1034         }
1035
1036         push @{$cond->{-and}}, $hash;
1037       }
1038     }
1039     else {
1040       foreach my $key (keys %{$full_cond}) {
1041         $key =~ /([^.]+)$/;
1042         $cond->{$1} = $full_cond->{$key};
1043       }
1044     }
1045   }
1046   else {
1047     $self->throw_exception(
1048       "Can't update/delete on resultset with condition unless hash or array"
1049     );
1050   }
1051
1052   return $cond;
1053 }
1054
1055
1056 =head2 update
1057
1058 =over 4
1059
1060 =item Arguments: \%values
1061
1062 =item Return Value: $storage_rv
1063
1064 =back
1065
1066 Sets the specified columns in the resultset to the supplied values in a
1067 single query. Return value will be true if the update succeeded or false
1068 if no records were updated; exact type of success value is storage-dependent.
1069
1070 =cut
1071
1072 sub update {
1073   my ($self, $values) = @_;
1074   $self->throw_exception("Values for update must be a hash")
1075     unless ref $values eq 'HASH';
1076
1077   my $cond = $self->_cond_for_update_delete;
1078
1079   return $self->result_source->storage->update(
1080     $self->result_source->from, $values, $cond
1081   );
1082 }
1083
1084 =head2 update_all
1085
1086 =over 4
1087
1088 =item Arguments: \%values
1089
1090 =item Return Value: 1
1091
1092 =back
1093
1094 Fetches all objects and updates them one at a time. Note that C<update_all>
1095 will run DBIC cascade triggers, while L</update> will not.
1096
1097 =cut
1098
1099 sub update_all {
1100   my ($self, $values) = @_;
1101   $self->throw_exception("Values for update must be a hash")
1102     unless ref $values eq 'HASH';
1103   foreach my $obj ($self->all) {
1104     $obj->set_columns($values)->update;
1105   }
1106   return 1;
1107 }
1108
1109 =head2 delete
1110
1111 =over 4
1112
1113 =item Arguments: none
1114
1115 =item Return Value: 1
1116
1117 =back
1118
1119 Deletes the contents of the resultset from its result source. Note that this
1120 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1121 to run. See also L<DBIx::Class::Row/delete>.
1122
1123 =cut
1124
1125 sub delete {
1126   my ($self) = @_;
1127
1128   my $cond = $self->_cond_for_update_delete;
1129
1130   $self->result_source->storage->delete($self->result_source->from, $cond);
1131   return 1;
1132 }
1133
1134 =head2 delete_all
1135
1136 =over 4
1137
1138 =item Arguments: none
1139
1140 =item Return Value: 1
1141
1142 =back
1143
1144 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1145 will run DBIC cascade triggers, while L</delete> will not.
1146
1147 =cut
1148
1149 sub delete_all {
1150   my ($self) = @_;
1151   $_->delete for $self->all;
1152   return 1;
1153 }
1154
1155 =head2 pager
1156
1157 =over 4
1158
1159 =item Arguments: none
1160
1161 =item Return Value: $pager
1162
1163 =back
1164
1165 Return Value a L<Data::Page> object for the current resultset. Only makes
1166 sense for queries with a C<page> attribute.
1167
1168 =cut
1169
1170 sub pager {
1171   my ($self) = @_;
1172   my $attrs = $self->{attrs};
1173   $self->throw_exception("Can't create pager for non-paged rs")
1174     unless $self->{attrs}{page};
1175   $attrs->{rows} ||= 10;
1176   return $self->{pager} ||= Data::Page->new(
1177     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1178 }
1179
1180 =head2 page
1181
1182 =over 4
1183
1184 =item Arguments: $page_number
1185
1186 =item Return Value: $rs
1187
1188 =back
1189
1190 Returns a resultset for the $page_number page of the resultset on which page
1191 is called, where each page contains a number of rows equal to the 'rows'
1192 attribute set on the resultset (10 by default).
1193
1194 =cut
1195
1196 sub page {
1197   my ($self, $page) = @_;
1198   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1199 }
1200
1201 =head2 new_result
1202
1203 =over 4
1204
1205 =item Arguments: \%vals
1206
1207 =item Return Value: $object
1208
1209 =back
1210
1211 Creates an object in the resultset's result class and returns it.
1212
1213 =cut
1214
1215 sub new_result {
1216   my ($self, $values) = @_;
1217   $self->throw_exception( "new_result needs a hash" )
1218     unless (ref $values eq 'HASH');
1219   $self->throw_exception(
1220     "Can't abstract implicit construct, condition not a hash"
1221   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1222
1223   my $alias = $self->{attrs}{alias};
1224   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1225   my %new = (
1226     %{ $self->_remove_alias($values, $alias) },
1227     %{ $self->_remove_alias($collapsed_cond, $alias) },
1228   );
1229
1230   my $obj = $self->result_class->new(\%new);
1231   $obj->result_source($self->result_source) if $obj->can('result_source');
1232   return $obj;
1233 }
1234
1235 # _collapse_cond
1236 #
1237 # Recursively collapse the condition.
1238
1239 sub _collapse_cond {
1240   my ($self, $cond, $collapsed) = @_;
1241
1242   $collapsed ||= {};
1243
1244   if (ref $cond eq 'ARRAY') {
1245     foreach my $subcond (@$cond) {
1246       next unless ref $subcond;  # -or
1247 #      warn "ARRAY: " . Dumper $subcond;
1248       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1249     }
1250   }
1251   elsif (ref $cond eq 'HASH') {
1252     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1253       foreach my $subcond (@{$cond->{-and}}) {
1254 #        warn "HASH: " . Dumper $subcond;
1255         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1256       }
1257     }
1258     else {
1259 #      warn "LEAF: " . Dumper $cond;
1260       foreach my $col (keys %$cond) {
1261         my $value = $cond->{$col};
1262         $collapsed->{$col} = $value;
1263       }
1264     }
1265   }
1266
1267   return $collapsed;
1268 }
1269
1270 # _remove_alias
1271 #
1272 # Remove the specified alias from the specified query hash. A copy is made so
1273 # the original query is not modified.
1274
1275 sub _remove_alias {
1276   my ($self, $query, $alias) = @_;
1277
1278   my %unaliased = %{ $query || {} };
1279   foreach my $key (keys %unaliased) {
1280     $unaliased{$1} = delete $unaliased{$key}
1281       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1282   }
1283
1284   return \%unaliased;
1285 }
1286
1287 =head2 find_or_new
1288
1289 =over 4
1290
1291 =item Arguments: \%vals, \%attrs?
1292
1293 =item Return Value: $object
1294
1295 =back
1296
1297 Find an existing record from this resultset. If none exists, instantiate a new
1298 result object and return it. The object will not be saved into your storage
1299 until you call L<DBIx::Class::Row/insert> on it.
1300
1301 If you want objects to be saved immediately, use L</find_or_create> instead.
1302
1303 =cut
1304
1305 sub find_or_new {
1306   my $self     = shift;
1307   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1308   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1309   my $exists   = $self->find($hash, $attrs);
1310   return defined $exists ? $exists : $self->new_result($hash);
1311 }
1312
1313 =head2 create
1314
1315 =over 4
1316
1317 =item Arguments: \%vals
1318
1319 =item Return Value: $object
1320
1321 =back
1322
1323 Inserts a record into the resultset and returns the object representing it.
1324
1325 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1326
1327 =cut
1328
1329 sub create {
1330   my ($self, $attrs) = @_;
1331   $self->throw_exception( "create needs a hashref" )
1332     unless ref $attrs eq 'HASH';
1333   return $self->new_result($attrs)->insert;
1334 }
1335
1336 =head2 find_or_create
1337
1338 =over 4
1339
1340 =item Arguments: \%vals, \%attrs?
1341
1342 =item Return Value: $object
1343
1344 =back
1345
1346   $class->find_or_create({ key => $val, ... });
1347
1348 Tries to find a record based on its primary key or unique constraint; if none
1349 is found, creates one and returns that instead.
1350
1351   my $cd = $schema->resultset('CD')->find_or_create({
1352     cdid   => 5,
1353     artist => 'Massive Attack',
1354     title  => 'Mezzanine',
1355     year   => 2005,
1356   });
1357
1358 Also takes an optional C<key> attribute, to search by a specific key or unique
1359 constraint. For example:
1360
1361   my $cd = $schema->resultset('CD')->find_or_create(
1362     {
1363       artist => 'Massive Attack',
1364       title  => 'Mezzanine',
1365     },
1366     { key => 'cd_artist_title' }
1367   );
1368
1369 See also L</find> and L</update_or_create>. For information on how to declare
1370 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1371
1372 =cut
1373
1374 sub find_or_create {
1375   my $self     = shift;
1376   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1377   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1378   my $exists   = $self->find($hash, $attrs);
1379   return defined $exists ? $exists : $self->create($hash);
1380 }
1381
1382 =head2 update_or_create
1383
1384 =over 4
1385
1386 =item Arguments: \%col_values, { key => $unique_constraint }?
1387
1388 =item Return Value: $object
1389
1390 =back
1391
1392   $class->update_or_create({ col => $val, ... });
1393
1394 First, searches for an existing row matching one of the unique constraints
1395 (including the primary key) on the source of this resultset. If a row is
1396 found, updates it with the other given column values. Otherwise, creates a new
1397 row.
1398
1399 Takes an optional C<key> attribute to search on a specific unique constraint.
1400 For example:
1401
1402   # In your application
1403   my $cd = $schema->resultset('CD')->update_or_create(
1404     {
1405       artist => 'Massive Attack',
1406       title  => 'Mezzanine',
1407       year   => 1998,
1408     },
1409     { key => 'cd_artist_title' }
1410   );
1411
1412 If no C<key> is specified, it searches on all unique constraints defined on the
1413 source, including the primary key.
1414
1415 If the C<key> is specified as C<primary>, it searches only on the primary key.
1416
1417 See also L</find> and L</find_or_create>. For information on how to declare
1418 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1419
1420 =cut
1421
1422 sub update_or_create {
1423   my $self = shift;
1424   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1425   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1426
1427   my $row = $self->find($cond, $attrs);
1428   if (defined $row) {
1429     $row->update($cond);
1430     return $row;
1431   }
1432
1433   return $self->create($cond);
1434 }
1435
1436 =head2 get_cache
1437
1438 =over 4
1439
1440 =item Arguments: none
1441
1442 =item Return Value: \@cache_objects?
1443
1444 =back
1445
1446 Gets the contents of the cache for the resultset, if the cache is set.
1447
1448 =cut
1449
1450 sub get_cache {
1451   shift->{all_cache};
1452 }
1453
1454 =head2 set_cache
1455
1456 =over 4
1457
1458 =item Arguments: \@cache_objects
1459
1460 =item Return Value: \@cache_objects
1461
1462 =back
1463
1464 Sets the contents of the cache for the resultset. Expects an arrayref
1465 of objects of the same class as those produced by the resultset. Note that
1466 if the cache is set the resultset will return the cached objects rather
1467 than re-querying the database even if the cache attr is not set.
1468
1469 =cut
1470
1471 sub set_cache {
1472   my ( $self, $data ) = @_;
1473   $self->throw_exception("set_cache requires an arrayref")
1474       if defined($data) && (ref $data ne 'ARRAY');
1475   $self->{all_cache} = $data;
1476 }
1477
1478 =head2 clear_cache
1479
1480 =over 4
1481
1482 =item Arguments: none
1483
1484 =item Return Value: []
1485
1486 =back
1487
1488 Clears the cache for the resultset.
1489
1490 =cut
1491
1492 sub clear_cache {
1493   shift->set_cache(undef);
1494 }
1495
1496 =head2 related_resultset
1497
1498 =over 4
1499
1500 =item Arguments: $relationship_name
1501
1502 =item Return Value: $resultset
1503
1504 =back
1505
1506 Returns a related resultset for the supplied relationship name.
1507
1508   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1509
1510 =cut
1511
1512 sub related_resultset {
1513   my ($self, $rel) = @_;
1514
1515   $self->{related_resultsets} ||= {};
1516   return $self->{related_resultsets}{$rel} ||= do {
1517     my $rel_obj = $self->result_source->relationship_info($rel);
1518
1519     $self->throw_exception(
1520       "search_related: result source '" . $self->result_source->name .
1521         "' has no such relationship $rel")
1522       unless $rel_obj;
1523     
1524     my ($from,$seen) = $self->_resolve_from($rel);
1525
1526     my $join_count = $seen->{$rel};
1527     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1528
1529     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1530       undef, {
1531         %{$self->{attrs}||{}},
1532         join => undef,
1533         prefetch => undef,
1534         select => undef,
1535         as => undef,
1536         alias => $alias,
1537         where => $self->{cond},
1538         seen_join => $seen,
1539         from => $from,
1540     });
1541   };
1542 }
1543
1544 sub _resolve_from {
1545   my ($self, $extra_join) = @_;
1546   my $source = $self->result_source;
1547   my $attrs = $self->{attrs};
1548   
1549   my $from = $attrs->{from}
1550     || [ { $attrs->{alias} => $source->from } ];
1551     
1552   my $seen = { %{$attrs->{seen_join}||{}} };
1553
1554   my $join = ($attrs->{join}
1555                ? [ $attrs->{join}, $extra_join ]
1556                : $extra_join);
1557   $from = [
1558     @$from,
1559     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1560   ];
1561
1562   return ($from,$seen);
1563 }
1564
1565 sub _resolved_attrs {
1566   my $self = shift;
1567   return $self->{_attrs} if $self->{_attrs};
1568
1569   my $attrs = { %{$self->{attrs}||{}} };
1570   my $source = $self->{result_source};
1571   my $alias = $attrs->{alias};
1572
1573   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1574   if ($attrs->{columns}) {
1575     delete $attrs->{as};
1576   } elsif (!$attrs->{select}) {
1577     $attrs->{columns} = [ $source->columns ];
1578   }
1579  
1580   $attrs->{select} = 
1581     ($attrs->{select}
1582       ? (ref $attrs->{select} eq 'ARRAY'
1583           ? [ @{$attrs->{select}} ]
1584           : [ $attrs->{select} ])
1585       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1586     );
1587   $attrs->{as} =
1588     ($attrs->{as}
1589       ? (ref $attrs->{as} eq 'ARRAY'
1590           ? [ @{$attrs->{as}} ]
1591           : [ $attrs->{as} ])
1592       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1593     );
1594   
1595   my $adds;
1596   if ($adds = delete $attrs->{include_columns}) {
1597     $adds = [$adds] unless ref $adds eq 'ARRAY';
1598     push(@{$attrs->{select}}, @$adds);
1599     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1600   }
1601   if ($adds = delete $attrs->{'+select'}) {
1602     $adds = [$adds] unless ref $adds eq 'ARRAY';
1603     push(@{$attrs->{select}},
1604            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1605   }
1606   if (my $adds = delete $attrs->{'+as'}) {
1607     $adds = [$adds] unless ref $adds eq 'ARRAY';
1608     push(@{$attrs->{as}}, @$adds);
1609   }
1610
1611   $attrs->{from} ||= [ { 'me' => $source->from } ];
1612
1613   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1614     my $join = delete $attrs->{join} || {};
1615
1616     if (defined $attrs->{prefetch}) {
1617       $join = $self->_merge_attr(
1618         $join, $attrs->{prefetch}
1619       );
1620     }
1621
1622     $attrs->{from} =   # have to copy here to avoid corrupting the original
1623       [
1624         @{$attrs->{from}}, 
1625         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1626       ];
1627   }
1628
1629   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1630   if ($attrs->{order_by}) {
1631     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1632                            ? [ @{$attrs->{order_by}} ]
1633                            : [ $attrs->{order_by} ]);
1634   } else {
1635     $attrs->{order_by} = [];    
1636   }
1637
1638   my $collapse = $attrs->{collapse} || {};
1639   if (my $prefetch = delete $attrs->{prefetch}) {
1640     $prefetch = $self->_merge_attr({}, $prefetch);
1641     my @pre_order;
1642     my $seen = $attrs->{seen_join} || {};
1643     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1644       # bring joins back to level of current class
1645       my @prefetch = $source->resolve_prefetch(
1646         $p, $alias, $seen, \@pre_order, $collapse
1647       );
1648       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1649       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1650     }
1651     push(@{$attrs->{order_by}}, @pre_order);
1652   }
1653   $attrs->{collapse} = $collapse;
1654
1655   return $self->{_attrs} = $attrs;
1656 }
1657
1658 sub _merge_attr {
1659   my ($self, $a, $b) = @_;
1660   return $b unless defined($a);
1661   return $a unless defined($b);
1662   
1663   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1664     foreach my $key (keys %{$b}) {
1665       if (exists $a->{$key}) {
1666         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1667       } else {
1668         $a->{$key} = $b->{$key};
1669       }
1670     }
1671     return $a;
1672   } else {
1673     $a = [$a] unless ref $a eq 'ARRAY';
1674     $b = [$b] unless ref $b eq 'ARRAY';
1675
1676     my $hash = {};
1677     my @array;
1678     foreach my $x ($a, $b) {
1679       foreach my $element (@{$x}) {
1680         if (ref $element eq 'HASH') {
1681           $hash = $self->_merge_attr($hash, $element);
1682         } elsif (ref $element eq 'ARRAY') {
1683           push(@array, @{$element});
1684         } else {
1685           push(@array, $element) unless $b == $x
1686             && grep { $_ eq $element } @array;
1687         }
1688       }
1689     }
1690     
1691     @array = grep { !exists $hash->{$_} } @array;
1692
1693     return keys %{$hash}
1694       ? ( scalar(@array)
1695             ? [$hash, @array]
1696             : $hash
1697         )
1698       : \@array;
1699   }
1700 }
1701
1702 =head2 throw_exception
1703
1704 See L<DBIx::Class::Schema/throw_exception> for details.
1705
1706 =cut
1707
1708 sub throw_exception {
1709   my $self=shift;
1710   $self->result_source->schema->throw_exception(@_);
1711 }
1712
1713 # XXX: FIXME: Attributes docs need clearing up
1714
1715 =head1 ATTRIBUTES
1716
1717 The resultset takes various attributes that modify its behavior. Here's an
1718 overview of them:
1719
1720 =head2 order_by
1721
1722 =over 4
1723
1724 =item Value: ($order_by | \@order_by)
1725
1726 =back
1727
1728 Which column(s) to order the results by. This is currently passed
1729 through directly to SQL, so you can give e.g. C<year DESC> for a
1730 descending order on the column `year'.
1731
1732 Please note that if you have quoting enabled (see
1733 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1734 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1735 so you will need to manually quote things as appropriate.)
1736
1737 =head2 columns
1738
1739 =over 4
1740
1741 =item Value: \@columns
1742
1743 =back
1744
1745 Shortcut to request a particular set of columns to be retrieved.  Adds
1746 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1747 from that, then auto-populates C<as> from C<select> as normal. (You may also
1748 use the C<cols> attribute, as in earlier versions of DBIC.)
1749
1750 =head2 include_columns
1751
1752 =over 4
1753
1754 =item Value: \@columns
1755
1756 =back
1757
1758 Shortcut to include additional columns in the returned results - for example
1759
1760   $schema->resultset('CD')->search(undef, {
1761     include_columns => ['artist.name'],
1762     join => ['artist']
1763   });
1764
1765 would return all CDs and include a 'name' column to the information
1766 passed to object inflation
1767
1768 =head2 select
1769
1770 =over 4
1771
1772 =item Value: \@select_columns
1773
1774 =back
1775
1776 Indicates which columns should be selected from the storage. You can use
1777 column names, or in the case of RDBMS back ends, function or stored procedure
1778 names:
1779
1780   $rs = $schema->resultset('Employee')->search(undef, {
1781     select => [
1782       'name',
1783       { count => 'employeeid' },
1784       { sum => 'salary' }
1785     ]
1786   });
1787
1788 When you use function/stored procedure names and do not supply an C<as>
1789 attribute, the column names returned are storage-dependent. E.g. MySQL would
1790 return a column named C<count(employeeid)> in the above example.
1791
1792 =head2 +select
1793
1794 =over 4
1795
1796 Indicates additional columns to be selected from storage.  Works the same as
1797 L<select> but adds columns to the selection.
1798
1799 =back
1800
1801 =head2 +as
1802
1803 =over 4
1804
1805 Indicates additional column names for those added via L<+select>.
1806
1807 =back
1808
1809 =head2 as
1810
1811 =over 4
1812
1813 =item Value: \@inflation_names
1814
1815 =back
1816
1817 Indicates column names for object inflation. This is used in conjunction with
1818 C<select>, usually when C<select> contains one or more function or stored
1819 procedure names:
1820
1821   $rs = $schema->resultset('Employee')->search(undef, {
1822     select => [
1823       'name',
1824       { count => 'employeeid' }
1825     ],
1826     as => ['name', 'employee_count'],
1827   });
1828
1829   my $employee = $rs->first(); # get the first Employee
1830
1831 If the object against which the search is performed already has an accessor
1832 matching a column name specified in C<as>, the value can be retrieved using
1833 the accessor as normal:
1834
1835   my $name = $employee->name();
1836
1837 If on the other hand an accessor does not exist in the object, you need to
1838 use C<get_column> instead:
1839
1840   my $employee_count = $employee->get_column('employee_count');
1841
1842 You can create your own accessors if required - see
1843 L<DBIx::Class::Manual::Cookbook> for details.
1844
1845 Please note: This will NOT insert an C<AS employee_count> into the SQL
1846 statement produced, it is used for internal access only. Thus
1847 attempting to use the accessor in an C<order_by> clause or similar
1848 will fail miserably.
1849
1850 To get around this limitation, you can supply literal SQL to your
1851 C<select> attibute that contains the C<AS alias> text, eg:
1852
1853   select => [\'myfield AS alias']
1854
1855 =head2 join
1856
1857 =over 4
1858
1859 =item Value: ($rel_name | \@rel_names | \%rel_names)
1860
1861 =back
1862
1863 Contains a list of relationships that should be joined for this query.  For
1864 example:
1865
1866   # Get CDs by Nine Inch Nails
1867   my $rs = $schema->resultset('CD')->search(
1868     { 'artist.name' => 'Nine Inch Nails' },
1869     { join => 'artist' }
1870   );
1871
1872 Can also contain a hash reference to refer to the other relation's relations.
1873 For example:
1874
1875   package MyApp::Schema::Track;
1876   use base qw/DBIx::Class/;
1877   __PACKAGE__->table('track');
1878   __PACKAGE__->add_columns(qw/trackid cd position title/);
1879   __PACKAGE__->set_primary_key('trackid');
1880   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1881   1;
1882
1883   # In your application
1884   my $rs = $schema->resultset('Artist')->search(
1885     { 'track.title' => 'Teardrop' },
1886     {
1887       join     => { cd => 'track' },
1888       order_by => 'artist.name',
1889     }
1890   );
1891
1892 You need to use the relationship (not the table) name in  conditions, 
1893 because they are aliased as such. The current table is aliased as "me", so 
1894 you need to use me.column_name in order to avoid ambiguity. For example:
1895
1896   # Get CDs from 1984 with a 'Foo' track 
1897   my $rs = $schema->resultset('CD')->search(
1898     { 
1899       'me.year' => 1984,
1900       'tracks.name' => 'Foo'
1901     },
1902     { join => 'tracks' }
1903   );
1904   
1905 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1906 similarly for a third time). For e.g.
1907
1908   my $rs = $schema->resultset('Artist')->search({
1909     'cds.title'   => 'Down to Earth',
1910     'cds_2.title' => 'Popular',
1911   }, {
1912     join => [ qw/cds cds/ ],
1913   });
1914
1915 will return a set of all artists that have both a cd with title 'Down
1916 to Earth' and a cd with title 'Popular'.
1917
1918 If you want to fetch related objects from other tables as well, see C<prefetch>
1919 below.
1920
1921 =head2 prefetch
1922
1923 =over 4
1924
1925 =item Value: ($rel_name | \@rel_names | \%rel_names)
1926
1927 =back
1928
1929 Contains one or more relationships that should be fetched along with the main
1930 query (when they are accessed afterwards they will have already been
1931 "prefetched").  This is useful for when you know you will need the related
1932 objects, because it saves at least one query:
1933
1934   my $rs = $schema->resultset('Tag')->search(
1935     undef,
1936     {
1937       prefetch => {
1938         cd => 'artist'
1939       }
1940     }
1941   );
1942
1943 The initial search results in SQL like the following:
1944
1945   SELECT tag.*, cd.*, artist.* FROM tag
1946   JOIN cd ON tag.cd = cd.cdid
1947   JOIN artist ON cd.artist = artist.artistid
1948
1949 L<DBIx::Class> has no need to go back to the database when we access the
1950 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1951 case.
1952
1953 Simple prefetches will be joined automatically, so there is no need
1954 for a C<join> attribute in the above search. If you're prefetching to
1955 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1956 specify the join as well.
1957
1958 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1959 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1960 with an accessor type of 'single' or 'filter').
1961
1962 =head2 page
1963
1964 =over 4
1965
1966 =item Value: $page
1967
1968 =back
1969
1970 Makes the resultset paged and specifies the page to retrieve. Effectively
1971 identical to creating a non-pages resultset and then calling ->page($page)
1972 on it.
1973
1974 If L<rows> attribute is not specified it defualts to 10 rows per page.
1975
1976 =head2 rows
1977
1978 =over 4
1979
1980 =item Value: $rows
1981
1982 =back
1983
1984 Specifes the maximum number of rows for direct retrieval or the number of
1985 rows per page if the page attribute or method is used.
1986
1987 =head2 offset
1988
1989 =over 4
1990
1991 =item Value: $offset
1992
1993 =back
1994
1995 Specifies the (zero-based) row number for the  first row to be returned, or the
1996 of the first row of the first page if paging is used.
1997
1998 =head2 group_by
1999
2000 =over 4
2001
2002 =item Value: \@columns
2003
2004 =back
2005
2006 A arrayref of columns to group by. Can include columns of joined tables.
2007
2008   group_by => [qw/ column1 column2 ... /]
2009
2010 =head2 having
2011
2012 =over 4
2013
2014 =item Value: $condition
2015
2016 =back
2017
2018 HAVING is a select statement attribute that is applied between GROUP BY and
2019 ORDER BY. It is applied to the after the grouping calculations have been
2020 done.
2021
2022   having => { 'count(employee)' => { '>=', 100 } }
2023
2024 =head2 distinct
2025
2026 =over 4
2027
2028 =item Value: (0 | 1)
2029
2030 =back
2031
2032 Set to 1 to group by all columns.
2033
2034 =head2 where
2035
2036 =over 4
2037
2038 Adds to the WHERE clause.
2039
2040   # only return rows WHERE deleted IS NULL for all searches
2041   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2042
2043 Can be overridden by passing C<{ where => undef }> as an attribute
2044 to a resulset.
2045
2046 =back
2047
2048 =head2 cache
2049
2050 Set to 1 to cache search results. This prevents extra SQL queries if you
2051 revisit rows in your ResultSet:
2052
2053   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2054
2055   while( my $artist = $resultset->next ) {
2056     ... do stuff ...
2057   }
2058
2059   $rs->first; # without cache, this would issue a query
2060
2061 By default, searches are not cached.
2062
2063 For more examples of using these attributes, see
2064 L<DBIx::Class::Manual::Cookbook>.
2065
2066 =head2 from
2067
2068 =over 4
2069
2070 =item Value: \@from_clause
2071
2072 =back
2073
2074 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2075 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2076 clauses.
2077
2078 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2079
2080 C<join> will usually do what you need and it is strongly recommended that you
2081 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2082 And we really do mean "cannot", not just tried and failed. Attempting to use
2083 this because you're having problems with C<join> is like trying to use x86
2084 ASM because you've got a syntax error in your C. Trust us on this.
2085
2086 Now, if you're still really, really sure you need to use this (and if you're
2087 not 100% sure, ask the mailing list first), here's an explanation of how this
2088 works.
2089
2090 The syntax is as follows -
2091
2092   [
2093     { <alias1> => <table1> },
2094     [
2095       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2096       [], # nested JOIN (optional)
2097       { <table1.column1> => <table2.column2>, ... (more conditions) },
2098     ],
2099     # More of the above [ ] may follow for additional joins
2100   ]
2101
2102   <table1> <alias1>
2103   JOIN
2104     <table2> <alias2>
2105     [JOIN ...]
2106   ON <table1.column1> = <table2.column2>
2107   <more joins may follow>
2108
2109 An easy way to follow the examples below is to remember the following:
2110
2111     Anything inside "[]" is a JOIN
2112     Anything inside "{}" is a condition for the enclosing JOIN
2113
2114 The following examples utilize a "person" table in a family tree application.
2115 In order to express parent->child relationships, this table is self-joined:
2116
2117     # Person->belongs_to('father' => 'Person');
2118     # Person->belongs_to('mother' => 'Person');
2119
2120 C<from> can be used to nest joins. Here we return all children with a father,
2121 then search against all mothers of those children:
2122
2123   $rs = $schema->resultset('Person')->search(
2124       undef,
2125       {
2126           alias => 'mother', # alias columns in accordance with "from"
2127           from => [
2128               { mother => 'person' },
2129               [
2130                   [
2131                       { child => 'person' },
2132                       [
2133                           { father => 'person' },
2134                           { 'father.person_id' => 'child.father_id' }
2135                       ]
2136                   ],
2137                   { 'mother.person_id' => 'child.mother_id' }
2138               ],
2139           ]
2140       },
2141   );
2142
2143   # Equivalent SQL:
2144   # SELECT mother.* FROM person mother
2145   # JOIN (
2146   #   person child
2147   #   JOIN person father
2148   #   ON ( father.person_id = child.father_id )
2149   # )
2150   # ON ( mother.person_id = child.mother_id )
2151
2152 The type of any join can be controlled manually. To search against only people
2153 with a father in the person table, we could explicitly use C<INNER JOIN>:
2154
2155     $rs = $schema->resultset('Person')->search(
2156         undef,
2157         {
2158             alias => 'child', # alias columns in accordance with "from"
2159             from => [
2160                 { child => 'person' },
2161                 [
2162                     { father => 'person', -join_type => 'inner' },
2163                     { 'father.id' => 'child.father_id' }
2164                 ],
2165             ]
2166         },
2167     );
2168
2169     # Equivalent SQL:
2170     # SELECT child.* FROM person child
2171     # INNER JOIN person father ON child.father_id = father.id
2172
2173 =cut
2174
2175 1;