half-finished collapse code
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
139
140 =cut
141
142 sub search {
143   my $self = shift;
144   my $rs = $self->search_rs( @_ );
145   return (wantarray ? $rs->all : $rs);
146 }
147
148 =head2 search_rs
149
150 =over 4
151
152 =item Arguments: $cond, \%attrs?
153
154 =item Return Value: $resultset
155
156 =back
157
158 This method does the same exact thing as search() except it will
159 always return a resultset, even in list context.
160
161 =cut
162
163 sub search_rs {
164   my $self = shift;
165
166   my $rows;
167
168   unless (@_) {                 # no search, effectively just a clone
169     $rows = $self->get_cache;
170   }
171
172   my $attrs = {};
173   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
174   my $our_attrs = { %{$self->{attrs}} };
175   my $having = delete $our_attrs->{having};
176   my $where = delete $our_attrs->{where};
177
178   my $new_attrs = { %{$our_attrs}, %{$attrs} };
179
180   # merge new attrs into inherited
181   foreach my $key (qw/join prefetch/) {
182     next unless exists $attrs->{$key};
183     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
184   }
185
186   my $cond = (@_
187     ? (
188         (@_ == 1 || ref $_[0] eq "HASH")
189           ? (
190               (ref $_[0] eq 'HASH')
191                 ? (
192                     (keys %{ $_[0] }  > 0)
193                       ? shift
194                       : undef
195                    )
196                 :  shift
197              )
198           : (
199               (@_ % 2)
200                 ? $self->throw_exception("Odd number of arguments to search")
201                 : {@_}
202              )
203       )
204     : undef
205   );
206
207   if (defined $where) {
208     $new_attrs->{where} = (
209       defined $new_attrs->{where}
210         ? { '-and' => [
211               map {
212                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
213               } $where, $new_attrs->{where}
214             ]
215           }
216         : $where);
217   }
218
219   if (defined $cond) {
220     $new_attrs->{where} = (
221       defined $new_attrs->{where}
222         ? { '-and' => [
223               map {
224                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
225               } $cond, $new_attrs->{where}
226             ]
227           }
228         : $cond);
229   }
230
231   if (defined $having) {
232     $new_attrs->{having} = (
233       defined $new_attrs->{having}
234         ? { '-and' => [
235               map {
236                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
237               } $having, $new_attrs->{having}
238             ]
239           }
240         : $having);
241   }
242
243   my $rs = (ref $self)->new($self->result_source, $new_attrs);
244   if ($rows) {
245     $rs->set_cache($rows);
246   }
247   return $rs;
248 }
249
250 =head2 search_literal
251
252 =over 4
253
254 =item Arguments: $sql_fragment, @bind_values
255
256 =item Return Value: $resultset (scalar context), @row_objs (list context)
257
258 =back
259
260   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
261   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
262
263 Pass a literal chunk of SQL to be added to the conditional part of the
264 resultset query.
265
266 =cut
267
268 sub search_literal {
269   my ($self, $cond, @vals) = @_;
270   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
271   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
272   return $self->search(\$cond, $attrs);
273 }
274
275 =head2 find
276
277 =over 4
278
279 =item Arguments: @values | \%cols, \%attrs?
280
281 =item Return Value: $row_object
282
283 =back
284
285 Finds a row based on its primary key or unique constraint. For example, to find
286 a row by its primary key:
287
288   my $cd = $schema->resultset('CD')->find(5);
289
290 You can also find a row by a specific unique constraint using the C<key>
291 attribute. For example:
292
293   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
294     key => 'cd_artist_title'
295   });
296
297 Additionally, you can specify the columns explicitly by name:
298
299   my $cd = $schema->resultset('CD')->find(
300     {
301       artist => 'Massive Attack',
302       title  => 'Mezzanine',
303     },
304     { key => 'cd_artist_title' }
305   );
306
307 If the C<key> is specified as C<primary>, it searches only on the primary key.
308
309 If no C<key> is specified, it searches on all unique constraints defined on the
310 source, including the primary key.
311
312 If your table does not have a primary key, you B<must> provide a value for the
313 C<key> attribute matching one of the unique constraints on the source.
314
315 See also L</find_or_create> and L</update_or_create>. For information on how to
316 declare unique constraints, see
317 L<DBIx::Class::ResultSource/add_unique_constraint>.
318
319 =cut
320
321 sub find {
322   my $self = shift;
323   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
324
325   # Default to the primary key, but allow a specific key
326   my @cols = exists $attrs->{key}
327     ? $self->result_source->unique_constraint_columns($attrs->{key})
328     : $self->result_source->primary_columns;
329   $self->throw_exception(
330     "Can't find unless a primary key is defined or unique constraint is specified"
331   ) unless @cols;
332
333   # Parse out a hashref from input
334   my $input_query;
335   if (ref $_[0] eq 'HASH') {
336     $input_query = { %{$_[0]} };
337   }
338   elsif (@_ == @cols) {
339     $input_query = {};
340     @{$input_query}{@cols} = @_;
341   }
342   else {
343     # Compatibility: Allow e.g. find(id => $value)
344     carp "Find by key => value deprecated; please use a hashref instead";
345     $input_query = {@_};
346   }
347
348   my (%related, $info);
349
350   foreach my $key (keys %$input_query) {
351     if (ref($input_query->{$key})
352         && ($info = $self->result_source->relationship_info($key))) {
353       my $rel_q = $self->result_source->resolve_condition(
354                     $info->{cond}, delete $input_query->{$key}, $key
355                   );
356       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
357       @related{keys %$rel_q} = values %$rel_q;
358     }
359   }
360   if (my @keys = keys %related) {
361     @{$input_query}{@keys} = values %related;
362   }
363
364   my @unique_queries = $self->_unique_queries($input_query, $attrs);
365
366   # Build the final query: Default to the disjunction of the unique queries,
367   # but allow the input query in case the ResultSet defines the query or the
368   # user is abusing find
369   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
370   my $query = @unique_queries
371     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
372     : $self->_add_alias($input_query, $alias);
373
374   # Run the query
375   if (keys %$attrs) {
376     my $rs = $self->search($query, $attrs);
377     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
378   }
379   else {
380     return keys %{$self->_resolved_attrs->{collapse}}
381       ? $self->search($query)->next
382       : $self->single($query);
383   }
384 }
385
386 # _add_alias
387 #
388 # Add the specified alias to the specified query hash. A copy is made so the
389 # original query is not modified.
390
391 sub _add_alias {
392   my ($self, $query, $alias) = @_;
393
394   my %aliased = %$query;
395   foreach my $col (grep { ! m/\./ } keys %aliased) {
396     $aliased{"$alias.$col"} = delete $aliased{$col};
397   }
398
399   return \%aliased;
400 }
401
402 # _unique_queries
403 #
404 # Build a list of queries which satisfy unique constraints.
405
406 sub _unique_queries {
407   my ($self, $query, $attrs) = @_;
408
409   my @constraint_names = exists $attrs->{key}
410     ? ($attrs->{key})
411     : $self->result_source->unique_constraint_names;
412
413   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
414   my $num_where = scalar keys %$where;
415
416   my @unique_queries;
417   foreach my $name (@constraint_names) {
418     my @unique_cols = $self->result_source->unique_constraint_columns($name);
419     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
420
421     my $num_cols = scalar @unique_cols;
422     my $num_query = scalar keys %$unique_query;
423
424     my $total = $num_query + $num_where;
425     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
426       # The query is either unique on its own or is unique in combination with
427       # the existing where clause
428       push @unique_queries, $unique_query;
429     }
430   }
431
432   return @unique_queries;
433 }
434
435 # _build_unique_query
436 #
437 # Constrain the specified query hash based on the specified column names.
438
439 sub _build_unique_query {
440   my ($self, $query, $unique_cols) = @_;
441
442   return {
443     map  { $_ => $query->{$_} }
444     grep { exists $query->{$_} }
445       @$unique_cols
446   };
447 }
448
449 =head2 search_related
450
451 =over 4
452
453 =item Arguments: $rel, $cond, \%attrs?
454
455 =item Return Value: $new_resultset
456
457 =back
458
459   $new_rs = $cd_rs->search_related('artist', {
460     name => 'Emo-R-Us',
461   });
462
463 Searches the specified relationship, optionally specifying a condition and
464 attributes for matching records. See L</ATTRIBUTES> for more information.
465
466 =cut
467
468 sub search_related {
469   return shift->related_resultset(shift)->search(@_);
470 }
471
472 =head2 cursor
473
474 =over 4
475
476 =item Arguments: none
477
478 =item Return Value: $cursor
479
480 =back
481
482 Returns a storage-driven cursor to the given resultset. See
483 L<DBIx::Class::Cursor> for more information.
484
485 =cut
486
487 sub cursor {
488   my ($self) = @_;
489
490   my $attrs = { %{$self->_resolved_attrs} };
491   return $self->{cursor}
492     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
493           $attrs->{where},$attrs);
494 }
495
496 =head2 single
497
498 =over 4
499
500 =item Arguments: $cond?
501
502 =item Return Value: $row_object?
503
504 =back
505
506   my $cd = $schema->resultset('CD')->single({ year => 2001 });
507
508 Inflates the first result without creating a cursor if the resultset has
509 any records in it; if not returns nothing. Used by L</find> as an optimisation.
510
511 Can optionally take an additional condition *only* - this is a fast-code-path
512 method; if you need to add extra joins or similar call ->search and then
513 ->single without a condition on the $rs returned from that.
514
515 =cut
516
517 sub single {
518   my ($self, $where) = @_;
519   my $attrs = { %{$self->_resolved_attrs} };
520   if ($where) {
521     if (defined $attrs->{where}) {
522       $attrs->{where} = {
523         '-and' =>
524             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
525                $where, delete $attrs->{where} ]
526       };
527     } else {
528       $attrs->{where} = $where;
529     }
530   }
531
532 #  XXX: Disabled since it doesn't infer uniqueness in all cases
533 #  unless ($self->_is_unique_query($attrs->{where})) {
534 #    carp "Query not guaranteed to return a single row"
535 #      . "; please declare your unique constraints or use search instead";
536 #  }
537
538   my @data = $self->result_source->storage->select_single(
539     $attrs->{from}, $attrs->{select},
540     $attrs->{where}, $attrs
541   );
542
543   return (@data ? ($self->_construct_object(@data))[0] : ());
544 }
545
546 # _is_unique_query
547 #
548 # Try to determine if the specified query is guaranteed to be unique, based on
549 # the declared unique constraints.
550
551 sub _is_unique_query {
552   my ($self, $query) = @_;
553
554   my $collapsed = $self->_collapse_query($query);
555   my $alias = $self->{attrs}{alias};
556
557   foreach my $name ($self->result_source->unique_constraint_names) {
558     my @unique_cols = map {
559       "$alias.$_"
560     } $self->result_source->unique_constraint_columns($name);
561
562     # Count the values for each unique column
563     my %seen = map { $_ => 0 } @unique_cols;
564
565     foreach my $key (keys %$collapsed) {
566       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
567       next unless exists $seen{$aliased};  # Additional constraints are okay
568       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
569     }
570
571     # If we get 0 or more than 1 value for a column, it's not necessarily unique
572     return 1 unless grep { $_ != 1 } values %seen;
573   }
574
575   return 0;
576 }
577
578 # _collapse_query
579 #
580 # Recursively collapse the query, accumulating values for each column.
581
582 sub _collapse_query {
583   my ($self, $query, $collapsed) = @_;
584
585   $collapsed ||= {};
586
587   if (ref $query eq 'ARRAY') {
588     foreach my $subquery (@$query) {
589       next unless ref $subquery;  # -or
590 #      warn "ARRAY: " . Dumper $subquery;
591       $collapsed = $self->_collapse_query($subquery, $collapsed);
592     }
593   }
594   elsif (ref $query eq 'HASH') {
595     if (keys %$query and (keys %$query)[0] eq '-and') {
596       foreach my $subquery (@{$query->{-and}}) {
597 #        warn "HASH: " . Dumper $subquery;
598         $collapsed = $self->_collapse_query($subquery, $collapsed);
599       }
600     }
601     else {
602 #      warn "LEAF: " . Dumper $query;
603       foreach my $col (keys %$query) {
604         my $value = $query->{$col};
605         $collapsed->{$col}{$value}++;
606       }
607     }
608   }
609
610   return $collapsed;
611 }
612
613 =head2 get_column
614
615 =over 4
616
617 =item Arguments: $cond?
618
619 =item Return Value: $resultsetcolumn
620
621 =back
622
623   my $max_length = $rs->get_column('length')->max;
624
625 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
626
627 =cut
628
629 sub get_column {
630   my ($self, $column) = @_;
631   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
632   return $new;
633 }
634
635 =head2 search_like
636
637 =over 4
638
639 =item Arguments: $cond, \%attrs?
640
641 =item Return Value: $resultset (scalar context), @row_objs (list context)
642
643 =back
644
645   # WHERE title LIKE '%blue%'
646   $cd_rs = $rs->search_like({ title => '%blue%'});
647
648 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
649 that this is simply a convenience method. You most likely want to use
650 L</search> with specific operators.
651
652 For more information, see L<DBIx::Class::Manual::Cookbook>.
653
654 =cut
655
656 sub search_like {
657   my $class = shift;
658   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
659   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
660   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
661   return $class->search($query, { %$attrs });
662 }
663
664 =head2 slice
665
666 =over 4
667
668 =item Arguments: $first, $last
669
670 =item Return Value: $resultset (scalar context), @row_objs (list context)
671
672 =back
673
674 Returns a resultset or object list representing a subset of elements from the
675 resultset slice is called on. Indexes are from 0, i.e., to get the first
676 three records, call:
677
678   my ($one, $two, $three) = $rs->slice(0, 2);
679
680 =cut
681
682 sub slice {
683   my ($self, $min, $max) = @_;
684   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
685   $attrs->{offset} = $self->{attrs}{offset} || 0;
686   $attrs->{offset} += $min;
687   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
688   return $self->search(undef(), $attrs);
689   #my $slice = (ref $self)->new($self->result_source, $attrs);
690   #return (wantarray ? $slice->all : $slice);
691 }
692
693 =head2 next
694
695 =over 4
696
697 =item Arguments: none
698
699 =item Return Value: $result?
700
701 =back
702
703 Returns the next element in the resultset (C<undef> is there is none).
704
705 Can be used to efficiently iterate over records in the resultset:
706
707   my $rs = $schema->resultset('CD')->search;
708   while (my $cd = $rs->next) {
709     print $cd->title;
710   }
711
712 Note that you need to store the resultset object, and call C<next> on it.
713 Calling C<< resultset('Table')->next >> repeatedly will always return the
714 first record from the resultset.
715
716 =cut
717
718 sub next {
719   my ($self) = @_;
720   if (my $cache = $self->get_cache) {
721     $self->{all_cache_position} ||= 0;
722     return $cache->[$self->{all_cache_position}++];
723   }
724   if ($self->{attrs}{cache}) {
725     $self->{all_cache_position} = 1;
726     return ($self->all)[0];
727   }
728   if ($self->{stashed_objects}) {
729     my $obj = shift(@{$self->{stashed_objects}});
730     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
731     return $obj;
732   }
733   my @row = (
734     exists $self->{stashed_row}
735       ? @{delete $self->{stashed_row}}
736       : $self->cursor->next
737   );
738   return unless (@row);
739   my ($row, @more) = $self->_construct_object(@row);
740   $self->{stashed_objects} = \@more if @more;
741   return $row;
742 }
743
744 sub _construct_object {
745   my ($self, @row) = @_;
746   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
747   my @new = $self->result_class->inflate_result($self->result_source, @$info);
748   @new = $self->{_attrs}{record_filter}->(@new)
749     if exists $self->{_attrs}{record_filter};
750   return @new;
751 }
752
753 sub _collapse_result {
754   my ($self, $as_proto, $row) = @_;
755
756   my @copy = @$row;
757
758   # 'foo'         => [ undef, 'foo' ]
759   # 'foo.bar'     => [ 'foo', 'bar' ]
760   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
761
762   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
763
764   my %collapse = %{$self->{_attrs}{collapse}||{}};
765
766   my @pri_index;
767
768   # if we're doing collapsing (has_many prefetch) we need to grab records
769   # until the PK changes, so fill @pri_index. if not, we leave it empty so
770   # we know we don't have to bother.
771
772   # the reason for not using the collapse stuff directly is because if you
773   # had for e.g. two artists in a row with no cds, the collapse info for
774   # both would be NULL (undef) so you'd lose the second artist
775
776   # store just the index so we can check the array positions from the row
777   # without having to contruct the full hash
778
779   if (keys %collapse) {
780     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
781     foreach my $i (0 .. $#construct_as) {
782       if (delete $pri{$construct_as[$i]}) {
783         push(@pri_index, $i);
784       }
785       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
786     }
787   }
788
789   # no need to do an if, it'll be empty if @pri_index is empty anyway
790
791   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
792
793   my %const;
794
795   do { # no need to check anything at the front, we always want the first row
796   
797     foreach my $this_as (@construct_as) {
798       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
799     }
800
801   } until ( # no pri_index => no collapse => drop straight out
802       !@pri_index
803     or
804       do { # get another row, stash it, drop out if different PK
805
806         @copy = $self->cursor->next;
807         $self->{stashed_row} = \@copy;
808
809         # last thing in do block, counts as true if anything doesn't match
810
811         # check xor defined first for NULL vs. NOT NULL then if one is
812         # defined the other must be so check string equality
813
814         grep {
815           (defined $pri_vals{$_} ^ defined $copy[$_])
816           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
817         } @pri_index;
818       }
819   );
820
821   # THIS BIT STILL NEEDS TO DO THE COLLAPSE
822
823   my $alias = $self->{attrs}{alias};
824   my $info = [ {}, {} ];
825   foreach my $key (keys %const) {
826     if (length $key && $key ne $alias) {
827       my $target = $info;
828       my @parts = split(/\./, $key);
829       foreach my $p (@parts) {
830         $target = $target->[1]->{$p} ||= [];
831       }
832       $target->[0] = $const{$key};
833     } else {
834       $info->[0] = $const{$key};
835     }
836   }
837
838   return $info;
839 }
840
841 =head2 result_source
842
843 =over 4
844
845 =item Arguments: $result_source?
846
847 =item Return Value: $result_source
848
849 =back
850
851 An accessor for the primary ResultSource object from which this ResultSet
852 is derived.
853
854 =head2 result_class
855
856 =over 4
857
858 =item Arguments: $result_class?
859
860 =item Return Value: $result_class
861
862 =back
863
864 An accessor for the class to use when creating row objects. Defaults to 
865 C<< result_source->result_class >> - which in most cases is the name of the 
866 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
867
868 =cut
869
870
871 =head2 count
872
873 =over 4
874
875 =item Arguments: $cond, \%attrs??
876
877 =item Return Value: $count
878
879 =back
880
881 Performs an SQL C<COUNT> with the same query as the resultset was built
882 with to find the number of elements. If passed arguments, does a search
883 on the resultset and counts the results of that.
884
885 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
886 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
887 not support C<DISTINCT> with multiple columns. If you are using such a
888 database, you should only use columns from the main table in your C<group_by>
889 clause.
890
891 =cut
892
893 sub count {
894   my $self = shift;
895   return $self->search(@_)->count if @_ and defined $_[0];
896   return scalar @{ $self->get_cache } if $self->get_cache;
897   my $count = $self->_count;
898   return 0 unless $count;
899
900   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
901   $count = $self->{attrs}{rows} if
902     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
903   return $count;
904 }
905
906 sub _count { # Separated out so pager can get the full count
907   my $self = shift;
908   my $select = { count => '*' };
909
910   my $attrs = { %{$self->_resolved_attrs} };
911   if (my $group_by = delete $attrs->{group_by}) {
912     delete $attrs->{having};
913     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
914     # todo: try CONCAT for multi-column pk
915     my @pk = $self->result_source->primary_columns;
916     if (@pk == 1) {
917       my $alias = $attrs->{alias};
918       foreach my $column (@distinct) {
919         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
920           @distinct = ($column);
921           last;
922         }
923       }
924     }
925
926     $select = { count => { distinct => \@distinct } };
927   }
928
929   $attrs->{select} = $select;
930   $attrs->{as} = [qw/count/];
931
932   # offset, order by and page are not needed to count. record_filter is cdbi
933   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
934
935   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
936   my ($count) = $tmp_rs->cursor->next;
937   return $count;
938 }
939
940 =head2 count_literal
941
942 =over 4
943
944 =item Arguments: $sql_fragment, @bind_values
945
946 =item Return Value: $count
947
948 =back
949
950 Counts the results in a literal query. Equivalent to calling L</search_literal>
951 with the passed arguments, then L</count>.
952
953 =cut
954
955 sub count_literal { shift->search_literal(@_)->count; }
956
957 =head2 all
958
959 =over 4
960
961 =item Arguments: none
962
963 =item Return Value: @objects
964
965 =back
966
967 Returns all elements in the resultset. Called implicitly if the resultset
968 is returned in list context.
969
970 =cut
971
972 sub all {
973   my ($self) = @_;
974   return @{ $self->get_cache } if $self->get_cache;
975
976   my @obj;
977
978   # TODO: don't call resolve here
979   if (keys %{$self->_resolved_attrs->{collapse}}) {
980 #  if ($self->{attrs}{prefetch}) {
981       # Using $self->cursor->all is really just an optimisation.
982       # If we're collapsing has_many prefetches it probably makes
983       # very little difference, and this is cleaner than hacking
984       # _construct_object to survive the approach
985     my @row = $self->cursor->next;
986     while (@row) {
987       push(@obj, $self->_construct_object(@row));
988       @row = (exists $self->{stashed_row}
989                ? @{delete $self->{stashed_row}}
990                : $self->cursor->next);
991     }
992   } else {
993     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
994   }
995
996   $self->set_cache(\@obj) if $self->{attrs}{cache};
997   return @obj;
998 }
999
1000 =head2 reset
1001
1002 =over 4
1003
1004 =item Arguments: none
1005
1006 =item Return Value: $self
1007
1008 =back
1009
1010 Resets the resultset's cursor, so you can iterate through the elements again.
1011
1012 =cut
1013
1014 sub reset {
1015   my ($self) = @_;
1016   delete $self->{_attrs} if exists $self->{_attrs};
1017   $self->{all_cache_position} = 0;
1018   $self->cursor->reset;
1019   return $self;
1020 }
1021
1022 =head2 first
1023
1024 =over 4
1025
1026 =item Arguments: none
1027
1028 =item Return Value: $object?
1029
1030 =back
1031
1032 Resets the resultset and returns an object for the first result (if the
1033 resultset returns anything).
1034
1035 =cut
1036
1037 sub first {
1038   return $_[0]->reset->next;
1039 }
1040
1041 # _cond_for_update_delete
1042 #
1043 # update/delete require the condition to be modified to handle
1044 # the differing SQL syntax available.  This transforms the $self->{cond}
1045 # appropriately, returning the new condition.
1046
1047 sub _cond_for_update_delete {
1048   my ($self, $full_cond) = @_;
1049   my $cond = {};
1050
1051   $full_cond ||= $self->{cond};
1052   # No-op. No condition, we're updating/deleting everything
1053   return $cond unless ref $full_cond;
1054
1055   if (ref $full_cond eq 'ARRAY') {
1056     $cond = [
1057       map {
1058         my %hash;
1059         foreach my $key (keys %{$_}) {
1060           $key =~ /([^.]+)$/;
1061           $hash{$1} = $_->{$key};
1062         }
1063         \%hash;
1064       } @{$full_cond}
1065     ];
1066   }
1067   elsif (ref $full_cond eq 'HASH') {
1068     if ((keys %{$full_cond})[0] eq '-and') {
1069       $cond->{-and} = [];
1070
1071       my @cond = @{$full_cond->{-and}};
1072       for (my $i = 0; $i < @cond; $i++) {
1073         my $entry = $cond[$i];
1074
1075         my $hash;
1076         if (ref $entry eq 'HASH') {
1077           $hash = $self->_cond_for_update_delete($entry);
1078         }
1079         else {
1080           $entry =~ /([^.]+)$/;
1081           $hash->{$1} = $cond[++$i];
1082         }
1083
1084         push @{$cond->{-and}}, $hash;
1085       }
1086     }
1087     else {
1088       foreach my $key (keys %{$full_cond}) {
1089         $key =~ /([^.]+)$/;
1090         $cond->{$1} = $full_cond->{$key};
1091       }
1092     }
1093   }
1094   else {
1095     $self->throw_exception(
1096       "Can't update/delete on resultset with condition unless hash or array"
1097     );
1098   }
1099
1100   return $cond;
1101 }
1102
1103
1104 =head2 update
1105
1106 =over 4
1107
1108 =item Arguments: \%values
1109
1110 =item Return Value: $storage_rv
1111
1112 =back
1113
1114 Sets the specified columns in the resultset to the supplied values in a
1115 single query. Return value will be true if the update succeeded or false
1116 if no records were updated; exact type of success value is storage-dependent.
1117
1118 =cut
1119
1120 sub update {
1121   my ($self, $values) = @_;
1122   $self->throw_exception("Values for update must be a hash")
1123     unless ref $values eq 'HASH';
1124
1125   my $cond = $self->_cond_for_update_delete;
1126    
1127   return $self->result_source->storage->update(
1128     $self->result_source, $values, $cond
1129   );
1130 }
1131
1132 =head2 update_all
1133
1134 =over 4
1135
1136 =item Arguments: \%values
1137
1138 =item Return Value: 1
1139
1140 =back
1141
1142 Fetches all objects and updates them one at a time. Note that C<update_all>
1143 will run DBIC cascade triggers, while L</update> will not.
1144
1145 =cut
1146
1147 sub update_all {
1148   my ($self, $values) = @_;
1149   $self->throw_exception("Values for update must be a hash")
1150     unless ref $values eq 'HASH';
1151   foreach my $obj ($self->all) {
1152     $obj->set_columns($values)->update;
1153   }
1154   return 1;
1155 }
1156
1157 =head2 delete
1158
1159 =over 4
1160
1161 =item Arguments: none
1162
1163 =item Return Value: 1
1164
1165 =back
1166
1167 Deletes the contents of the resultset from its result source. Note that this
1168 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1169 to run. See also L<DBIx::Class::Row/delete>.
1170
1171 =cut
1172
1173 sub delete {
1174   my ($self) = @_;
1175
1176   my $cond = $self->_cond_for_update_delete;
1177
1178   $self->result_source->storage->delete($self->result_source, $cond);
1179   return 1;
1180 }
1181
1182 =head2 delete_all
1183
1184 =over 4
1185
1186 =item Arguments: none
1187
1188 =item Return Value: 1
1189
1190 =back
1191
1192 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1193 will run DBIC cascade triggers, while L</delete> will not.
1194
1195 =cut
1196
1197 sub delete_all {
1198   my ($self) = @_;
1199   $_->delete for $self->all;
1200   return 1;
1201 }
1202
1203 =head2 pager
1204
1205 =over 4
1206
1207 =item Arguments: none
1208
1209 =item Return Value: $pager
1210
1211 =back
1212
1213 Return Value a L<Data::Page> object for the current resultset. Only makes
1214 sense for queries with a C<page> attribute.
1215
1216 =cut
1217
1218 sub pager {
1219   my ($self) = @_;
1220   my $attrs = $self->{attrs};
1221   $self->throw_exception("Can't create pager for non-paged rs")
1222     unless $self->{attrs}{page};
1223   $attrs->{rows} ||= 10;
1224   return $self->{pager} ||= Data::Page->new(
1225     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1226 }
1227
1228 =head2 page
1229
1230 =over 4
1231
1232 =item Arguments: $page_number
1233
1234 =item Return Value: $rs
1235
1236 =back
1237
1238 Returns a resultset for the $page_number page of the resultset on which page
1239 is called, where each page contains a number of rows equal to the 'rows'
1240 attribute set on the resultset (10 by default).
1241
1242 =cut
1243
1244 sub page {
1245   my ($self, $page) = @_;
1246   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1247 }
1248
1249 =head2 new_result
1250
1251 =over 4
1252
1253 =item Arguments: \%vals
1254
1255 =item Return Value: $object
1256
1257 =back
1258
1259 Creates an object in the resultset's result class and returns it.
1260
1261 =cut
1262
1263 sub new_result {
1264   my ($self, $values) = @_;
1265   $self->throw_exception( "new_result needs a hash" )
1266     unless (ref $values eq 'HASH');
1267   $self->throw_exception(
1268     "Can't abstract implicit construct, condition not a hash"
1269   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1270
1271   my $alias = $self->{attrs}{alias};
1272   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1273   my %new = (
1274     %{ $self->_remove_alias($values, $alias) },
1275     %{ $self->_remove_alias($collapsed_cond, $alias) },
1276   );
1277
1278   return $self->result_class->new(\%new,$self->_source_handle);
1279 }
1280
1281 # _collapse_cond
1282 #
1283 # Recursively collapse the condition.
1284
1285 sub _collapse_cond {
1286   my ($self, $cond, $collapsed) = @_;
1287
1288   $collapsed ||= {};
1289
1290   if (ref $cond eq 'ARRAY') {
1291     foreach my $subcond (@$cond) {
1292       next unless ref $subcond;  # -or
1293 #      warn "ARRAY: " . Dumper $subcond;
1294       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1295     }
1296   }
1297   elsif (ref $cond eq 'HASH') {
1298     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1299       foreach my $subcond (@{$cond->{-and}}) {
1300 #        warn "HASH: " . Dumper $subcond;
1301         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1302       }
1303     }
1304     else {
1305 #      warn "LEAF: " . Dumper $cond;
1306       foreach my $col (keys %$cond) {
1307         my $value = $cond->{$col};
1308         $collapsed->{$col} = $value;
1309       }
1310     }
1311   }
1312
1313   return $collapsed;
1314 }
1315
1316 # _remove_alias
1317 #
1318 # Remove the specified alias from the specified query hash. A copy is made so
1319 # the original query is not modified.
1320
1321 sub _remove_alias {
1322   my ($self, $query, $alias) = @_;
1323
1324   my %orig = %{ $query || {} };
1325   my %unaliased;
1326
1327   foreach my $key (keys %orig) {
1328     if ($key !~ /\./) {
1329       $unaliased{$key} = $orig{$key};
1330       next;
1331     }
1332     $unaliased{$1} = $orig{$key}
1333       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1334   }
1335
1336   return \%unaliased;
1337 }
1338
1339 =head2 find_or_new
1340
1341 =over 4
1342
1343 =item Arguments: \%vals, \%attrs?
1344
1345 =item Return Value: $object
1346
1347 =back
1348
1349 Find an existing record from this resultset. If none exists, instantiate a new
1350 result object and return it. The object will not be saved into your storage
1351 until you call L<DBIx::Class::Row/insert> on it.
1352
1353 If you want objects to be saved immediately, use L</find_or_create> instead.
1354
1355 =cut
1356
1357 sub find_or_new {
1358   my $self     = shift;
1359   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1360   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1361   my $exists   = $self->find($hash, $attrs);
1362   return defined $exists ? $exists : $self->new_result($hash);
1363 }
1364
1365 =head2 create
1366
1367 =over 4
1368
1369 =item Arguments: \%vals
1370
1371 =item Return Value: $object
1372
1373 =back
1374
1375 Inserts a record into the resultset and returns the object representing it.
1376
1377 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1378
1379 =cut
1380
1381 sub create {
1382   my ($self, $attrs) = @_;
1383   $self->throw_exception( "create needs a hashref" )
1384     unless ref $attrs eq 'HASH';
1385   return $self->new_result($attrs)->insert;
1386 }
1387
1388 =head2 find_or_create
1389
1390 =over 4
1391
1392 =item Arguments: \%vals, \%attrs?
1393
1394 =item Return Value: $object
1395
1396 =back
1397
1398   $class->find_or_create({ key => $val, ... });
1399
1400 Tries to find a record based on its primary key or unique constraint; if none
1401 is found, creates one and returns that instead.
1402
1403   my $cd = $schema->resultset('CD')->find_or_create({
1404     cdid   => 5,
1405     artist => 'Massive Attack',
1406     title  => 'Mezzanine',
1407     year   => 2005,
1408   });
1409
1410 Also takes an optional C<key> attribute, to search by a specific key or unique
1411 constraint. For example:
1412
1413   my $cd = $schema->resultset('CD')->find_or_create(
1414     {
1415       artist => 'Massive Attack',
1416       title  => 'Mezzanine',
1417     },
1418     { key => 'cd_artist_title' }
1419   );
1420
1421 See also L</find> and L</update_or_create>. For information on how to declare
1422 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1423
1424 =cut
1425
1426 sub find_or_create {
1427   my $self     = shift;
1428   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1429   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1430   my $exists   = $self->find($hash, $attrs);
1431   return defined $exists ? $exists : $self->create($hash);
1432 }
1433
1434 =head2 update_or_create
1435
1436 =over 4
1437
1438 =item Arguments: \%col_values, { key => $unique_constraint }?
1439
1440 =item Return Value: $object
1441
1442 =back
1443
1444   $class->update_or_create({ col => $val, ... });
1445
1446 First, searches for an existing row matching one of the unique constraints
1447 (including the primary key) on the source of this resultset. If a row is
1448 found, updates it with the other given column values. Otherwise, creates a new
1449 row.
1450
1451 Takes an optional C<key> attribute to search on a specific unique constraint.
1452 For example:
1453
1454   # In your application
1455   my $cd = $schema->resultset('CD')->update_or_create(
1456     {
1457       artist => 'Massive Attack',
1458       title  => 'Mezzanine',
1459       year   => 1998,
1460     },
1461     { key => 'cd_artist_title' }
1462   );
1463
1464 If no C<key> is specified, it searches on all unique constraints defined on the
1465 source, including the primary key.
1466
1467 If the C<key> is specified as C<primary>, it searches only on the primary key.
1468
1469 See also L</find> and L</find_or_create>. For information on how to declare
1470 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1471
1472 =cut
1473
1474 sub update_or_create {
1475   my $self = shift;
1476   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1477   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1478
1479   my $row = $self->find($cond, $attrs);
1480   if (defined $row) {
1481     $row->update($cond);
1482     return $row;
1483   }
1484
1485   return $self->create($cond);
1486 }
1487
1488 =head2 get_cache
1489
1490 =over 4
1491
1492 =item Arguments: none
1493
1494 =item Return Value: \@cache_objects?
1495
1496 =back
1497
1498 Gets the contents of the cache for the resultset, if the cache is set.
1499
1500 =cut
1501
1502 sub get_cache {
1503   shift->{all_cache};
1504 }
1505
1506 =head2 set_cache
1507
1508 =over 4
1509
1510 =item Arguments: \@cache_objects
1511
1512 =item Return Value: \@cache_objects
1513
1514 =back
1515
1516 Sets the contents of the cache for the resultset. Expects an arrayref
1517 of objects of the same class as those produced by the resultset. Note that
1518 if the cache is set the resultset will return the cached objects rather
1519 than re-querying the database even if the cache attr is not set.
1520
1521 =cut
1522
1523 sub set_cache {
1524   my ( $self, $data ) = @_;
1525   $self->throw_exception("set_cache requires an arrayref")
1526       if defined($data) && (ref $data ne 'ARRAY');
1527   $self->{all_cache} = $data;
1528 }
1529
1530 =head2 clear_cache
1531
1532 =over 4
1533
1534 =item Arguments: none
1535
1536 =item Return Value: []
1537
1538 =back
1539
1540 Clears the cache for the resultset.
1541
1542 =cut
1543
1544 sub clear_cache {
1545   shift->set_cache(undef);
1546 }
1547
1548 =head2 related_resultset
1549
1550 =over 4
1551
1552 =item Arguments: $relationship_name
1553
1554 =item Return Value: $resultset
1555
1556 =back
1557
1558 Returns a related resultset for the supplied relationship name.
1559
1560   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1561
1562 =cut
1563
1564 sub related_resultset {
1565   my ($self, $rel) = @_;
1566
1567   $self->{related_resultsets} ||= {};
1568   return $self->{related_resultsets}{$rel} ||= do {
1569     my $rel_obj = $self->result_source->relationship_info($rel);
1570
1571     $self->throw_exception(
1572       "search_related: result source '" . $self->_source_handle->source_moniker .
1573         "' has no such relationship $rel")
1574       unless $rel_obj;
1575     
1576     my ($from,$seen) = $self->_resolve_from($rel);
1577
1578     my $join_count = $seen->{$rel};
1579     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1580
1581     $self->_source_handle->schema->resultset($rel_obj->{class})->search_rs(
1582       undef, {
1583         %{$self->{attrs}||{}},
1584         join => undef,
1585         prefetch => undef,
1586         select => undef,
1587         as => undef,
1588         alias => $alias,
1589         where => $self->{cond},
1590         seen_join => $seen,
1591         from => $from,
1592     });
1593   };
1594 }
1595
1596 sub _resolve_from {
1597   my ($self, $extra_join) = @_;
1598   my $source = $self->result_source;
1599   my $attrs = $self->{attrs};
1600   
1601   my $from = $attrs->{from}
1602     || [ { $attrs->{alias} => $source->from } ];
1603     
1604   my $seen = { %{$attrs->{seen_join}||{}} };
1605
1606   my $join = ($attrs->{join}
1607                ? [ $attrs->{join}, $extra_join ]
1608                : $extra_join);
1609   $from = [
1610     @$from,
1611     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1612   ];
1613
1614   return ($from,$seen);
1615 }
1616
1617 sub _resolved_attrs {
1618   my $self = shift;
1619   return $self->{_attrs} if $self->{_attrs};
1620
1621   my $attrs = { %{$self->{attrs}||{}} };
1622   my $source = $self->result_source;
1623   my $alias = $attrs->{alias};
1624
1625   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1626   if ($attrs->{columns}) {
1627     delete $attrs->{as};
1628   } elsif (!$attrs->{select}) {
1629     $attrs->{columns} = [ $source->columns ];
1630   }
1631  
1632   $attrs->{select} = 
1633     ($attrs->{select}
1634       ? (ref $attrs->{select} eq 'ARRAY'
1635           ? [ @{$attrs->{select}} ]
1636           : [ $attrs->{select} ])
1637       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1638     );
1639   $attrs->{as} =
1640     ($attrs->{as}
1641       ? (ref $attrs->{as} eq 'ARRAY'
1642           ? [ @{$attrs->{as}} ]
1643           : [ $attrs->{as} ])
1644       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1645     );
1646   
1647   my $adds;
1648   if ($adds = delete $attrs->{include_columns}) {
1649     $adds = [$adds] unless ref $adds eq 'ARRAY';
1650     push(@{$attrs->{select}}, @$adds);
1651     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1652   }
1653   if ($adds = delete $attrs->{'+select'}) {
1654     $adds = [$adds] unless ref $adds eq 'ARRAY';
1655     push(@{$attrs->{select}},
1656            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1657   }
1658   if (my $adds = delete $attrs->{'+as'}) {
1659     $adds = [$adds] unless ref $adds eq 'ARRAY';
1660     push(@{$attrs->{as}}, @$adds);
1661   }
1662
1663   $attrs->{from} ||= [ { 'me' => $source->from } ];
1664
1665   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1666     my $join = delete $attrs->{join} || {};
1667
1668     if (defined $attrs->{prefetch}) {
1669       $join = $self->_merge_attr(
1670         $join, $attrs->{prefetch}
1671       );
1672     }
1673
1674     $attrs->{from} =   # have to copy here to avoid corrupting the original
1675       [
1676         @{$attrs->{from}}, 
1677         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1678       ];
1679   }
1680
1681   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1682   if ($attrs->{order_by}) {
1683     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1684                            ? [ @{$attrs->{order_by}} ]
1685                            : [ $attrs->{order_by} ]);
1686   } else {
1687     $attrs->{order_by} = [];    
1688   }
1689
1690   my $collapse = $attrs->{collapse} || {};
1691   if (my $prefetch = delete $attrs->{prefetch}) {
1692     $prefetch = $self->_merge_attr({}, $prefetch);
1693     my @pre_order;
1694     my $seen = $attrs->{seen_join} || {};
1695     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1696       # bring joins back to level of current class
1697       my @prefetch = $source->resolve_prefetch(
1698         $p, $alias, $seen, \@pre_order, $collapse
1699       );
1700       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1701       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1702     }
1703     push(@{$attrs->{order_by}}, @pre_order);
1704   }
1705   $attrs->{collapse} = $collapse;
1706
1707   return $self->{_attrs} = $attrs;
1708 }
1709
1710 sub _merge_attr {
1711   my ($self, $a, $b) = @_;
1712   return $b unless defined($a);
1713   return $a unless defined($b);
1714   
1715   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1716     foreach my $key (keys %{$b}) {
1717       if (exists $a->{$key}) {
1718         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1719       } else {
1720         $a->{$key} = $b->{$key};
1721       }
1722     }
1723     return $a;
1724   } else {
1725     $a = [$a] unless ref $a eq 'ARRAY';
1726     $b = [$b] unless ref $b eq 'ARRAY';
1727
1728     my $hash = {};
1729     my @array;
1730     foreach my $x ($a, $b) {
1731       foreach my $element (@{$x}) {
1732         if (ref $element eq 'HASH') {
1733           $hash = $self->_merge_attr($hash, $element);
1734         } elsif (ref $element eq 'ARRAY') {
1735           push(@array, @{$element});
1736         } else {
1737           push(@array, $element) unless $b == $x
1738             && grep { $_ eq $element } @array;
1739         }
1740       }
1741     }
1742     
1743     @array = grep { !exists $hash->{$_} } @array;
1744
1745     return keys %{$hash}
1746       ? ( scalar(@array)
1747             ? [$hash, @array]
1748             : $hash
1749         )
1750       : \@array;
1751   }
1752 }
1753
1754 sub result_source {
1755     my $self = shift;
1756
1757     if (@_) {
1758         $self->_source_handle($_[0]->handle);
1759     } else {
1760         $self->_source_handle->resolve;
1761     }
1762 }
1763
1764 =head2 throw_exception
1765
1766 See L<DBIx::Class::Schema/throw_exception> for details.
1767
1768 =cut
1769
1770 sub throw_exception {
1771   my $self=shift;
1772   $self->_source_handle->schema->throw_exception(@_);
1773 }
1774
1775 # XXX: FIXME: Attributes docs need clearing up
1776
1777 =head1 ATTRIBUTES
1778
1779 The resultset takes various attributes that modify its behavior. Here's an
1780 overview of them:
1781
1782 =head2 order_by
1783
1784 =over 4
1785
1786 =item Value: ($order_by | \@order_by)
1787
1788 =back
1789
1790 Which column(s) to order the results by. This is currently passed
1791 through directly to SQL, so you can give e.g. C<year DESC> for a
1792 descending order on the column `year'.
1793
1794 Please note that if you have C<quote_char> enabled (see
1795 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1796 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1797 so you will need to manually quote things as appropriate.)
1798
1799 =head2 columns
1800
1801 =over 4
1802
1803 =item Value: \@columns
1804
1805 =back
1806
1807 Shortcut to request a particular set of columns to be retrieved.  Adds
1808 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1809 from that, then auto-populates C<as> from C<select> as normal. (You may also
1810 use the C<cols> attribute, as in earlier versions of DBIC.)
1811
1812 =head2 include_columns
1813
1814 =over 4
1815
1816 =item Value: \@columns
1817
1818 =back
1819
1820 Shortcut to include additional columns in the returned results - for example
1821
1822   $schema->resultset('CD')->search(undef, {
1823     include_columns => ['artist.name'],
1824     join => ['artist']
1825   });
1826
1827 would return all CDs and include a 'name' column to the information
1828 passed to object inflation
1829
1830 =head2 select
1831
1832 =over 4
1833
1834 =item Value: \@select_columns
1835
1836 =back
1837
1838 Indicates which columns should be selected from the storage. You can use
1839 column names, or in the case of RDBMS back ends, function or stored procedure
1840 names:
1841
1842   $rs = $schema->resultset('Employee')->search(undef, {
1843     select => [
1844       'name',
1845       { count => 'employeeid' },
1846       { sum => 'salary' }
1847     ]
1848   });
1849
1850 When you use function/stored procedure names and do not supply an C<as>
1851 attribute, the column names returned are storage-dependent. E.g. MySQL would
1852 return a column named C<count(employeeid)> in the above example.
1853
1854 =head2 +select
1855
1856 =over 4
1857
1858 Indicates additional columns to be selected from storage.  Works the same as
1859 L<select> but adds columns to the selection.
1860
1861 =back
1862
1863 =head2 +as
1864
1865 =over 4
1866
1867 Indicates additional column names for those added via L<+select>.
1868
1869 =back
1870
1871 =head2 as
1872
1873 =over 4
1874
1875 =item Value: \@inflation_names
1876
1877 =back
1878
1879 Indicates column names for object inflation. This is used in conjunction with
1880 C<select>, usually when C<select> contains one or more function or stored
1881 procedure names:
1882
1883   $rs = $schema->resultset('Employee')->search(undef, {
1884     select => [
1885       'name',
1886       { count => 'employeeid' }
1887     ],
1888     as => ['name', 'employee_count'],
1889   });
1890
1891   my $employee = $rs->first(); # get the first Employee
1892
1893 If the object against which the search is performed already has an accessor
1894 matching a column name specified in C<as>, the value can be retrieved using
1895 the accessor as normal:
1896
1897   my $name = $employee->name();
1898
1899 If on the other hand an accessor does not exist in the object, you need to
1900 use C<get_column> instead:
1901
1902   my $employee_count = $employee->get_column('employee_count');
1903
1904 You can create your own accessors if required - see
1905 L<DBIx::Class::Manual::Cookbook> for details.
1906
1907 Please note: This will NOT insert an C<AS employee_count> into the SQL
1908 statement produced, it is used for internal access only. Thus
1909 attempting to use the accessor in an C<order_by> clause or similar
1910 will fail miserably.
1911
1912 To get around this limitation, you can supply literal SQL to your
1913 C<select> attibute that contains the C<AS alias> text, eg:
1914
1915   select => [\'myfield AS alias']
1916
1917 =head2 join
1918
1919 =over 4
1920
1921 =item Value: ($rel_name | \@rel_names | \%rel_names)
1922
1923 =back
1924
1925 Contains a list of relationships that should be joined for this query.  For
1926 example:
1927
1928   # Get CDs by Nine Inch Nails
1929   my $rs = $schema->resultset('CD')->search(
1930     { 'artist.name' => 'Nine Inch Nails' },
1931     { join => 'artist' }
1932   );
1933
1934 Can also contain a hash reference to refer to the other relation's relations.
1935 For example:
1936
1937   package MyApp::Schema::Track;
1938   use base qw/DBIx::Class/;
1939   __PACKAGE__->table('track');
1940   __PACKAGE__->add_columns(qw/trackid cd position title/);
1941   __PACKAGE__->set_primary_key('trackid');
1942   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1943   1;
1944
1945   # In your application
1946   my $rs = $schema->resultset('Artist')->search(
1947     { 'track.title' => 'Teardrop' },
1948     {
1949       join     => { cd => 'track' },
1950       order_by => 'artist.name',
1951     }
1952   );
1953
1954 You need to use the relationship (not the table) name in  conditions, 
1955 because they are aliased as such. The current table is aliased as "me", so 
1956 you need to use me.column_name in order to avoid ambiguity. For example:
1957
1958   # Get CDs from 1984 with a 'Foo' track 
1959   my $rs = $schema->resultset('CD')->search(
1960     { 
1961       'me.year' => 1984,
1962       'tracks.name' => 'Foo'
1963     },
1964     { join => 'tracks' }
1965   );
1966   
1967 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1968 similarly for a third time). For e.g.
1969
1970   my $rs = $schema->resultset('Artist')->search({
1971     'cds.title'   => 'Down to Earth',
1972     'cds_2.title' => 'Popular',
1973   }, {
1974     join => [ qw/cds cds/ ],
1975   });
1976
1977 will return a set of all artists that have both a cd with title 'Down
1978 to Earth' and a cd with title 'Popular'.
1979
1980 If you want to fetch related objects from other tables as well, see C<prefetch>
1981 below.
1982
1983 =head2 prefetch
1984
1985 =over 4
1986
1987 =item Value: ($rel_name | \@rel_names | \%rel_names)
1988
1989 =back
1990
1991 Contains one or more relationships that should be fetched along with the main
1992 query (when they are accessed afterwards they will have already been
1993 "prefetched").  This is useful for when you know you will need the related
1994 objects, because it saves at least one query:
1995
1996   my $rs = $schema->resultset('Tag')->search(
1997     undef,
1998     {
1999       prefetch => {
2000         cd => 'artist'
2001       }
2002     }
2003   );
2004
2005 The initial search results in SQL like the following:
2006
2007   SELECT tag.*, cd.*, artist.* FROM tag
2008   JOIN cd ON tag.cd = cd.cdid
2009   JOIN artist ON cd.artist = artist.artistid
2010
2011 L<DBIx::Class> has no need to go back to the database when we access the
2012 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2013 case.
2014
2015 Simple prefetches will be joined automatically, so there is no need
2016 for a C<join> attribute in the above search. If you're prefetching to
2017 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2018 specify the join as well.
2019
2020 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2021 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2022 with an accessor type of 'single' or 'filter').
2023
2024 =head2 page
2025
2026 =over 4
2027
2028 =item Value: $page
2029
2030 =back
2031
2032 Makes the resultset paged and specifies the page to retrieve. Effectively
2033 identical to creating a non-pages resultset and then calling ->page($page)
2034 on it.
2035
2036 If L<rows> attribute is not specified it defualts to 10 rows per page.
2037
2038 =head2 rows
2039
2040 =over 4
2041
2042 =item Value: $rows
2043
2044 =back
2045
2046 Specifes the maximum number of rows for direct retrieval or the number of
2047 rows per page if the page attribute or method is used.
2048
2049 =head2 offset
2050
2051 =over 4
2052
2053 =item Value: $offset
2054
2055 =back
2056
2057 Specifies the (zero-based) row number for the  first row to be returned, or the
2058 of the first row of the first page if paging is used.
2059
2060 =head2 group_by
2061
2062 =over 4
2063
2064 =item Value: \@columns
2065
2066 =back
2067
2068 A arrayref of columns to group by. Can include columns of joined tables.
2069
2070   group_by => [qw/ column1 column2 ... /]
2071
2072 =head2 having
2073
2074 =over 4
2075
2076 =item Value: $condition
2077
2078 =back
2079
2080 HAVING is a select statement attribute that is applied between GROUP BY and
2081 ORDER BY. It is applied to the after the grouping calculations have been
2082 done.
2083
2084   having => { 'count(employee)' => { '>=', 100 } }
2085
2086 =head2 distinct
2087
2088 =over 4
2089
2090 =item Value: (0 | 1)
2091
2092 =back
2093
2094 Set to 1 to group by all columns.
2095
2096 =head2 where
2097
2098 =over 4
2099
2100 Adds to the WHERE clause.
2101
2102   # only return rows WHERE deleted IS NULL for all searches
2103   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2104
2105 Can be overridden by passing C<{ where => undef }> as an attribute
2106 to a resulset.
2107
2108 =back
2109
2110 =head2 cache
2111
2112 Set to 1 to cache search results. This prevents extra SQL queries if you
2113 revisit rows in your ResultSet:
2114
2115   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2116
2117   while( my $artist = $resultset->next ) {
2118     ... do stuff ...
2119   }
2120
2121   $rs->first; # without cache, this would issue a query
2122
2123 By default, searches are not cached.
2124
2125 For more examples of using these attributes, see
2126 L<DBIx::Class::Manual::Cookbook>.
2127
2128 =head2 from
2129
2130 =over 4
2131
2132 =item Value: \@from_clause
2133
2134 =back
2135
2136 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2137 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2138 clauses.
2139
2140 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2141
2142 C<join> will usually do what you need and it is strongly recommended that you
2143 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2144 And we really do mean "cannot", not just tried and failed. Attempting to use
2145 this because you're having problems with C<join> is like trying to use x86
2146 ASM because you've got a syntax error in your C. Trust us on this.
2147
2148 Now, if you're still really, really sure you need to use this (and if you're
2149 not 100% sure, ask the mailing list first), here's an explanation of how this
2150 works.
2151
2152 The syntax is as follows -
2153
2154   [
2155     { <alias1> => <table1> },
2156     [
2157       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2158       [], # nested JOIN (optional)
2159       { <table1.column1> => <table2.column2>, ... (more conditions) },
2160     ],
2161     # More of the above [ ] may follow for additional joins
2162   ]
2163
2164   <table1> <alias1>
2165   JOIN
2166     <table2> <alias2>
2167     [JOIN ...]
2168   ON <table1.column1> = <table2.column2>
2169   <more joins may follow>
2170
2171 An easy way to follow the examples below is to remember the following:
2172
2173     Anything inside "[]" is a JOIN
2174     Anything inside "{}" is a condition for the enclosing JOIN
2175
2176 The following examples utilize a "person" table in a family tree application.
2177 In order to express parent->child relationships, this table is self-joined:
2178
2179     # Person->belongs_to('father' => 'Person');
2180     # Person->belongs_to('mother' => 'Person');
2181
2182 C<from> can be used to nest joins. Here we return all children with a father,
2183 then search against all mothers of those children:
2184
2185   $rs = $schema->resultset('Person')->search(
2186       undef,
2187       {
2188           alias => 'mother', # alias columns in accordance with "from"
2189           from => [
2190               { mother => 'person' },
2191               [
2192                   [
2193                       { child => 'person' },
2194                       [
2195                           { father => 'person' },
2196                           { 'father.person_id' => 'child.father_id' }
2197                       ]
2198                   ],
2199                   { 'mother.person_id' => 'child.mother_id' }
2200               ],
2201           ]
2202       },
2203   );
2204
2205   # Equivalent SQL:
2206   # SELECT mother.* FROM person mother
2207   # JOIN (
2208   #   person child
2209   #   JOIN person father
2210   #   ON ( father.person_id = child.father_id )
2211   # )
2212   # ON ( mother.person_id = child.mother_id )
2213
2214 The type of any join can be controlled manually. To search against only people
2215 with a father in the person table, we could explicitly use C<INNER JOIN>:
2216
2217     $rs = $schema->resultset('Person')->search(
2218         undef,
2219         {
2220             alias => 'child', # alias columns in accordance with "from"
2221             from => [
2222                 { child => 'person' },
2223                 [
2224                     { father => 'person', -join_type => 'inner' },
2225                     { 'father.id' => 'child.father_id' }
2226                 ],
2227             ]
2228         },
2229     );
2230
2231     # Equivalent SQL:
2232     # SELECT child.* FROM person child
2233     # INNER JOIN person father ON child.father_id = father.id
2234
2235 =cut
2236
2237 1;