tweaked some stuff. appears no more broken.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85
86   my ($source, $attrs) = @_;
87   #weaken $source;
88
89   if ($attrs->{page}) {
90     $attrs->{rows} ||= 10;
91     $attrs->{offset} ||= 0;
92     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
93   }
94
95   $attrs->{alias} ||= 'me';
96   $attrs->{_orig_alias} ||= $attrs->{alias};
97
98   bless {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102 #    from => $attrs->{from},
103 #    collapse => $collapse,
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   }, $class;
108 }
109
110 =head2 search
111
112 =over 4
113
114 =item Arguments: $cond, \%attrs?
115
116 =item Return Value: $resultset (scalar context), @row_objs (list context)
117
118 =back
119
120   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
121   my $new_rs = $cd_rs->search({ year => 2005 });
122
123   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
124                  # year = 2005 OR year = 2004
125
126 If you need to pass in additional attributes but no additional condition,
127 call it as C<search(undef, \%attrs)>.
128
129   # "SELECT name, artistid FROM $artist_table"
130   my @all_artists = $schema->resultset('Artist')->search(undef, {
131     columns => [qw/name artistid/],
132   });
133
134 =cut
135
136 sub search {
137   my $self = shift;
138   my $rs = $self->search_rs( @_ );
139   return (wantarray ? $rs->all : $rs);
140 }
141
142 =head2 search_rs
143
144 =over 4
145
146 =item Arguments: $cond, \%attrs?
147
148 =item Return Value: $resultset
149
150 =back
151
152 This method does the same exact thing as search() except it will
153 always return a resultset, even in list context.
154
155 =cut
156
157 sub search_rs {
158   my $self = shift;
159
160   my $rows;
161
162   unless (@_) {                 # no search, effectively just a clone
163     $rows = $self->get_cache;
164   }
165
166   my $attrs = {};
167   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
168   my $our_attrs = exists $attrs->{_parent_attrs}
169     ? { %{delete $attrs->{_parent_attrs}} }
170     : { %{$self->{attrs}} };
171   my $having = delete $our_attrs->{having};
172
173   # XXX should only maintain _live_join_stack and generate _live_join_h from that
174   if ($attrs->{_live_join_stack}) {
175     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
176       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
177         ? { $join => $attrs->{_live_join_h} }
178         : $join;
179     }
180   }
181
182   # merge new attrs into inherited
183   foreach my $key (qw/join prefetch/) {
184     next unless exists $attrs->{$key};
185     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
186       foreach my $join (reverse @{$live_join}) {
187         $attrs->{$key} = { $join => $attrs->{$key} };
188       }
189     }
190
191     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
192   }
193
194   $our_attrs->{join} = $self->_merge_attr(
195     $our_attrs->{join}, $attrs->{_live_join_h}
196   ) if ($attrs->{_live_join_h});
197
198   if (defined $our_attrs->{prefetch}) {
199     $our_attrs->{join} = $self->_merge_attr(
200       $our_attrs->{join}, $our_attrs->{prefetch}
201     );
202   }
203
204   my $new_attrs = { %{$our_attrs}, %{$attrs} };
205   my $where = (@_
206     ? (
207         (@_ == 1 || ref $_[0] eq "HASH")
208           ? shift
209           : (
210               (@_ % 2)
211                 ? $self->throw_exception("Odd number of arguments to search")
212                 : {@_}
213              )
214       )
215     : undef
216   );
217
218   if (defined $where) {
219     $new_attrs->{where} = (
220       defined $new_attrs->{where}
221         ? { '-and' => [
222               map {
223                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
224               } $where, $new_attrs->{where}
225             ]
226           }
227         : $where);
228   }
229
230   if (defined $having) {
231     $new_attrs->{having} = (
232       defined $new_attrs->{having}
233         ? { '-and' => [
234               map {
235                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
236               } $having, $new_attrs->{having}
237             ]
238           }
239         : $having);
240   }
241
242   my $rs = (ref $self)->new($self->result_source, $new_attrs);
243   $rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
244
245   if ($rows) {
246     $rs->set_cache($rows);
247   }
248   return $rs;
249 }
250
251 =head2 search_literal
252
253 =over 4
254
255 =item Arguments: $sql_fragment, @bind_values
256
257 =item Return Value: $resultset (scalar context), @row_objs (list context)
258
259 =back
260
261   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
262   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
263
264 Pass a literal chunk of SQL to be added to the conditional part of the
265 resultset query.
266
267 =cut
268
269 sub search_literal {
270   my ($self, $cond, @vals) = @_;
271   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
272   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
273   return $self->search(\$cond, $attrs);
274 }
275
276 =head2 find
277
278 =over 4
279
280 =item Arguments: @values | \%cols, \%attrs?
281
282 =item Return Value: $row_object
283
284 =back
285
286 Finds a row based on its primary key or unique constraint. For example, to find
287 a row by its primary key:
288
289   my $cd = $schema->resultset('CD')->find(5);
290
291 You can also find a row by a specific unique constraint using the C<key>
292 attribute. For example:
293
294   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
295     key => 'cd_artist_title'
296   });
297
298 Additionally, you can specify the columns explicitly by name:
299
300   my $cd = $schema->resultset('CD')->find(
301     {
302       artist => 'Massive Attack',
303       title  => 'Mezzanine',
304     },
305     { key => 'cd_artist_title' }
306   );
307
308 If the C<key> is specified as C<primary>, it searches only on the primary key.
309
310 If no C<key> is specified, it searches on all unique constraints defined on the
311 source, including the primary key.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key or unique constraint is defined"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my @unique_queries = $self->_unique_queries($input_query, $attrs);
347
348   # Handle cases where the ResultSet defines the query, or where the user is
349   # abusing find
350   my $query = @unique_queries ? \@unique_queries : $input_query;
351
352   # Run the query
353   if (keys %$attrs) {
354     my $rs = $self->search($query, $attrs);
355     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
356   }
357   else {
358     return keys %{$self->_resolved_attrs->{collapse}}
359       ? $self->search($query)->next
360       : $self->single($query);
361   }
362 }
363
364 # _unique_queries
365 #
366 # Build a list of queries which satisfy unique constraints.
367
368 sub _unique_queries {
369   my ($self, $query, $attrs) = @_;
370
371   my $alias = $self->{attrs}{alias};
372   my @constraint_names = exists $attrs->{key}
373     ? ($attrs->{key})
374     : $self->result_source->unique_constraint_names;
375
376   my @unique_queries;
377   foreach my $name (@constraint_names) {
378     my @unique_cols = $self->result_source->unique_constraint_columns($name);
379     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
380
381     my $num_query = scalar keys %$unique_query;
382     next unless $num_query;
383
384     # Add the ResultSet's alias
385     foreach my $col (grep { ! m/\./ } keys %$unique_query) {
386       $unique_query->{"$alias.$col"} = delete $unique_query->{$col};
387     }
388
389     # XXX: Assuming quite a bit about $self->{attrs}{where}
390     my $num_cols = scalar @unique_cols;
391     my $num_where = exists $self->{attrs}{where}
392       ? scalar keys %{ $self->{attrs}{where} }
393       : 0;
394     push @unique_queries, $unique_query
395       if $num_query + $num_where == $num_cols;
396   }
397
398   return @unique_queries;
399 }
400
401 # _build_unique_query
402 #
403 # Constrain the specified query hash based on the specified column names.
404
405 sub _build_unique_query {
406   my ($self, $query, $unique_cols) = @_;
407
408   return {
409     map  { $_ => $query->{$_} }
410     grep { exists $query->{$_} }
411       @$unique_cols
412   };
413 }
414
415 =head2 search_related
416
417 =over 4
418
419 =item Arguments: $rel, $cond, \%attrs?
420
421 =item Return Value: $new_resultset
422
423 =back
424
425   $new_rs = $cd_rs->search_related('artist', {
426     name => 'Emo-R-Us',
427   });
428
429 Searches the specified relationship, optionally specifying a condition and
430 attributes for matching records. See L</ATTRIBUTES> for more information.
431
432 =cut
433
434 sub search_related {
435   return shift->related_resultset(shift)->search(@_);
436 }
437
438 =head2 cursor
439
440 =over 4
441
442 =item Arguments: none
443
444 =item Return Value: $cursor
445
446 =back
447
448 Returns a storage-driven cursor to the given resultset. See
449 L<DBIx::Class::Cursor> for more information.
450
451 =cut
452
453 sub cursor {
454   my ($self) = @_;
455
456   my $attrs = { %{$self->_resolved_attrs} };
457   return $self->{cursor}
458     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
459           $attrs->{where},$attrs);
460 }
461
462 =head2 single
463
464 =over 4
465
466 =item Arguments: $cond?
467
468 =item Return Value: $row_object?
469
470 =back
471
472   my $cd = $schema->resultset('CD')->single({ year => 2001 });
473
474 Inflates the first result without creating a cursor if the resultset has
475 any records in it; if not returns nothing. Used by L</find> as an optimisation.
476
477 Can optionally take an additional condition *only* - this is a fast-code-path
478 method; if you need to add extra joins or similar call ->search and then
479 ->single without a condition on the $rs returned from that.
480
481 =cut
482
483 sub single {
484   my ($self, $where) = @_;
485   my $attrs = { %{$self->_resolved_attrs} };
486   if ($where) {
487     if (defined $attrs->{where}) {
488       $attrs->{where} = {
489         '-and' =>
490             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
491                $where, delete $attrs->{where} ]
492       };
493     } else {
494       $attrs->{where} = $where;
495     }
496   }
497
498   unless ($self->_is_unique_query($attrs->{where})) {
499     carp "Query not guaranteed to return a single row"
500       . "; please declare your unique constraints or use search instead";
501   }
502
503   my @data = $self->result_source->storage->select_single(
504     $attrs->{from}, $attrs->{select},
505     $attrs->{where}, $attrs
506   );
507
508   return (@data ? $self->_construct_object(@data) : ());
509 }
510
511 # _is_unique_query
512 #
513 # Try to determine if the specified query is guaranteed to be unique, based on
514 # the declared unique constraints.
515
516 sub _is_unique_query {
517   my ($self, $query) = @_;
518
519   my $collapsed = $self->_collapse_query($query);
520   my $alias = $self->{attrs}{alias};
521
522   foreach my $name ($self->result_source->unique_constraint_names) {
523     my @unique_cols = map {
524       "$alias.$_"
525     } $self->result_source->unique_constraint_columns($name);
526
527     # Count the values for each unique column
528     my %seen = map { $_ => 0 } @unique_cols;
529
530     foreach my $key (keys %$collapsed) {
531       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
532       next unless exists $seen{$aliased};  # Additional constraints are okay
533       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
534     }
535
536     # If we get 0 or more than 1 value for a column, it's not necessarily unique
537     return 1 unless grep { $_ != 1 } values %seen;
538   }
539
540   return 0;
541 }
542
543 # _collapse_query
544 #
545 # Recursively collapse the query, accumulating values for each column.
546
547 sub _collapse_query {
548   my ($self, $query, $collapsed) = @_;
549
550   $collapsed ||= {};
551
552   if (ref $query eq 'ARRAY') {
553     foreach my $subquery (@$query) {
554       next unless ref $subquery;  # -or
555 #      warn "ARRAY: " . Dumper $subquery;
556       $collapsed = $self->_collapse_query($subquery, $collapsed);
557     }
558   }
559   elsif (ref $query eq 'HASH') {
560     if (keys %$query and (keys %$query)[0] eq '-and') {
561       foreach my $subquery (@{$query->{-and}}) {
562 #        warn "HASH: " . Dumper $subquery;
563         $collapsed = $self->_collapse_query($subquery, $collapsed);
564       }
565     }
566     else {
567 #      warn "LEAF: " . Dumper $query;
568       foreach my $col (keys %$query) {
569         my $value = $query->{$col};
570         $collapsed->{$col}{$value}++;
571       }
572     }
573   }
574
575   return $collapsed;
576 }
577
578 =head2 get_column
579
580 =over 4
581
582 =item Arguments: $cond?
583
584 =item Return Value: $resultsetcolumn
585
586 =back
587
588   my $max_length = $rs->get_column('length')->max;
589
590 Returns a ResultSetColumn instance for $column based on $self
591
592 =cut
593
594 sub get_column {
595   my ($self, $column) = @_;
596   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
597   return $new;
598 }
599
600 =head2 search_like
601
602 =over 4
603
604 =item Arguments: $cond, \%attrs?
605
606 =item Return Value: $resultset (scalar context), @row_objs (list context)
607
608 =back
609
610   # WHERE title LIKE '%blue%'
611   $cd_rs = $rs->search_like({ title => '%blue%'});
612
613 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
614 that this is simply a convenience method. You most likely want to use
615 L</search> with specific operators.
616
617 For more information, see L<DBIx::Class::Manual::Cookbook>.
618
619 =cut
620
621 sub search_like {
622   my $class = shift;
623   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
624   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
625   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
626   return $class->search($query, { %$attrs });
627 }
628
629 =head2 slice
630
631 =over 4
632
633 =item Arguments: $first, $last
634
635 =item Return Value: $resultset (scalar context), @row_objs (list context)
636
637 =back
638
639 Returns a resultset or object list representing a subset of elements from the
640 resultset slice is called on. Indexes are from 0, i.e., to get the first
641 three records, call:
642
643   my ($one, $two, $three) = $rs->slice(0, 2);
644
645 =cut
646
647 sub slice {
648   my ($self, $min, $max) = @_;
649   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
650   $attrs->{offset} = $self->{attrs}{offset} || 0;
651   $attrs->{offset} += $min;
652   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
653   return $self->search(undef(), $attrs);
654   #my $slice = (ref $self)->new($self->result_source, $attrs);
655   #return (wantarray ? $slice->all : $slice);
656 }
657
658 =head2 next
659
660 =over 4
661
662 =item Arguments: none
663
664 =item Return Value: $result?
665
666 =back
667
668 Returns the next element in the resultset (C<undef> is there is none).
669
670 Can be used to efficiently iterate over records in the resultset:
671
672   my $rs = $schema->resultset('CD')->search;
673   while (my $cd = $rs->next) {
674     print $cd->title;
675   }
676
677 Note that you need to store the resultset object, and call C<next> on it.
678 Calling C<< resultset('Table')->next >> repeatedly will always return the
679 first record from the resultset.
680
681 =cut
682
683 sub next {
684   my ($self) = @_;
685   if (my $cache = $self->get_cache) {
686     $self->{all_cache_position} ||= 0;
687     return $cache->[$self->{all_cache_position}++];
688   }
689   if ($self->{attrs}{cache}) {
690     $self->{all_cache_position} = 1;
691     return ($self->all)[0];
692   }
693   my @row = (
694     exists $self->{stashed_row}
695       ? @{delete $self->{stashed_row}}
696       : $self->cursor->next
697   );
698   return unless (@row);
699   return $self->_construct_object(@row);
700 }
701
702 sub _resolved_attrs {
703   my $self = shift;
704   return $self->{_attrs} if $self->{_attrs};
705
706   my $attrs = { %{$self->{attrs}||{}} };
707   my $source = $self->{_parent_source} || $self->{result_source};
708   my $alias = $attrs->{_orig_alias};
709
710   # XXX - lose storable dclone
711   my $record_filter = delete $attrs->{record_filter};
712   #$attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
713   $attrs = { %{ $attrs || {} } };
714   $attrs->{record_filter} = $record_filter if $record_filter;
715
716   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
717   if ($attrs->{columns}) {
718     delete $attrs->{as};
719   } elsif (!$attrs->{select}) {
720     $attrs->{columns} = [ $self->{result_source}->columns ];
721   }
722   
723   my $select_alias = $self->{attrs}{alias};
724   $attrs->{select} ||= [
725     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
726   ];
727   $attrs->{as} ||= [
728     map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
729   ];
730   
731   my $adds;
732   if ($adds = delete $attrs->{include_columns}) {
733     $adds = [$adds] unless ref $adds eq 'ARRAY';
734     push(@{$attrs->{select}}, @$adds);
735     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
736   }
737   if ($adds = delete $attrs->{'+select'}) {
738     $adds = [$adds] unless ref $adds eq 'ARRAY';
739     push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
740   }
741   if (my $adds = delete $attrs->{'+as'}) {
742     $adds = [$adds] unless ref $adds eq 'ARRAY';
743     push(@{$attrs->{as}}, @$adds);
744   }
745
746   $attrs->{from} ||= [ { $alias => $source->from } ];
747   $attrs->{seen_join} ||= {};
748   if (my $join = delete $attrs->{join}) {
749     push(@{$attrs->{from}},
750       $source->resolve_join($join, $alias, $attrs->{seen_join})
751     );
752   }
753
754   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
755   if ($attrs->{order_by}) {
756     $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
757   } else {
758     $attrs->{order_by} ||= [];    
759   }
760
761   my $collapse = $attrs->{collapse} || {};
762   if (my $prefetch = delete $attrs->{prefetch}) {
763     my @pre_order;
764     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
765       # bring joins back to level of current class
766       $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
767       if ($p) {
768         my @prefetch = $self->result_source->resolve_prefetch(
769           $p, $alias, {}, \@pre_order, $collapse
770         );
771         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
772         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
773       }
774     }
775     push(@{$attrs->{order_by}}, @pre_order);
776   }
777   $attrs->{collapse} = $collapse;
778
779   return $self->{_attrs} = $attrs;
780 }
781
782 sub _merge_attr {
783   my ($self, $a, $b) = @_;
784   return $b unless $a;
785   
786   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
787     foreach my $key (keys %{$b}) {
788       if (exists $a->{$key}) {
789         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
790       } else {
791         $a->{$key} = $b->{$key};
792       }
793     }
794     return $a;
795   } else {
796     $a = [$a] unless ref $a eq 'ARRAY';
797     $b = [$b] unless ref $b eq 'ARRAY';
798
799     my $hash = {};
800     my @array;
801     foreach my $x ($a, $b) {
802       foreach my $element (@{$x}) {
803         if (ref $element eq 'HASH') {
804           $hash = $self->_merge_attr($hash, $element);
805         } elsif (ref $element eq 'ARRAY') {
806           push(@array, @{$element});
807         } else {
808           push(@array, $element) unless $b == $x
809             && grep { $_ eq $element } @array;
810         }
811       }
812     }
813     
814     @array = grep { !exists $hash->{$_} } @array;
815
816     return keys %{$hash}
817       ? ( scalar(@array)
818             ? [$hash, @array]
819             : $hash
820         )
821       : \@array;
822   }
823 }
824
825 # bring the joins (which are from the original class) to the level
826 # of the current class so that we can resolve them properly
827 sub _reduce_joins {
828   my ($self, $p, $attrs) = @_;
829
830   STACK:
831   foreach my $join (@{$attrs->{_live_join_stack}}) {
832     if (ref $p eq 'HASH') {
833       return undef unless exists $p->{$join};
834       $p = $p->{$join};
835     } elsif (ref $p eq 'ARRAY') {
836       foreach my $pe (@{$p}) {
837         return undef if $pe eq $join;
838         if (ref $pe eq 'HASH' && exists $pe->{$join}) {
839           $p = $pe->{$join};
840           next STACK;
841         }
842       }
843       return undef;
844     } else {
845       return undef;
846     }
847   }
848   return $p;
849 }
850
851 sub _construct_object {
852   my ($self, @row) = @_;
853   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
854   my $new = $self->result_class->inflate_result($self->result_source, @$info);
855   $new = $self->{_attrs}{record_filter}->($new)
856     if exists $self->{_attrs}{record_filter};
857   return $new;
858 }
859
860 sub _collapse_result {
861   my ($self, $as, $row, $prefix) = @_;
862
863   my %const;
864   my @copy = @$row;
865   
866   foreach my $this_as (@$as) {
867     my $val = shift @copy;
868     if (defined $prefix) {
869       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
870         my $remain = $1;
871         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
872         $const{$1||''}{$2} = $val;
873       }
874     } else {
875       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
876       $const{$1||''}{$2} = $val;
877     }
878   }
879
880   my $alias = $self->{attrs}{alias};
881   my $info = [ {}, {} ];
882   foreach my $key (keys %const) {
883     if (length $key && $key ne $alias) {
884       my $target = $info;
885       my @parts = split(/\./, $key);
886       foreach my $p (@parts) {
887         $target = $target->[1]->{$p} ||= [];
888       }
889       $target->[0] = $const{$key};
890     } else {
891       $info->[0] = $const{$key};
892     }
893   }
894   
895   my @collapse;
896   if (defined $prefix) {
897     @collapse = map {
898         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
899     } keys %{$self->{_attrs}{collapse}}
900   } else {
901     @collapse = keys %{$self->{_attrs}{collapse}};
902   };
903
904   if (@collapse) {
905     my ($c) = sort { length $a <=> length $b } @collapse;
906     my $target = $info;
907     foreach my $p (split(/\./, $c)) {
908       $target = $target->[1]->{$p} ||= [];
909     }
910     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
911     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
912     my $tree = $self->_collapse_result($as, $row, $c_prefix);
913     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
914     my (@final, @raw);
915
916     while (
917       !(
918         grep {
919           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
920         } @co_key
921         )
922     ) {
923       push(@final, $tree);
924       last unless (@raw = $self->cursor->next);
925       $row = $self->{stashed_row} = \@raw;
926       $tree = $self->_collapse_result($as, $row, $c_prefix);
927     }
928     @$target = (@final ? @final : [ {}, {} ]);
929       # single empty result to indicate an empty prefetched has_many
930   }
931
932   #print "final info: " . Dumper($info);
933   return $info;
934 }
935
936 =head2 result_source
937
938 =over 4
939
940 =item Arguments: $result_source?
941
942 =item Return Value: $result_source
943
944 =back
945
946 An accessor for the primary ResultSource object from which this ResultSet
947 is derived.
948
949 =cut
950
951
952 =head2 count
953
954 =over 4
955
956 =item Arguments: $cond, \%attrs??
957
958 =item Return Value: $count
959
960 =back
961
962 Performs an SQL C<COUNT> with the same query as the resultset was built
963 with to find the number of elements. If passed arguments, does a search
964 on the resultset and counts the results of that.
965
966 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
967 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
968 not support C<DISTINCT> with multiple columns. If you are using such a
969 database, you should only use columns from the main table in your C<group_by>
970 clause.
971
972 =cut
973
974 sub count {
975   my $self = shift;
976   return $self->search(@_)->count if @_ and defined $_[0];
977   return scalar @{ $self->get_cache } if $self->get_cache;
978   my $count = $self->_count;
979   return 0 unless $count;
980
981   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
982   $count = $self->{attrs}{rows} if
983     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
984   return $count;
985 }
986
987 sub _count { # Separated out so pager can get the full count
988   my $self = shift;
989   my $select = { count => '*' };
990
991   my $attrs = { %{$self->_resolved_attrs} };
992   if (my $group_by = delete $attrs->{group_by}) {
993     delete $attrs->{having};
994     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
995     # todo: try CONCAT for multi-column pk
996     my @pk = $self->result_source->primary_columns;
997     if (@pk == 1) {
998       my $alias = $attrs->{_orig_alias};
999       foreach my $column (@distinct) {
1000         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1001           @distinct = ($column);
1002           last;
1003         }
1004       }
1005     }
1006
1007     $select = { count => { distinct => \@distinct } };
1008   }
1009
1010   $attrs->{select} = $select;
1011   $attrs->{as} = [qw/count/];
1012
1013   # offset, order by and page are not needed to count. record_filter is cdbi
1014   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1015   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1016   $tmp_rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
1017        #XXX - hack to pass through parent of related resultsets
1018
1019   my ($count) = $tmp_rs->cursor->next;
1020   return $count;
1021 }
1022
1023 =head2 count_literal
1024
1025 =over 4
1026
1027 =item Arguments: $sql_fragment, @bind_values
1028
1029 =item Return Value: $count
1030
1031 =back
1032
1033 Counts the results in a literal query. Equivalent to calling L</search_literal>
1034 with the passed arguments, then L</count>.
1035
1036 =cut
1037
1038 sub count_literal { shift->search_literal(@_)->count; }
1039
1040 =head2 all
1041
1042 =over 4
1043
1044 =item Arguments: none
1045
1046 =item Return Value: @objects
1047
1048 =back
1049
1050 Returns all elements in the resultset. Called implicitly if the resultset
1051 is returned in list context.
1052
1053 =cut
1054
1055 sub all {
1056   my ($self) = @_;
1057   return @{ $self->get_cache } if $self->get_cache;
1058
1059   my @obj;
1060
1061   # TODO: don't call resolve here
1062   if (keys %{$self->_resolved_attrs->{collapse}}) {
1063 #  if ($self->{attrs}{prefetch}) {
1064       # Using $self->cursor->all is really just an optimisation.
1065       # If we're collapsing has_many prefetches it probably makes
1066       # very little difference, and this is cleaner than hacking
1067       # _construct_object to survive the approach
1068     my @row = $self->cursor->next;
1069     while (@row) {
1070       push(@obj, $self->_construct_object(@row));
1071       @row = (exists $self->{stashed_row}
1072                ? @{delete $self->{stashed_row}}
1073                : $self->cursor->next);
1074     }
1075   } else {
1076     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1077   }
1078
1079   $self->set_cache(\@obj) if $self->{attrs}{cache};
1080   return @obj;
1081 }
1082
1083 =head2 reset
1084
1085 =over 4
1086
1087 =item Arguments: none
1088
1089 =item Return Value: $self
1090
1091 =back
1092
1093 Resets the resultset's cursor, so you can iterate through the elements again.
1094
1095 =cut
1096
1097 sub reset {
1098   my ($self) = @_;
1099   delete $self->{_attrs} if exists $self->{_attrs};
1100   $self->{all_cache_position} = 0;
1101   $self->cursor->reset;
1102   return $self;
1103 }
1104
1105 =head2 first
1106
1107 =over 4
1108
1109 =item Arguments: none
1110
1111 =item Return Value: $object?
1112
1113 =back
1114
1115 Resets the resultset and returns an object for the first result (if the
1116 resultset returns anything).
1117
1118 =cut
1119
1120 sub first {
1121   return $_[0]->reset->next;
1122 }
1123
1124 # _cond_for_update_delete
1125 #
1126 # update/delete require the condition to be modified to handle
1127 # the differing SQL syntax available.  This transforms the $self->{cond}
1128 # appropriately, returning the new condition.
1129
1130 sub _cond_for_update_delete {
1131   my ($self) = @_;
1132   my $cond = {};
1133
1134   # No-op. No condition, we're updating/deleting everything
1135   return $cond unless ref $self->{cond};
1136
1137   if (ref $self->{cond} eq 'ARRAY') {
1138     $cond = [
1139       map {
1140         my %hash;
1141         foreach my $key (keys %{$_}) {
1142           $key =~ /([^.]+)$/;
1143           $hash{$1} = $_->{$key};
1144         }
1145         \%hash;
1146       } @{$self->{cond}}
1147     ];
1148   }
1149   elsif (ref $self->{cond} eq 'HASH') {
1150     if ((keys %{$self->{cond}})[0] eq '-and') {
1151       $cond->{-and} = [];
1152
1153       my @cond = @{$self->{cond}{-and}};
1154       for (my $i = 0; $i < @cond; $i++) {
1155         my $entry = $cond[$i];
1156
1157         my %hash;
1158         if (ref $entry eq 'HASH') {
1159           foreach my $key (keys %{$entry}) {
1160             $key =~ /([^.]+)$/;
1161             $hash{$1} = $entry->{$key};
1162           }
1163         }
1164         else {
1165           $entry =~ /([^.]+)$/;
1166           $hash{$1} = $cond[++$i];
1167         }
1168
1169         push @{$cond->{-and}}, \%hash;
1170       }
1171     }
1172     else {
1173       foreach my $key (keys %{$self->{cond}}) {
1174         $key =~ /([^.]+)$/;
1175         $cond->{$1} = $self->{cond}{$key};
1176       }
1177     }
1178   }
1179   else {
1180     $self->throw_exception(
1181       "Can't update/delete on resultset with condition unless hash or array"
1182     );
1183   }
1184
1185   return $cond;
1186 }
1187
1188
1189 =head2 update
1190
1191 =over 4
1192
1193 =item Arguments: \%values
1194
1195 =item Return Value: $storage_rv
1196
1197 =back
1198
1199 Sets the specified columns in the resultset to the supplied values in a
1200 single query. Return value will be true if the update succeeded or false
1201 if no records were updated; exact type of success value is storage-dependent.
1202
1203 =cut
1204
1205 sub update {
1206   my ($self, $values) = @_;
1207   $self->throw_exception("Values for update must be a hash")
1208     unless ref $values eq 'HASH';
1209
1210   my $cond = $self->_cond_for_update_delete;
1211
1212   return $self->result_source->storage->update(
1213     $self->result_source->from, $values, $cond
1214   );
1215 }
1216
1217 =head2 update_all
1218
1219 =over 4
1220
1221 =item Arguments: \%values
1222
1223 =item Return Value: 1
1224
1225 =back
1226
1227 Fetches all objects and updates them one at a time. Note that C<update_all>
1228 will run DBIC cascade triggers, while L</update> will not.
1229
1230 =cut
1231
1232 sub update_all {
1233   my ($self, $values) = @_;
1234   $self->throw_exception("Values for update must be a hash")
1235     unless ref $values eq 'HASH';
1236   foreach my $obj ($self->all) {
1237     $obj->set_columns($values)->update;
1238   }
1239   return 1;
1240 }
1241
1242 =head2 delete
1243
1244 =over 4
1245
1246 =item Arguments: none
1247
1248 =item Return Value: 1
1249
1250 =back
1251
1252 Deletes the contents of the resultset from its result source. Note that this
1253 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1254 to run.
1255
1256 =cut
1257
1258 sub delete {
1259   my ($self) = @_;
1260
1261   my $cond = $self->_cond_for_update_delete;
1262
1263   $self->result_source->storage->delete($self->result_source->from, $cond);
1264   return 1;
1265 }
1266
1267 =head2 delete_all
1268
1269 =over 4
1270
1271 =item Arguments: none
1272
1273 =item Return Value: 1
1274
1275 =back
1276
1277 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1278 will run DBIC cascade triggers, while L</delete> will not.
1279
1280 =cut
1281
1282 sub delete_all {
1283   my ($self) = @_;
1284   $_->delete for $self->all;
1285   return 1;
1286 }
1287
1288 =head2 pager
1289
1290 =over 4
1291
1292 =item Arguments: none
1293
1294 =item Return Value: $pager
1295
1296 =back
1297
1298 Return Value a L<Data::Page> object for the current resultset. Only makes
1299 sense for queries with a C<page> attribute.
1300
1301 =cut
1302
1303 sub pager {
1304   my ($self) = @_;
1305   my $attrs = $self->{attrs};
1306   $self->throw_exception("Can't create pager for non-paged rs")
1307     unless $self->{attrs}{page};
1308   $attrs->{rows} ||= 10;
1309   return $self->{pager} ||= Data::Page->new(
1310     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1311 }
1312
1313 =head2 page
1314
1315 =over 4
1316
1317 =item Arguments: $page_number
1318
1319 =item Return Value: $rs
1320
1321 =back
1322
1323 Returns a resultset for the $page_number page of the resultset on which page
1324 is called, where each page contains a number of rows equal to the 'rows'
1325 attribute set on the resultset (10 by default).
1326
1327 =cut
1328
1329 sub page {
1330   my ($self, $page) = @_;
1331   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1332 }
1333
1334 =head2 new_result
1335
1336 =over 4
1337
1338 =item Arguments: \%vals
1339
1340 =item Return Value: $object
1341
1342 =back
1343
1344 Creates an object in the resultset's result class and returns it.
1345
1346 =cut
1347
1348 sub new_result {
1349   my ($self, $values) = @_;
1350   $self->throw_exception( "new_result needs a hash" )
1351     unless (ref $values eq 'HASH');
1352   $self->throw_exception(
1353     "Can't abstract implicit construct, condition not a hash"
1354   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1355   my %new = %$values;
1356   my $alias = $self->{attrs}{_orig_alias};
1357   foreach my $key (keys %{$self->{cond}||{}}) {
1358     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1359   }
1360   my $obj = $self->result_class->new(\%new);
1361   $obj->result_source($self->result_source) if $obj->can('result_source');
1362   return $obj;
1363 }
1364
1365 =head2 find_or_new
1366
1367 =over 4
1368
1369 =item Arguments: \%vals, \%attrs?
1370
1371 =item Return Value: $object
1372
1373 =back
1374
1375 Find an existing record from this resultset. If none exists, instantiate a new
1376 result object and return it. The object will not be saved into your storage
1377 until you call L<DBIx::Class::Row/insert> on it.
1378
1379 If you want objects to be saved immediately, use L</find_or_create> instead.
1380
1381 =cut
1382
1383 sub find_or_new {
1384   my $self     = shift;
1385   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1386   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1387   my $exists   = $self->find($hash, $attrs);
1388   return defined $exists ? $exists : $self->new_result($hash);
1389 }
1390
1391 =head2 create
1392
1393 =over 4
1394
1395 =item Arguments: \%vals
1396
1397 =item Return Value: $object
1398
1399 =back
1400
1401 Inserts a record into the resultset and returns the object representing it.
1402
1403 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1404
1405 =cut
1406
1407 sub create {
1408   my ($self, $attrs) = @_;
1409   $self->throw_exception( "create needs a hashref" )
1410     unless ref $attrs eq 'HASH';
1411   return $self->new_result($attrs)->insert;
1412 }
1413
1414 =head2 find_or_create
1415
1416 =over 4
1417
1418 =item Arguments: \%vals, \%attrs?
1419
1420 =item Return Value: $object
1421
1422 =back
1423
1424   $class->find_or_create({ key => $val, ... });
1425
1426 Tries to find a record based on its primary key or unique constraint; if none
1427 is found, creates one and returns that instead.
1428
1429   my $cd = $schema->resultset('CD')->find_or_create({
1430     cdid   => 5,
1431     artist => 'Massive Attack',
1432     title  => 'Mezzanine',
1433     year   => 2005,
1434   });
1435
1436 Also takes an optional C<key> attribute, to search by a specific key or unique
1437 constraint. For example:
1438
1439   my $cd = $schema->resultset('CD')->find_or_create(
1440     {
1441       artist => 'Massive Attack',
1442       title  => 'Mezzanine',
1443     },
1444     { key => 'cd_artist_title' }
1445   );
1446
1447 See also L</find> and L</update_or_create>. For information on how to declare
1448 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1449
1450 =cut
1451
1452 sub find_or_create {
1453   my $self     = shift;
1454   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1455   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1456   my $exists   = $self->find($hash, $attrs);
1457   return defined $exists ? $exists : $self->create($hash);
1458 }
1459
1460 =head2 update_or_create
1461
1462 =over 4
1463
1464 =item Arguments: \%col_values, { key => $unique_constraint }?
1465
1466 =item Return Value: $object
1467
1468 =back
1469
1470   $class->update_or_create({ col => $val, ... });
1471
1472 First, searches for an existing row matching one of the unique constraints
1473 (including the primary key) on the source of this resultset. If a row is
1474 found, updates it with the other given column values. Otherwise, creates a new
1475 row.
1476
1477 Takes an optional C<key> attribute to search on a specific unique constraint.
1478 For example:
1479
1480   # In your application
1481   my $cd = $schema->resultset('CD')->update_or_create(
1482     {
1483       artist => 'Massive Attack',
1484       title  => 'Mezzanine',
1485       year   => 1998,
1486     },
1487     { key => 'cd_artist_title' }
1488   );
1489
1490 If no C<key> is specified, it searches on all unique constraints defined on the
1491 source, including the primary key.
1492
1493 If the C<key> is specified as C<primary>, it searches only on the primary key.
1494
1495 See also L</find> and L</find_or_create>. For information on how to declare
1496 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1497
1498 =cut
1499
1500 sub update_or_create {
1501   my $self = shift;
1502   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1503   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1504
1505   my $row = $self->find($cond);
1506   if (defined $row) {
1507     $row->update($cond);
1508     return $row;
1509   }
1510
1511   return $self->create($cond);
1512 }
1513
1514 =head2 get_cache
1515
1516 =over 4
1517
1518 =item Arguments: none
1519
1520 =item Return Value: \@cache_objects?
1521
1522 =back
1523
1524 Gets the contents of the cache for the resultset, if the cache is set.
1525
1526 =cut
1527
1528 sub get_cache {
1529   shift->{all_cache};
1530 }
1531
1532 =head2 set_cache
1533
1534 =over 4
1535
1536 =item Arguments: \@cache_objects
1537
1538 =item Return Value: \@cache_objects
1539
1540 =back
1541
1542 Sets the contents of the cache for the resultset. Expects an arrayref
1543 of objects of the same class as those produced by the resultset. Note that
1544 if the cache is set the resultset will return the cached objects rather
1545 than re-querying the database even if the cache attr is not set.
1546
1547 =cut
1548
1549 sub set_cache {
1550   my ( $self, $data ) = @_;
1551   $self->throw_exception("set_cache requires an arrayref")
1552       if defined($data) && (ref $data ne 'ARRAY');
1553   $self->{all_cache} = $data;
1554 }
1555
1556 =head2 clear_cache
1557
1558 =over 4
1559
1560 =item Arguments: none
1561
1562 =item Return Value: []
1563
1564 =back
1565
1566 Clears the cache for the resultset.
1567
1568 =cut
1569
1570 sub clear_cache {
1571   shift->set_cache(undef);
1572 }
1573
1574 =head2 related_resultset
1575
1576 =over 4
1577
1578 =item Arguments: $relationship_name
1579
1580 =item Return Value: $resultset
1581
1582 =back
1583
1584 Returns a related resultset for the supplied relationship name.
1585
1586   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1587
1588 =cut
1589
1590 sub related_resultset {
1591   my ($self, $rel) = @_;
1592
1593   $self->{related_resultsets} ||= {};
1594   return $self->{related_resultsets}{$rel} ||= do {
1595     my $rel_obj = $self->result_source->relationship_info($rel);
1596
1597     $self->throw_exception(
1598       "search_related: result source '" . $self->result_source->name .
1599         "' has no such relationship $rel")
1600       unless $rel_obj;
1601
1602     my @live_join_stack = @{$self->{attrs}{_live_join_stack}||[]};
1603
1604     # XXX mst: I'm sure this is wrong, somehow
1605     #          something with complex joins early on could die on search_rel
1606     #          followed by a prefetch. I think. need a test case.
1607
1608     my $join_count = scalar(grep { $_ eq $rel } @live_join_stack);
1609     my $alias = $join_count ? join('_', $rel, $join_count+1) : $rel;
1610
1611     push(@live_join_stack, $rel);
1612
1613     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1614       undef, {
1615         select => undef,
1616         as => undef,
1617         alias => $alias,
1618         _live_join_stack => \@live_join_stack,
1619         _parent_attrs => $self->{attrs}}
1620     );
1621
1622     # keep reference of the original resultset
1623     $rs->{_parent_source} = $self->{_parent_source} || $self->result_source;
1624
1625     return $rs;
1626   };
1627 }
1628
1629 =head2 throw_exception
1630
1631 See L<DBIx::Class::Schema/throw_exception> for details.
1632
1633 =cut
1634
1635 sub throw_exception {
1636   my $self=shift;
1637   $self->result_source->schema->throw_exception(@_);
1638 }
1639
1640 # XXX: FIXME: Attributes docs need clearing up
1641
1642 =head1 ATTRIBUTES
1643
1644 The resultset takes various attributes that modify its behavior. Here's an
1645 overview of them:
1646
1647 =head2 order_by
1648
1649 =over 4
1650
1651 =item Value: ($order_by | \@order_by)
1652
1653 =back
1654
1655 Which column(s) to order the results by. This is currently passed
1656 through directly to SQL, so you can give e.g. C<year DESC> for a
1657 descending order on the column `year'.
1658
1659 Please note that if you have quoting enabled (see
1660 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1661 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1662 so you will need to manually quote things as appropriate.)
1663
1664 =head2 columns
1665
1666 =over 4
1667
1668 =item Value: \@columns
1669
1670 =back
1671
1672 Shortcut to request a particular set of columns to be retrieved.  Adds
1673 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1674 from that, then auto-populates C<as> from C<select> as normal. (You may also
1675 use the C<cols> attribute, as in earlier versions of DBIC.)
1676
1677 =head2 include_columns
1678
1679 =over 4
1680
1681 =item Value: \@columns
1682
1683 =back
1684
1685 Shortcut to include additional columns in the returned results - for example
1686
1687   $schema->resultset('CD')->search(undef, {
1688     include_columns => ['artist.name'],
1689     join => ['artist']
1690   });
1691
1692 would return all CDs and include a 'name' column to the information
1693 passed to object inflation
1694
1695 =head2 select
1696
1697 =over 4
1698
1699 =item Value: \@select_columns
1700
1701 =back
1702
1703 Indicates which columns should be selected from the storage. You can use
1704 column names, or in the case of RDBMS back ends, function or stored procedure
1705 names:
1706
1707   $rs = $schema->resultset('Employee')->search(undef, {
1708     select => [
1709       'name',
1710       { count => 'employeeid' },
1711       { sum => 'salary' }
1712     ]
1713   });
1714
1715 When you use function/stored procedure names and do not supply an C<as>
1716 attribute, the column names returned are storage-dependent. E.g. MySQL would
1717 return a column named C<count(employeeid)> in the above example.
1718
1719 =head2 +select
1720
1721 =over 4
1722
1723 Indicates additional columns to be selected from storage.  Works the same as
1724 L<select> but adds columns to the selection.
1725
1726 =back
1727
1728 =head2 +as
1729
1730 =over 4
1731
1732 Indicates additional column names for those added via L<+select>.
1733
1734 =back
1735
1736 =head2 as
1737
1738 =over 4
1739
1740 =item Value: \@inflation_names
1741
1742 =back
1743
1744 Indicates column names for object inflation. This is used in conjunction with
1745 C<select>, usually when C<select> contains one or more function or stored
1746 procedure names:
1747
1748   $rs = $schema->resultset('Employee')->search(undef, {
1749     select => [
1750       'name',
1751       { count => 'employeeid' }
1752     ],
1753     as => ['name', 'employee_count'],
1754   });
1755
1756   my $employee = $rs->first(); # get the first Employee
1757
1758 If the object against which the search is performed already has an accessor
1759 matching a column name specified in C<as>, the value can be retrieved using
1760 the accessor as normal:
1761
1762   my $name = $employee->name();
1763
1764 If on the other hand an accessor does not exist in the object, you need to
1765 use C<get_column> instead:
1766
1767   my $employee_count = $employee->get_column('employee_count');
1768
1769 You can create your own accessors if required - see
1770 L<DBIx::Class::Manual::Cookbook> for details.
1771
1772 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1773 produced, it is used for internal access only. Thus attempting to use the accessor
1774 in an C<order_by> clause or similar will fail misrably.
1775
1776 =head2 join
1777
1778 =over 4
1779
1780 =item Value: ($rel_name | \@rel_names | \%rel_names)
1781
1782 =back
1783
1784 Contains a list of relationships that should be joined for this query.  For
1785 example:
1786
1787   # Get CDs by Nine Inch Nails
1788   my $rs = $schema->resultset('CD')->search(
1789     { 'artist.name' => 'Nine Inch Nails' },
1790     { join => 'artist' }
1791   );
1792
1793 Can also contain a hash reference to refer to the other relation's relations.
1794 For example:
1795
1796   package MyApp::Schema::Track;
1797   use base qw/DBIx::Class/;
1798   __PACKAGE__->table('track');
1799   __PACKAGE__->add_columns(qw/trackid cd position title/);
1800   __PACKAGE__->set_primary_key('trackid');
1801   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1802   1;
1803
1804   # In your application
1805   my $rs = $schema->resultset('Artist')->search(
1806     { 'track.title' => 'Teardrop' },
1807     {
1808       join     => { cd => 'track' },
1809       order_by => 'artist.name',
1810     }
1811   );
1812
1813 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1814 similarly for a third time). For e.g.
1815
1816   my $rs = $schema->resultset('Artist')->search({
1817     'cds.title'   => 'Down to Earth',
1818     'cds_2.title' => 'Popular',
1819   }, {
1820     join => [ qw/cds cds/ ],
1821   });
1822
1823 will return a set of all artists that have both a cd with title 'Down
1824 to Earth' and a cd with title 'Popular'.
1825
1826 If you want to fetch related objects from other tables as well, see C<prefetch>
1827 below.
1828
1829 =head2 prefetch
1830
1831 =over 4
1832
1833 =item Value: ($rel_name | \@rel_names | \%rel_names)
1834
1835 =back
1836
1837 Contains one or more relationships that should be fetched along with the main
1838 query (when they are accessed afterwards they will have already been
1839 "prefetched").  This is useful for when you know you will need the related
1840 objects, because it saves at least one query:
1841
1842   my $rs = $schema->resultset('Tag')->search(
1843     undef,
1844     {
1845       prefetch => {
1846         cd => 'artist'
1847       }
1848     }
1849   );
1850
1851 The initial search results in SQL like the following:
1852
1853   SELECT tag.*, cd.*, artist.* FROM tag
1854   JOIN cd ON tag.cd = cd.cdid
1855   JOIN artist ON cd.artist = artist.artistid
1856
1857 L<DBIx::Class> has no need to go back to the database when we access the
1858 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1859 case.
1860
1861 Simple prefetches will be joined automatically, so there is no need
1862 for a C<join> attribute in the above search. If you're prefetching to
1863 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1864 specify the join as well.
1865
1866 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1867 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1868 with an accessor type of 'single' or 'filter').
1869
1870 =head2 page
1871
1872 =over 4
1873
1874 =item Value: $page
1875
1876 =back
1877
1878 Makes the resultset paged and specifies the page to retrieve. Effectively
1879 identical to creating a non-pages resultset and then calling ->page($page)
1880 on it.
1881
1882 If L<rows> attribute is not specified it defualts to 10 rows per page.
1883
1884 =head2 rows
1885
1886 =over 4
1887
1888 =item Value: $rows
1889
1890 =back
1891
1892 Specifes the maximum number of rows for direct retrieval or the number of
1893 rows per page if the page attribute or method is used.
1894
1895 =head2 offset
1896
1897 =over 4
1898
1899 =item Value: $offset
1900
1901 =back
1902
1903 Specifies the (zero-based) row number for the  first row to be returned, or the
1904 of the first row of the first page if paging is used.
1905
1906 =head2 group_by
1907
1908 =over 4
1909
1910 =item Value: \@columns
1911
1912 =back
1913
1914 A arrayref of columns to group by. Can include columns of joined tables.
1915
1916   group_by => [qw/ column1 column2 ... /]
1917
1918 =head2 having
1919
1920 =over 4
1921
1922 =item Value: $condition
1923
1924 =back
1925
1926 HAVING is a select statement attribute that is applied between GROUP BY and
1927 ORDER BY. It is applied to the after the grouping calculations have been
1928 done.
1929
1930   having => { 'count(employee)' => { '>=', 100 } }
1931
1932 =head2 distinct
1933
1934 =over 4
1935
1936 =item Value: (0 | 1)
1937
1938 =back
1939
1940 Set to 1 to group by all columns.
1941
1942 =head2 where
1943
1944 =over 4
1945
1946 Adds to the WHERE clause.
1947
1948   # only return rows WHERE deleted IS NULL for all searches
1949   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1950
1951 Can be overridden by passing C<{ where => undef }> as an attribute
1952 to a resulset.
1953
1954 =back
1955
1956 =head2 cache
1957
1958 Set to 1 to cache search results. This prevents extra SQL queries if you
1959 revisit rows in your ResultSet:
1960
1961   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1962
1963   while( my $artist = $resultset->next ) {
1964     ... do stuff ...
1965   }
1966
1967   $rs->first; # without cache, this would issue a query
1968
1969 By default, searches are not cached.
1970
1971 For more examples of using these attributes, see
1972 L<DBIx::Class::Manual::Cookbook>.
1973
1974 =head2 from
1975
1976 =over 4
1977
1978 =item Value: \@from_clause
1979
1980 =back
1981
1982 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1983 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1984 clauses.
1985
1986 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1987
1988 C<join> will usually do what you need and it is strongly recommended that you
1989 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1990 And we really do mean "cannot", not just tried and failed. Attempting to use
1991 this because you're having problems with C<join> is like trying to use x86
1992 ASM because you've got a syntax error in your C. Trust us on this.
1993
1994 Now, if you're still really, really sure you need to use this (and if you're
1995 not 100% sure, ask the mailing list first), here's an explanation of how this
1996 works.
1997
1998 The syntax is as follows -
1999
2000   [
2001     { <alias1> => <table1> },
2002     [
2003       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2004       [], # nested JOIN (optional)
2005       { <table1.column1> => <table2.column2>, ... (more conditions) },
2006     ],
2007     # More of the above [ ] may follow for additional joins
2008   ]
2009
2010   <table1> <alias1>
2011   JOIN
2012     <table2> <alias2>
2013     [JOIN ...]
2014   ON <table1.column1> = <table2.column2>
2015   <more joins may follow>
2016
2017 An easy way to follow the examples below is to remember the following:
2018
2019     Anything inside "[]" is a JOIN
2020     Anything inside "{}" is a condition for the enclosing JOIN
2021
2022 The following examples utilize a "person" table in a family tree application.
2023 In order to express parent->child relationships, this table is self-joined:
2024
2025     # Person->belongs_to('father' => 'Person');
2026     # Person->belongs_to('mother' => 'Person');
2027
2028 C<from> can be used to nest joins. Here we return all children with a father,
2029 then search against all mothers of those children:
2030
2031   $rs = $schema->resultset('Person')->search(
2032       undef,
2033       {
2034           alias => 'mother', # alias columns in accordance with "from"
2035           from => [
2036               { mother => 'person' },
2037               [
2038                   [
2039                       { child => 'person' },
2040                       [
2041                           { father => 'person' },
2042                           { 'father.person_id' => 'child.father_id' }
2043                       ]
2044                   ],
2045                   { 'mother.person_id' => 'child.mother_id' }
2046               ],
2047           ]
2048       },
2049   );
2050
2051   # Equivalent SQL:
2052   # SELECT mother.* FROM person mother
2053   # JOIN (
2054   #   person child
2055   #   JOIN person father
2056   #   ON ( father.person_id = child.father_id )
2057   # )
2058   # ON ( mother.person_id = child.mother_id )
2059
2060 The type of any join can be controlled manually. To search against only people
2061 with a father in the person table, we could explicitly use C<INNER JOIN>:
2062
2063     $rs = $schema->resultset('Person')->search(
2064         undef,
2065         {
2066             alias => 'child', # alias columns in accordance with "from"
2067             from => [
2068                 { child => 'person' },
2069                 [
2070                     { father => 'person', -join_type => 'inner' },
2071                     { 'father.id' => 'child.father_id' }
2072                 ],
2073             ]
2074         },
2075     );
2076
2077     # Equivalent SQL:
2078     # SELECT child.* FROM person child
2079     # INNER JOIN person father ON child.father_id = father.id
2080
2081 =cut
2082
2083 1;