various additional ResultSet cleanups
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Scalar::Util qw/weaken/;
13 use DBIx::Class::ResultSetColumn;
14 use base qw/DBIx::Class/;
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97   $attrs->{_orig_alias} ||= $attrs->{alias};
98
99   bless {
100     result_source => $source,
101     result_class => $attrs->{result_class} || $source->result_class,
102     cond => $attrs->{where},
103 #    from => $attrs->{from},
104 #    collapse => $collapse,
105     count => undef,
106     pager => undef,
107     attrs => $attrs
108   }, $class;
109 }
110
111 =head2 search
112
113 =over 4
114
115 =item Arguments: $cond, \%attrs?
116
117 =item Return Value: $resultset (scalar context), @row_objs (list context)
118
119 =back
120
121   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
122   my $new_rs = $cd_rs->search({ year => 2005 });
123
124   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
125                  # year = 2005 OR year = 2004
126
127 If you need to pass in additional attributes but no additional condition,
128 call it as C<search(undef, \%attrs)>.
129
130   # "SELECT name, artistid FROM $artist_table"
131   my @all_artists = $schema->resultset('Artist')->search(undef, {
132     columns => [qw/name artistid/],
133   });
134
135 =cut
136
137 sub search {
138   my $self = shift;
139   my $rs = $self->search_rs( @_ );
140   return (wantarray ? $rs->all : $rs);
141 }
142
143 =head2 search_rs
144
145 =over 4
146
147 =item Arguments: $cond, \%attrs?
148
149 =item Return Value: $resultset
150
151 =back
152
153 This method does the same exact thing as search() except it will
154 always return a resultset, even in list context.
155
156 =cut
157
158 sub search_rs {
159   my $self = shift;
160
161   my $attrs = {};
162   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
163   my $our_attrs = exists $attrs->{_parent_attrs}
164     ? { %{delete $attrs->{_parent_attrs}} }
165     : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168   # XXX should only maintain _live_join_stack and generate _live_join_h from that
169   if ($attrs->{_live_join_stack}) {
170     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
171       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
172         ? { $join => $attrs->{_live_join_h} }
173         : $join;
174     }
175   }
176
177   # merge new attrs into inherited
178   foreach my $key (qw/join prefetch/) {
179     next unless exists $attrs->{$key};
180     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
181       foreach my $join (reverse @{$live_join}) {
182         $attrs->{$key} = { $join => $attrs->{$key} };
183       }
184     }
185
186     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
187   }
188
189   $our_attrs->{join} = $self->_merge_attr(
190     $our_attrs->{join}, $attrs->{_live_join_h}
191   ) if ($attrs->{_live_join_h});
192
193   if (defined $our_attrs->{prefetch}) {
194     $our_attrs->{join} = $self->_merge_attr(
195       $our_attrs->{join}, $our_attrs->{prefetch}
196     );
197   }
198
199   my $new_attrs = { %{$our_attrs}, %{$attrs} };
200   my $where = (@_
201     ? (
202         (@_ == 1 || ref $_[0] eq "HASH")
203           ? shift
204           : (
205               (@_ % 2)
206                 ? $self->throw_exception("Odd number of arguments to search")
207                 : {@_}
208              )
209       )
210     : undef
211   );
212
213   if (defined $where) {
214     $new_attrs->{where} = (
215       defined $new_attrs->{where}
216         ? { '-and' => [
217               map {
218                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
219               } $where, $new_attrs->{where}
220             ]
221           }
222         : $where);
223   }
224
225   if (defined $having) {
226     $new_attrs->{having} = (
227       defined $new_attrs->{having}
228         ? { '-and' => [
229               map {
230                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
231               } $having, $new_attrs->{having}
232             ]
233           }
234         : $having);
235   }
236
237   my $rs = (ref $self)->new($self->result_source, $new_attrs);
238   $rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
239
240   unless (@_) {                 # no search, effectively just a clone
241     my $rows = $self->get_cache;
242     if ($rows) {
243       $rs->set_cache($rows);
244     }
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 See also L</find_or_create> and L</update_or_create>. For information on how to
312 declare unique constraints, see
313 L<DBIx::Class::ResultSource/add_unique_constraint>.
314
315 =cut
316
317 sub find {
318   my $self = shift;
319   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
320
321   # Default to the primary key, but allow a specific key
322   my @cols = exists $attrs->{key}
323     ? $self->result_source->unique_constraint_columns($attrs->{key})
324     : $self->result_source->primary_columns;
325   $self->throw_exception(
326     "Can't find unless a primary key or unique constraint is defined"
327   ) unless @cols;
328
329   # Parse out a hashref from input
330   my $input_query;
331   if (ref $_[0] eq 'HASH') {
332     $input_query = { %{$_[0]} };
333   }
334   elsif (@_ == @cols) {
335     $input_query = {};
336     @{$input_query}{@cols} = @_;
337   }
338   else {
339     # Compatibility: Allow e.g. find(id => $value)
340     carp "Find by key => value deprecated; please use a hashref instead";
341     $input_query = {@_};
342   }
343
344   my @unique_queries = $self->_unique_queries($input_query, $attrs);
345
346   # Handle cases where the ResultSet defines the query, or where the user is
347   # abusing find
348   my $query = @unique_queries ? \@unique_queries : $input_query;
349
350   # Run the query
351   if (keys %$attrs) {
352     my $rs = $self->search($query, $attrs);
353     $rs->_resolve_attr;
354     return keys %{$rs->{_attrs}{collapse}} ? $rs->next : $rs->single;
355   }
356   else {
357     $self->_resolve_attr;
358     return keys %{$self->{_attrs}{collapse}}
359       ? $self->search($query)->next
360       : $self->single($query);
361   }
362 }
363
364 # _unique_queries
365 #
366 # Build a list of queries which satisfy unique constraints.
367
368 sub _unique_queries {
369   my ($self, $query, $attrs) = @_;
370
371   my @constraint_names = exists $attrs->{key}
372     ? ($attrs->{key})
373     : $self->result_source->unique_constraint_names;
374
375   my @unique_queries;
376   foreach my $name (@constraint_names) {
377     my @unique_cols = $self->result_source->unique_constraint_columns($name);
378     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
379
380     next unless scalar keys %$unique_query;
381
382     # Add the ResultSet's alias
383     my $alias = $self->{attrs}{alias};
384     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
385       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
386     }
387
388     push @unique_queries, $unique_query;
389   }
390
391   return @unique_queries;
392 }
393
394 # _build_unique_query
395 #
396 # Constrain the specified query hash based on the specified column names.
397
398 sub _build_unique_query {
399   my ($self, $query, $unique_cols) = @_;
400
401   return {
402     map  { $_ => $query->{$_} }
403     grep { exists $query->{$_} }
404       @$unique_cols
405   };
406 }
407
408 =head2 search_related
409
410 =over 4
411
412 =item Arguments: $rel, $cond, \%attrs?
413
414 =item Return Value: $new_resultset
415
416 =back
417
418   $new_rs = $cd_rs->search_related('artist', {
419     name => 'Emo-R-Us',
420   });
421
422 Searches the specified relationship, optionally specifying a condition and
423 attributes for matching records. See L</ATTRIBUTES> for more information.
424
425 =cut
426
427 sub search_related {
428   return shift->related_resultset(shift)->search(@_);
429 }
430
431 =head2 cursor
432
433 =over 4
434
435 =item Arguments: none
436
437 =item Return Value: $cursor
438
439 =back
440
441 Returns a storage-driven cursor to the given resultset. See
442 L<DBIx::Class::Cursor> for more information.
443
444 =cut
445
446 sub cursor {
447   my ($self) = @_;
448
449   $self->_resolve_attr;
450   my $attrs = { %{$self->{_attrs}} };
451   return $self->{cursor}
452     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
453           $attrs->{where},$attrs);
454 }
455
456 =head2 single
457
458 =over 4
459
460 =item Arguments: $cond?
461
462 =item Return Value: $row_object?
463
464 =back
465
466   my $cd = $schema->resultset('CD')->single({ year => 2001 });
467
468 Inflates the first result without creating a cursor if the resultset has
469 any records in it; if not returns nothing. Used by L</find> as an optimisation.
470
471 Can optionally take an additional condition *only* - this is a fast-code-path
472 method; if you need to add extra joins or similar call ->search and then
473 ->single without a condition on the $rs returned from that.
474
475 =cut
476
477 sub single {
478   my ($self, $where) = @_;
479   $self->_resolve_attr;
480   my $attrs = { %{$self->{_attrs}} };
481   if ($where) {
482     if (defined $attrs->{where}) {
483       $attrs->{where} = {
484         '-and' =>
485             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
486                $where, delete $attrs->{where} ]
487       };
488     } else {
489       $attrs->{where} = $where;
490     }
491   }
492
493   unless ($self->_is_unique_query($attrs->{where})) {
494     carp "Query not guaranteed to return a single row"
495       . "; please declare your unique constraints or use search instead";
496   }
497
498   my @data = $self->result_source->storage->select_single(
499     $attrs->{from}, $attrs->{select},
500     $attrs->{where}, $attrs
501   );
502
503   return (@data ? $self->_construct_object(@data) : ());
504 }
505
506 # _is_unique_query
507 #
508 # Try to determine if the specified query is guaranteed to be unique, based on
509 # the declared unique constraints.
510
511 sub _is_unique_query {
512   my ($self, $query) = @_;
513
514   my $collapsed = $self->_collapse_query($query);
515   my $alias = $self->{attrs}{alias};
516
517   foreach my $name ($self->result_source->unique_constraint_names) {
518     my @unique_cols = map {
519       "$alias.$_"
520     } $self->result_source->unique_constraint_columns($name);
521
522     # Count the values for each unique column
523     my %seen = map { $_ => 0 } @unique_cols;
524
525     foreach my $key (keys %$collapsed) {
526       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
527       next unless exists $seen{$aliased};  # Additional constraints are okay
528       $seen{$aliased} = scalar @{ $collapsed->{$key} };
529     }
530
531     # If we get 0 or more than 1 value for a column, it's not necessarily unique
532     return 1 unless grep { $_ != 1 } values %seen;
533   }
534
535   return 0;
536 }
537
538 # _collapse_query
539 #
540 # Recursively collapse the query, accumulating values for each column.
541
542 sub _collapse_query {
543   my ($self, $query, $collapsed) = @_;
544
545   $collapsed ||= {};
546
547   if (ref $query eq 'ARRAY') {
548     foreach my $subquery (@$query) {
549       next unless ref $subquery;  # -or
550 #      warn "ARRAY: " . Dumper $subquery;
551       $collapsed = $self->_collapse_query($subquery, $collapsed);
552     }
553   }
554   elsif (ref $query eq 'HASH') {
555     if (keys %$query and (keys %$query)[0] eq '-and') {
556       foreach my $subquery (@{$query->{-and}}) {
557 #        warn "HASH: " . Dumper $subquery;
558         $collapsed = $self->_collapse_query($subquery, $collapsed);
559       }
560     }
561     else {
562 #      warn "LEAF: " . Dumper $query;
563       foreach my $key (keys %$query) {
564         push @{$collapsed->{$key}}, $query->{$key};
565       }
566     }
567   }
568
569   return $collapsed;
570 }
571
572 =head2 get_column
573
574 =over 4
575
576 =item Arguments: $cond?
577
578 =item Return Value: $resultsetcolumn
579
580 =back
581
582   my $max_length = $rs->get_column('length')->max;
583
584 Returns a ResultSetColumn instance for $column based on $self
585
586 =cut
587
588 sub get_column {
589   my ($self, $column) = @_;
590   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
591   return $new;
592 }
593
594 =head2 search_like
595
596 =over 4
597
598 =item Arguments: $cond, \%attrs?
599
600 =item Return Value: $resultset (scalar context), @row_objs (list context)
601
602 =back
603
604   # WHERE title LIKE '%blue%'
605   $cd_rs = $rs->search_like({ title => '%blue%'});
606
607 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
608 that this is simply a convenience method. You most likely want to use
609 L</search> with specific operators.
610
611 For more information, see L<DBIx::Class::Manual::Cookbook>.
612
613 =cut
614
615 sub search_like {
616   my $class = shift;
617   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
618   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
619   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
620   return $class->search($query, { %$attrs });
621 }
622
623 =head2 slice
624
625 =over 4
626
627 =item Arguments: $first, $last
628
629 =item Return Value: $resultset (scalar context), @row_objs (list context)
630
631 =back
632
633 Returns a resultset or object list representing a subset of elements from the
634 resultset slice is called on. Indexes are from 0, i.e., to get the first
635 three records, call:
636
637   my ($one, $two, $three) = $rs->slice(0, 2);
638
639 =cut
640
641 sub slice {
642   my ($self, $min, $max) = @_;
643   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
644   $attrs->{offset} = $self->{attrs}{offset} || 0;
645   $attrs->{offset} += $min;
646   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
647   return $self->search(undef(), $attrs);
648   #my $slice = (ref $self)->new($self->result_source, $attrs);
649   #return (wantarray ? $slice->all : $slice);
650 }
651
652 =head2 next
653
654 =over 4
655
656 =item Arguments: none
657
658 =item Return Value: $result?
659
660 =back
661
662 Returns the next element in the resultset (C<undef> is there is none).
663
664 Can be used to efficiently iterate over records in the resultset:
665
666   my $rs = $schema->resultset('CD')->search;
667   while (my $cd = $rs->next) {
668     print $cd->title;
669   }
670
671 Note that you need to store the resultset object, and call C<next> on it.
672 Calling C<< resultset('Table')->next >> repeatedly will always return the
673 first record from the resultset.
674
675 =cut
676
677 sub next {
678   my ($self) = @_;
679   if (my $cache = $self->get_cache) {
680     $self->{all_cache_position} ||= 0;
681     return $cache->[$self->{all_cache_position}++];
682   }
683   if ($self->{attrs}{cache}) {
684     $self->{all_cache_position} = 1;
685     return ($self->all)[0];
686   }
687   my @row = (
688     exists $self->{stashed_row}
689       ? @{delete $self->{stashed_row}}
690       : $self->cursor->next
691   );
692   return unless (@row);
693   return $self->_construct_object(@row);
694 }
695
696 sub _resolve_attr {
697   my $self = shift;
698   return if exists $self->{_attrs}; #return if _resolve_attr has already been called
699
700   my $attrs = $self->{attrs};
701   my $source = $self->{_parent_rs} || $self->{result_source};
702   my $alias = $attrs->{_orig_alias};
703
704   # XXX - lose storable dclone
705   my $record_filter = delete $attrs->{record_filter};
706   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
707   $attrs->{record_filter} = $record_filter if $record_filter;
708
709   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
710   if ($attrs->{columns}) {
711     delete $attrs->{as};
712   } elsif (!$attrs->{select}) {
713     $attrs->{columns} = [ $self->{result_source}->columns ];
714   }
715   
716   my $select_alias = $self->{attrs}{alias};
717   $attrs->{select} ||= [
718     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
719   ];
720   $attrs->{as} ||= [
721     map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
722   ];
723   
724   my $adds;
725   if ($adds = delete $attrs->{include_columns}) {
726     $adds = [$adds] unless ref $adds eq 'ARRAY';
727     push(@{$attrs->{select}}, @$adds);
728     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
729   }
730   if ($adds = delete $attrs->{'+select'}) {
731     $adds = [$adds] unless ref $adds eq 'ARRAY';
732     push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
733   }
734   if (my $adds = delete $attrs->{'+as'}) {
735     $adds = [$adds] unless ref $adds eq 'ARRAY';
736     push(@{$attrs->{as}}, @$adds);
737   }
738
739   $attrs->{from} ||= [ { $alias => $source->from } ];
740   $attrs->{seen_join} ||= {};
741   my %seen;
742   if (my $join = delete $attrs->{join}) {
743     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
744       if (ref $j eq 'HASH') {
745         $seen{$_} = 1 foreach keys %$j;
746       } else {
747         $seen{$j} = 1;
748       }
749     }
750     push(@{$attrs->{from}},
751       $source->resolve_join($join, $alias, $attrs->{seen_join})
752     );
753   }
754
755   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
756   if ($attrs->{order_by}) {
757     $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
758   } else {
759     $attrs->{order_by} ||= [];    
760   }
761
762   my $collapse = $attrs->{collapse} || {};
763   if (my $prefetch = delete $attrs->{prefetch}) {
764     my @pre_order;
765     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
766       if ( ref $p eq 'HASH' ) {
767         foreach my $key (keys %$p) {
768           push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
769             unless $seen{$key};
770         }
771       } else {
772         push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
773           unless $seen{$p};
774       }
775       # bring joins back to level of current class
776       $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
777       if ($p) {
778         my @prefetch = $self->result_source->resolve_prefetch(
779           $p, $alias, {}, \@pre_order, $collapse
780         );
781         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
782         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
783       }
784     }
785     push(@{$attrs->{order_by}}, @pre_order);
786   }
787   $attrs->{collapse} = $collapse;
788   $self->{_attrs} = $attrs;
789 }
790
791 sub _merge_attr {
792   my ($self, $a, $b) = @_;
793   return $b unless $a;
794   
795   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
796     foreach my $key (keys %{$b}) {
797       if (exists $a->{$key}) {
798         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
799       } else {
800         $a->{$key} = $b->{$key};
801       }
802     }
803     return $a;
804   } else {
805     $a = [$a] unless ref $a eq 'ARRAY';
806     $b = [$b] unless ref $b eq 'ARRAY';
807
808     my $hash = {};
809     my @array;
810     foreach my $x ($a, $b) {
811       foreach my $element (@{$x}) {
812         if (ref $element eq 'HASH') {
813           $hash = $self->_merge_attr($hash, $element);
814         } elsif (ref $element eq 'ARRAY') {
815           push(@array, @{$element});
816         } else {
817           push(@array, $element) unless $b == $x
818             && grep { $_ eq $element } @array;
819         }
820       }
821     }
822     
823     @array = grep { !exists $hash->{$_} } @array;
824
825     return keys %{$hash}
826       ? ( scalar(@array)
827             ? [$hash, @array]
828             : $hash
829         )
830       : \@array;
831   }
832 }
833
834 # bring the joins (which are from the original class) to the level
835 # of the current class so that we can resolve them properly
836 sub _reduce_joins {
837   my ($self, $p, $attrs) = @_;
838
839   STACK:
840   foreach my $join (@{$attrs->{_live_join_stack}}) {
841     if (ref $p eq 'HASH') {
842       return undef unless exists $p->{$join};
843       $p = $p->{$join};
844     } elsif (ref $p eq 'ARRAY') {
845       foreach my $pe (@{$p}) {
846         return undef if $pe eq $join;
847         if (ref $pe eq 'HASH' && exists $pe->{$join}) {
848           $p = $pe->{$join};
849           next STACK;
850         }
851       }
852       return undef;
853     } else {
854       return undef;
855     }
856   }
857   return $p;
858 }
859
860 sub _construct_object {
861   my ($self, @row) = @_;
862   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
863   my $new = $self->result_class->inflate_result($self->result_source, @$info);
864   $new = $self->{_attrs}{record_filter}->($new)
865     if exists $self->{_attrs}{record_filter};
866   return $new;
867 }
868
869 sub _collapse_result {
870   my ($self, $as, $row, $prefix) = @_;
871
872   my %const;
873   my @copy = @$row;
874   
875   foreach my $this_as (@$as) {
876     my $val = shift @copy;
877     if (defined $prefix) {
878       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
879         my $remain = $1;
880         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
881         $const{$1||''}{$2} = $val;
882       }
883     } else {
884       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
885       $const{$1||''}{$2} = $val;
886     }
887   }
888
889   my $alias = $self->{attrs}{alias};
890   my $info = [ {}, {} ];
891   foreach my $key (keys %const) {
892     if (length $key && $key ne $alias) {
893       my $target = $info;
894       my @parts = split(/\./, $key);
895       foreach my $p (@parts) {
896         $target = $target->[1]->{$p} ||= [];
897       }
898       $target->[0] = $const{$key};
899     } else {
900       $info->[0] = $const{$key};
901     }
902   }
903   
904   my @collapse;
905   if (defined $prefix) {
906     @collapse = map {
907         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
908     } keys %{$self->{_attrs}{collapse}}
909   } else {
910     @collapse = keys %{$self->{_attrs}{collapse}};
911   };
912
913   if (@collapse) {
914     my ($c) = sort { length $a <=> length $b } @collapse;
915     my $target = $info;
916     foreach my $p (split(/\./, $c)) {
917       $target = $target->[1]->{$p} ||= [];
918     }
919     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
920     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
921     my $tree = $self->_collapse_result($as, $row, $c_prefix);
922     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
923     my (@final, @raw);
924
925     while (
926       !(
927         grep {
928           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
929         } @co_key
930         )
931     ) {
932       push(@final, $tree);
933       last unless (@raw = $self->cursor->next);
934       $row = $self->{stashed_row} = \@raw;
935       $tree = $self->_collapse_result($as, $row, $c_prefix);
936     }
937     @$target = (@final ? @final : [ {}, {} ]);
938       # single empty result to indicate an empty prefetched has_many
939   }
940
941   #print "final info: " . Dumper($info);
942   return $info;
943 }
944
945 =head2 result_source
946
947 =over 4
948
949 =item Arguments: $result_source?
950
951 =item Return Value: $result_source
952
953 =back
954
955 An accessor for the primary ResultSource object from which this ResultSet
956 is derived.
957
958 =cut
959
960
961 =head2 count
962
963 =over 4
964
965 =item Arguments: $cond, \%attrs??
966
967 =item Return Value: $count
968
969 =back
970
971 Performs an SQL C<COUNT> with the same query as the resultset was built
972 with to find the number of elements. If passed arguments, does a search
973 on the resultset and counts the results of that.
974
975 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
976 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
977 not support C<DISTINCT> with multiple columns. If you are using such a
978 database, you should only use columns from the main table in your C<group_by>
979 clause.
980
981 =cut
982
983 sub count {
984   my $self = shift;
985   return $self->search(@_)->count if @_ and defined $_[0];
986   return scalar @{ $self->get_cache } if $self->get_cache;
987   my $count = $self->_count;
988   return 0 unless $count;
989
990   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
991   $count = $self->{attrs}{rows} if
992     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
993   return $count;
994 }
995
996 sub _count { # Separated out so pager can get the full count
997   my $self = shift;
998   my $select = { count => '*' };
999
1000   $self->_resolve_attr;
1001   my $attrs = { %{ $self->{_attrs} } };
1002   if (my $group_by = delete $attrs->{group_by}) {
1003     delete $attrs->{having};
1004     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1005     # todo: try CONCAT for multi-column pk
1006     my @pk = $self->result_source->primary_columns;
1007     if (@pk == 1) {
1008       my $alias = $attrs->{_orig_alias};
1009       foreach my $column (@distinct) {
1010         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1011           @distinct = ($column);
1012           last;
1013         }
1014       }
1015     }
1016
1017     $select = { count => { distinct => \@distinct } };
1018   }
1019
1020   $attrs->{select} = $select;
1021   $attrs->{as} = [qw/count/];
1022
1023   # offset, order by and page are not needed to count. record_filter is cdbi
1024   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1025   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1026   $tmp_rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
1027        #XXX - hack to pass through parent of related resultsets
1028
1029   my ($count) = $tmp_rs->cursor->next;
1030   return $count;
1031 }
1032
1033 =head2 count_literal
1034
1035 =over 4
1036
1037 =item Arguments: $sql_fragment, @bind_values
1038
1039 =item Return Value: $count
1040
1041 =back
1042
1043 Counts the results in a literal query. Equivalent to calling L</search_literal>
1044 with the passed arguments, then L</count>.
1045
1046 =cut
1047
1048 sub count_literal { shift->search_literal(@_)->count; }
1049
1050 =head2 all
1051
1052 =over 4
1053
1054 =item Arguments: none
1055
1056 =item Return Value: @objects
1057
1058 =back
1059
1060 Returns all elements in the resultset. Called implicitly if the resultset
1061 is returned in list context.
1062
1063 =cut
1064
1065 sub all {
1066   my ($self) = @_;
1067   return @{ $self->get_cache } if $self->get_cache;
1068
1069   my @obj;
1070
1071   # TODO: don't call resolve here
1072   $self->_resolve_attr;
1073   if (keys %{$self->{_attrs}{collapse}}) {
1074 #  if ($self->{attrs}{prefetch}) {
1075       # Using $self->cursor->all is really just an optimisation.
1076       # If we're collapsing has_many prefetches it probably makes
1077       # very little difference, and this is cleaner than hacking
1078       # _construct_object to survive the approach
1079     my @row = $self->cursor->next;
1080     while (@row) {
1081       push(@obj, $self->_construct_object(@row));
1082       @row = (exists $self->{stashed_row}
1083                ? @{delete $self->{stashed_row}}
1084                : $self->cursor->next);
1085     }
1086   } else {
1087     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1088   }
1089
1090   $self->set_cache(\@obj) if $self->{attrs}{cache};
1091   return @obj;
1092 }
1093
1094 =head2 reset
1095
1096 =over 4
1097
1098 =item Arguments: none
1099
1100 =item Return Value: $self
1101
1102 =back
1103
1104 Resets the resultset's cursor, so you can iterate through the elements again.
1105
1106 =cut
1107
1108 sub reset {
1109   my ($self) = @_;
1110   delete $self->{_attrs} if exists $self->{_attrs};
1111   $self->{all_cache_position} = 0;
1112   $self->cursor->reset;
1113   return $self;
1114 }
1115
1116 =head2 first
1117
1118 =over 4
1119
1120 =item Arguments: none
1121
1122 =item Return Value: $object?
1123
1124 =back
1125
1126 Resets the resultset and returns an object for the first result (if the
1127 resultset returns anything).
1128
1129 =cut
1130
1131 sub first {
1132   return $_[0]->reset->next;
1133 }
1134
1135 # _cond_for_update_delete
1136 #
1137 # update/delete require the condition to be modified to handle
1138 # the differing SQL syntax available.  This transforms the $self->{cond}
1139 # appropriately, returning the new condition.
1140
1141 sub _cond_for_update_delete {
1142   my ($self) = @_;
1143   my $cond = {};
1144
1145   # No-op. No condition, we're updating/deleting everything
1146   return $cond unless ref $self->{cond};
1147
1148   if (ref $self->{cond} eq 'ARRAY') {
1149     $cond = [
1150       map {
1151         my %hash;
1152         foreach my $key (keys %{$_}) {
1153           $key =~ /([^.]+)$/;
1154           $hash{$1} = $_->{$key};
1155         }
1156         \%hash;
1157       } @{$self->{cond}}
1158     ];
1159   }
1160   elsif (ref $self->{cond} eq 'HASH') {
1161     if ((keys %{$self->{cond}})[0] eq '-and') {
1162       $cond->{-and} = [];
1163
1164       my @cond = @{$self->{cond}{-and}};
1165       for (my $i = 0; $i < @cond; $i++) {
1166         my $entry = $cond[$i];
1167
1168         my %hash;
1169         if (ref $entry eq 'HASH') {
1170           foreach my $key (keys %{$entry}) {
1171             $key =~ /([^.]+)$/;
1172             $hash{$1} = $entry->{$key};
1173           }
1174         }
1175         else {
1176           $entry =~ /([^.]+)$/;
1177           $hash{$1} = $cond[++$i];
1178         }
1179
1180         push @{$cond->{-and}}, \%hash;
1181       }
1182     }
1183     else {
1184       foreach my $key (keys %{$self->{cond}}) {
1185         $key =~ /([^.]+)$/;
1186         $cond->{$1} = $self->{cond}{$key};
1187       }
1188     }
1189   }
1190   else {
1191     $self->throw_exception(
1192       "Can't update/delete on resultset with condition unless hash or array"
1193     );
1194   }
1195
1196   return $cond;
1197 }
1198
1199
1200 =head2 update
1201
1202 =over 4
1203
1204 =item Arguments: \%values
1205
1206 =item Return Value: $storage_rv
1207
1208 =back
1209
1210 Sets the specified columns in the resultset to the supplied values in a
1211 single query. Return value will be true if the update succeeded or false
1212 if no records were updated; exact type of success value is storage-dependent.
1213
1214 =cut
1215
1216 sub update {
1217   my ($self, $values) = @_;
1218   $self->throw_exception("Values for update must be a hash")
1219     unless ref $values eq 'HASH';
1220
1221   my $cond = $self->_cond_for_update_delete;
1222
1223   return $self->result_source->storage->update(
1224     $self->result_source->from, $values, $cond
1225   );
1226 }
1227
1228 =head2 update_all
1229
1230 =over 4
1231
1232 =item Arguments: \%values
1233
1234 =item Return Value: 1
1235
1236 =back
1237
1238 Fetches all objects and updates them one at a time. Note that C<update_all>
1239 will run DBIC cascade triggers, while L</update> will not.
1240
1241 =cut
1242
1243 sub update_all {
1244   my ($self, $values) = @_;
1245   $self->throw_exception("Values for update must be a hash")
1246     unless ref $values eq 'HASH';
1247   foreach my $obj ($self->all) {
1248     $obj->set_columns($values)->update;
1249   }
1250   return 1;
1251 }
1252
1253 =head2 delete
1254
1255 =over 4
1256
1257 =item Arguments: none
1258
1259 =item Return Value: 1
1260
1261 =back
1262
1263 Deletes the contents of the resultset from its result source. Note that this
1264 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1265 to run.
1266
1267 =cut
1268
1269 sub delete {
1270   my ($self) = @_;
1271
1272   my $cond = $self->_cond_for_update_delete;
1273
1274   $self->result_source->storage->delete($self->result_source->from, $cond);
1275   return 1;
1276 }
1277
1278 =head2 delete_all
1279
1280 =over 4
1281
1282 =item Arguments: none
1283
1284 =item Return Value: 1
1285
1286 =back
1287
1288 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1289 will run DBIC cascade triggers, while L</delete> will not.
1290
1291 =cut
1292
1293 sub delete_all {
1294   my ($self) = @_;
1295   $_->delete for $self->all;
1296   return 1;
1297 }
1298
1299 =head2 pager
1300
1301 =over 4
1302
1303 =item Arguments: none
1304
1305 =item Return Value: $pager
1306
1307 =back
1308
1309 Return Value a L<Data::Page> object for the current resultset. Only makes
1310 sense for queries with a C<page> attribute.
1311
1312 =cut
1313
1314 sub pager {
1315   my ($self) = @_;
1316   my $attrs = $self->{attrs};
1317   $self->throw_exception("Can't create pager for non-paged rs")
1318     unless $self->{attrs}{page};
1319   $attrs->{rows} ||= 10;
1320   return $self->{pager} ||= Data::Page->new(
1321     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1322 }
1323
1324 =head2 page
1325
1326 =over 4
1327
1328 =item Arguments: $page_number
1329
1330 =item Return Value: $rs
1331
1332 =back
1333
1334 Returns a resultset for the $page_number page of the resultset on which page
1335 is called, where each page contains a number of rows equal to the 'rows'
1336 attribute set on the resultset (10 by default).
1337
1338 =cut
1339
1340 sub page {
1341   my ($self, $page) = @_;
1342   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1343 }
1344
1345 =head2 new_result
1346
1347 =over 4
1348
1349 =item Arguments: \%vals
1350
1351 =item Return Value: $object
1352
1353 =back
1354
1355 Creates an object in the resultset's result class and returns it.
1356
1357 =cut
1358
1359 sub new_result {
1360   my ($self, $values) = @_;
1361   $self->throw_exception( "new_result needs a hash" )
1362     unless (ref $values eq 'HASH');
1363   $self->throw_exception(
1364     "Can't abstract implicit construct, condition not a hash"
1365   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1366   my %new = %$values;
1367   my $alias = $self->{attrs}{_orig_alias};
1368   foreach my $key (keys %{$self->{cond}||{}}) {
1369     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1370   }
1371   my $obj = $self->result_class->new(\%new);
1372   $obj->result_source($self->result_source) if $obj->can('result_source');
1373   return $obj;
1374 }
1375
1376 =head2 find_or_new
1377
1378 =over 4
1379
1380 =item Arguments: \%vals, \%attrs?
1381
1382 =item Return Value: $object
1383
1384 =back
1385
1386 Find an existing record from this resultset. If none exists, instantiate a new
1387 result object and return it. The object will not be saved into your storage
1388 until you call L<DBIx::Class::Row/insert> on it.
1389
1390 If you want objects to be saved immediately, use L</find_or_create> instead.
1391
1392 =cut
1393
1394 sub find_or_new {
1395   my $self     = shift;
1396   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1397   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1398   my $exists   = $self->find($hash, $attrs);
1399   return defined $exists ? $exists : $self->new_result($hash);
1400 }
1401
1402 =head2 create
1403
1404 =over 4
1405
1406 =item Arguments: \%vals
1407
1408 =item Return Value: $object
1409
1410 =back
1411
1412 Inserts a record into the resultset and returns the object representing it.
1413
1414 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1415
1416 =cut
1417
1418 sub create {
1419   my ($self, $attrs) = @_;
1420   $self->throw_exception( "create needs a hashref" )
1421     unless ref $attrs eq 'HASH';
1422   return $self->new_result($attrs)->insert;
1423 }
1424
1425 =head2 find_or_create
1426
1427 =over 4
1428
1429 =item Arguments: \%vals, \%attrs?
1430
1431 =item Return Value: $object
1432
1433 =back
1434
1435   $class->find_or_create({ key => $val, ... });
1436
1437 Tries to find a record based on its primary key or unique constraint; if none
1438 is found, creates one and returns that instead.
1439
1440   my $cd = $schema->resultset('CD')->find_or_create({
1441     cdid   => 5,
1442     artist => 'Massive Attack',
1443     title  => 'Mezzanine',
1444     year   => 2005,
1445   });
1446
1447 Also takes an optional C<key> attribute, to search by a specific key or unique
1448 constraint. For example:
1449
1450   my $cd = $schema->resultset('CD')->find_or_create(
1451     {
1452       artist => 'Massive Attack',
1453       title  => 'Mezzanine',
1454     },
1455     { key => 'cd_artist_title' }
1456   );
1457
1458 See also L</find> and L</update_or_create>. For information on how to declare
1459 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1460
1461 =cut
1462
1463 sub find_or_create {
1464   my $self     = shift;
1465   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1466   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1467   my $exists   = $self->find($hash, $attrs);
1468   return defined $exists ? $exists : $self->create($hash);
1469 }
1470
1471 =head2 update_or_create
1472
1473 =over 4
1474
1475 =item Arguments: \%col_values, { key => $unique_constraint }?
1476
1477 =item Return Value: $object
1478
1479 =back
1480
1481   $class->update_or_create({ col => $val, ... });
1482
1483 First, searches for an existing row matching one of the unique constraints
1484 (including the primary key) on the source of this resultset. If a row is
1485 found, updates it with the other given column values. Otherwise, creates a new
1486 row.
1487
1488 Takes an optional C<key> attribute to search on a specific unique constraint.
1489 For example:
1490
1491   # In your application
1492   my $cd = $schema->resultset('CD')->update_or_create(
1493     {
1494       artist => 'Massive Attack',
1495       title  => 'Mezzanine',
1496       year   => 1998,
1497     },
1498     { key => 'cd_artist_title' }
1499   );
1500
1501 If no C<key> is specified, it searches on all unique constraints defined on the
1502 source, including the primary key.
1503
1504 If the C<key> is specified as C<primary>, it searches only on the primary key.
1505
1506 See also L</find> and L</find_or_create>. For information on how to declare
1507 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1508
1509 =cut
1510
1511 sub update_or_create {
1512   my $self = shift;
1513   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1514   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1515
1516   my $row = $self->find($cond);
1517   if (defined $row) {
1518     $row->update($cond);
1519     return $row;
1520   }
1521
1522   return $self->create($cond);
1523 }
1524
1525 =head2 get_cache
1526
1527 =over 4
1528
1529 =item Arguments: none
1530
1531 =item Return Value: \@cache_objects?
1532
1533 =back
1534
1535 Gets the contents of the cache for the resultset, if the cache is set.
1536
1537 =cut
1538
1539 sub get_cache {
1540   shift->{all_cache};
1541 }
1542
1543 =head2 set_cache
1544
1545 =over 4
1546
1547 =item Arguments: \@cache_objects
1548
1549 =item Return Value: \@cache_objects
1550
1551 =back
1552
1553 Sets the contents of the cache for the resultset. Expects an arrayref
1554 of objects of the same class as those produced by the resultset. Note that
1555 if the cache is set the resultset will return the cached objects rather
1556 than re-querying the database even if the cache attr is not set.
1557
1558 =cut
1559
1560 sub set_cache {
1561   my ( $self, $data ) = @_;
1562   $self->throw_exception("set_cache requires an arrayref")
1563       if defined($data) && (ref $data ne 'ARRAY');
1564   $self->{all_cache} = $data;
1565 }
1566
1567 =head2 clear_cache
1568
1569 =over 4
1570
1571 =item Arguments: none
1572
1573 =item Return Value: []
1574
1575 =back
1576
1577 Clears the cache for the resultset.
1578
1579 =cut
1580
1581 sub clear_cache {
1582   shift->set_cache(undef);
1583 }
1584
1585 =head2 related_resultset
1586
1587 =over 4
1588
1589 =item Arguments: $relationship_name
1590
1591 =item Return Value: $resultset
1592
1593 =back
1594
1595 Returns a related resultset for the supplied relationship name.
1596
1597   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1598
1599 =cut
1600
1601 sub related_resultset {
1602   my ($self, $rel) = @_;
1603
1604   $self->{related_resultsets} ||= {};
1605   return $self->{related_resultsets}{$rel} ||= do {
1606     my $rel_obj = $self->result_source->relationship_info($rel);
1607
1608     $self->throw_exception(
1609       "search_related: result source '" . $self->result_source->name .
1610         "' has no such relationship $rel")
1611       unless $rel_obj;
1612
1613     my @live_join_stack = (@{$self->{attrs}{_live_join_stack}||[]}, $rel);
1614
1615     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1616       undef, {
1617         select => undef,
1618         as => undef,
1619         alias => $rel, #the most recent
1620         _live_join_stack => \@live_join_stack, #the trail of rels
1621         _parent_attrs => $self->{attrs}}
1622     );
1623
1624     # keep reference of the original resultset
1625     $rs->{_parent_rs} = $self->{_parent_rs} || $self->result_source;
1626
1627     return $rs;
1628   };
1629 }
1630
1631 =head2 throw_exception
1632
1633 See L<DBIx::Class::Schema/throw_exception> for details.
1634
1635 =cut
1636
1637 sub throw_exception {
1638   my $self=shift;
1639   $self->result_source->schema->throw_exception(@_);
1640 }
1641
1642 # XXX: FIXME: Attributes docs need clearing up
1643
1644 =head1 ATTRIBUTES
1645
1646 The resultset takes various attributes that modify its behavior. Here's an
1647 overview of them:
1648
1649 =head2 order_by
1650
1651 =over 4
1652
1653 =item Value: ($order_by | \@order_by)
1654
1655 =back
1656
1657 Which column(s) to order the results by. This is currently passed
1658 through directly to SQL, so you can give e.g. C<year DESC> for a
1659 descending order on the column `year'.
1660
1661 Please note that if you have quoting enabled (see
1662 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1663 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1664 so you will need to manually quote things as appropriate.)
1665
1666 =head2 columns
1667
1668 =over 4
1669
1670 =item Value: \@columns
1671
1672 =back
1673
1674 Shortcut to request a particular set of columns to be retrieved.  Adds
1675 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1676 from that, then auto-populates C<as> from C<select> as normal. (You may also
1677 use the C<cols> attribute, as in earlier versions of DBIC.)
1678
1679 =head2 include_columns
1680
1681 =over 4
1682
1683 =item Value: \@columns
1684
1685 =back
1686
1687 Shortcut to include additional columns in the returned results - for example
1688
1689   $schema->resultset('CD')->search(undef, {
1690     include_columns => ['artist.name'],
1691     join => ['artist']
1692   });
1693
1694 would return all CDs and include a 'name' column to the information
1695 passed to object inflation
1696
1697 =head2 select
1698
1699 =over 4
1700
1701 =item Value: \@select_columns
1702
1703 =back
1704
1705 Indicates which columns should be selected from the storage. You can use
1706 column names, or in the case of RDBMS back ends, function or stored procedure
1707 names:
1708
1709   $rs = $schema->resultset('Employee')->search(undef, {
1710     select => [
1711       'name',
1712       { count => 'employeeid' },
1713       { sum => 'salary' }
1714     ]
1715   });
1716
1717 When you use function/stored procedure names and do not supply an C<as>
1718 attribute, the column names returned are storage-dependent. E.g. MySQL would
1719 return a column named C<count(employeeid)> in the above example.
1720
1721 =head2 +select
1722
1723 =over 4
1724
1725 Indicates additional columns to be selected from storage.  Works the same as
1726 L<select> but adds columns to the selection.
1727
1728 =back
1729
1730 =head2 +as
1731
1732 =over 4
1733
1734 Indicates additional column names for those added via L<+select>.
1735
1736 =back
1737
1738 =head2 as
1739
1740 =over 4
1741
1742 =item Value: \@inflation_names
1743
1744 =back
1745
1746 Indicates column names for object inflation. This is used in conjunction with
1747 C<select>, usually when C<select> contains one or more function or stored
1748 procedure names:
1749
1750   $rs = $schema->resultset('Employee')->search(undef, {
1751     select => [
1752       'name',
1753       { count => 'employeeid' }
1754     ],
1755     as => ['name', 'employee_count'],
1756   });
1757
1758   my $employee = $rs->first(); # get the first Employee
1759
1760 If the object against which the search is performed already has an accessor
1761 matching a column name specified in C<as>, the value can be retrieved using
1762 the accessor as normal:
1763
1764   my $name = $employee->name();
1765
1766 If on the other hand an accessor does not exist in the object, you need to
1767 use C<get_column> instead:
1768
1769   my $employee_count = $employee->get_column('employee_count');
1770
1771 You can create your own accessors if required - see
1772 L<DBIx::Class::Manual::Cookbook> for details.
1773
1774 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1775 produced, it is used for internal access only. Thus attempting to use the accessor
1776 in an C<order_by> clause or similar will fail misrably.
1777
1778 =head2 join
1779
1780 =over 4
1781
1782 =item Value: ($rel_name | \@rel_names | \%rel_names)
1783
1784 =back
1785
1786 Contains a list of relationships that should be joined for this query.  For
1787 example:
1788
1789   # Get CDs by Nine Inch Nails
1790   my $rs = $schema->resultset('CD')->search(
1791     { 'artist.name' => 'Nine Inch Nails' },
1792     { join => 'artist' }
1793   );
1794
1795 Can also contain a hash reference to refer to the other relation's relations.
1796 For example:
1797
1798   package MyApp::Schema::Track;
1799   use base qw/DBIx::Class/;
1800   __PACKAGE__->table('track');
1801   __PACKAGE__->add_columns(qw/trackid cd position title/);
1802   __PACKAGE__->set_primary_key('trackid');
1803   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1804   1;
1805
1806   # In your application
1807   my $rs = $schema->resultset('Artist')->search(
1808     { 'track.title' => 'Teardrop' },
1809     {
1810       join     => { cd => 'track' },
1811       order_by => 'artist.name',
1812     }
1813   );
1814
1815 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1816 similarly for a third time). For e.g.
1817
1818   my $rs = $schema->resultset('Artist')->search({
1819     'cds.title'   => 'Down to Earth',
1820     'cds_2.title' => 'Popular',
1821   }, {
1822     join => [ qw/cds cds/ ],
1823   });
1824
1825 will return a set of all artists that have both a cd with title 'Down
1826 to Earth' and a cd with title 'Popular'.
1827
1828 If you want to fetch related objects from other tables as well, see C<prefetch>
1829 below.
1830
1831 =head2 prefetch
1832
1833 =over 4
1834
1835 =item Value: ($rel_name | \@rel_names | \%rel_names)
1836
1837 =back
1838
1839 Contains one or more relationships that should be fetched along with the main
1840 query (when they are accessed afterwards they will have already been
1841 "prefetched").  This is useful for when you know you will need the related
1842 objects, because it saves at least one query:
1843
1844   my $rs = $schema->resultset('Tag')->search(
1845     undef,
1846     {
1847       prefetch => {
1848         cd => 'artist'
1849       }
1850     }
1851   );
1852
1853 The initial search results in SQL like the following:
1854
1855   SELECT tag.*, cd.*, artist.* FROM tag
1856   JOIN cd ON tag.cd = cd.cdid
1857   JOIN artist ON cd.artist = artist.artistid
1858
1859 L<DBIx::Class> has no need to go back to the database when we access the
1860 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1861 case.
1862
1863 Simple prefetches will be joined automatically, so there is no need
1864 for a C<join> attribute in the above search. If you're prefetching to
1865 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1866 specify the join as well.
1867
1868 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1869 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1870 with an accessor type of 'single' or 'filter').
1871
1872 =head2 page
1873
1874 =over 4
1875
1876 =item Value: $page
1877
1878 =back
1879
1880 Makes the resultset paged and specifies the page to retrieve. Effectively
1881 identical to creating a non-pages resultset and then calling ->page($page)
1882 on it.
1883
1884 If L<rows> attribute is not specified it defualts to 10 rows per page.
1885
1886 =head2 rows
1887
1888 =over 4
1889
1890 =item Value: $rows
1891
1892 =back
1893
1894 Specifes the maximum number of rows for direct retrieval or the number of
1895 rows per page if the page attribute or method is used.
1896
1897 =head2 offset
1898
1899 =over 4
1900
1901 =item Value: $offset
1902
1903 =back
1904
1905 Specifies the (zero-based) row number for the  first row to be returned, or the
1906 of the first row of the first page if paging is used.
1907
1908 =head2 group_by
1909
1910 =over 4
1911
1912 =item Value: \@columns
1913
1914 =back
1915
1916 A arrayref of columns to group by. Can include columns of joined tables.
1917
1918   group_by => [qw/ column1 column2 ... /]
1919
1920 =head2 having
1921
1922 =over 4
1923
1924 =item Value: $condition
1925
1926 =back
1927
1928 HAVING is a select statement attribute that is applied between GROUP BY and
1929 ORDER BY. It is applied to the after the grouping calculations have been
1930 done.
1931
1932   having => { 'count(employee)' => { '>=', 100 } }
1933
1934 =head2 distinct
1935
1936 =over 4
1937
1938 =item Value: (0 | 1)
1939
1940 =back
1941
1942 Set to 1 to group by all columns.
1943
1944 =head2 where
1945
1946 =over 4
1947
1948 Adds to the WHERE clause.
1949
1950   # only return rows WHERE deleted IS NULL for all searches
1951   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1952
1953 Can be overridden by passing C<{ where => undef }> as an attribute
1954 to a resulset.
1955
1956 =back
1957
1958 =head2 cache
1959
1960 Set to 1 to cache search results. This prevents extra SQL queries if you
1961 revisit rows in your ResultSet:
1962
1963   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1964
1965   while( my $artist = $resultset->next ) {
1966     ... do stuff ...
1967   }
1968
1969   $rs->first; # without cache, this would issue a query
1970
1971 By default, searches are not cached.
1972
1973 For more examples of using these attributes, see
1974 L<DBIx::Class::Manual::Cookbook>.
1975
1976 =head2 from
1977
1978 =over 4
1979
1980 =item Value: \@from_clause
1981
1982 =back
1983
1984 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1985 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1986 clauses.
1987
1988 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1989
1990 C<join> will usually do what you need and it is strongly recommended that you
1991 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1992 And we really do mean "cannot", not just tried and failed. Attempting to use
1993 this because you're having problems with C<join> is like trying to use x86
1994 ASM because you've got a syntax error in your C. Trust us on this.
1995
1996 Now, if you're still really, really sure you need to use this (and if you're
1997 not 100% sure, ask the mailing list first), here's an explanation of how this
1998 works.
1999
2000 The syntax is as follows -
2001
2002   [
2003     { <alias1> => <table1> },
2004     [
2005       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2006       [], # nested JOIN (optional)
2007       { <table1.column1> => <table2.column2>, ... (more conditions) },
2008     ],
2009     # More of the above [ ] may follow for additional joins
2010   ]
2011
2012   <table1> <alias1>
2013   JOIN
2014     <table2> <alias2>
2015     [JOIN ...]
2016   ON <table1.column1> = <table2.column2>
2017   <more joins may follow>
2018
2019 An easy way to follow the examples below is to remember the following:
2020
2021     Anything inside "[]" is a JOIN
2022     Anything inside "{}" is a condition for the enclosing JOIN
2023
2024 The following examples utilize a "person" table in a family tree application.
2025 In order to express parent->child relationships, this table is self-joined:
2026
2027     # Person->belongs_to('father' => 'Person');
2028     # Person->belongs_to('mother' => 'Person');
2029
2030 C<from> can be used to nest joins. Here we return all children with a father,
2031 then search against all mothers of those children:
2032
2033   $rs = $schema->resultset('Person')->search(
2034       undef,
2035       {
2036           alias => 'mother', # alias columns in accordance with "from"
2037           from => [
2038               { mother => 'person' },
2039               [
2040                   [
2041                       { child => 'person' },
2042                       [
2043                           { father => 'person' },
2044                           { 'father.person_id' => 'child.father_id' }
2045                       ]
2046                   ],
2047                   { 'mother.person_id' => 'child.mother_id' }
2048               ],
2049           ]
2050       },
2051   );
2052
2053   # Equivalent SQL:
2054   # SELECT mother.* FROM person mother
2055   # JOIN (
2056   #   person child
2057   #   JOIN person father
2058   #   ON ( father.person_id = child.father_id )
2059   # )
2060   # ON ( mother.person_id = child.mother_id )
2061
2062 The type of any join can be controlled manually. To search against only people
2063 with a father in the person table, we could explicitly use C<INNER JOIN>:
2064
2065     $rs = $schema->resultset('Person')->search(
2066         undef,
2067         {
2068             alias => 'child', # alias columns in accordance with "from"
2069             from => [
2070                 { child => 'person' },
2071                 [
2072                     { father => 'person', -join_type => 'inner' },
2073                     { 'father.id' => 'child.father_id' }
2074                 ],
2075             ]
2076         },
2077     );
2078
2079     # Equivalent SQL:
2080     # SELECT child.* FROM person child
2081     # INNER JOIN person father ON child.father_id = father.id
2082
2083 =cut
2084
2085 1;