cf4f89a94a77382cb1ddaf1fd28578bd047f7e2f
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85
86   my ($source, $attrs) = @_;
87   #weaken $source;
88   $attrs = { %{$attrs||{}} };
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my (%related, $info);
347
348   foreach my $key (keys %$input_query) {
349     if (ref($input_query->{$key})
350         && ($info = $self->result_source->relationship_info($key))) {
351       my $rel_q = $self->result_source->resolve_condition(
352                     $info->{cond}, delete $input_query->{$key}, $key
353                   );
354       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
355       @related{keys %$rel_q} = values %$rel_q;
356     }
357   }
358   if (my @keys = keys %related) {
359     @{$input_query}{@keys} = values %related;
360   }
361
362   my @unique_queries = $self->_unique_queries($input_query, $attrs);
363
364   # Build the final query: Default to the disjunction of the unique queries,
365   # but allow the input query in case the ResultSet defines the query or the
366   # user is abusing find
367   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
368   my $query = @unique_queries
369     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
370     : $self->_add_alias($input_query, $alias);
371
372   # Run the query
373   if (keys %$attrs) {
374     my $rs = $self->search($query, $attrs);
375     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
376   }
377   else {
378     return keys %{$self->_resolved_attrs->{collapse}}
379       ? $self->search($query)->next
380       : $self->single($query);
381   }
382 }
383
384 # _add_alias
385 #
386 # Add the specified alias to the specified query hash. A copy is made so the
387 # original query is not modified.
388
389 sub _add_alias {
390   my ($self, $query, $alias) = @_;
391
392   my %aliased = %$query;
393   foreach my $col (grep { ! m/\./ } keys %aliased) {
394     $aliased{"$alias.$col"} = delete $aliased{$col};
395   }
396
397   return \%aliased;
398 }
399
400 # _unique_queries
401 #
402 # Build a list of queries which satisfy unique constraints.
403
404 sub _unique_queries {
405   my ($self, $query, $attrs) = @_;
406
407   my @constraint_names = exists $attrs->{key}
408     ? ($attrs->{key})
409     : $self->result_source->unique_constraint_names;
410
411   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
412   my $num_where = scalar keys %$where;
413
414   my @unique_queries;
415   foreach my $name (@constraint_names) {
416     my @unique_cols = $self->result_source->unique_constraint_columns($name);
417     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
418
419     my $num_cols = scalar @unique_cols;
420     my $num_query = scalar keys %$unique_query;
421
422     my $total = $num_query + $num_where;
423     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
424       # The query is either unique on its own or is unique in combination with
425       # the existing where clause
426       push @unique_queries, $unique_query;
427     }
428   }
429
430   return @unique_queries;
431 }
432
433 # _build_unique_query
434 #
435 # Constrain the specified query hash based on the specified column names.
436
437 sub _build_unique_query {
438   my ($self, $query, $unique_cols) = @_;
439
440   return {
441     map  { $_ => $query->{$_} }
442     grep { exists $query->{$_} }
443       @$unique_cols
444   };
445 }
446
447 =head2 search_related
448
449 =over 4
450
451 =item Arguments: $rel, $cond, \%attrs?
452
453 =item Return Value: $new_resultset
454
455 =back
456
457   $new_rs = $cd_rs->search_related('artist', {
458     name => 'Emo-R-Us',
459   });
460
461 Searches the specified relationship, optionally specifying a condition and
462 attributes for matching records. See L</ATTRIBUTES> for more information.
463
464 =cut
465
466 sub search_related {
467   return shift->related_resultset(shift)->search(@_);
468 }
469
470 =head2 cursor
471
472 =over 4
473
474 =item Arguments: none
475
476 =item Return Value: $cursor
477
478 =back
479
480 Returns a storage-driven cursor to the given resultset. See
481 L<DBIx::Class::Cursor> for more information.
482
483 =cut
484
485 sub cursor {
486   my ($self) = @_;
487
488   my $attrs = { %{$self->_resolved_attrs} };
489   return $self->{cursor}
490     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
491           $attrs->{where},$attrs);
492 }
493
494 =head2 single
495
496 =over 4
497
498 =item Arguments: $cond?
499
500 =item Return Value: $row_object?
501
502 =back
503
504   my $cd = $schema->resultset('CD')->single({ year => 2001 });
505
506 Inflates the first result without creating a cursor if the resultset has
507 any records in it; if not returns nothing. Used by L</find> as an optimisation.
508
509 Can optionally take an additional condition *only* - this is a fast-code-path
510 method; if you need to add extra joins or similar call ->search and then
511 ->single without a condition on the $rs returned from that.
512
513 =cut
514
515 sub single {
516   my ($self, $where) = @_;
517   my $attrs = { %{$self->_resolved_attrs} };
518   if ($where) {
519     if (defined $attrs->{where}) {
520       $attrs->{where} = {
521         '-and' =>
522             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
523                $where, delete $attrs->{where} ]
524       };
525     } else {
526       $attrs->{where} = $where;
527     }
528   }
529
530 #  XXX: Disabled since it doesn't infer uniqueness in all cases
531 #  unless ($self->_is_unique_query($attrs->{where})) {
532 #    carp "Query not guaranteed to return a single row"
533 #      . "; please declare your unique constraints or use search instead";
534 #  }
535
536   my @data = $self->result_source->storage->select_single(
537     $attrs->{from}, $attrs->{select},
538     $attrs->{where}, $attrs
539   );
540
541   return (@data ? ($self->_construct_object(@data))[0] : ());
542 }
543
544 # _is_unique_query
545 #
546 # Try to determine if the specified query is guaranteed to be unique, based on
547 # the declared unique constraints.
548
549 sub _is_unique_query {
550   my ($self, $query) = @_;
551
552   my $collapsed = $self->_collapse_query($query);
553   my $alias = $self->{attrs}{alias};
554
555   foreach my $name ($self->result_source->unique_constraint_names) {
556     my @unique_cols = map {
557       "$alias.$_"
558     } $self->result_source->unique_constraint_columns($name);
559
560     # Count the values for each unique column
561     my %seen = map { $_ => 0 } @unique_cols;
562
563     foreach my $key (keys %$collapsed) {
564       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
565       next unless exists $seen{$aliased};  # Additional constraints are okay
566       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
567     }
568
569     # If we get 0 or more than 1 value for a column, it's not necessarily unique
570     return 1 unless grep { $_ != 1 } values %seen;
571   }
572
573   return 0;
574 }
575
576 # _collapse_query
577 #
578 # Recursively collapse the query, accumulating values for each column.
579
580 sub _collapse_query {
581   my ($self, $query, $collapsed) = @_;
582
583   $collapsed ||= {};
584
585   if (ref $query eq 'ARRAY') {
586     foreach my $subquery (@$query) {
587       next unless ref $subquery;  # -or
588 #      warn "ARRAY: " . Dumper $subquery;
589       $collapsed = $self->_collapse_query($subquery, $collapsed);
590     }
591   }
592   elsif (ref $query eq 'HASH') {
593     if (keys %$query and (keys %$query)[0] eq '-and') {
594       foreach my $subquery (@{$query->{-and}}) {
595 #        warn "HASH: " . Dumper $subquery;
596         $collapsed = $self->_collapse_query($subquery, $collapsed);
597       }
598     }
599     else {
600 #      warn "LEAF: " . Dumper $query;
601       foreach my $col (keys %$query) {
602         my $value = $query->{$col};
603         $collapsed->{$col}{$value}++;
604       }
605     }
606   }
607
608   return $collapsed;
609 }
610
611 =head2 get_column
612
613 =over 4
614
615 =item Arguments: $cond?
616
617 =item Return Value: $resultsetcolumn
618
619 =back
620
621   my $max_length = $rs->get_column('length')->max;
622
623 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
624
625 =cut
626
627 sub get_column {
628   my ($self, $column) = @_;
629   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
630   return $new;
631 }
632
633 =head2 search_like
634
635 =over 4
636
637 =item Arguments: $cond, \%attrs?
638
639 =item Return Value: $resultset (scalar context), @row_objs (list context)
640
641 =back
642
643   # WHERE title LIKE '%blue%'
644   $cd_rs = $rs->search_like({ title => '%blue%'});
645
646 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
647 that this is simply a convenience method. You most likely want to use
648 L</search> with specific operators.
649
650 For more information, see L<DBIx::Class::Manual::Cookbook>.
651
652 =cut
653
654 sub search_like {
655   my $class = shift;
656   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
657   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
658   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
659   return $class->search($query, { %$attrs });
660 }
661
662 =head2 slice
663
664 =over 4
665
666 =item Arguments: $first, $last
667
668 =item Return Value: $resultset (scalar context), @row_objs (list context)
669
670 =back
671
672 Returns a resultset or object list representing a subset of elements from the
673 resultset slice is called on. Indexes are from 0, i.e., to get the first
674 three records, call:
675
676   my ($one, $two, $three) = $rs->slice(0, 2);
677
678 =cut
679
680 sub slice {
681   my ($self, $min, $max) = @_;
682   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
683   $attrs->{offset} = $self->{attrs}{offset} || 0;
684   $attrs->{offset} += $min;
685   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
686   return $self->search(undef(), $attrs);
687   #my $slice = (ref $self)->new($self->result_source, $attrs);
688   #return (wantarray ? $slice->all : $slice);
689 }
690
691 =head2 next
692
693 =over 4
694
695 =item Arguments: none
696
697 =item Return Value: $result?
698
699 =back
700
701 Returns the next element in the resultset (C<undef> is there is none).
702
703 Can be used to efficiently iterate over records in the resultset:
704
705   my $rs = $schema->resultset('CD')->search;
706   while (my $cd = $rs->next) {
707     print $cd->title;
708   }
709
710 Note that you need to store the resultset object, and call C<next> on it.
711 Calling C<< resultset('Table')->next >> repeatedly will always return the
712 first record from the resultset.
713
714 =cut
715
716 sub next {
717   my ($self) = @_;
718   if (my $cache = $self->get_cache) {
719     $self->{all_cache_position} ||= 0;
720     return $cache->[$self->{all_cache_position}++];
721   }
722   if ($self->{attrs}{cache}) {
723     $self->{all_cache_position} = 1;
724     return ($self->all)[0];
725   }
726   if ($self->{stashed_objects}) {
727     my $obj = shift(@{$self->{stashed_objects}});
728     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
729     return $obj;
730   }
731   my @row = (
732     exists $self->{stashed_row}
733       ? @{delete $self->{stashed_row}}
734       : $self->cursor->next
735   );
736   return unless (@row);
737   my ($row, @more) = $self->_construct_object(@row);
738   $self->{stashed_objects} = \@more if @more;
739   return $row;
740 }
741
742 sub _construct_object {
743   my ($self, @row) = @_;
744   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
745   my @new = $self->result_class->inflate_result($self->result_source, @$info);
746   @new = $self->{_attrs}{record_filter}->(@new)
747     if exists $self->{_attrs}{record_filter};
748   return @new;
749 }
750
751 sub _collapse_result {
752   my ($self, $as, $row, $prefix) = @_;
753
754   my %const;
755   my @copy = @$row;
756   
757   foreach my $this_as (@$as) {
758     my $val = shift @copy;
759     if (defined $prefix) {
760       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
761         my $remain = $1;
762         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
763         $const{$1||''}{$2} = $val;
764       }
765     } else {
766       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
767       $const{$1||''}{$2} = $val;
768     }
769   }
770
771   my $alias = $self->{attrs}{alias};
772   my $info = [ {}, {} ];
773   foreach my $key (keys %const) {
774     if (length $key && $key ne $alias) {
775       my $target = $info;
776       my @parts = split(/\./, $key);
777       foreach my $p (@parts) {
778         $target = $target->[1]->{$p} ||= [];
779       }
780       $target->[0] = $const{$key};
781     } else {
782       $info->[0] = $const{$key};
783     }
784   }
785   
786   my @collapse;
787   if (defined $prefix) {
788     @collapse = map {
789         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
790     } keys %{$self->{_attrs}{collapse}}
791   } else {
792     @collapse = keys %{$self->{_attrs}{collapse}};
793   };
794
795   if (@collapse) {
796     my ($c) = sort { length $a <=> length $b } @collapse;
797     my $target = $info;
798     foreach my $p (split(/\./, $c)) {
799       $target = $target->[1]->{$p} ||= [];
800     }
801     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
802     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
803     my $tree = $self->_collapse_result($as, $row, $c_prefix);
804     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
805     my (@final, @raw);
806
807     while (
808       !(
809         grep {
810           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
811         } @co_key
812         )
813     ) {
814       push(@final, $tree);
815       last unless (@raw = $self->cursor->next);
816       $row = $self->{stashed_row} = \@raw;
817       $tree = $self->_collapse_result($as, $row, $c_prefix);
818     }
819     @$target = (@final ? @final : [ {}, {} ]);
820       # single empty result to indicate an empty prefetched has_many
821   }
822
823   #print "final info: " . Dumper($info);
824   return $info;
825 }
826
827 =head2 result_source
828
829 =over 4
830
831 =item Arguments: $result_source?
832
833 =item Return Value: $result_source
834
835 =back
836
837 An accessor for the primary ResultSource object from which this ResultSet
838 is derived.
839
840 =head2 result_class
841
842 =over 4
843
844 =item Arguments: $result_class?
845
846 =item Return Value: $result_class
847
848 =back
849
850 An accessor for the class to use when creating row objects. Defaults to 
851 C<< result_source->result_class >> - which in most cases is the name of the 
852 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
853
854 =cut
855
856
857 =head2 count
858
859 =over 4
860
861 =item Arguments: $cond, \%attrs??
862
863 =item Return Value: $count
864
865 =back
866
867 Performs an SQL C<COUNT> with the same query as the resultset was built
868 with to find the number of elements. If passed arguments, does a search
869 on the resultset and counts the results of that.
870
871 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
872 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
873 not support C<DISTINCT> with multiple columns. If you are using such a
874 database, you should only use columns from the main table in your C<group_by>
875 clause.
876
877 =cut
878
879 sub count {
880   my $self = shift;
881   return $self->search(@_)->count if @_ and defined $_[0];
882   return scalar @{ $self->get_cache } if $self->get_cache;
883   my $count = $self->_count;
884   return 0 unless $count;
885
886   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
887   $count = $self->{attrs}{rows} if
888     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
889   return $count;
890 }
891
892 sub _count { # Separated out so pager can get the full count
893   my $self = shift;
894   my $select = { count => '*' };
895
896   my $attrs = { %{$self->_resolved_attrs} };
897   if (my $group_by = delete $attrs->{group_by}) {
898     delete $attrs->{having};
899     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
900     # todo: try CONCAT for multi-column pk
901     my @pk = $self->result_source->primary_columns;
902     if (@pk == 1) {
903       my $alias = $attrs->{alias};
904       foreach my $column (@distinct) {
905         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
906           @distinct = ($column);
907           last;
908         }
909       }
910     }
911
912     $select = { count => { distinct => \@distinct } };
913   }
914
915   $attrs->{select} = $select;
916   $attrs->{as} = [qw/count/];
917
918   # offset, order by and page are not needed to count. record_filter is cdbi
919   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
920
921   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
922   my ($count) = $tmp_rs->cursor->next;
923   return $count;
924 }
925
926 =head2 count_literal
927
928 =over 4
929
930 =item Arguments: $sql_fragment, @bind_values
931
932 =item Return Value: $count
933
934 =back
935
936 Counts the results in a literal query. Equivalent to calling L</search_literal>
937 with the passed arguments, then L</count>.
938
939 =cut
940
941 sub count_literal { shift->search_literal(@_)->count; }
942
943 =head2 all
944
945 =over 4
946
947 =item Arguments: none
948
949 =item Return Value: @objects
950
951 =back
952
953 Returns all elements in the resultset. Called implicitly if the resultset
954 is returned in list context.
955
956 =cut
957
958 sub all {
959   my ($self) = @_;
960   return @{ $self->get_cache } if $self->get_cache;
961
962   my @obj;
963
964   # TODO: don't call resolve here
965   if (keys %{$self->_resolved_attrs->{collapse}}) {
966 #  if ($self->{attrs}{prefetch}) {
967       # Using $self->cursor->all is really just an optimisation.
968       # If we're collapsing has_many prefetches it probably makes
969       # very little difference, and this is cleaner than hacking
970       # _construct_object to survive the approach
971     my @row = $self->cursor->next;
972     while (@row) {
973       push(@obj, $self->_construct_object(@row));
974       @row = (exists $self->{stashed_row}
975                ? @{delete $self->{stashed_row}}
976                : $self->cursor->next);
977     }
978   } else {
979     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
980   }
981
982   $self->set_cache(\@obj) if $self->{attrs}{cache};
983   return @obj;
984 }
985
986 =head2 reset
987
988 =over 4
989
990 =item Arguments: none
991
992 =item Return Value: $self
993
994 =back
995
996 Resets the resultset's cursor, so you can iterate through the elements again.
997
998 =cut
999
1000 sub reset {
1001   my ($self) = @_;
1002   delete $self->{_attrs} if exists $self->{_attrs};
1003   $self->{all_cache_position} = 0;
1004   $self->cursor->reset;
1005   return $self;
1006 }
1007
1008 =head2 first
1009
1010 =over 4
1011
1012 =item Arguments: none
1013
1014 =item Return Value: $object?
1015
1016 =back
1017
1018 Resets the resultset and returns an object for the first result (if the
1019 resultset returns anything).
1020
1021 =cut
1022
1023 sub first {
1024   return $_[0]->reset->next;
1025 }
1026
1027 # _cond_for_update_delete
1028 #
1029 # update/delete require the condition to be modified to handle
1030 # the differing SQL syntax available.  This transforms the $self->{cond}
1031 # appropriately, returning the new condition.
1032
1033 sub _cond_for_update_delete {
1034   my ($self, $full_cond) = @_;
1035   my $cond = {};
1036
1037   $full_cond ||= $self->{cond};
1038   # No-op. No condition, we're updating/deleting everything
1039   return $cond unless ref $full_cond;
1040
1041   if (ref $full_cond eq 'ARRAY') {
1042     $cond = [
1043       map {
1044         my %hash;
1045         foreach my $key (keys %{$_}) {
1046           $key =~ /([^.]+)$/;
1047           $hash{$1} = $_->{$key};
1048         }
1049         \%hash;
1050       } @{$full_cond}
1051     ];
1052   }
1053   elsif (ref $full_cond eq 'HASH') {
1054     if ((keys %{$full_cond})[0] eq '-and') {
1055       $cond->{-and} = [];
1056
1057       my @cond = @{$full_cond->{-and}};
1058       for (my $i = 0; $i < @cond; $i++) {
1059         my $entry = $cond[$i];
1060
1061         my $hash;
1062         if (ref $entry eq 'HASH') {
1063           $hash = $self->_cond_for_update_delete($entry);
1064         }
1065         else {
1066           $entry =~ /([^.]+)$/;
1067           $hash->{$1} = $cond[++$i];
1068         }
1069
1070         push @{$cond->{-and}}, $hash;
1071       }
1072     }
1073     else {
1074       foreach my $key (keys %{$full_cond}) {
1075         $key =~ /([^.]+)$/;
1076         $cond->{$1} = $full_cond->{$key};
1077       }
1078     }
1079   }
1080   else {
1081     $self->throw_exception(
1082       "Can't update/delete on resultset with condition unless hash or array"
1083     );
1084   }
1085
1086   return $cond;
1087 }
1088
1089
1090 =head2 update
1091
1092 =over 4
1093
1094 =item Arguments: \%values
1095
1096 =item Return Value: $storage_rv
1097
1098 =back
1099
1100 Sets the specified columns in the resultset to the supplied values in a
1101 single query. Return value will be true if the update succeeded or false
1102 if no records were updated; exact type of success value is storage-dependent.
1103
1104 =cut
1105
1106 sub update {
1107   my ($self, $values) = @_;
1108   $self->throw_exception("Values for update must be a hash")
1109     unless ref $values eq 'HASH';
1110
1111   my $cond = $self->_cond_for_update_delete;
1112
1113   return $self->result_source->storage->update(
1114     $self->result_source->from, $values, $cond
1115   );
1116 }
1117
1118 =head2 update_all
1119
1120 =over 4
1121
1122 =item Arguments: \%values
1123
1124 =item Return Value: 1
1125
1126 =back
1127
1128 Fetches all objects and updates them one at a time. Note that C<update_all>
1129 will run DBIC cascade triggers, while L</update> will not.
1130
1131 =cut
1132
1133 sub update_all {
1134   my ($self, $values) = @_;
1135   $self->throw_exception("Values for update must be a hash")
1136     unless ref $values eq 'HASH';
1137   foreach my $obj ($self->all) {
1138     $obj->set_columns($values)->update;
1139   }
1140   return 1;
1141 }
1142
1143 =head2 delete
1144
1145 =over 4
1146
1147 =item Arguments: none
1148
1149 =item Return Value: 1
1150
1151 =back
1152
1153 Deletes the contents of the resultset from its result source. Note that this
1154 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1155 to run. See also L<DBIx::Class::Row/delete>.
1156
1157 =cut
1158
1159 sub delete {
1160   my ($self) = @_;
1161
1162   my $cond = $self->_cond_for_update_delete;
1163
1164   $self->result_source->storage->delete($self->result_source->from, $cond);
1165   return 1;
1166 }
1167
1168 =head2 delete_all
1169
1170 =over 4
1171
1172 =item Arguments: none
1173
1174 =item Return Value: 1
1175
1176 =back
1177
1178 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1179 will run DBIC cascade triggers, while L</delete> will not.
1180
1181 =cut
1182
1183 sub delete_all {
1184   my ($self) = @_;
1185   $_->delete for $self->all;
1186   return 1;
1187 }
1188
1189 =head2 pager
1190
1191 =over 4
1192
1193 =item Arguments: none
1194
1195 =item Return Value: $pager
1196
1197 =back
1198
1199 Return Value a L<Data::Page> object for the current resultset. Only makes
1200 sense for queries with a C<page> attribute.
1201
1202 =cut
1203
1204 sub pager {
1205   my ($self) = @_;
1206   my $attrs = $self->{attrs};
1207   $self->throw_exception("Can't create pager for non-paged rs")
1208     unless $self->{attrs}{page};
1209   $attrs->{rows} ||= 10;
1210   return $self->{pager} ||= Data::Page->new(
1211     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1212 }
1213
1214 =head2 page
1215
1216 =over 4
1217
1218 =item Arguments: $page_number
1219
1220 =item Return Value: $rs
1221
1222 =back
1223
1224 Returns a resultset for the $page_number page of the resultset on which page
1225 is called, where each page contains a number of rows equal to the 'rows'
1226 attribute set on the resultset (10 by default).
1227
1228 =cut
1229
1230 sub page {
1231   my ($self, $page) = @_;
1232   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1233 }
1234
1235 =head2 new_result
1236
1237 =over 4
1238
1239 =item Arguments: \%vals
1240
1241 =item Return Value: $object
1242
1243 =back
1244
1245 Creates an object in the resultset's result class and returns it.
1246
1247 =cut
1248
1249 sub new_result {
1250   my ($self, $values) = @_;
1251   $self->throw_exception( "new_result needs a hash" )
1252     unless (ref $values eq 'HASH');
1253   $self->throw_exception(
1254     "Can't abstract implicit construct, condition not a hash"
1255   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1256
1257   my $alias = $self->{attrs}{alias};
1258   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1259   my %new = (
1260     %{ $self->_remove_alias($values, $alias) },
1261     %{ $self->_remove_alias($collapsed_cond, $alias) },
1262     -result_source => $self->result_source,
1263   );
1264
1265   my $obj = $self->result_class->new(\%new);
1266   return $obj;
1267 }
1268
1269 # _collapse_cond
1270 #
1271 # Recursively collapse the condition.
1272
1273 sub _collapse_cond {
1274   my ($self, $cond, $collapsed) = @_;
1275
1276   $collapsed ||= {};
1277
1278   if (ref $cond eq 'ARRAY') {
1279     foreach my $subcond (@$cond) {
1280       next unless ref $subcond;  # -or
1281 #      warn "ARRAY: " . Dumper $subcond;
1282       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1283     }
1284   }
1285   elsif (ref $cond eq 'HASH') {
1286     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1287       foreach my $subcond (@{$cond->{-and}}) {
1288 #        warn "HASH: " . Dumper $subcond;
1289         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1290       }
1291     }
1292     else {
1293 #      warn "LEAF: " . Dumper $cond;
1294       foreach my $col (keys %$cond) {
1295         my $value = $cond->{$col};
1296         $collapsed->{$col} = $value;
1297       }
1298     }
1299   }
1300
1301   return $collapsed;
1302 }
1303
1304 # _remove_alias
1305 #
1306 # Remove the specified alias from the specified query hash. A copy is made so
1307 # the original query is not modified.
1308
1309 sub _remove_alias {
1310   my ($self, $query, $alias) = @_;
1311
1312   my %orig = %{ $query || {} };
1313   my %unaliased;
1314
1315   foreach my $key (keys %orig) {
1316     if ($key !~ /\./) {
1317       $unaliased{$key} = $orig{$key};
1318       next;
1319     }
1320     $unaliased{$1} = $orig{$key}
1321       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1322   }
1323
1324   return \%unaliased;
1325 }
1326
1327 =head2 find_or_new
1328
1329 =over 4
1330
1331 =item Arguments: \%vals, \%attrs?
1332
1333 =item Return Value: $object
1334
1335 =back
1336
1337 Find an existing record from this resultset. If none exists, instantiate a new
1338 result object and return it. The object will not be saved into your storage
1339 until you call L<DBIx::Class::Row/insert> on it.
1340
1341 If you want objects to be saved immediately, use L</find_or_create> instead.
1342
1343 =cut
1344
1345 sub find_or_new {
1346   my $self     = shift;
1347   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1348   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1349   my $exists   = $self->find($hash, $attrs);
1350   return defined $exists ? $exists : $self->new_result($hash);
1351 }
1352
1353 =head2 create
1354
1355 =over 4
1356
1357 =item Arguments: \%vals
1358
1359 =item Return Value: $object
1360
1361 =back
1362
1363 Inserts a record into the resultset and returns the object representing it.
1364
1365 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1366
1367 =cut
1368
1369 sub create {
1370   my ($self, $attrs) = @_;
1371   $self->throw_exception( "create needs a hashref" )
1372     unless ref $attrs eq 'HASH';
1373   return $self->new_result($attrs)->insert;
1374 }
1375
1376 =head2 find_or_create
1377
1378 =over 4
1379
1380 =item Arguments: \%vals, \%attrs?
1381
1382 =item Return Value: $object
1383
1384 =back
1385
1386   $class->find_or_create({ key => $val, ... });
1387
1388 Tries to find a record based on its primary key or unique constraint; if none
1389 is found, creates one and returns that instead.
1390
1391   my $cd = $schema->resultset('CD')->find_or_create({
1392     cdid   => 5,
1393     artist => 'Massive Attack',
1394     title  => 'Mezzanine',
1395     year   => 2005,
1396   });
1397
1398 Also takes an optional C<key> attribute, to search by a specific key or unique
1399 constraint. For example:
1400
1401   my $cd = $schema->resultset('CD')->find_or_create(
1402     {
1403       artist => 'Massive Attack',
1404       title  => 'Mezzanine',
1405     },
1406     { key => 'cd_artist_title' }
1407   );
1408
1409 See also L</find> and L</update_or_create>. For information on how to declare
1410 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1411
1412 =cut
1413
1414 sub find_or_create {
1415   my $self     = shift;
1416   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1417   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1418   my $exists   = $self->find($hash, $attrs);
1419   return defined $exists ? $exists : $self->create($hash);
1420 }
1421
1422 =head2 update_or_create
1423
1424 =over 4
1425
1426 =item Arguments: \%col_values, { key => $unique_constraint }?
1427
1428 =item Return Value: $object
1429
1430 =back
1431
1432   $class->update_or_create({ col => $val, ... });
1433
1434 First, searches for an existing row matching one of the unique constraints
1435 (including the primary key) on the source of this resultset. If a row is
1436 found, updates it with the other given column values. Otherwise, creates a new
1437 row.
1438
1439 Takes an optional C<key> attribute to search on a specific unique constraint.
1440 For example:
1441
1442   # In your application
1443   my $cd = $schema->resultset('CD')->update_or_create(
1444     {
1445       artist => 'Massive Attack',
1446       title  => 'Mezzanine',
1447       year   => 1998,
1448     },
1449     { key => 'cd_artist_title' }
1450   );
1451
1452 If no C<key> is specified, it searches on all unique constraints defined on the
1453 source, including the primary key.
1454
1455 If the C<key> is specified as C<primary>, it searches only on the primary key.
1456
1457 See also L</find> and L</find_or_create>. For information on how to declare
1458 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1459
1460 =cut
1461
1462 sub update_or_create {
1463   my $self = shift;
1464   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1465   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1466
1467   my $row = $self->find($cond, $attrs);
1468   if (defined $row) {
1469     $row->update($cond);
1470     return $row;
1471   }
1472
1473   return $self->create($cond);
1474 }
1475
1476 =head2 get_cache
1477
1478 =over 4
1479
1480 =item Arguments: none
1481
1482 =item Return Value: \@cache_objects?
1483
1484 =back
1485
1486 Gets the contents of the cache for the resultset, if the cache is set.
1487
1488 =cut
1489
1490 sub get_cache {
1491   shift->{all_cache};
1492 }
1493
1494 =head2 set_cache
1495
1496 =over 4
1497
1498 =item Arguments: \@cache_objects
1499
1500 =item Return Value: \@cache_objects
1501
1502 =back
1503
1504 Sets the contents of the cache for the resultset. Expects an arrayref
1505 of objects of the same class as those produced by the resultset. Note that
1506 if the cache is set the resultset will return the cached objects rather
1507 than re-querying the database even if the cache attr is not set.
1508
1509 =cut
1510
1511 sub set_cache {
1512   my ( $self, $data ) = @_;
1513   $self->throw_exception("set_cache requires an arrayref")
1514       if defined($data) && (ref $data ne 'ARRAY');
1515   $self->{all_cache} = $data;
1516 }
1517
1518 =head2 clear_cache
1519
1520 =over 4
1521
1522 =item Arguments: none
1523
1524 =item Return Value: []
1525
1526 =back
1527
1528 Clears the cache for the resultset.
1529
1530 =cut
1531
1532 sub clear_cache {
1533   shift->set_cache(undef);
1534 }
1535
1536 =head2 related_resultset
1537
1538 =over 4
1539
1540 =item Arguments: $relationship_name
1541
1542 =item Return Value: $resultset
1543
1544 =back
1545
1546 Returns a related resultset for the supplied relationship name.
1547
1548   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1549
1550 =cut
1551
1552 sub related_resultset {
1553   my ($self, $rel) = @_;
1554
1555   $self->{related_resultsets} ||= {};
1556   return $self->{related_resultsets}{$rel} ||= do {
1557     my $rel_obj = $self->result_source->relationship_info($rel);
1558
1559     $self->throw_exception(
1560       "search_related: result source '" . $self->result_source->name .
1561         "' has no such relationship $rel")
1562       unless $rel_obj;
1563     
1564     my ($from,$seen) = $self->_resolve_from($rel);
1565
1566     my $join_count = $seen->{$rel};
1567     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1568
1569     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1570       undef, {
1571         %{$self->{attrs}||{}},
1572         join => undef,
1573         prefetch => undef,
1574         select => undef,
1575         as => undef,
1576         alias => $alias,
1577         where => $self->{cond},
1578         seen_join => $seen,
1579         from => $from,
1580     });
1581   };
1582 }
1583
1584 sub _resolve_from {
1585   my ($self, $extra_join) = @_;
1586   my $source = $self->result_source;
1587   my $attrs = $self->{attrs};
1588   
1589   my $from = $attrs->{from}
1590     || [ { $attrs->{alias} => $source->from } ];
1591     
1592   my $seen = { %{$attrs->{seen_join}||{}} };
1593
1594   my $join = ($attrs->{join}
1595                ? [ $attrs->{join}, $extra_join ]
1596                : $extra_join);
1597   $from = [
1598     @$from,
1599     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1600   ];
1601
1602   return ($from,$seen);
1603 }
1604
1605 sub _resolved_attrs {
1606   my $self = shift;
1607   return $self->{_attrs} if $self->{_attrs};
1608
1609   my $attrs = { %{$self->{attrs}||{}} };
1610   my $source = $self->{result_source};
1611   my $alias = $attrs->{alias};
1612
1613   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1614   if ($attrs->{columns}) {
1615     delete $attrs->{as};
1616   } elsif (!$attrs->{select}) {
1617     $attrs->{columns} = [ $source->columns ];
1618   }
1619  
1620   $attrs->{select} = 
1621     ($attrs->{select}
1622       ? (ref $attrs->{select} eq 'ARRAY'
1623           ? [ @{$attrs->{select}} ]
1624           : [ $attrs->{select} ])
1625       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1626     );
1627   $attrs->{as} =
1628     ($attrs->{as}
1629       ? (ref $attrs->{as} eq 'ARRAY'
1630           ? [ @{$attrs->{as}} ]
1631           : [ $attrs->{as} ])
1632       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1633     );
1634   
1635   my $adds;
1636   if ($adds = delete $attrs->{include_columns}) {
1637     $adds = [$adds] unless ref $adds eq 'ARRAY';
1638     push(@{$attrs->{select}}, @$adds);
1639     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1640   }
1641   if ($adds = delete $attrs->{'+select'}) {
1642     $adds = [$adds] unless ref $adds eq 'ARRAY';
1643     push(@{$attrs->{select}},
1644            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1645   }
1646   if (my $adds = delete $attrs->{'+as'}) {
1647     $adds = [$adds] unless ref $adds eq 'ARRAY';
1648     push(@{$attrs->{as}}, @$adds);
1649   }
1650
1651   $attrs->{from} ||= [ { 'me' => $source->from } ];
1652
1653   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1654     my $join = delete $attrs->{join} || {};
1655
1656     if (defined $attrs->{prefetch}) {
1657       $join = $self->_merge_attr(
1658         $join, $attrs->{prefetch}
1659       );
1660     }
1661
1662     $attrs->{from} =   # have to copy here to avoid corrupting the original
1663       [
1664         @{$attrs->{from}}, 
1665         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1666       ];
1667   }
1668
1669   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1670   if ($attrs->{order_by}) {
1671     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1672                            ? [ @{$attrs->{order_by}} ]
1673                            : [ $attrs->{order_by} ]);
1674   } else {
1675     $attrs->{order_by} = [];    
1676   }
1677
1678   my $collapse = $attrs->{collapse} || {};
1679   if (my $prefetch = delete $attrs->{prefetch}) {
1680     $prefetch = $self->_merge_attr({}, $prefetch);
1681     my @pre_order;
1682     my $seen = $attrs->{seen_join} || {};
1683     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1684       # bring joins back to level of current class
1685       my @prefetch = $source->resolve_prefetch(
1686         $p, $alias, $seen, \@pre_order, $collapse
1687       );
1688       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1689       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1690     }
1691     push(@{$attrs->{order_by}}, @pre_order);
1692   }
1693   $attrs->{collapse} = $collapse;
1694
1695   return $self->{_attrs} = $attrs;
1696 }
1697
1698 sub _merge_attr {
1699   my ($self, $a, $b) = @_;
1700   return $b unless defined($a);
1701   return $a unless defined($b);
1702   
1703   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1704     foreach my $key (keys %{$b}) {
1705       if (exists $a->{$key}) {
1706         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1707       } else {
1708         $a->{$key} = $b->{$key};
1709       }
1710     }
1711     return $a;
1712   } else {
1713     $a = [$a] unless ref $a eq 'ARRAY';
1714     $b = [$b] unless ref $b eq 'ARRAY';
1715
1716     my $hash = {};
1717     my @array;
1718     foreach my $x ($a, $b) {
1719       foreach my $element (@{$x}) {
1720         if (ref $element eq 'HASH') {
1721           $hash = $self->_merge_attr($hash, $element);
1722         } elsif (ref $element eq 'ARRAY') {
1723           push(@array, @{$element});
1724         } else {
1725           push(@array, $element) unless $b == $x
1726             && grep { $_ eq $element } @array;
1727         }
1728       }
1729     }
1730     
1731     @array = grep { !exists $hash->{$_} } @array;
1732
1733     return keys %{$hash}
1734       ? ( scalar(@array)
1735             ? [$hash, @array]
1736             : $hash
1737         )
1738       : \@array;
1739   }
1740 }
1741
1742 =head2 throw_exception
1743
1744 See L<DBIx::Class::Schema/throw_exception> for details.
1745
1746 =cut
1747
1748 sub throw_exception {
1749   my $self=shift;
1750   $self->result_source->schema->throw_exception(@_);
1751 }
1752
1753 # XXX: FIXME: Attributes docs need clearing up
1754
1755 =head1 ATTRIBUTES
1756
1757 The resultset takes various attributes that modify its behavior. Here's an
1758 overview of them:
1759
1760 =head2 order_by
1761
1762 =over 4
1763
1764 =item Value: ($order_by | \@order_by)
1765
1766 =back
1767
1768 Which column(s) to order the results by. This is currently passed
1769 through directly to SQL, so you can give e.g. C<year DESC> for a
1770 descending order on the column `year'.
1771
1772 Please note that if you have C<quote_char> enabled (see
1773 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1774 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1775 so you will need to manually quote things as appropriate.)
1776
1777 =head2 columns
1778
1779 =over 4
1780
1781 =item Value: \@columns
1782
1783 =back
1784
1785 Shortcut to request a particular set of columns to be retrieved.  Adds
1786 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1787 from that, then auto-populates C<as> from C<select> as normal. (You may also
1788 use the C<cols> attribute, as in earlier versions of DBIC.)
1789
1790 =head2 include_columns
1791
1792 =over 4
1793
1794 =item Value: \@columns
1795
1796 =back
1797
1798 Shortcut to include additional columns in the returned results - for example
1799
1800   $schema->resultset('CD')->search(undef, {
1801     include_columns => ['artist.name'],
1802     join => ['artist']
1803   });
1804
1805 would return all CDs and include a 'name' column to the information
1806 passed to object inflation
1807
1808 =head2 select
1809
1810 =over 4
1811
1812 =item Value: \@select_columns
1813
1814 =back
1815
1816 Indicates which columns should be selected from the storage. You can use
1817 column names, or in the case of RDBMS back ends, function or stored procedure
1818 names:
1819
1820   $rs = $schema->resultset('Employee')->search(undef, {
1821     select => [
1822       'name',
1823       { count => 'employeeid' },
1824       { sum => 'salary' }
1825     ]
1826   });
1827
1828 When you use function/stored procedure names and do not supply an C<as>
1829 attribute, the column names returned are storage-dependent. E.g. MySQL would
1830 return a column named C<count(employeeid)> in the above example.
1831
1832 =head2 +select
1833
1834 =over 4
1835
1836 Indicates additional columns to be selected from storage.  Works the same as
1837 L<select> but adds columns to the selection.
1838
1839 =back
1840
1841 =head2 +as
1842
1843 =over 4
1844
1845 Indicates additional column names for those added via L<+select>.
1846
1847 =back
1848
1849 =head2 as
1850
1851 =over 4
1852
1853 =item Value: \@inflation_names
1854
1855 =back
1856
1857 Indicates column names for object inflation. This is used in conjunction with
1858 C<select>, usually when C<select> contains one or more function or stored
1859 procedure names:
1860
1861   $rs = $schema->resultset('Employee')->search(undef, {
1862     select => [
1863       'name',
1864       { count => 'employeeid' }
1865     ],
1866     as => ['name', 'employee_count'],
1867   });
1868
1869   my $employee = $rs->first(); # get the first Employee
1870
1871 If the object against which the search is performed already has an accessor
1872 matching a column name specified in C<as>, the value can be retrieved using
1873 the accessor as normal:
1874
1875   my $name = $employee->name();
1876
1877 If on the other hand an accessor does not exist in the object, you need to
1878 use C<get_column> instead:
1879
1880   my $employee_count = $employee->get_column('employee_count');
1881
1882 You can create your own accessors if required - see
1883 L<DBIx::Class::Manual::Cookbook> for details.
1884
1885 Please note: This will NOT insert an C<AS employee_count> into the SQL
1886 statement produced, it is used for internal access only. Thus
1887 attempting to use the accessor in an C<order_by> clause or similar
1888 will fail miserably.
1889
1890 To get around this limitation, you can supply literal SQL to your
1891 C<select> attibute that contains the C<AS alias> text, eg:
1892
1893   select => [\'myfield AS alias']
1894
1895 =head2 join
1896
1897 =over 4
1898
1899 =item Value: ($rel_name | \@rel_names | \%rel_names)
1900
1901 =back
1902
1903 Contains a list of relationships that should be joined for this query.  For
1904 example:
1905
1906   # Get CDs by Nine Inch Nails
1907   my $rs = $schema->resultset('CD')->search(
1908     { 'artist.name' => 'Nine Inch Nails' },
1909     { join => 'artist' }
1910   );
1911
1912 Can also contain a hash reference to refer to the other relation's relations.
1913 For example:
1914
1915   package MyApp::Schema::Track;
1916   use base qw/DBIx::Class/;
1917   __PACKAGE__->table('track');
1918   __PACKAGE__->add_columns(qw/trackid cd position title/);
1919   __PACKAGE__->set_primary_key('trackid');
1920   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1921   1;
1922
1923   # In your application
1924   my $rs = $schema->resultset('Artist')->search(
1925     { 'track.title' => 'Teardrop' },
1926     {
1927       join     => { cd => 'track' },
1928       order_by => 'artist.name',
1929     }
1930   );
1931
1932 You need to use the relationship (not the table) name in  conditions, 
1933 because they are aliased as such. The current table is aliased as "me", so 
1934 you need to use me.column_name in order to avoid ambiguity. For example:
1935
1936   # Get CDs from 1984 with a 'Foo' track 
1937   my $rs = $schema->resultset('CD')->search(
1938     { 
1939       'me.year' => 1984,
1940       'tracks.name' => 'Foo'
1941     },
1942     { join => 'tracks' }
1943   );
1944   
1945 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1946 similarly for a third time). For e.g.
1947
1948   my $rs = $schema->resultset('Artist')->search({
1949     'cds.title'   => 'Down to Earth',
1950     'cds_2.title' => 'Popular',
1951   }, {
1952     join => [ qw/cds cds/ ],
1953   });
1954
1955 will return a set of all artists that have both a cd with title 'Down
1956 to Earth' and a cd with title 'Popular'.
1957
1958 If you want to fetch related objects from other tables as well, see C<prefetch>
1959 below.
1960
1961 =head2 prefetch
1962
1963 =over 4
1964
1965 =item Value: ($rel_name | \@rel_names | \%rel_names)
1966
1967 =back
1968
1969 Contains one or more relationships that should be fetched along with the main
1970 query (when they are accessed afterwards they will have already been
1971 "prefetched").  This is useful for when you know you will need the related
1972 objects, because it saves at least one query:
1973
1974   my $rs = $schema->resultset('Tag')->search(
1975     undef,
1976     {
1977       prefetch => {
1978         cd => 'artist'
1979       }
1980     }
1981   );
1982
1983 The initial search results in SQL like the following:
1984
1985   SELECT tag.*, cd.*, artist.* FROM tag
1986   JOIN cd ON tag.cd = cd.cdid
1987   JOIN artist ON cd.artist = artist.artistid
1988
1989 L<DBIx::Class> has no need to go back to the database when we access the
1990 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1991 case.
1992
1993 Simple prefetches will be joined automatically, so there is no need
1994 for a C<join> attribute in the above search. If you're prefetching to
1995 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1996 specify the join as well.
1997
1998 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1999 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2000 with an accessor type of 'single' or 'filter').
2001
2002 =head2 page
2003
2004 =over 4
2005
2006 =item Value: $page
2007
2008 =back
2009
2010 Makes the resultset paged and specifies the page to retrieve. Effectively
2011 identical to creating a non-pages resultset and then calling ->page($page)
2012 on it.
2013
2014 If L<rows> attribute is not specified it defualts to 10 rows per page.
2015
2016 =head2 rows
2017
2018 =over 4
2019
2020 =item Value: $rows
2021
2022 =back
2023
2024 Specifes the maximum number of rows for direct retrieval or the number of
2025 rows per page if the page attribute or method is used.
2026
2027 =head2 offset
2028
2029 =over 4
2030
2031 =item Value: $offset
2032
2033 =back
2034
2035 Specifies the (zero-based) row number for the  first row to be returned, or the
2036 of the first row of the first page if paging is used.
2037
2038 =head2 group_by
2039
2040 =over 4
2041
2042 =item Value: \@columns
2043
2044 =back
2045
2046 A arrayref of columns to group by. Can include columns of joined tables.
2047
2048   group_by => [qw/ column1 column2 ... /]
2049
2050 =head2 having
2051
2052 =over 4
2053
2054 =item Value: $condition
2055
2056 =back
2057
2058 HAVING is a select statement attribute that is applied between GROUP BY and
2059 ORDER BY. It is applied to the after the grouping calculations have been
2060 done.
2061
2062   having => { 'count(employee)' => { '>=', 100 } }
2063
2064 =head2 distinct
2065
2066 =over 4
2067
2068 =item Value: (0 | 1)
2069
2070 =back
2071
2072 Set to 1 to group by all columns.
2073
2074 =head2 where
2075
2076 =over 4
2077
2078 Adds to the WHERE clause.
2079
2080   # only return rows WHERE deleted IS NULL for all searches
2081   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2082
2083 Can be overridden by passing C<{ where => undef }> as an attribute
2084 to a resulset.
2085
2086 =back
2087
2088 =head2 cache
2089
2090 Set to 1 to cache search results. This prevents extra SQL queries if you
2091 revisit rows in your ResultSet:
2092
2093   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2094
2095   while( my $artist = $resultset->next ) {
2096     ... do stuff ...
2097   }
2098
2099   $rs->first; # without cache, this would issue a query
2100
2101 By default, searches are not cached.
2102
2103 For more examples of using these attributes, see
2104 L<DBIx::Class::Manual::Cookbook>.
2105
2106 =head2 from
2107
2108 =over 4
2109
2110 =item Value: \@from_clause
2111
2112 =back
2113
2114 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2115 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2116 clauses.
2117
2118 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2119
2120 C<join> will usually do what you need and it is strongly recommended that you
2121 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2122 And we really do mean "cannot", not just tried and failed. Attempting to use
2123 this because you're having problems with C<join> is like trying to use x86
2124 ASM because you've got a syntax error in your C. Trust us on this.
2125
2126 Now, if you're still really, really sure you need to use this (and if you're
2127 not 100% sure, ask the mailing list first), here's an explanation of how this
2128 works.
2129
2130 The syntax is as follows -
2131
2132   [
2133     { <alias1> => <table1> },
2134     [
2135       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2136       [], # nested JOIN (optional)
2137       { <table1.column1> => <table2.column2>, ... (more conditions) },
2138     ],
2139     # More of the above [ ] may follow for additional joins
2140   ]
2141
2142   <table1> <alias1>
2143   JOIN
2144     <table2> <alias2>
2145     [JOIN ...]
2146   ON <table1.column1> = <table2.column2>
2147   <more joins may follow>
2148
2149 An easy way to follow the examples below is to remember the following:
2150
2151     Anything inside "[]" is a JOIN
2152     Anything inside "{}" is a condition for the enclosing JOIN
2153
2154 The following examples utilize a "person" table in a family tree application.
2155 In order to express parent->child relationships, this table is self-joined:
2156
2157     # Person->belongs_to('father' => 'Person');
2158     # Person->belongs_to('mother' => 'Person');
2159
2160 C<from> can be used to nest joins. Here we return all children with a father,
2161 then search against all mothers of those children:
2162
2163   $rs = $schema->resultset('Person')->search(
2164       undef,
2165       {
2166           alias => 'mother', # alias columns in accordance with "from"
2167           from => [
2168               { mother => 'person' },
2169               [
2170                   [
2171                       { child => 'person' },
2172                       [
2173                           { father => 'person' },
2174                           { 'father.person_id' => 'child.father_id' }
2175                       ]
2176                   ],
2177                   { 'mother.person_id' => 'child.mother_id' }
2178               ],
2179           ]
2180       },
2181   );
2182
2183   # Equivalent SQL:
2184   # SELECT mother.* FROM person mother
2185   # JOIN (
2186   #   person child
2187   #   JOIN person father
2188   #   ON ( father.person_id = child.father_id )
2189   # )
2190   # ON ( mother.person_id = child.mother_id )
2191
2192 The type of any join can be controlled manually. To search against only people
2193 with a father in the person table, we could explicitly use C<INNER JOIN>:
2194
2195     $rs = $schema->resultset('Person')->search(
2196         undef,
2197         {
2198             alias => 'child', # alias columns in accordance with "from"
2199             from => [
2200                 { child => 'person' },
2201                 [
2202                     { father => 'person', -join_type => 'inner' },
2203                     { 'father.id' => 'child.father_id' }
2204                 ],
2205             ]
2206         },
2207     );
2208
2209     # Equivalent SQL:
2210     # SELECT child.* FROM person child
2211     # INNER JOIN person father ON child.father_id = father.id
2212
2213 =cut
2214
2215 1;