9367b3cb38f317ecfb025e8519329d86037f03fb
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see
139 L</ATTRIBUTES>. For more examples of using this function, see
140 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
141 documentation for the first argument, see L<SQL::Abstract>.
142
143 =cut
144
145 sub search {
146   my $self = shift;
147   my $rs = $self->search_rs( @_ );
148   return (wantarray ? $rs->all : $rs);
149 }
150
151 =head2 search_rs
152
153 =over 4
154
155 =item Arguments: $cond, \%attrs?
156
157 =item Return Value: $resultset
158
159 =back
160
161 This method does the same exact thing as search() except it will
162 always return a resultset, even in list context.
163
164 =cut
165
166 sub search_rs {
167   my $self = shift;
168
169   my $rows;
170
171   unless (@_) {                 # no search, effectively just a clone
172     $rows = $self->get_cache;
173   }
174
175   my $attrs = {};
176   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
177   my $our_attrs = { %{$self->{attrs}} };
178   my $having = delete $our_attrs->{having};
179   my $where = delete $our_attrs->{where};
180
181   my $new_attrs = { %{$our_attrs}, %{$attrs} };
182
183   # merge new attrs into inherited
184   foreach my $key (qw/join prefetch/) {
185     next unless exists $attrs->{$key};
186     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
187   }
188
189   my $cond = (@_
190     ? (
191         (@_ == 1 || ref $_[0] eq "HASH")
192           ? (
193               (ref $_[0] eq 'HASH')
194                 ? (
195                     (keys %{ $_[0] }  > 0)
196                       ? shift
197                       : undef
198                    )
199                 :  shift
200              )
201           : (
202               (@_ % 2)
203                 ? $self->throw_exception("Odd number of arguments to search")
204                 : {@_}
205              )
206       )
207     : undef
208   );
209
210   if (defined $where) {
211     $new_attrs->{where} = (
212       defined $new_attrs->{where}
213         ? { '-and' => [
214               map {
215                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
216               } $where, $new_attrs->{where}
217             ]
218           }
219         : $where);
220   }
221
222   if (defined $cond) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $cond, $new_attrs->{where}
229             ]
230           }
231         : $cond);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->result_source, $new_attrs);
247   if ($rows) {
248     $rs->set_cache($rows);
249   }
250   return $rs;
251 }
252
253 =head2 search_literal
254
255 =over 4
256
257 =item Arguments: $sql_fragment, @bind_values
258
259 =item Return Value: $resultset (scalar context), @row_objs (list context)
260
261 =back
262
263   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
264   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
265
266 Pass a literal chunk of SQL to be added to the conditional part of the
267 resultset query.
268
269 =cut
270
271 sub search_literal {
272   my ($self, $cond, @vals) = @_;
273   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
274   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
275   return $self->search(\$cond, $attrs);
276 }
277
278 =head2 find
279
280 =over 4
281
282 =item Arguments: @values | \%cols, \%attrs?
283
284 =item Return Value: $row_object
285
286 =back
287
288 Finds a row based on its primary key or unique constraint. For example, to find
289 a row by its primary key:
290
291   my $cd = $schema->resultset('CD')->find(5);
292
293 You can also find a row by a specific unique constraint using the C<key>
294 attribute. For example:
295
296   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
297     key => 'cd_artist_title'
298   });
299
300 Additionally, you can specify the columns explicitly by name:
301
302   my $cd = $schema->resultset('CD')->find(
303     {
304       artist => 'Massive Attack',
305       title  => 'Mezzanine',
306     },
307     { key => 'cd_artist_title' }
308   );
309
310 If the C<key> is specified as C<primary>, it searches only on the primary key.
311
312 If no C<key> is specified, it searches on all unique constraints defined on the
313 source, including the primary key.
314
315 If your table does not have a primary key, you B<must> provide a value for the
316 C<key> attribute matching one of the unique constraints on the source.
317
318 See also L</find_or_create> and L</update_or_create>. For information on how to
319 declare unique constraints, see
320 L<DBIx::Class::ResultSource/add_unique_constraint>.
321
322 =cut
323
324 sub find {
325   my $self = shift;
326   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
327
328   # Default to the primary key, but allow a specific key
329   my @cols = exists $attrs->{key}
330     ? $self->result_source->unique_constraint_columns($attrs->{key})
331     : $self->result_source->primary_columns;
332   $self->throw_exception(
333     "Can't find unless a primary key is defined or unique constraint is specified"
334   ) unless @cols;
335
336   # Parse out a hashref from input
337   my $input_query;
338   if (ref $_[0] eq 'HASH') {
339     $input_query = { %{$_[0]} };
340   }
341   elsif (@_ == @cols) {
342     $input_query = {};
343     @{$input_query}{@cols} = @_;
344   }
345   else {
346     # Compatibility: Allow e.g. find(id => $value)
347     carp "Find by key => value deprecated; please use a hashref instead";
348     $input_query = {@_};
349   }
350
351   my (%related, $info);
352
353   foreach my $key (keys %$input_query) {
354     if (ref($input_query->{$key})
355         && ($info = $self->result_source->relationship_info($key))) {
356       my $rel_q = $self->result_source->resolve_condition(
357                     $info->{cond}, delete $input_query->{$key}, $key
358                   );
359       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
360       @related{keys %$rel_q} = values %$rel_q;
361     }
362   }
363   if (my @keys = keys %related) {
364     @{$input_query}{@keys} = values %related;
365   }
366
367   my @unique_queries = $self->_unique_queries($input_query, $attrs);
368
369   # Build the final query: Default to the disjunction of the unique queries,
370   # but allow the input query in case the ResultSet defines the query or the
371   # user is abusing find
372   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
373   my $query = @unique_queries
374     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
375     : $self->_add_alias($input_query, $alias);
376
377   # Run the query
378   if (keys %$attrs) {
379     my $rs = $self->search($query, $attrs);
380     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
381   }
382   else {
383     return keys %{$self->_resolved_attrs->{collapse}}
384       ? $self->search($query)->next
385       : $self->single($query);
386   }
387 }
388
389 # _add_alias
390 #
391 # Add the specified alias to the specified query hash. A copy is made so the
392 # original query is not modified.
393
394 sub _add_alias {
395   my ($self, $query, $alias) = @_;
396
397   my %aliased = %$query;
398   foreach my $col (grep { ! m/\./ } keys %aliased) {
399     $aliased{"$alias.$col"} = delete $aliased{$col};
400   }
401
402   return \%aliased;
403 }
404
405 # _unique_queries
406 #
407 # Build a list of queries which satisfy unique constraints.
408
409 sub _unique_queries {
410   my ($self, $query, $attrs) = @_;
411
412   my @constraint_names = exists $attrs->{key}
413     ? ($attrs->{key})
414     : $self->result_source->unique_constraint_names;
415
416   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
417   my $num_where = scalar keys %$where;
418
419   my @unique_queries;
420   foreach my $name (@constraint_names) {
421     my @unique_cols = $self->result_source->unique_constraint_columns($name);
422     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
423
424     my $num_cols = scalar @unique_cols;
425     my $num_query = scalar keys %$unique_query;
426
427     my $total = $num_query + $num_where;
428     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
429       # The query is either unique on its own or is unique in combination with
430       # the existing where clause
431       push @unique_queries, $unique_query;
432     }
433   }
434
435   return @unique_queries;
436 }
437
438 # _build_unique_query
439 #
440 # Constrain the specified query hash based on the specified column names.
441
442 sub _build_unique_query {
443   my ($self, $query, $unique_cols) = @_;
444
445   return {
446     map  { $_ => $query->{$_} }
447     grep { exists $query->{$_} }
448       @$unique_cols
449   };
450 }
451
452 =head2 search_related
453
454 =over 4
455
456 =item Arguments: $rel, $cond, \%attrs?
457
458 =item Return Value: $new_resultset
459
460 =back
461
462   $new_rs = $cd_rs->search_related('artist', {
463     name => 'Emo-R-Us',
464   });
465
466 Searches the specified relationship, optionally specifying a condition and
467 attributes for matching records. See L</ATTRIBUTES> for more information.
468
469 =cut
470
471 sub search_related {
472   return shift->related_resultset(shift)->search(@_);
473 }
474
475 =head2 cursor
476
477 =over 4
478
479 =item Arguments: none
480
481 =item Return Value: $cursor
482
483 =back
484
485 Returns a storage-driven cursor to the given resultset. See
486 L<DBIx::Class::Cursor> for more information.
487
488 =cut
489
490 sub cursor {
491   my ($self) = @_;
492
493   my $attrs = { %{$self->_resolved_attrs} };
494   return $self->{cursor}
495     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
496           $attrs->{where},$attrs);
497 }
498
499 =head2 single
500
501 =over 4
502
503 =item Arguments: $cond?
504
505 =item Return Value: $row_object?
506
507 =back
508
509   my $cd = $schema->resultset('CD')->single({ year => 2001 });
510
511 Inflates the first result without creating a cursor if the resultset has
512 any records in it; if not returns nothing. Used by L</find> as an optimisation.
513
514 Can optionally take an additional condition *only* - this is a fast-code-path
515 method; if you need to add extra joins or similar call ->search and then
516 ->single without a condition on the $rs returned from that.
517
518 =cut
519
520 sub single {
521   my ($self, $where) = @_;
522   my $attrs = { %{$self->_resolved_attrs} };
523   if ($where) {
524     if (defined $attrs->{where}) {
525       $attrs->{where} = {
526         '-and' =>
527             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
528                $where, delete $attrs->{where} ]
529       };
530     } else {
531       $attrs->{where} = $where;
532     }
533   }
534
535 #  XXX: Disabled since it doesn't infer uniqueness in all cases
536 #  unless ($self->_is_unique_query($attrs->{where})) {
537 #    carp "Query not guaranteed to return a single row"
538 #      . "; please declare your unique constraints or use search instead";
539 #  }
540
541   my @data = $self->result_source->storage->select_single(
542     $attrs->{from}, $attrs->{select},
543     $attrs->{where}, $attrs
544   );
545
546   return (@data ? ($self->_construct_object(@data))[0] : ());
547 }
548
549 # _is_unique_query
550 #
551 # Try to determine if the specified query is guaranteed to be unique, based on
552 # the declared unique constraints.
553
554 sub _is_unique_query {
555   my ($self, $query) = @_;
556
557   my $collapsed = $self->_collapse_query($query);
558   my $alias = $self->{attrs}{alias};
559
560   foreach my $name ($self->result_source->unique_constraint_names) {
561     my @unique_cols = map {
562       "$alias.$_"
563     } $self->result_source->unique_constraint_columns($name);
564
565     # Count the values for each unique column
566     my %seen = map { $_ => 0 } @unique_cols;
567
568     foreach my $key (keys %$collapsed) {
569       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
570       next unless exists $seen{$aliased};  # Additional constraints are okay
571       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
572     }
573
574     # If we get 0 or more than 1 value for a column, it's not necessarily unique
575     return 1 unless grep { $_ != 1 } values %seen;
576   }
577
578   return 0;
579 }
580
581 # _collapse_query
582 #
583 # Recursively collapse the query, accumulating values for each column.
584
585 sub _collapse_query {
586   my ($self, $query, $collapsed) = @_;
587
588   $collapsed ||= {};
589
590   if (ref $query eq 'ARRAY') {
591     foreach my $subquery (@$query) {
592       next unless ref $subquery;  # -or
593 #      warn "ARRAY: " . Dumper $subquery;
594       $collapsed = $self->_collapse_query($subquery, $collapsed);
595     }
596   }
597   elsif (ref $query eq 'HASH') {
598     if (keys %$query and (keys %$query)[0] eq '-and') {
599       foreach my $subquery (@{$query->{-and}}) {
600 #        warn "HASH: " . Dumper $subquery;
601         $collapsed = $self->_collapse_query($subquery, $collapsed);
602       }
603     }
604     else {
605 #      warn "LEAF: " . Dumper $query;
606       foreach my $col (keys %$query) {
607         my $value = $query->{$col};
608         $collapsed->{$col}{$value}++;
609       }
610     }
611   }
612
613   return $collapsed;
614 }
615
616 =head2 get_column
617
618 =over 4
619
620 =item Arguments: $cond?
621
622 =item Return Value: $resultsetcolumn
623
624 =back
625
626   my $max_length = $rs->get_column('length')->max;
627
628 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
629
630 =cut
631
632 sub get_column {
633   my ($self, $column) = @_;
634   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
635   return $new;
636 }
637
638 =head2 search_like
639
640 =over 4
641
642 =item Arguments: $cond, \%attrs?
643
644 =item Return Value: $resultset (scalar context), @row_objs (list context)
645
646 =back
647
648   # WHERE title LIKE '%blue%'
649   $cd_rs = $rs->search_like({ title => '%blue%'});
650
651 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
652 that this is simply a convenience method. You most likely want to use
653 L</search> with specific operators.
654
655 For more information, see L<DBIx::Class::Manual::Cookbook>.
656
657 =cut
658
659 sub search_like {
660   my $class = shift;
661   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
662   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
663   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
664   return $class->search($query, { %$attrs });
665 }
666
667 =head2 slice
668
669 =over 4
670
671 =item Arguments: $first, $last
672
673 =item Return Value: $resultset (scalar context), @row_objs (list context)
674
675 =back
676
677 Returns a resultset or object list representing a subset of elements from the
678 resultset slice is called on. Indexes are from 0, i.e., to get the first
679 three records, call:
680
681   my ($one, $two, $three) = $rs->slice(0, 2);
682
683 =cut
684
685 sub slice {
686   my ($self, $min, $max) = @_;
687   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
688   $attrs->{offset} = $self->{attrs}{offset} || 0;
689   $attrs->{offset} += $min;
690   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
691   return $self->search(undef(), $attrs);
692   #my $slice = (ref $self)->new($self->result_source, $attrs);
693   #return (wantarray ? $slice->all : $slice);
694 }
695
696 =head2 next
697
698 =over 4
699
700 =item Arguments: none
701
702 =item Return Value: $result?
703
704 =back
705
706 Returns the next element in the resultset (C<undef> is there is none).
707
708 Can be used to efficiently iterate over records in the resultset:
709
710   my $rs = $schema->resultset('CD')->search;
711   while (my $cd = $rs->next) {
712     print $cd->title;
713   }
714
715 Note that you need to store the resultset object, and call C<next> on it.
716 Calling C<< resultset('Table')->next >> repeatedly will always return the
717 first record from the resultset.
718
719 =cut
720
721 sub next {
722   my ($self) = @_;
723   if (my $cache = $self->get_cache) {
724     $self->{all_cache_position} ||= 0;
725     return $cache->[$self->{all_cache_position}++];
726   }
727   if ($self->{attrs}{cache}) {
728     $self->{all_cache_position} = 1;
729     return ($self->all)[0];
730   }
731   if ($self->{stashed_objects}) {
732     my $obj = shift(@{$self->{stashed_objects}});
733     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
734     return $obj;
735   }
736   my @row = (
737     exists $self->{stashed_row}
738       ? @{delete $self->{stashed_row}}
739       : $self->cursor->next
740   );
741   return unless (@row);
742   my ($row, @more) = $self->_construct_object(@row);
743   $self->{stashed_objects} = \@more if @more;
744   return $row;
745 }
746
747 sub _construct_object {
748   my ($self, @row) = @_;
749   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
750   my @new = $self->result_class->inflate_result($self->result_source, @$info);
751   @new = $self->{_attrs}{record_filter}->(@new)
752     if exists $self->{_attrs}{record_filter};
753   return @new;
754 }
755
756 sub _collapse_result {
757   my ($self, $as_proto, $row) = @_;
758
759   my @copy = @$row;
760
761   # 'foo'         => [ undef, 'foo' ]
762   # 'foo.bar'     => [ 'foo', 'bar' ]
763   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
764
765   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
766
767   my %collapse = %{$self->{_attrs}{collapse}||{}};
768
769   my @pri_index;
770
771   # if we're doing collapsing (has_many prefetch) we need to grab records
772   # until the PK changes, so fill @pri_index. if not, we leave it empty so
773   # we know we don't have to bother.
774
775   # the reason for not using the collapse stuff directly is because if you
776   # had for e.g. two artists in a row with no cds, the collapse info for
777   # both would be NULL (undef) so you'd lose the second artist
778
779   # store just the index so we can check the array positions from the row
780   # without having to contruct the full hash
781
782   if (keys %collapse) {
783     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
784     foreach my $i (0 .. $#construct_as) {
785       if (delete $pri{$construct_as[$i]}) {
786         push(@pri_index, $i);
787       }
788       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
789     }
790   }
791
792   # no need to do an if, it'll be empty if @pri_index is empty anyway
793
794   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
795
796   my %const;
797
798   do { # no need to check anything at the front, we always want the first row
799   
800     foreach my $this_as (@construct_as) {
801       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
802     }
803
804   } until ( # no pri_index => no collapse => drop straight out
805       !@pri_index
806     or
807       do { # get another row, stash it, drop out if different PK
808
809         @copy = $self->cursor->next;
810         $self->{stashed_row} = \@copy;
811
812         # last thing in do block, counts as true if anything doesn't match
813
814         # check xor defined first for NULL vs. NOT NULL then if one is
815         # defined the other must be so check string equality
816
817         grep {
818           (defined $pri_vals{$_} ^ defined $copy[$_])
819           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
820         } @pri_index;
821       }
822   );
823
824   # THIS BIT STILL NEEDS TO DO THE COLLAPSE
825
826   my $alias = $self->{attrs}{alias};
827   my $info = [ {}, {} ];
828   foreach my $key (keys %const) {
829     if (length $key && $key ne $alias) {
830       my $target = $info;
831       my @parts = split(/\./, $key);
832       foreach my $p (@parts) {
833         $target = $target->[1]->{$p} ||= [];
834       }
835       $target->[0] = $const{$key};
836     } else {
837       $info->[0] = $const{$key};
838     }
839   }
840
841   return $info;
842 }
843
844 =head2 result_source
845
846 =over 4
847
848 =item Arguments: $result_source?
849
850 =item Return Value: $result_source
851
852 =back
853
854 An accessor for the primary ResultSource object from which this ResultSet
855 is derived.
856
857 =head2 result_class
858
859 =over 4
860
861 =item Arguments: $result_class?
862
863 =item Return Value: $result_class
864
865 =back
866
867 An accessor for the class to use when creating row objects. Defaults to 
868 C<< result_source->result_class >> - which in most cases is the name of the 
869 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
870
871 =cut
872
873
874 =head2 count
875
876 =over 4
877
878 =item Arguments: $cond, \%attrs??
879
880 =item Return Value: $count
881
882 =back
883
884 Performs an SQL C<COUNT> with the same query as the resultset was built
885 with to find the number of elements. If passed arguments, does a search
886 on the resultset and counts the results of that.
887
888 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
889 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
890 not support C<DISTINCT> with multiple columns. If you are using such a
891 database, you should only use columns from the main table in your C<group_by>
892 clause.
893
894 =cut
895
896 sub count {
897   my $self = shift;
898   return $self->search(@_)->count if @_ and defined $_[0];
899   return scalar @{ $self->get_cache } if $self->get_cache;
900   my $count = $self->_count;
901   return 0 unless $count;
902
903   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
904   $count = $self->{attrs}{rows} if
905     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
906   return $count;
907 }
908
909 sub _count { # Separated out so pager can get the full count
910   my $self = shift;
911   my $select = { count => '*' };
912
913   my $attrs = { %{$self->_resolved_attrs} };
914   if (my $group_by = delete $attrs->{group_by}) {
915     delete $attrs->{having};
916     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
917     # todo: try CONCAT for multi-column pk
918     my @pk = $self->result_source->primary_columns;
919     if (@pk == 1) {
920       my $alias = $attrs->{alias};
921       foreach my $column (@distinct) {
922         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
923           @distinct = ($column);
924           last;
925         }
926       }
927     }
928
929     $select = { count => { distinct => \@distinct } };
930   }
931
932   $attrs->{select} = $select;
933   $attrs->{as} = [qw/count/];
934
935   # offset, order by and page are not needed to count. record_filter is cdbi
936   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
937
938   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
939   my ($count) = $tmp_rs->cursor->next;
940   return $count;
941 }
942
943 =head2 count_literal
944
945 =over 4
946
947 =item Arguments: $sql_fragment, @bind_values
948
949 =item Return Value: $count
950
951 =back
952
953 Counts the results in a literal query. Equivalent to calling L</search_literal>
954 with the passed arguments, then L</count>.
955
956 =cut
957
958 sub count_literal { shift->search_literal(@_)->count; }
959
960 =head2 all
961
962 =over 4
963
964 =item Arguments: none
965
966 =item Return Value: @objects
967
968 =back
969
970 Returns all elements in the resultset. Called implicitly if the resultset
971 is returned in list context.
972
973 =cut
974
975 sub all {
976   my ($self) = @_;
977   return @{ $self->get_cache } if $self->get_cache;
978
979   my @obj;
980
981   # TODO: don't call resolve here
982   if (keys %{$self->_resolved_attrs->{collapse}}) {
983 #  if ($self->{attrs}{prefetch}) {
984       # Using $self->cursor->all is really just an optimisation.
985       # If we're collapsing has_many prefetches it probably makes
986       # very little difference, and this is cleaner than hacking
987       # _construct_object to survive the approach
988     my @row = $self->cursor->next;
989     while (@row) {
990       push(@obj, $self->_construct_object(@row));
991       @row = (exists $self->{stashed_row}
992                ? @{delete $self->{stashed_row}}
993                : $self->cursor->next);
994     }
995   } else {
996     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
997   }
998
999   $self->set_cache(\@obj) if $self->{attrs}{cache};
1000   return @obj;
1001 }
1002
1003 =head2 reset
1004
1005 =over 4
1006
1007 =item Arguments: none
1008
1009 =item Return Value: $self
1010
1011 =back
1012
1013 Resets the resultset's cursor, so you can iterate through the elements again.
1014
1015 =cut
1016
1017 sub reset {
1018   my ($self) = @_;
1019   delete $self->{_attrs} if exists $self->{_attrs};
1020   $self->{all_cache_position} = 0;
1021   $self->cursor->reset;
1022   return $self;
1023 }
1024
1025 =head2 first
1026
1027 =over 4
1028
1029 =item Arguments: none
1030
1031 =item Return Value: $object?
1032
1033 =back
1034
1035 Resets the resultset and returns an object for the first result (if the
1036 resultset returns anything).
1037
1038 =cut
1039
1040 sub first {
1041   return $_[0]->reset->next;
1042 }
1043
1044 # _cond_for_update_delete
1045 #
1046 # update/delete require the condition to be modified to handle
1047 # the differing SQL syntax available.  This transforms the $self->{cond}
1048 # appropriately, returning the new condition.
1049
1050 sub _cond_for_update_delete {
1051   my ($self, $full_cond) = @_;
1052   my $cond = {};
1053
1054   $full_cond ||= $self->{cond};
1055   # No-op. No condition, we're updating/deleting everything
1056   return $cond unless ref $full_cond;
1057
1058   if (ref $full_cond eq 'ARRAY') {
1059     $cond = [
1060       map {
1061         my %hash;
1062         foreach my $key (keys %{$_}) {
1063           $key =~ /([^.]+)$/;
1064           $hash{$1} = $_->{$key};
1065         }
1066         \%hash;
1067       } @{$full_cond}
1068     ];
1069   }
1070   elsif (ref $full_cond eq 'HASH') {
1071     if ((keys %{$full_cond})[0] eq '-and') {
1072       $cond->{-and} = [];
1073
1074       my @cond = @{$full_cond->{-and}};
1075       for (my $i = 0; $i < @cond; $i++) {
1076         my $entry = $cond[$i];
1077
1078         my $hash;
1079         if (ref $entry eq 'HASH') {
1080           $hash = $self->_cond_for_update_delete($entry);
1081         }
1082         else {
1083           $entry =~ /([^.]+)$/;
1084           $hash->{$1} = $cond[++$i];
1085         }
1086
1087         push @{$cond->{-and}}, $hash;
1088       }
1089     }
1090     else {
1091       foreach my $key (keys %{$full_cond}) {
1092         $key =~ /([^.]+)$/;
1093         $cond->{$1} = $full_cond->{$key};
1094       }
1095     }
1096   }
1097   else {
1098     $self->throw_exception(
1099       "Can't update/delete on resultset with condition unless hash or array"
1100     );
1101   }
1102
1103   return $cond;
1104 }
1105
1106
1107 =head2 update
1108
1109 =over 4
1110
1111 =item Arguments: \%values
1112
1113 =item Return Value: $storage_rv
1114
1115 =back
1116
1117 Sets the specified columns in the resultset to the supplied values in a
1118 single query. Return value will be true if the update succeeded or false
1119 if no records were updated; exact type of success value is storage-dependent.
1120
1121 =cut
1122
1123 sub update {
1124   my ($self, $values) = @_;
1125   $self->throw_exception("Values for update must be a hash")
1126     unless ref $values eq 'HASH';
1127
1128   my $cond = $self->_cond_for_update_delete;
1129    
1130   return $self->result_source->storage->update(
1131     $self->result_source, $values, $cond
1132   );
1133 }
1134
1135 =head2 update_all
1136
1137 =over 4
1138
1139 =item Arguments: \%values
1140
1141 =item Return Value: 1
1142
1143 =back
1144
1145 Fetches all objects and updates them one at a time. Note that C<update_all>
1146 will run DBIC cascade triggers, while L</update> will not.
1147
1148 =cut
1149
1150 sub update_all {
1151   my ($self, $values) = @_;
1152   $self->throw_exception("Values for update must be a hash")
1153     unless ref $values eq 'HASH';
1154   foreach my $obj ($self->all) {
1155     $obj->set_columns($values)->update;
1156   }
1157   return 1;
1158 }
1159
1160 =head2 delete
1161
1162 =over 4
1163
1164 =item Arguments: none
1165
1166 =item Return Value: 1
1167
1168 =back
1169
1170 Deletes the contents of the resultset from its result source. Note that this
1171 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1172 to run. See also L<DBIx::Class::Row/delete>.
1173
1174 =cut
1175
1176 sub delete {
1177   my ($self) = @_;
1178
1179   my $cond = $self->_cond_for_update_delete;
1180
1181   $self->result_source->storage->delete($self->result_source, $cond);
1182   return 1;
1183 }
1184
1185 =head2 delete_all
1186
1187 =over 4
1188
1189 =item Arguments: none
1190
1191 =item Return Value: 1
1192
1193 =back
1194
1195 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1196 will run DBIC cascade triggers, while L</delete> will not.
1197
1198 =cut
1199
1200 sub delete_all {
1201   my ($self) = @_;
1202   $_->delete for $self->all;
1203   return 1;
1204 }
1205
1206 =head2 pager
1207
1208 =over 4
1209
1210 =item Arguments: none
1211
1212 =item Return Value: $pager
1213
1214 =back
1215
1216 Return Value a L<Data::Page> object for the current resultset. Only makes
1217 sense for queries with a C<page> attribute.
1218
1219 =cut
1220
1221 sub pager {
1222   my ($self) = @_;
1223   my $attrs = $self->{attrs};
1224   $self->throw_exception("Can't create pager for non-paged rs")
1225     unless $self->{attrs}{page};
1226   $attrs->{rows} ||= 10;
1227   return $self->{pager} ||= Data::Page->new(
1228     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1229 }
1230
1231 =head2 page
1232
1233 =over 4
1234
1235 =item Arguments: $page_number
1236
1237 =item Return Value: $rs
1238
1239 =back
1240
1241 Returns a resultset for the $page_number page of the resultset on which page
1242 is called, where each page contains a number of rows equal to the 'rows'
1243 attribute set on the resultset (10 by default).
1244
1245 =cut
1246
1247 sub page {
1248   my ($self, $page) = @_;
1249   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1250 }
1251
1252 =head2 new_result
1253
1254 =over 4
1255
1256 =item Arguments: \%vals
1257
1258 =item Return Value: $object
1259
1260 =back
1261
1262 Creates an object in the resultset's result class and returns it.
1263
1264 =cut
1265
1266 sub new_result {
1267   my ($self, $values) = @_;
1268   $self->throw_exception( "new_result needs a hash" )
1269     unless (ref $values eq 'HASH');
1270   $self->throw_exception(
1271     "Can't abstract implicit construct, condition not a hash"
1272   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1273
1274   my $alias = $self->{attrs}{alias};
1275   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1276   my %new = (
1277     %{ $self->_remove_alias($values, $alias) },
1278     %{ $self->_remove_alias($collapsed_cond, $alias) },
1279     -source_handle => $self->_source_handle
1280   );
1281
1282   return $self->result_class->new(\%new);
1283 }
1284
1285 # _collapse_cond
1286 #
1287 # Recursively collapse the condition.
1288
1289 sub _collapse_cond {
1290   my ($self, $cond, $collapsed) = @_;
1291
1292   $collapsed ||= {};
1293
1294   if (ref $cond eq 'ARRAY') {
1295     foreach my $subcond (@$cond) {
1296       next unless ref $subcond;  # -or
1297 #      warn "ARRAY: " . Dumper $subcond;
1298       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1299     }
1300   }
1301   elsif (ref $cond eq 'HASH') {
1302     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1303       foreach my $subcond (@{$cond->{-and}}) {
1304 #        warn "HASH: " . Dumper $subcond;
1305         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1306       }
1307     }
1308     else {
1309 #      warn "LEAF: " . Dumper $cond;
1310       foreach my $col (keys %$cond) {
1311         my $value = $cond->{$col};
1312         $collapsed->{$col} = $value;
1313       }
1314     }
1315   }
1316
1317   return $collapsed;
1318 }
1319
1320 # _remove_alias
1321 #
1322 # Remove the specified alias from the specified query hash. A copy is made so
1323 # the original query is not modified.
1324
1325 sub _remove_alias {
1326   my ($self, $query, $alias) = @_;
1327
1328   my %orig = %{ $query || {} };
1329   my %unaliased;
1330
1331   foreach my $key (keys %orig) {
1332     if ($key !~ /\./) {
1333       $unaliased{$key} = $orig{$key};
1334       next;
1335     }
1336     $unaliased{$1} = $orig{$key}
1337       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1338   }
1339
1340   return \%unaliased;
1341 }
1342
1343 =head2 find_or_new
1344
1345 =over 4
1346
1347 =item Arguments: \%vals, \%attrs?
1348
1349 =item Return Value: $object
1350
1351 =back
1352
1353 Find an existing record from this resultset. If none exists, instantiate a new
1354 result object and return it. The object will not be saved into your storage
1355 until you call L<DBIx::Class::Row/insert> on it.
1356
1357 If you want objects to be saved immediately, use L</find_or_create> instead.
1358
1359 =cut
1360
1361 sub find_or_new {
1362   my $self     = shift;
1363   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1364   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1365   my $exists   = $self->find($hash, $attrs);
1366   return defined $exists ? $exists : $self->new_result($hash);
1367 }
1368
1369 =head2 create
1370
1371 =over 4
1372
1373 =item Arguments: \%vals
1374
1375 =item Return Value: $object
1376
1377 =back
1378
1379 Inserts a record into the resultset and returns the object representing it.
1380
1381 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1382
1383 =cut
1384
1385 sub create {
1386   my ($self, $attrs) = @_;
1387   $self->throw_exception( "create needs a hashref" )
1388     unless ref $attrs eq 'HASH';
1389   return $self->new_result($attrs)->insert;
1390 }
1391
1392 =head2 find_or_create
1393
1394 =over 4
1395
1396 =item Arguments: \%vals, \%attrs?
1397
1398 =item Return Value: $object
1399
1400 =back
1401
1402   $class->find_or_create({ key => $val, ... });
1403
1404 Tries to find a record based on its primary key or unique constraint; if none
1405 is found, creates one and returns that instead.
1406
1407   my $cd = $schema->resultset('CD')->find_or_create({
1408     cdid   => 5,
1409     artist => 'Massive Attack',
1410     title  => 'Mezzanine',
1411     year   => 2005,
1412   });
1413
1414 Also takes an optional C<key> attribute, to search by a specific key or unique
1415 constraint. For example:
1416
1417   my $cd = $schema->resultset('CD')->find_or_create(
1418     {
1419       artist => 'Massive Attack',
1420       title  => 'Mezzanine',
1421     },
1422     { key => 'cd_artist_title' }
1423   );
1424
1425 See also L</find> and L</update_or_create>. For information on how to declare
1426 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1427
1428 =cut
1429
1430 sub find_or_create {
1431   my $self     = shift;
1432   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1433   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1434   my $exists   = $self->find($hash, $attrs);
1435   return defined $exists ? $exists : $self->create($hash);
1436 }
1437
1438 =head2 update_or_create
1439
1440 =over 4
1441
1442 =item Arguments: \%col_values, { key => $unique_constraint }?
1443
1444 =item Return Value: $object
1445
1446 =back
1447
1448   $class->update_or_create({ col => $val, ... });
1449
1450 First, searches for an existing row matching one of the unique constraints
1451 (including the primary key) on the source of this resultset. If a row is
1452 found, updates it with the other given column values. Otherwise, creates a new
1453 row.
1454
1455 Takes an optional C<key> attribute to search on a specific unique constraint.
1456 For example:
1457
1458   # In your application
1459   my $cd = $schema->resultset('CD')->update_or_create(
1460     {
1461       artist => 'Massive Attack',
1462       title  => 'Mezzanine',
1463       year   => 1998,
1464     },
1465     { key => 'cd_artist_title' }
1466   );
1467
1468 If no C<key> is specified, it searches on all unique constraints defined on the
1469 source, including the primary key.
1470
1471 If the C<key> is specified as C<primary>, it searches only on the primary key.
1472
1473 See also L</find> and L</find_or_create>. For information on how to declare
1474 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1475
1476 =cut
1477
1478 sub update_or_create {
1479   my $self = shift;
1480   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1481   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1482
1483   my $row = $self->find($cond, $attrs);
1484   if (defined $row) {
1485     $row->update($cond);
1486     return $row;
1487   }
1488
1489   return $self->create($cond);
1490 }
1491
1492 =head2 get_cache
1493
1494 =over 4
1495
1496 =item Arguments: none
1497
1498 =item Return Value: \@cache_objects?
1499
1500 =back
1501
1502 Gets the contents of the cache for the resultset, if the cache is set.
1503
1504 =cut
1505
1506 sub get_cache {
1507   shift->{all_cache};
1508 }
1509
1510 =head2 set_cache
1511
1512 =over 4
1513
1514 =item Arguments: \@cache_objects
1515
1516 =item Return Value: \@cache_objects
1517
1518 =back
1519
1520 Sets the contents of the cache for the resultset. Expects an arrayref
1521 of objects of the same class as those produced by the resultset. Note that
1522 if the cache is set the resultset will return the cached objects rather
1523 than re-querying the database even if the cache attr is not set.
1524
1525 =cut
1526
1527 sub set_cache {
1528   my ( $self, $data ) = @_;
1529   $self->throw_exception("set_cache requires an arrayref")
1530       if defined($data) && (ref $data ne 'ARRAY');
1531   $self->{all_cache} = $data;
1532 }
1533
1534 =head2 clear_cache
1535
1536 =over 4
1537
1538 =item Arguments: none
1539
1540 =item Return Value: []
1541
1542 =back
1543
1544 Clears the cache for the resultset.
1545
1546 =cut
1547
1548 sub clear_cache {
1549   shift->set_cache(undef);
1550 }
1551
1552 =head2 related_resultset
1553
1554 =over 4
1555
1556 =item Arguments: $relationship_name
1557
1558 =item Return Value: $resultset
1559
1560 =back
1561
1562 Returns a related resultset for the supplied relationship name.
1563
1564   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1565
1566 =cut
1567
1568 sub related_resultset {
1569   my ($self, $rel) = @_;
1570
1571   $self->{related_resultsets} ||= {};
1572   return $self->{related_resultsets}{$rel} ||= do {
1573     my $rel_obj = $self->result_source->relationship_info($rel);
1574
1575     $self->throw_exception(
1576       "search_related: result source '" . $self->_source_handle->source_moniker .
1577         "' has no such relationship $rel")
1578       unless $rel_obj;
1579     
1580     my ($from,$seen) = $self->_resolve_from($rel);
1581
1582     my $join_count = $seen->{$rel};
1583     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1584
1585     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1586       undef, {
1587         %{$self->{attrs}||{}},
1588         join => undef,
1589         prefetch => undef,
1590         select => undef,
1591         as => undef,
1592         alias => $alias,
1593         where => $self->{cond},
1594         seen_join => $seen,
1595         from => $from,
1596     });
1597   };
1598 }
1599
1600 sub _resolve_from {
1601   my ($self, $extra_join) = @_;
1602   my $source = $self->result_source;
1603   my $attrs = $self->{attrs};
1604   
1605   my $from = $attrs->{from}
1606     || [ { $attrs->{alias} => $source->from } ];
1607     
1608   my $seen = { %{$attrs->{seen_join}||{}} };
1609
1610   my $join = ($attrs->{join}
1611                ? [ $attrs->{join}, $extra_join ]
1612                : $extra_join);
1613   $from = [
1614     @$from,
1615     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1616   ];
1617
1618   return ($from,$seen);
1619 }
1620
1621 sub _resolved_attrs {
1622   my $self = shift;
1623   return $self->{_attrs} if $self->{_attrs};
1624
1625   my $attrs = { %{$self->{attrs}||{}} };
1626   my $source = $self->result_source;
1627   my $alias = $attrs->{alias};
1628
1629   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1630   if ($attrs->{columns}) {
1631     delete $attrs->{as};
1632   } elsif (!$attrs->{select}) {
1633     $attrs->{columns} = [ $source->columns ];
1634   }
1635  
1636   $attrs->{select} = 
1637     ($attrs->{select}
1638       ? (ref $attrs->{select} eq 'ARRAY'
1639           ? [ @{$attrs->{select}} ]
1640           : [ $attrs->{select} ])
1641       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1642     );
1643   $attrs->{as} =
1644     ($attrs->{as}
1645       ? (ref $attrs->{as} eq 'ARRAY'
1646           ? [ @{$attrs->{as}} ]
1647           : [ $attrs->{as} ])
1648       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1649     );
1650   
1651   my $adds;
1652   if ($adds = delete $attrs->{include_columns}) {
1653     $adds = [$adds] unless ref $adds eq 'ARRAY';
1654     push(@{$attrs->{select}}, @$adds);
1655     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1656   }
1657   if ($adds = delete $attrs->{'+select'}) {
1658     $adds = [$adds] unless ref $adds eq 'ARRAY';
1659     push(@{$attrs->{select}},
1660            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1661   }
1662   if (my $adds = delete $attrs->{'+as'}) {
1663     $adds = [$adds] unless ref $adds eq 'ARRAY';
1664     push(@{$attrs->{as}}, @$adds);
1665   }
1666
1667   $attrs->{from} ||= [ { 'me' => $source->from } ];
1668
1669   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1670     my $join = delete $attrs->{join} || {};
1671
1672     if (defined $attrs->{prefetch}) {
1673       $join = $self->_merge_attr(
1674         $join, $attrs->{prefetch}
1675       );
1676     }
1677
1678     $attrs->{from} =   # have to copy here to avoid corrupting the original
1679       [
1680         @{$attrs->{from}}, 
1681         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1682       ];
1683   }
1684
1685   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1686   if ($attrs->{order_by}) {
1687     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1688                            ? [ @{$attrs->{order_by}} ]
1689                            : [ $attrs->{order_by} ]);
1690   } else {
1691     $attrs->{order_by} = [];    
1692   }
1693
1694   my $collapse = $attrs->{collapse} || {};
1695   if (my $prefetch = delete $attrs->{prefetch}) {
1696     $prefetch = $self->_merge_attr({}, $prefetch);
1697     my @pre_order;
1698     my $seen = $attrs->{seen_join} || {};
1699     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1700       # bring joins back to level of current class
1701       my @prefetch = $source->resolve_prefetch(
1702         $p, $alias, $seen, \@pre_order, $collapse
1703       );
1704       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1705       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1706     }
1707     push(@{$attrs->{order_by}}, @pre_order);
1708   }
1709   $attrs->{collapse} = $collapse;
1710
1711   return $self->{_attrs} = $attrs;
1712 }
1713
1714 sub _merge_attr {
1715   my ($self, $a, $b) = @_;
1716   return $b unless defined($a);
1717   return $a unless defined($b);
1718   
1719   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1720     foreach my $key (keys %{$b}) {
1721       if (exists $a->{$key}) {
1722         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1723       } else {
1724         $a->{$key} = $b->{$key};
1725       }
1726     }
1727     return $a;
1728   } else {
1729     $a = [$a] unless ref $a eq 'ARRAY';
1730     $b = [$b] unless ref $b eq 'ARRAY';
1731
1732     my $hash = {};
1733     my @array;
1734     foreach my $x ($a, $b) {
1735       foreach my $element (@{$x}) {
1736         if (ref $element eq 'HASH') {
1737           $hash = $self->_merge_attr($hash, $element);
1738         } elsif (ref $element eq 'ARRAY') {
1739           push(@array, @{$element});
1740         } else {
1741           push(@array, $element) unless $b == $x
1742             && grep { $_ eq $element } @array;
1743         }
1744       }
1745     }
1746     
1747     @array = grep { !exists $hash->{$_} } @array;
1748
1749     return keys %{$hash}
1750       ? ( scalar(@array)
1751             ? [$hash, @array]
1752             : $hash
1753         )
1754       : \@array;
1755   }
1756 }
1757
1758 sub result_source {
1759     my $self = shift;
1760
1761     if (@_) {
1762         $self->_source_handle($_[0]->handle);
1763     } else {
1764         $self->_source_handle->resolve;
1765     }
1766 }
1767
1768 =head2 throw_exception
1769
1770 See L<DBIx::Class::Schema/throw_exception> for details.
1771
1772 =cut
1773
1774 sub throw_exception {
1775   my $self=shift;
1776   $self->_source_handle->schema->throw_exception(@_);
1777 }
1778
1779 # XXX: FIXME: Attributes docs need clearing up
1780
1781 =head1 ATTRIBUTES
1782
1783 The resultset takes various attributes that modify its behavior. Here's an
1784 overview of them:
1785
1786 =head2 order_by
1787
1788 =over 4
1789
1790 =item Value: ($order_by | \@order_by)
1791
1792 =back
1793
1794 Which column(s) to order the results by. This is currently passed
1795 through directly to SQL, so you can give e.g. C<year DESC> for a
1796 descending order on the column `year'.
1797
1798 Please note that if you have C<quote_char> enabled (see
1799 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1800 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1801 so you will need to manually quote things as appropriate.)
1802
1803 =head2 columns
1804
1805 =over 4
1806
1807 =item Value: \@columns
1808
1809 =back
1810
1811 Shortcut to request a particular set of columns to be retrieved.  Adds
1812 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1813 from that, then auto-populates C<as> from C<select> as normal. (You may also
1814 use the C<cols> attribute, as in earlier versions of DBIC.)
1815
1816 =head2 include_columns
1817
1818 =over 4
1819
1820 =item Value: \@columns
1821
1822 =back
1823
1824 Shortcut to include additional columns in the returned results - for example
1825
1826   $schema->resultset('CD')->search(undef, {
1827     include_columns => ['artist.name'],
1828     join => ['artist']
1829   });
1830
1831 would return all CDs and include a 'name' column to the information
1832 passed to object inflation. Note that the 'artist' is the name of the
1833 column (or relationship) accessor, and 'name' is the name of the column
1834 accessor in the related table.
1835
1836 =head2 select
1837
1838 =over 4
1839
1840 =item Value: \@select_columns
1841
1842 =back
1843
1844 Indicates which columns should be selected from the storage. You can use
1845 column names, or in the case of RDBMS back ends, function or stored procedure
1846 names:
1847
1848   $rs = $schema->resultset('Employee')->search(undef, {
1849     select => [
1850       'name',
1851       { count => 'employeeid' },
1852       { sum => 'salary' }
1853     ]
1854   });
1855
1856 When you use function/stored procedure names and do not supply an C<as>
1857 attribute, the column names returned are storage-dependent. E.g. MySQL would
1858 return a column named C<count(employeeid)> in the above example.
1859
1860 =head2 +select
1861
1862 =over 4
1863
1864 Indicates additional columns to be selected from storage.  Works the same as
1865 L<select> but adds columns to the selection.
1866
1867 =back
1868
1869 =head2 +as
1870
1871 =over 4
1872
1873 Indicates additional column names for those added via L<+select>.
1874
1875 =back
1876
1877 =head2 as
1878
1879 =over 4
1880
1881 =item Value: \@inflation_names
1882
1883 =back
1884
1885 Indicates column names for object inflation. That is, c< as >
1886 indicates the name that the column can be accessed as via the
1887 C<get_column> method (or via the object accessor, B<if one already
1888 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
1889 >.
1890
1891 The C< as > attribute is used in conjunction with C<select>,
1892 usually when C<select> contains one or more function or stored
1893 procedure names:
1894
1895   $rs = $schema->resultset('Employee')->search(undef, {
1896     select => [
1897       'name',
1898       { count => 'employeeid' }
1899     ],
1900     as => ['name', 'employee_count'],
1901   });
1902
1903   my $employee = $rs->first(); # get the first Employee
1904
1905 If the object against which the search is performed already has an accessor
1906 matching a column name specified in C<as>, the value can be retrieved using
1907 the accessor as normal:
1908
1909   my $name = $employee->name();
1910
1911 If on the other hand an accessor does not exist in the object, you need to
1912 use C<get_column> instead:
1913
1914   my $employee_count = $employee->get_column('employee_count');
1915
1916 You can create your own accessors if required - see
1917 L<DBIx::Class::Manual::Cookbook> for details.
1918
1919 Please note: This will NOT insert an C<AS employee_count> into the SQL
1920 statement produced, it is used for internal access only. Thus
1921 attempting to use the accessor in an C<order_by> clause or similar
1922 will fail miserably.
1923
1924 To get around this limitation, you can supply literal SQL to your
1925 C<select> attibute that contains the C<AS alias> text, eg:
1926
1927   select => [\'myfield AS alias']
1928
1929 =head2 join
1930
1931 =over 4
1932
1933 =item Value: ($rel_name | \@rel_names | \%rel_names)
1934
1935 =back
1936
1937 Contains a list of relationships that should be joined for this query.  For
1938 example:
1939
1940   # Get CDs by Nine Inch Nails
1941   my $rs = $schema->resultset('CD')->search(
1942     { 'artist.name' => 'Nine Inch Nails' },
1943     { join => 'artist' }
1944   );
1945
1946 Can also contain a hash reference to refer to the other relation's relations.
1947 For example:
1948
1949   package MyApp::Schema::Track;
1950   use base qw/DBIx::Class/;
1951   __PACKAGE__->table('track');
1952   __PACKAGE__->add_columns(qw/trackid cd position title/);
1953   __PACKAGE__->set_primary_key('trackid');
1954   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1955   1;
1956
1957   # In your application
1958   my $rs = $schema->resultset('Artist')->search(
1959     { 'track.title' => 'Teardrop' },
1960     {
1961       join     => { cd => 'track' },
1962       order_by => 'artist.name',
1963     }
1964   );
1965
1966 You need to use the relationship (not the table) name in  conditions, 
1967 because they are aliased as such. The current table is aliased as "me", so 
1968 you need to use me.column_name in order to avoid ambiguity. For example:
1969
1970   # Get CDs from 1984 with a 'Foo' track 
1971   my $rs = $schema->resultset('CD')->search(
1972     { 
1973       'me.year' => 1984,
1974       'tracks.name' => 'Foo'
1975     },
1976     { join => 'tracks' }
1977   );
1978   
1979 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1980 similarly for a third time). For e.g.
1981
1982   my $rs = $schema->resultset('Artist')->search({
1983     'cds.title'   => 'Down to Earth',
1984     'cds_2.title' => 'Popular',
1985   }, {
1986     join => [ qw/cds cds/ ],
1987   });
1988
1989 will return a set of all artists that have both a cd with title 'Down
1990 to Earth' and a cd with title 'Popular'.
1991
1992 If you want to fetch related objects from other tables as well, see C<prefetch>
1993 below.
1994
1995 =head2 prefetch
1996
1997 =over 4
1998
1999 =item Value: ($rel_name | \@rel_names | \%rel_names)
2000
2001 =back
2002
2003 Contains one or more relationships that should be fetched along with the main
2004 query (when they are accessed afterwards they will have already been
2005 "prefetched").  This is useful for when you know you will need the related
2006 objects, because it saves at least one query:
2007
2008   my $rs = $schema->resultset('Tag')->search(
2009     undef,
2010     {
2011       prefetch => {
2012         cd => 'artist'
2013       }
2014     }
2015   );
2016
2017 The initial search results in SQL like the following:
2018
2019   SELECT tag.*, cd.*, artist.* FROM tag
2020   JOIN cd ON tag.cd = cd.cdid
2021   JOIN artist ON cd.artist = artist.artistid
2022
2023 L<DBIx::Class> has no need to go back to the database when we access the
2024 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2025 case.
2026
2027 Simple prefetches will be joined automatically, so there is no need
2028 for a C<join> attribute in the above search. If you're prefetching to
2029 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2030 specify the join as well.
2031
2032 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2033 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2034 with an accessor type of 'single' or 'filter').
2035
2036 =head2 page
2037
2038 =over 4
2039
2040 =item Value: $page
2041
2042 =back
2043
2044 Makes the resultset paged and specifies the page to retrieve. Effectively
2045 identical to creating a non-pages resultset and then calling ->page($page)
2046 on it.
2047
2048 If L<rows> attribute is not specified it defualts to 10 rows per page.
2049
2050 =head2 rows
2051
2052 =over 4
2053
2054 =item Value: $rows
2055
2056 =back
2057
2058 Specifes the maximum number of rows for direct retrieval or the number of
2059 rows per page if the page attribute or method is used.
2060
2061 =head2 offset
2062
2063 =over 4
2064
2065 =item Value: $offset
2066
2067 =back
2068
2069 Specifies the (zero-based) row number for the  first row to be returned, or the
2070 of the first row of the first page if paging is used.
2071
2072 =head2 group_by
2073
2074 =over 4
2075
2076 =item Value: \@columns
2077
2078 =back
2079
2080 A arrayref of columns to group by. Can include columns of joined tables.
2081
2082   group_by => [qw/ column1 column2 ... /]
2083
2084 =head2 having
2085
2086 =over 4
2087
2088 =item Value: $condition
2089
2090 =back
2091
2092 HAVING is a select statement attribute that is applied between GROUP BY and
2093 ORDER BY. It is applied to the after the grouping calculations have been
2094 done.
2095
2096   having => { 'count(employee)' => { '>=', 100 } }
2097
2098 =head2 distinct
2099
2100 =over 4
2101
2102 =item Value: (0 | 1)
2103
2104 =back
2105
2106 Set to 1 to group by all columns.
2107
2108 =head2 where
2109
2110 =over 4
2111
2112 Adds to the WHERE clause.
2113
2114   # only return rows WHERE deleted IS NULL for all searches
2115   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2116
2117 Can be overridden by passing C<{ where => undef }> as an attribute
2118 to a resulset.
2119
2120 =back
2121
2122 =head2 cache
2123
2124 Set to 1 to cache search results. This prevents extra SQL queries if you
2125 revisit rows in your ResultSet:
2126
2127   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2128
2129   while( my $artist = $resultset->next ) {
2130     ... do stuff ...
2131   }
2132
2133   $rs->first; # without cache, this would issue a query
2134
2135 By default, searches are not cached.
2136
2137 For more examples of using these attributes, see
2138 L<DBIx::Class::Manual::Cookbook>.
2139
2140 =head2 from
2141
2142 =over 4
2143
2144 =item Value: \@from_clause
2145
2146 =back
2147
2148 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2149 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2150 clauses.
2151
2152 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2153
2154 C<join> will usually do what you need and it is strongly recommended that you
2155 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2156 And we really do mean "cannot", not just tried and failed. Attempting to use
2157 this because you're having problems with C<join> is like trying to use x86
2158 ASM because you've got a syntax error in your C. Trust us on this.
2159
2160 Now, if you're still really, really sure you need to use this (and if you're
2161 not 100% sure, ask the mailing list first), here's an explanation of how this
2162 works.
2163
2164 The syntax is as follows -
2165
2166   [
2167     { <alias1> => <table1> },
2168     [
2169       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2170       [], # nested JOIN (optional)
2171       { <table1.column1> => <table2.column2>, ... (more conditions) },
2172     ],
2173     # More of the above [ ] may follow for additional joins
2174   ]
2175
2176   <table1> <alias1>
2177   JOIN
2178     <table2> <alias2>
2179     [JOIN ...]
2180   ON <table1.column1> = <table2.column2>
2181   <more joins may follow>
2182
2183 An easy way to follow the examples below is to remember the following:
2184
2185     Anything inside "[]" is a JOIN
2186     Anything inside "{}" is a condition for the enclosing JOIN
2187
2188 The following examples utilize a "person" table in a family tree application.
2189 In order to express parent->child relationships, this table is self-joined:
2190
2191     # Person->belongs_to('father' => 'Person');
2192     # Person->belongs_to('mother' => 'Person');
2193
2194 C<from> can be used to nest joins. Here we return all children with a father,
2195 then search against all mothers of those children:
2196
2197   $rs = $schema->resultset('Person')->search(
2198       undef,
2199       {
2200           alias => 'mother', # alias columns in accordance with "from"
2201           from => [
2202               { mother => 'person' },
2203               [
2204                   [
2205                       { child => 'person' },
2206                       [
2207                           { father => 'person' },
2208                           { 'father.person_id' => 'child.father_id' }
2209                       ]
2210                   ],
2211                   { 'mother.person_id' => 'child.mother_id' }
2212               ],
2213           ]
2214       },
2215   );
2216
2217   # Equivalent SQL:
2218   # SELECT mother.* FROM person mother
2219   # JOIN (
2220   #   person child
2221   #   JOIN person father
2222   #   ON ( father.person_id = child.father_id )
2223   # )
2224   # ON ( mother.person_id = child.mother_id )
2225
2226 The type of any join can be controlled manually. To search against only people
2227 with a father in the person table, we could explicitly use C<INNER JOIN>:
2228
2229     $rs = $schema->resultset('Person')->search(
2230         undef,
2231         {
2232             alias => 'child', # alias columns in accordance with "from"
2233             from => [
2234                 { child => 'person' },
2235                 [
2236                     { father => 'person', -join_type => 'inner' },
2237                     { 'father.id' => 'child.father_id' }
2238                 ],
2239             ]
2240         },
2241     );
2242
2243     # Equivalent SQL:
2244     # SELECT child.* FROM person child
2245     # INNER JOIN person father ON child.father_id = father.id
2246
2247 =cut
2248
2249 1;