Merge 'trunk' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85
86   my ($source, $attrs) = @_;
87   #weaken $source;
88   $attrs = { %{$attrs||{}} };
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my (%related, $info);
347
348   foreach my $key (keys %$input_query) {
349     if (ref($input_query->{$key})
350         && ($info = $self->result_source->relationship_info($key))) {
351       my $rel_q = $self->result_source->resolve_condition(
352                     $info->{cond}, delete $input_query->{$key}, $key
353                   );
354       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
355       @related{keys %$rel_q} = values %$rel_q;
356     }
357   }
358   if (my @keys = keys %related) {
359     @{$input_query}{@keys} = values %related;
360   }
361
362   my @unique_queries = $self->_unique_queries($input_query, $attrs);
363
364   # Build the final query: Default to the disjunction of the unique queries,
365   # but allow the input query in case the ResultSet defines the query or the
366   # user is abusing find
367   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
368   my $query = @unique_queries
369     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
370     : $self->_add_alias($input_query, $alias);
371
372   # Run the query
373   if (keys %$attrs) {
374     my $rs = $self->search($query, $attrs);
375     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
376   }
377   else {
378     return keys %{$self->_resolved_attrs->{collapse}}
379       ? $self->search($query)->next
380       : $self->single($query);
381   }
382 }
383
384 # _add_alias
385 #
386 # Add the specified alias to the specified query hash. A copy is made so the
387 # original query is not modified.
388
389 sub _add_alias {
390   my ($self, $query, $alias) = @_;
391
392   my %aliased = %$query;
393   foreach my $col (grep { ! m/\./ } keys %aliased) {
394     $aliased{"$alias.$col"} = delete $aliased{$col};
395   }
396
397   return \%aliased;
398 }
399
400 # _unique_queries
401 #
402 # Build a list of queries which satisfy unique constraints.
403
404 sub _unique_queries {
405   my ($self, $query, $attrs) = @_;
406
407   my @constraint_names = exists $attrs->{key}
408     ? ($attrs->{key})
409     : $self->result_source->unique_constraint_names;
410
411   my @unique_queries;
412   foreach my $name (@constraint_names) {
413     my @unique_cols = $self->result_source->unique_constraint_columns($name);
414     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
415
416     my $num_query = scalar keys %$unique_query;
417     next unless $num_query;
418
419     # XXX: Assuming quite a bit about $self->{attrs}{where}
420     my $num_cols = scalar @unique_cols;
421     my $num_where = exists $self->{attrs}{where}
422       ? scalar keys %{ $self->{attrs}{where} }
423       : 0;
424     push @unique_queries, $unique_query
425       if $num_query + $num_where == $num_cols;
426   }
427
428   return @unique_queries;
429 }
430
431 # _build_unique_query
432 #
433 # Constrain the specified query hash based on the specified column names.
434
435 sub _build_unique_query {
436   my ($self, $query, $unique_cols) = @_;
437
438   return {
439     map  { $_ => $query->{$_} }
440     grep { exists $query->{$_} }
441       @$unique_cols
442   };
443 }
444
445 =head2 search_related
446
447 =over 4
448
449 =item Arguments: $rel, $cond, \%attrs?
450
451 =item Return Value: $new_resultset
452
453 =back
454
455   $new_rs = $cd_rs->search_related('artist', {
456     name => 'Emo-R-Us',
457   });
458
459 Searches the specified relationship, optionally specifying a condition and
460 attributes for matching records. See L</ATTRIBUTES> for more information.
461
462 =cut
463
464 sub search_related {
465   return shift->related_resultset(shift)->search(@_);
466 }
467
468 =head2 cursor
469
470 =over 4
471
472 =item Arguments: none
473
474 =item Return Value: $cursor
475
476 =back
477
478 Returns a storage-driven cursor to the given resultset. See
479 L<DBIx::Class::Cursor> for more information.
480
481 =cut
482
483 sub cursor {
484   my ($self) = @_;
485
486   my $attrs = { %{$self->_resolved_attrs} };
487   return $self->{cursor}
488     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
489           $attrs->{where},$attrs);
490 }
491
492 =head2 single
493
494 =over 4
495
496 =item Arguments: $cond?
497
498 =item Return Value: $row_object?
499
500 =back
501
502   my $cd = $schema->resultset('CD')->single({ year => 2001 });
503
504 Inflates the first result without creating a cursor if the resultset has
505 any records in it; if not returns nothing. Used by L</find> as an optimisation.
506
507 Can optionally take an additional condition *only* - this is a fast-code-path
508 method; if you need to add extra joins or similar call ->search and then
509 ->single without a condition on the $rs returned from that.
510
511 =cut
512
513 sub single {
514   my ($self, $where) = @_;
515   my $attrs = { %{$self->_resolved_attrs} };
516   if ($where) {
517     if (defined $attrs->{where}) {
518       $attrs->{where} = {
519         '-and' =>
520             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
521                $where, delete $attrs->{where} ]
522       };
523     } else {
524       $attrs->{where} = $where;
525     }
526   }
527
528 #  XXX: Disabled since it doesn't infer uniqueness in all cases
529 #  unless ($self->_is_unique_query($attrs->{where})) {
530 #    carp "Query not guaranteed to return a single row"
531 #      . "; please declare your unique constraints or use search instead";
532 #  }
533
534   my @data = $self->result_source->storage->select_single(
535     $attrs->{from}, $attrs->{select},
536     $attrs->{where}, $attrs
537   );
538
539   return (@data ? ($self->_construct_object(@data))[0] : ());
540 }
541
542 # _is_unique_query
543 #
544 # Try to determine if the specified query is guaranteed to be unique, based on
545 # the declared unique constraints.
546
547 sub _is_unique_query {
548   my ($self, $query) = @_;
549
550   my $collapsed = $self->_collapse_query($query);
551   my $alias = $self->{attrs}{alias};
552
553   foreach my $name ($self->result_source->unique_constraint_names) {
554     my @unique_cols = map {
555       "$alias.$_"
556     } $self->result_source->unique_constraint_columns($name);
557
558     # Count the values for each unique column
559     my %seen = map { $_ => 0 } @unique_cols;
560
561     foreach my $key (keys %$collapsed) {
562       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
563       next unless exists $seen{$aliased};  # Additional constraints are okay
564       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
565     }
566
567     # If we get 0 or more than 1 value for a column, it's not necessarily unique
568     return 1 unless grep { $_ != 1 } values %seen;
569   }
570
571   return 0;
572 }
573
574 # _collapse_query
575 #
576 # Recursively collapse the query, accumulating values for each column.
577
578 sub _collapse_query {
579   my ($self, $query, $collapsed) = @_;
580
581   $collapsed ||= {};
582
583   if (ref $query eq 'ARRAY') {
584     foreach my $subquery (@$query) {
585       next unless ref $subquery;  # -or
586 #      warn "ARRAY: " . Dumper $subquery;
587       $collapsed = $self->_collapse_query($subquery, $collapsed);
588     }
589   }
590   elsif (ref $query eq 'HASH') {
591     if (keys %$query and (keys %$query)[0] eq '-and') {
592       foreach my $subquery (@{$query->{-and}}) {
593 #        warn "HASH: " . Dumper $subquery;
594         $collapsed = $self->_collapse_query($subquery, $collapsed);
595       }
596     }
597     else {
598 #      warn "LEAF: " . Dumper $query;
599       foreach my $col (keys %$query) {
600         my $value = $query->{$col};
601         $collapsed->{$col}{$value}++;
602       }
603     }
604   }
605
606   return $collapsed;
607 }
608
609 =head2 get_column
610
611 =over 4
612
613 =item Arguments: $cond?
614
615 =item Return Value: $resultsetcolumn
616
617 =back
618
619   my $max_length = $rs->get_column('length')->max;
620
621 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
622
623 =cut
624
625 sub get_column {
626   my ($self, $column) = @_;
627   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
628   return $new;
629 }
630
631 =head2 search_like
632
633 =over 4
634
635 =item Arguments: $cond, \%attrs?
636
637 =item Return Value: $resultset (scalar context), @row_objs (list context)
638
639 =back
640
641   # WHERE title LIKE '%blue%'
642   $cd_rs = $rs->search_like({ title => '%blue%'});
643
644 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
645 that this is simply a convenience method. You most likely want to use
646 L</search> with specific operators.
647
648 For more information, see L<DBIx::Class::Manual::Cookbook>.
649
650 =cut
651
652 sub search_like {
653   my $class = shift;
654   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
655   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
656   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
657   return $class->search($query, { %$attrs });
658 }
659
660 =head2 slice
661
662 =over 4
663
664 =item Arguments: $first, $last
665
666 =item Return Value: $resultset (scalar context), @row_objs (list context)
667
668 =back
669
670 Returns a resultset or object list representing a subset of elements from the
671 resultset slice is called on. Indexes are from 0, i.e., to get the first
672 three records, call:
673
674   my ($one, $two, $three) = $rs->slice(0, 2);
675
676 =cut
677
678 sub slice {
679   my ($self, $min, $max) = @_;
680   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
681   $attrs->{offset} = $self->{attrs}{offset} || 0;
682   $attrs->{offset} += $min;
683   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
684   return $self->search(undef(), $attrs);
685   #my $slice = (ref $self)->new($self->result_source, $attrs);
686   #return (wantarray ? $slice->all : $slice);
687 }
688
689 =head2 next
690
691 =over 4
692
693 =item Arguments: none
694
695 =item Return Value: $result?
696
697 =back
698
699 Returns the next element in the resultset (C<undef> is there is none).
700
701 Can be used to efficiently iterate over records in the resultset:
702
703   my $rs = $schema->resultset('CD')->search;
704   while (my $cd = $rs->next) {
705     print $cd->title;
706   }
707
708 Note that you need to store the resultset object, and call C<next> on it.
709 Calling C<< resultset('Table')->next >> repeatedly will always return the
710 first record from the resultset.
711
712 =cut
713
714 sub next {
715   my ($self) = @_;
716   if (my $cache = $self->get_cache) {
717     $self->{all_cache_position} ||= 0;
718     return $cache->[$self->{all_cache_position}++];
719   }
720   if ($self->{attrs}{cache}) {
721     $self->{all_cache_position} = 1;
722     return ($self->all)[0];
723   }
724   if ($self->{stashed_objects}) {
725     my $obj = shift(@{$self->{stashed_objects}});
726     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
727     return $obj;
728   }
729   my @row = (
730     exists $self->{stashed_row}
731       ? @{delete $self->{stashed_row}}
732       : $self->cursor->next
733   );
734   return unless (@row);
735   my ($row, @more) = $self->_construct_object(@row);
736   $self->{stashed_objects} = \@more if @more;
737   return $row;
738 }
739
740 sub _construct_object {
741   my ($self, @row) = @_;
742   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
743   my @new = $self->result_class->inflate_result($self->result_source, @$info);
744   @new = $self->{_attrs}{record_filter}->(@new)
745     if exists $self->{_attrs}{record_filter};
746   return @new;
747 }
748
749 sub _collapse_result {
750   my ($self, $as, $row, $prefix) = @_;
751
752   my %const;
753   my @copy = @$row;
754   
755   foreach my $this_as (@$as) {
756     my $val = shift @copy;
757     if (defined $prefix) {
758       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
759         my $remain = $1;
760         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
761         $const{$1||''}{$2} = $val;
762       }
763     } else {
764       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
765       $const{$1||''}{$2} = $val;
766     }
767   }
768
769   my $alias = $self->{attrs}{alias};
770   my $info = [ {}, {} ];
771   foreach my $key (keys %const) {
772     if (length $key && $key ne $alias) {
773       my $target = $info;
774       my @parts = split(/\./, $key);
775       foreach my $p (@parts) {
776         $target = $target->[1]->{$p} ||= [];
777       }
778       $target->[0] = $const{$key};
779     } else {
780       $info->[0] = $const{$key};
781     }
782   }
783   
784   my @collapse;
785   if (defined $prefix) {
786     @collapse = map {
787         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
788     } keys %{$self->{_attrs}{collapse}}
789   } else {
790     @collapse = keys %{$self->{_attrs}{collapse}};
791   };
792
793   if (@collapse) {
794     my ($c) = sort { length $a <=> length $b } @collapse;
795     my $target = $info;
796     foreach my $p (split(/\./, $c)) {
797       $target = $target->[1]->{$p} ||= [];
798     }
799     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
800     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
801     my $tree = $self->_collapse_result($as, $row, $c_prefix);
802     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
803     my (@final, @raw);
804
805     while (
806       !(
807         grep {
808           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
809         } @co_key
810         )
811     ) {
812       push(@final, $tree);
813       last unless (@raw = $self->cursor->next);
814       $row = $self->{stashed_row} = \@raw;
815       $tree = $self->_collapse_result($as, $row, $c_prefix);
816     }
817     @$target = (@final ? @final : [ {}, {} ]);
818       # single empty result to indicate an empty prefetched has_many
819   }
820
821   #print "final info: " . Dumper($info);
822   return $info;
823 }
824
825 =head2 result_source
826
827 =over 4
828
829 =item Arguments: $result_source?
830
831 =item Return Value: $result_source
832
833 =back
834
835 An accessor for the primary ResultSource object from which this ResultSet
836 is derived.
837
838 =head2 result_class
839
840 =over 4
841
842 =item Arguments: $result_class?
843
844 =item Return Value: $result_class
845
846 =back
847
848 An accessor for the class to use when creating row objects. Defaults to 
849 C<< result_source->result_class >> - which in most cases is the name of the 
850 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
851
852 =cut
853
854
855 =head2 count
856
857 =over 4
858
859 =item Arguments: $cond, \%attrs??
860
861 =item Return Value: $count
862
863 =back
864
865 Performs an SQL C<COUNT> with the same query as the resultset was built
866 with to find the number of elements. If passed arguments, does a search
867 on the resultset and counts the results of that.
868
869 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
870 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
871 not support C<DISTINCT> with multiple columns. If you are using such a
872 database, you should only use columns from the main table in your C<group_by>
873 clause.
874
875 =cut
876
877 sub count {
878   my $self = shift;
879   return $self->search(@_)->count if @_ and defined $_[0];
880   return scalar @{ $self->get_cache } if $self->get_cache;
881   my $count = $self->_count;
882   return 0 unless $count;
883
884   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
885   $count = $self->{attrs}{rows} if
886     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
887   return $count;
888 }
889
890 sub _count { # Separated out so pager can get the full count
891   my $self = shift;
892   my $select = { count => '*' };
893
894   my $attrs = { %{$self->_resolved_attrs} };
895   if (my $group_by = delete $attrs->{group_by}) {
896     delete $attrs->{having};
897     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
898     # todo: try CONCAT for multi-column pk
899     my @pk = $self->result_source->primary_columns;
900     if (@pk == 1) {
901       my $alias = $attrs->{alias};
902       foreach my $column (@distinct) {
903         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
904           @distinct = ($column);
905           last;
906         }
907       }
908     }
909
910     $select = { count => { distinct => \@distinct } };
911   }
912
913   $attrs->{select} = $select;
914   $attrs->{as} = [qw/count/];
915
916   # offset, order by and page are not needed to count. record_filter is cdbi
917   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
918
919   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
920   my ($count) = $tmp_rs->cursor->next;
921   return $count;
922 }
923
924 =head2 count_literal
925
926 =over 4
927
928 =item Arguments: $sql_fragment, @bind_values
929
930 =item Return Value: $count
931
932 =back
933
934 Counts the results in a literal query. Equivalent to calling L</search_literal>
935 with the passed arguments, then L</count>.
936
937 =cut
938
939 sub count_literal { shift->search_literal(@_)->count; }
940
941 =head2 all
942
943 =over 4
944
945 =item Arguments: none
946
947 =item Return Value: @objects
948
949 =back
950
951 Returns all elements in the resultset. Called implicitly if the resultset
952 is returned in list context.
953
954 =cut
955
956 sub all {
957   my ($self) = @_;
958   return @{ $self->get_cache } if $self->get_cache;
959
960   my @obj;
961
962   # TODO: don't call resolve here
963   if (keys %{$self->_resolved_attrs->{collapse}}) {
964 #  if ($self->{attrs}{prefetch}) {
965       # Using $self->cursor->all is really just an optimisation.
966       # If we're collapsing has_many prefetches it probably makes
967       # very little difference, and this is cleaner than hacking
968       # _construct_object to survive the approach
969     my @row = $self->cursor->next;
970     while (@row) {
971       push(@obj, $self->_construct_object(@row));
972       @row = (exists $self->{stashed_row}
973                ? @{delete $self->{stashed_row}}
974                : $self->cursor->next);
975     }
976   } else {
977     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
978   }
979
980   $self->set_cache(\@obj) if $self->{attrs}{cache};
981   return @obj;
982 }
983
984 =head2 reset
985
986 =over 4
987
988 =item Arguments: none
989
990 =item Return Value: $self
991
992 =back
993
994 Resets the resultset's cursor, so you can iterate through the elements again.
995
996 =cut
997
998 sub reset {
999   my ($self) = @_;
1000   delete $self->{_attrs} if exists $self->{_attrs};
1001   $self->{all_cache_position} = 0;
1002   $self->cursor->reset;
1003   return $self;
1004 }
1005
1006 =head2 first
1007
1008 =over 4
1009
1010 =item Arguments: none
1011
1012 =item Return Value: $object?
1013
1014 =back
1015
1016 Resets the resultset and returns an object for the first result (if the
1017 resultset returns anything).
1018
1019 =cut
1020
1021 sub first {
1022   return $_[0]->reset->next;
1023 }
1024
1025 # _cond_for_update_delete
1026 #
1027 # update/delete require the condition to be modified to handle
1028 # the differing SQL syntax available.  This transforms the $self->{cond}
1029 # appropriately, returning the new condition.
1030
1031 sub _cond_for_update_delete {
1032   my ($self, $full_cond) = @_;
1033   my $cond = {};
1034
1035   $full_cond ||= $self->{cond};
1036   # No-op. No condition, we're updating/deleting everything
1037   return $cond unless ref $full_cond;
1038
1039   if (ref $full_cond eq 'ARRAY') {
1040     $cond = [
1041       map {
1042         my %hash;
1043         foreach my $key (keys %{$_}) {
1044           $key =~ /([^.]+)$/;
1045           $hash{$1} = $_->{$key};
1046         }
1047         \%hash;
1048       } @{$full_cond}
1049     ];
1050   }
1051   elsif (ref $full_cond eq 'HASH') {
1052     if ((keys %{$full_cond})[0] eq '-and') {
1053       $cond->{-and} = [];
1054
1055       my @cond = @{$full_cond->{-and}};
1056       for (my $i = 0; $i < @cond; $i++) {
1057         my $entry = $cond[$i];
1058
1059         my $hash;
1060         if (ref $entry eq 'HASH') {
1061           $hash = $self->_cond_for_update_delete($entry);
1062         }
1063         else {
1064           $entry =~ /([^.]+)$/;
1065           $hash->{$1} = $cond[++$i];
1066         }
1067
1068         push @{$cond->{-and}}, $hash;
1069       }
1070     }
1071     else {
1072       foreach my $key (keys %{$full_cond}) {
1073         $key =~ /([^.]+)$/;
1074         $cond->{$1} = $full_cond->{$key};
1075       }
1076     }
1077   }
1078   else {
1079     $self->throw_exception(
1080       "Can't update/delete on resultset with condition unless hash or array"
1081     );
1082   }
1083
1084   return $cond;
1085 }
1086
1087
1088 =head2 update
1089
1090 =over 4
1091
1092 =item Arguments: \%values
1093
1094 =item Return Value: $storage_rv
1095
1096 =back
1097
1098 Sets the specified columns in the resultset to the supplied values in a
1099 single query. Return value will be true if the update succeeded or false
1100 if no records were updated; exact type of success value is storage-dependent.
1101
1102 =cut
1103
1104 sub update {
1105   my ($self, $values) = @_;
1106   $self->throw_exception("Values for update must be a hash")
1107     unless ref $values eq 'HASH';
1108
1109   my $cond = $self->_cond_for_update_delete;
1110
1111   return $self->result_source->storage->update(
1112     $self->result_source->from, $values, $cond
1113   );
1114 }
1115
1116 =head2 update_all
1117
1118 =over 4
1119
1120 =item Arguments: \%values
1121
1122 =item Return Value: 1
1123
1124 =back
1125
1126 Fetches all objects and updates them one at a time. Note that C<update_all>
1127 will run DBIC cascade triggers, while L</update> will not.
1128
1129 =cut
1130
1131 sub update_all {
1132   my ($self, $values) = @_;
1133   $self->throw_exception("Values for update must be a hash")
1134     unless ref $values eq 'HASH';
1135   foreach my $obj ($self->all) {
1136     $obj->set_columns($values)->update;
1137   }
1138   return 1;
1139 }
1140
1141 =head2 delete
1142
1143 =over 4
1144
1145 =item Arguments: none
1146
1147 =item Return Value: 1
1148
1149 =back
1150
1151 Deletes the contents of the resultset from its result source. Note that this
1152 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1153 to run. See also L<DBIx::Class::Row/delete>.
1154
1155 =cut
1156
1157 sub delete {
1158   my ($self) = @_;
1159
1160   my $cond = $self->_cond_for_update_delete;
1161
1162   $self->result_source->storage->delete($self->result_source->from, $cond);
1163   return 1;
1164 }
1165
1166 =head2 delete_all
1167
1168 =over 4
1169
1170 =item Arguments: none
1171
1172 =item Return Value: 1
1173
1174 =back
1175
1176 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1177 will run DBIC cascade triggers, while L</delete> will not.
1178
1179 =cut
1180
1181 sub delete_all {
1182   my ($self) = @_;
1183   $_->delete for $self->all;
1184   return 1;
1185 }
1186
1187 =head2 pager
1188
1189 =over 4
1190
1191 =item Arguments: none
1192
1193 =item Return Value: $pager
1194
1195 =back
1196
1197 Return Value a L<Data::Page> object for the current resultset. Only makes
1198 sense for queries with a C<page> attribute.
1199
1200 =cut
1201
1202 sub pager {
1203   my ($self) = @_;
1204   my $attrs = $self->{attrs};
1205   $self->throw_exception("Can't create pager for non-paged rs")
1206     unless $self->{attrs}{page};
1207   $attrs->{rows} ||= 10;
1208   return $self->{pager} ||= Data::Page->new(
1209     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1210 }
1211
1212 =head2 page
1213
1214 =over 4
1215
1216 =item Arguments: $page_number
1217
1218 =item Return Value: $rs
1219
1220 =back
1221
1222 Returns a resultset for the $page_number page of the resultset on which page
1223 is called, where each page contains a number of rows equal to the 'rows'
1224 attribute set on the resultset (10 by default).
1225
1226 =cut
1227
1228 sub page {
1229   my ($self, $page) = @_;
1230   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1231 }
1232
1233 =head2 new_result
1234
1235 =over 4
1236
1237 =item Arguments: \%vals
1238
1239 =item Return Value: $object
1240
1241 =back
1242
1243 Creates an object in the resultset's result class and returns it.
1244
1245 =cut
1246
1247 sub new_result {
1248   my ($self, $values) = @_;
1249   $self->throw_exception( "new_result needs a hash" )
1250     unless (ref $values eq 'HASH');
1251   $self->throw_exception(
1252     "Can't abstract implicit construct, condition not a hash"
1253   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1254
1255   my $alias = $self->{attrs}{alias};
1256   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1257   my %new = (
1258     %{ $self->_remove_alias($values, $alias) },
1259     %{ $self->_remove_alias($collapsed_cond, $alias) },
1260     -result_source => $self->result_source,
1261   );
1262
1263   my $obj = $self->result_class->new(\%new);
1264   return $obj;
1265 }
1266
1267 # _collapse_cond
1268 #
1269 # Recursively collapse the condition.
1270
1271 sub _collapse_cond {
1272   my ($self, $cond, $collapsed) = @_;
1273
1274   $collapsed ||= {};
1275
1276   if (ref $cond eq 'ARRAY') {
1277     foreach my $subcond (@$cond) {
1278       next unless ref $subcond;  # -or
1279 #      warn "ARRAY: " . Dumper $subcond;
1280       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1281     }
1282   }
1283   elsif (ref $cond eq 'HASH') {
1284     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1285       foreach my $subcond (@{$cond->{-and}}) {
1286 #        warn "HASH: " . Dumper $subcond;
1287         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1288       }
1289     }
1290     else {
1291 #      warn "LEAF: " . Dumper $cond;
1292       foreach my $col (keys %$cond) {
1293         my $value = $cond->{$col};
1294         $collapsed->{$col} = $value;
1295       }
1296     }
1297   }
1298
1299   return $collapsed;
1300 }
1301
1302 # _remove_alias
1303 #
1304 # Remove the specified alias from the specified query hash. A copy is made so
1305 # the original query is not modified.
1306
1307 sub _remove_alias {
1308   my ($self, $query, $alias) = @_;
1309
1310   my %orig = %{ $query || {} };
1311   my %unaliased;
1312
1313   foreach my $key (keys %orig) {
1314     if ($key !~ /\./) {
1315       $unaliased{$key} = $orig{$key};
1316       next;
1317     }
1318     $unaliased{$1} = $orig{$key}
1319       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1320   }
1321
1322   return \%unaliased;
1323 }
1324
1325 =head2 find_or_new
1326
1327 =over 4
1328
1329 =item Arguments: \%vals, \%attrs?
1330
1331 =item Return Value: $object
1332
1333 =back
1334
1335 Find an existing record from this resultset. If none exists, instantiate a new
1336 result object and return it. The object will not be saved into your storage
1337 until you call L<DBIx::Class::Row/insert> on it.
1338
1339 If you want objects to be saved immediately, use L</find_or_create> instead.
1340
1341 =cut
1342
1343 sub find_or_new {
1344   my $self     = shift;
1345   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1346   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1347   my $exists   = $self->find($hash, $attrs);
1348   return defined $exists ? $exists : $self->new_result($hash);
1349 }
1350
1351 =head2 create
1352
1353 =over 4
1354
1355 =item Arguments: \%vals
1356
1357 =item Return Value: $object
1358
1359 =back
1360
1361 Inserts a record into the resultset and returns the object representing it.
1362
1363 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1364
1365 =cut
1366
1367 sub create {
1368   my ($self, $attrs) = @_;
1369   $self->throw_exception( "create needs a hashref" )
1370     unless ref $attrs eq 'HASH';
1371   return $self->new_result($attrs)->insert;
1372 }
1373
1374 =head2 find_or_create
1375
1376 =over 4
1377
1378 =item Arguments: \%vals, \%attrs?
1379
1380 =item Return Value: $object
1381
1382 =back
1383
1384   $class->find_or_create({ key => $val, ... });
1385
1386 Tries to find a record based on its primary key or unique constraint; if none
1387 is found, creates one and returns that instead.
1388
1389   my $cd = $schema->resultset('CD')->find_or_create({
1390     cdid   => 5,
1391     artist => 'Massive Attack',
1392     title  => 'Mezzanine',
1393     year   => 2005,
1394   });
1395
1396 Also takes an optional C<key> attribute, to search by a specific key or unique
1397 constraint. For example:
1398
1399   my $cd = $schema->resultset('CD')->find_or_create(
1400     {
1401       artist => 'Massive Attack',
1402       title  => 'Mezzanine',
1403     },
1404     { key => 'cd_artist_title' }
1405   );
1406
1407 See also L</find> and L</update_or_create>. For information on how to declare
1408 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1409
1410 =cut
1411
1412 sub find_or_create {
1413   my $self     = shift;
1414   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1415   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1416   my $exists   = $self->find($hash, $attrs);
1417   return defined $exists ? $exists : $self->create($hash);
1418 }
1419
1420 =head2 update_or_create
1421
1422 =over 4
1423
1424 =item Arguments: \%col_values, { key => $unique_constraint }?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430   $class->update_or_create({ col => $val, ... });
1431
1432 First, searches for an existing row matching one of the unique constraints
1433 (including the primary key) on the source of this resultset. If a row is
1434 found, updates it with the other given column values. Otherwise, creates a new
1435 row.
1436
1437 Takes an optional C<key> attribute to search on a specific unique constraint.
1438 For example:
1439
1440   # In your application
1441   my $cd = $schema->resultset('CD')->update_or_create(
1442     {
1443       artist => 'Massive Attack',
1444       title  => 'Mezzanine',
1445       year   => 1998,
1446     },
1447     { key => 'cd_artist_title' }
1448   );
1449
1450 If no C<key> is specified, it searches on all unique constraints defined on the
1451 source, including the primary key.
1452
1453 If the C<key> is specified as C<primary>, it searches only on the primary key.
1454
1455 See also L</find> and L</find_or_create>. For information on how to declare
1456 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1457
1458 =cut
1459
1460 sub update_or_create {
1461   my $self = shift;
1462   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1463   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1464
1465   my $row = $self->find($cond, $attrs);
1466   if (defined $row) {
1467     $row->update($cond);
1468     return $row;
1469   }
1470
1471   return $self->create($cond);
1472 }
1473
1474 =head2 get_cache
1475
1476 =over 4
1477
1478 =item Arguments: none
1479
1480 =item Return Value: \@cache_objects?
1481
1482 =back
1483
1484 Gets the contents of the cache for the resultset, if the cache is set.
1485
1486 =cut
1487
1488 sub get_cache {
1489   shift->{all_cache};
1490 }
1491
1492 =head2 set_cache
1493
1494 =over 4
1495
1496 =item Arguments: \@cache_objects
1497
1498 =item Return Value: \@cache_objects
1499
1500 =back
1501
1502 Sets the contents of the cache for the resultset. Expects an arrayref
1503 of objects of the same class as those produced by the resultset. Note that
1504 if the cache is set the resultset will return the cached objects rather
1505 than re-querying the database even if the cache attr is not set.
1506
1507 =cut
1508
1509 sub set_cache {
1510   my ( $self, $data ) = @_;
1511   $self->throw_exception("set_cache requires an arrayref")
1512       if defined($data) && (ref $data ne 'ARRAY');
1513   $self->{all_cache} = $data;
1514 }
1515
1516 =head2 clear_cache
1517
1518 =over 4
1519
1520 =item Arguments: none
1521
1522 =item Return Value: []
1523
1524 =back
1525
1526 Clears the cache for the resultset.
1527
1528 =cut
1529
1530 sub clear_cache {
1531   shift->set_cache(undef);
1532 }
1533
1534 =head2 related_resultset
1535
1536 =over 4
1537
1538 =item Arguments: $relationship_name
1539
1540 =item Return Value: $resultset
1541
1542 =back
1543
1544 Returns a related resultset for the supplied relationship name.
1545
1546   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1547
1548 =cut
1549
1550 sub related_resultset {
1551   my ($self, $rel) = @_;
1552
1553   $self->{related_resultsets} ||= {};
1554   return $self->{related_resultsets}{$rel} ||= do {
1555     my $rel_obj = $self->result_source->relationship_info($rel);
1556
1557     $self->throw_exception(
1558       "search_related: result source '" . $self->result_source->name .
1559         "' has no such relationship $rel")
1560       unless $rel_obj;
1561     
1562     my ($from,$seen) = $self->_resolve_from($rel);
1563
1564     my $join_count = $seen->{$rel};
1565     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1566
1567     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1568       undef, {
1569         %{$self->{attrs}||{}},
1570         join => undef,
1571         prefetch => undef,
1572         select => undef,
1573         as => undef,
1574         alias => $alias,
1575         where => $self->{cond},
1576         seen_join => $seen,
1577         from => $from,
1578     });
1579   };
1580 }
1581
1582 sub _resolve_from {
1583   my ($self, $extra_join) = @_;
1584   my $source = $self->result_source;
1585   my $attrs = $self->{attrs};
1586   
1587   my $from = $attrs->{from}
1588     || [ { $attrs->{alias} => $source->from } ];
1589     
1590   my $seen = { %{$attrs->{seen_join}||{}} };
1591
1592   my $join = ($attrs->{join}
1593                ? [ $attrs->{join}, $extra_join ]
1594                : $extra_join);
1595   $from = [
1596     @$from,
1597     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1598   ];
1599
1600   return ($from,$seen);
1601 }
1602
1603 sub _resolved_attrs {
1604   my $self = shift;
1605   return $self->{_attrs} if $self->{_attrs};
1606
1607   my $attrs = { %{$self->{attrs}||{}} };
1608   my $source = $self->{result_source};
1609   my $alias = $attrs->{alias};
1610
1611   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1612   if ($attrs->{columns}) {
1613     delete $attrs->{as};
1614   } elsif (!$attrs->{select}) {
1615     $attrs->{columns} = [ $source->columns ];
1616   }
1617  
1618   $attrs->{select} = 
1619     ($attrs->{select}
1620       ? (ref $attrs->{select} eq 'ARRAY'
1621           ? [ @{$attrs->{select}} ]
1622           : [ $attrs->{select} ])
1623       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1624     );
1625   $attrs->{as} =
1626     ($attrs->{as}
1627       ? (ref $attrs->{as} eq 'ARRAY'
1628           ? [ @{$attrs->{as}} ]
1629           : [ $attrs->{as} ])
1630       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1631     );
1632   
1633   my $adds;
1634   if ($adds = delete $attrs->{include_columns}) {
1635     $adds = [$adds] unless ref $adds eq 'ARRAY';
1636     push(@{$attrs->{select}}, @$adds);
1637     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1638   }
1639   if ($adds = delete $attrs->{'+select'}) {
1640     $adds = [$adds] unless ref $adds eq 'ARRAY';
1641     push(@{$attrs->{select}},
1642            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1643   }
1644   if (my $adds = delete $attrs->{'+as'}) {
1645     $adds = [$adds] unless ref $adds eq 'ARRAY';
1646     push(@{$attrs->{as}}, @$adds);
1647   }
1648
1649   $attrs->{from} ||= [ { 'me' => $source->from } ];
1650
1651   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1652     my $join = delete $attrs->{join} || {};
1653
1654     if (defined $attrs->{prefetch}) {
1655       $join = $self->_merge_attr(
1656         $join, $attrs->{prefetch}
1657       );
1658     }
1659
1660     $attrs->{from} =   # have to copy here to avoid corrupting the original
1661       [
1662         @{$attrs->{from}}, 
1663         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1664       ];
1665   }
1666
1667   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1668   if ($attrs->{order_by}) {
1669     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1670                            ? [ @{$attrs->{order_by}} ]
1671                            : [ $attrs->{order_by} ]);
1672   } else {
1673     $attrs->{order_by} = [];    
1674   }
1675
1676   my $collapse = $attrs->{collapse} || {};
1677   if (my $prefetch = delete $attrs->{prefetch}) {
1678     $prefetch = $self->_merge_attr({}, $prefetch);
1679     my @pre_order;
1680     my $seen = $attrs->{seen_join} || {};
1681     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1682       # bring joins back to level of current class
1683       my @prefetch = $source->resolve_prefetch(
1684         $p, $alias, $seen, \@pre_order, $collapse
1685       );
1686       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1687       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1688     }
1689     push(@{$attrs->{order_by}}, @pre_order);
1690   }
1691   $attrs->{collapse} = $collapse;
1692
1693   return $self->{_attrs} = $attrs;
1694 }
1695
1696 sub _merge_attr {
1697   my ($self, $a, $b) = @_;
1698   return $b unless defined($a);
1699   return $a unless defined($b);
1700   
1701   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1702     foreach my $key (keys %{$b}) {
1703       if (exists $a->{$key}) {
1704         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1705       } else {
1706         $a->{$key} = $b->{$key};
1707       }
1708     }
1709     return $a;
1710   } else {
1711     $a = [$a] unless ref $a eq 'ARRAY';
1712     $b = [$b] unless ref $b eq 'ARRAY';
1713
1714     my $hash = {};
1715     my @array;
1716     foreach my $x ($a, $b) {
1717       foreach my $element (@{$x}) {
1718         if (ref $element eq 'HASH') {
1719           $hash = $self->_merge_attr($hash, $element);
1720         } elsif (ref $element eq 'ARRAY') {
1721           push(@array, @{$element});
1722         } else {
1723           push(@array, $element) unless $b == $x
1724             && grep { $_ eq $element } @array;
1725         }
1726       }
1727     }
1728     
1729     @array = grep { !exists $hash->{$_} } @array;
1730
1731     return keys %{$hash}
1732       ? ( scalar(@array)
1733             ? [$hash, @array]
1734             : $hash
1735         )
1736       : \@array;
1737   }
1738 }
1739
1740 =head2 throw_exception
1741
1742 See L<DBIx::Class::Schema/throw_exception> for details.
1743
1744 =cut
1745
1746 sub throw_exception {
1747   my $self=shift;
1748   $self->result_source->schema->throw_exception(@_);
1749 }
1750
1751 # XXX: FIXME: Attributes docs need clearing up
1752
1753 =head1 ATTRIBUTES
1754
1755 The resultset takes various attributes that modify its behavior. Here's an
1756 overview of them:
1757
1758 =head2 order_by
1759
1760 =over 4
1761
1762 =item Value: ($order_by | \@order_by)
1763
1764 =back
1765
1766 Which column(s) to order the results by. This is currently passed
1767 through directly to SQL, so you can give e.g. C<year DESC> for a
1768 descending order on the column `year'.
1769
1770 Please note that if you have C<quote_char> enabled (see
1771 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1772 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1773 so you will need to manually quote things as appropriate.)
1774
1775 =head2 columns
1776
1777 =over 4
1778
1779 =item Value: \@columns
1780
1781 =back
1782
1783 Shortcut to request a particular set of columns to be retrieved.  Adds
1784 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1785 from that, then auto-populates C<as> from C<select> as normal. (You may also
1786 use the C<cols> attribute, as in earlier versions of DBIC.)
1787
1788 =head2 include_columns
1789
1790 =over 4
1791
1792 =item Value: \@columns
1793
1794 =back
1795
1796 Shortcut to include additional columns in the returned results - for example
1797
1798   $schema->resultset('CD')->search(undef, {
1799     include_columns => ['artist.name'],
1800     join => ['artist']
1801   });
1802
1803 would return all CDs and include a 'name' column to the information
1804 passed to object inflation
1805
1806 =head2 select
1807
1808 =over 4
1809
1810 =item Value: \@select_columns
1811
1812 =back
1813
1814 Indicates which columns should be selected from the storage. You can use
1815 column names, or in the case of RDBMS back ends, function or stored procedure
1816 names:
1817
1818   $rs = $schema->resultset('Employee')->search(undef, {
1819     select => [
1820       'name',
1821       { count => 'employeeid' },
1822       { sum => 'salary' }
1823     ]
1824   });
1825
1826 When you use function/stored procedure names and do not supply an C<as>
1827 attribute, the column names returned are storage-dependent. E.g. MySQL would
1828 return a column named C<count(employeeid)> in the above example.
1829
1830 =head2 +select
1831
1832 =over 4
1833
1834 Indicates additional columns to be selected from storage.  Works the same as
1835 L<select> but adds columns to the selection.
1836
1837 =back
1838
1839 =head2 +as
1840
1841 =over 4
1842
1843 Indicates additional column names for those added via L<+select>.
1844
1845 =back
1846
1847 =head2 as
1848
1849 =over 4
1850
1851 =item Value: \@inflation_names
1852
1853 =back
1854
1855 Indicates column names for object inflation. This is used in conjunction with
1856 C<select>, usually when C<select> contains one or more function or stored
1857 procedure names:
1858
1859   $rs = $schema->resultset('Employee')->search(undef, {
1860     select => [
1861       'name',
1862       { count => 'employeeid' }
1863     ],
1864     as => ['name', 'employee_count'],
1865   });
1866
1867   my $employee = $rs->first(); # get the first Employee
1868
1869 If the object against which the search is performed already has an accessor
1870 matching a column name specified in C<as>, the value can be retrieved using
1871 the accessor as normal:
1872
1873   my $name = $employee->name();
1874
1875 If on the other hand an accessor does not exist in the object, you need to
1876 use C<get_column> instead:
1877
1878   my $employee_count = $employee->get_column('employee_count');
1879
1880 You can create your own accessors if required - see
1881 L<DBIx::Class::Manual::Cookbook> for details.
1882
1883 Please note: This will NOT insert an C<AS employee_count> into the SQL
1884 statement produced, it is used for internal access only. Thus
1885 attempting to use the accessor in an C<order_by> clause or similar
1886 will fail miserably.
1887
1888 To get around this limitation, you can supply literal SQL to your
1889 C<select> attibute that contains the C<AS alias> text, eg:
1890
1891   select => [\'myfield AS alias']
1892
1893 =head2 join
1894
1895 =over 4
1896
1897 =item Value: ($rel_name | \@rel_names | \%rel_names)
1898
1899 =back
1900
1901 Contains a list of relationships that should be joined for this query.  For
1902 example:
1903
1904   # Get CDs by Nine Inch Nails
1905   my $rs = $schema->resultset('CD')->search(
1906     { 'artist.name' => 'Nine Inch Nails' },
1907     { join => 'artist' }
1908   );
1909
1910 Can also contain a hash reference to refer to the other relation's relations.
1911 For example:
1912
1913   package MyApp::Schema::Track;
1914   use base qw/DBIx::Class/;
1915   __PACKAGE__->table('track');
1916   __PACKAGE__->add_columns(qw/trackid cd position title/);
1917   __PACKAGE__->set_primary_key('trackid');
1918   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1919   1;
1920
1921   # In your application
1922   my $rs = $schema->resultset('Artist')->search(
1923     { 'track.title' => 'Teardrop' },
1924     {
1925       join     => { cd => 'track' },
1926       order_by => 'artist.name',
1927     }
1928   );
1929
1930 You need to use the relationship (not the table) name in  conditions, 
1931 because they are aliased as such. The current table is aliased as "me", so 
1932 you need to use me.column_name in order to avoid ambiguity. For example:
1933
1934   # Get CDs from 1984 with a 'Foo' track 
1935   my $rs = $schema->resultset('CD')->search(
1936     { 
1937       'me.year' => 1984,
1938       'tracks.name' => 'Foo'
1939     },
1940     { join => 'tracks' }
1941   );
1942   
1943 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1944 similarly for a third time). For e.g.
1945
1946   my $rs = $schema->resultset('Artist')->search({
1947     'cds.title'   => 'Down to Earth',
1948     'cds_2.title' => 'Popular',
1949   }, {
1950     join => [ qw/cds cds/ ],
1951   });
1952
1953 will return a set of all artists that have both a cd with title 'Down
1954 to Earth' and a cd with title 'Popular'.
1955
1956 If you want to fetch related objects from other tables as well, see C<prefetch>
1957 below.
1958
1959 =head2 prefetch
1960
1961 =over 4
1962
1963 =item Value: ($rel_name | \@rel_names | \%rel_names)
1964
1965 =back
1966
1967 Contains one or more relationships that should be fetched along with the main
1968 query (when they are accessed afterwards they will have already been
1969 "prefetched").  This is useful for when you know you will need the related
1970 objects, because it saves at least one query:
1971
1972   my $rs = $schema->resultset('Tag')->search(
1973     undef,
1974     {
1975       prefetch => {
1976         cd => 'artist'
1977       }
1978     }
1979   );
1980
1981 The initial search results in SQL like the following:
1982
1983   SELECT tag.*, cd.*, artist.* FROM tag
1984   JOIN cd ON tag.cd = cd.cdid
1985   JOIN artist ON cd.artist = artist.artistid
1986
1987 L<DBIx::Class> has no need to go back to the database when we access the
1988 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1989 case.
1990
1991 Simple prefetches will be joined automatically, so there is no need
1992 for a C<join> attribute in the above search. If you're prefetching to
1993 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1994 specify the join as well.
1995
1996 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1997 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1998 with an accessor type of 'single' or 'filter').
1999
2000 =head2 page
2001
2002 =over 4
2003
2004 =item Value: $page
2005
2006 =back
2007
2008 Makes the resultset paged and specifies the page to retrieve. Effectively
2009 identical to creating a non-pages resultset and then calling ->page($page)
2010 on it.
2011
2012 If L<rows> attribute is not specified it defualts to 10 rows per page.
2013
2014 =head2 rows
2015
2016 =over 4
2017
2018 =item Value: $rows
2019
2020 =back
2021
2022 Specifes the maximum number of rows for direct retrieval or the number of
2023 rows per page if the page attribute or method is used.
2024
2025 =head2 offset
2026
2027 =over 4
2028
2029 =item Value: $offset
2030
2031 =back
2032
2033 Specifies the (zero-based) row number for the  first row to be returned, or the
2034 of the first row of the first page if paging is used.
2035
2036 =head2 group_by
2037
2038 =over 4
2039
2040 =item Value: \@columns
2041
2042 =back
2043
2044 A arrayref of columns to group by. Can include columns of joined tables.
2045
2046   group_by => [qw/ column1 column2 ... /]
2047
2048 =head2 having
2049
2050 =over 4
2051
2052 =item Value: $condition
2053
2054 =back
2055
2056 HAVING is a select statement attribute that is applied between GROUP BY and
2057 ORDER BY. It is applied to the after the grouping calculations have been
2058 done.
2059
2060   having => { 'count(employee)' => { '>=', 100 } }
2061
2062 =head2 distinct
2063
2064 =over 4
2065
2066 =item Value: (0 | 1)
2067
2068 =back
2069
2070 Set to 1 to group by all columns.
2071
2072 =head2 where
2073
2074 =over 4
2075
2076 Adds to the WHERE clause.
2077
2078   # only return rows WHERE deleted IS NULL for all searches
2079   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2080
2081 Can be overridden by passing C<{ where => undef }> as an attribute
2082 to a resulset.
2083
2084 =back
2085
2086 =head2 cache
2087
2088 Set to 1 to cache search results. This prevents extra SQL queries if you
2089 revisit rows in your ResultSet:
2090
2091   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2092
2093   while( my $artist = $resultset->next ) {
2094     ... do stuff ...
2095   }
2096
2097   $rs->first; # without cache, this would issue a query
2098
2099 By default, searches are not cached.
2100
2101 For more examples of using these attributes, see
2102 L<DBIx::Class::Manual::Cookbook>.
2103
2104 =head2 from
2105
2106 =over 4
2107
2108 =item Value: \@from_clause
2109
2110 =back
2111
2112 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2113 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2114 clauses.
2115
2116 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2117
2118 C<join> will usually do what you need and it is strongly recommended that you
2119 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2120 And we really do mean "cannot", not just tried and failed. Attempting to use
2121 this because you're having problems with C<join> is like trying to use x86
2122 ASM because you've got a syntax error in your C. Trust us on this.
2123
2124 Now, if you're still really, really sure you need to use this (and if you're
2125 not 100% sure, ask the mailing list first), here's an explanation of how this
2126 works.
2127
2128 The syntax is as follows -
2129
2130   [
2131     { <alias1> => <table1> },
2132     [
2133       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2134       [], # nested JOIN (optional)
2135       { <table1.column1> => <table2.column2>, ... (more conditions) },
2136     ],
2137     # More of the above [ ] may follow for additional joins
2138   ]
2139
2140   <table1> <alias1>
2141   JOIN
2142     <table2> <alias2>
2143     [JOIN ...]
2144   ON <table1.column1> = <table2.column2>
2145   <more joins may follow>
2146
2147 An easy way to follow the examples below is to remember the following:
2148
2149     Anything inside "[]" is a JOIN
2150     Anything inside "{}" is a condition for the enclosing JOIN
2151
2152 The following examples utilize a "person" table in a family tree application.
2153 In order to express parent->child relationships, this table is self-joined:
2154
2155     # Person->belongs_to('father' => 'Person');
2156     # Person->belongs_to('mother' => 'Person');
2157
2158 C<from> can be used to nest joins. Here we return all children with a father,
2159 then search against all mothers of those children:
2160
2161   $rs = $schema->resultset('Person')->search(
2162       undef,
2163       {
2164           alias => 'mother', # alias columns in accordance with "from"
2165           from => [
2166               { mother => 'person' },
2167               [
2168                   [
2169                       { child => 'person' },
2170                       [
2171                           { father => 'person' },
2172                           { 'father.person_id' => 'child.father_id' }
2173                       ]
2174                   ],
2175                   { 'mother.person_id' => 'child.mother_id' }
2176               ],
2177           ]
2178       },
2179   );
2180
2181   # Equivalent SQL:
2182   # SELECT mother.* FROM person mother
2183   # JOIN (
2184   #   person child
2185   #   JOIN person father
2186   #   ON ( father.person_id = child.father_id )
2187   # )
2188   # ON ( mother.person_id = child.mother_id )
2189
2190 The type of any join can be controlled manually. To search against only people
2191 with a father in the person table, we could explicitly use C<INNER JOIN>:
2192
2193     $rs = $schema->resultset('Person')->search(
2194         undef,
2195         {
2196             alias => 'child', # alias columns in accordance with "from"
2197             from => [
2198                 { child => 'person' },
2199                 [
2200                     { father => 'person', -join_type => 'inner' },
2201                     { 'father.id' => 'child.father_id' }
2202                 ],
2203             ]
2204         },
2205     );
2206
2207     # Equivalent SQL:
2208     # SELECT child.* FROM person child
2209     # INNER JOIN person father ON child.father_id = father.id
2210
2211 =cut
2212
2213 1;