5212799842b398f182fc6a08d74c3e2194ef0538
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   #weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
137
138 =cut
139
140 sub search {
141   my $self = shift;
142   my $rs = $self->search_rs( @_ );
143   return (wantarray ? $rs->all : $rs);
144 }
145
146 =head2 search_rs
147
148 =over 4
149
150 =item Arguments: $cond, \%attrs?
151
152 =item Return Value: $resultset
153
154 =back
155
156 This method does the same exact thing as search() except it will
157 always return a resultset, even in list context.
158
159 =cut
160
161 sub search_rs {
162   my $self = shift;
163
164   my $rows;
165
166   unless (@_) {                 # no search, effectively just a clone
167     $rows = $self->get_cache;
168   }
169
170   my $attrs = {};
171   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
172   my $our_attrs = { %{$self->{attrs}} };
173   my $having = delete $our_attrs->{having};
174   my $where = delete $our_attrs->{where};
175
176   my $new_attrs = { %{$our_attrs}, %{$attrs} };
177
178   # merge new attrs into inherited
179   foreach my $key (qw/join prefetch/) {
180     next unless exists $attrs->{$key};
181     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
182   }
183
184   my $cond = (@_
185     ? (
186         (@_ == 1 || ref $_[0] eq "HASH")
187           ? (
188               (ref $_[0] eq 'HASH')
189                 ? (
190                     (keys %{ $_[0] }  > 0)
191                       ? shift
192                       : undef
193                    )
194                 :  shift
195              )
196           : (
197               (@_ % 2)
198                 ? $self->throw_exception("Odd number of arguments to search")
199                 : {@_}
200              )
201       )
202     : undef
203   );
204
205   if (defined $where) {
206     $new_attrs->{where} = (
207       defined $new_attrs->{where}
208         ? { '-and' => [
209               map {
210                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
211               } $where, $new_attrs->{where}
212             ]
213           }
214         : $where);
215   }
216
217   if (defined $cond) {
218     $new_attrs->{where} = (
219       defined $new_attrs->{where}
220         ? { '-and' => [
221               map {
222                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
223               } $cond, $new_attrs->{where}
224             ]
225           }
226         : $cond);
227   }
228
229   if (defined $having) {
230     $new_attrs->{having} = (
231       defined $new_attrs->{having}
232         ? { '-and' => [
233               map {
234                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
235               } $having, $new_attrs->{having}
236             ]
237           }
238         : $having);
239   }
240
241   my $rs = (ref $self)->new($self->result_source, $new_attrs);
242   if ($rows) {
243     $rs->set_cache($rows);
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 If your table does not have a primary key, you B<must> provide a value for the
311 C<key> attribute matching one of the unique constraints on the source.
312
313 See also L</find_or_create> and L</update_or_create>. For information on how to
314 declare unique constraints, see
315 L<DBIx::Class::ResultSource/add_unique_constraint>.
316
317 =cut
318
319 sub find {
320   my $self = shift;
321   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
322
323   # Default to the primary key, but allow a specific key
324   my @cols = exists $attrs->{key}
325     ? $self->result_source->unique_constraint_columns($attrs->{key})
326     : $self->result_source->primary_columns;
327   $self->throw_exception(
328     "Can't find unless a primary key is defined or unique constraint is specified"
329   ) unless @cols;
330
331   # Parse out a hashref from input
332   my $input_query;
333   if (ref $_[0] eq 'HASH') {
334     $input_query = { %{$_[0]} };
335   }
336   elsif (@_ == @cols) {
337     $input_query = {};
338     @{$input_query}{@cols} = @_;
339   }
340   else {
341     # Compatibility: Allow e.g. find(id => $value)
342     carp "Find by key => value deprecated; please use a hashref instead";
343     $input_query = {@_};
344   }
345
346   my (%related, $info);
347
348   foreach my $key (keys %$input_query) {
349     if (ref($input_query->{$key})
350         && ($info = $self->result_source->relationship_info($key))) {
351       my $rel_q = $self->result_source->resolve_condition(
352                     $info->{cond}, delete $input_query->{$key}, $key
353                   );
354       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
355       @related{keys %$rel_q} = values %$rel_q;
356     }
357   }
358   if (my @keys = keys %related) {
359     @{$input_query}{@keys} = values %related;
360   }
361
362   my @unique_queries = $self->_unique_queries($input_query, $attrs);
363
364   # Build the final query: Default to the disjunction of the unique queries,
365   # but allow the input query in case the ResultSet defines the query or the
366   # user is abusing find
367   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
368   my $query = @unique_queries
369     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
370     : $self->_add_alias($input_query, $alias);
371
372   # Run the query
373   if (keys %$attrs) {
374     my $rs = $self->search($query, $attrs);
375     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
376   }
377   else {
378     return keys %{$self->_resolved_attrs->{collapse}}
379       ? $self->search($query)->next
380       : $self->single($query);
381   }
382 }
383
384 # _add_alias
385 #
386 # Add the specified alias to the specified query hash. A copy is made so the
387 # original query is not modified.
388
389 sub _add_alias {
390   my ($self, $query, $alias) = @_;
391
392   my %aliased = %$query;
393   foreach my $col (grep { ! m/\./ } keys %aliased) {
394     $aliased{"$alias.$col"} = delete $aliased{$col};
395   }
396
397   return \%aliased;
398 }
399
400 # _unique_queries
401 #
402 # Build a list of queries which satisfy unique constraints.
403
404 sub _unique_queries {
405   my ($self, $query, $attrs) = @_;
406
407   my @constraint_names = exists $attrs->{key}
408     ? ($attrs->{key})
409     : $self->result_source->unique_constraint_names;
410
411   my @unique_queries;
412   foreach my $name (@constraint_names) {
413     my @unique_cols = $self->result_source->unique_constraint_columns($name);
414     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
415
416     my $num_query = scalar keys %$unique_query;
417     next unless $num_query;
418
419     # XXX: Assuming quite a bit about $self->{attrs}{where}
420     my $num_cols = scalar @unique_cols;
421     my $num_where = exists $self->{attrs}{where}
422       ? scalar keys %{ $self->{attrs}{where} }
423       : 0;
424     push @unique_queries, $unique_query
425       if $num_query + $num_where == $num_cols;
426   }
427
428   return @unique_queries;
429 }
430
431 # _build_unique_query
432 #
433 # Constrain the specified query hash based on the specified column names.
434
435 sub _build_unique_query {
436   my ($self, $query, $unique_cols) = @_;
437
438   return {
439     map  { $_ => $query->{$_} }
440     grep { exists $query->{$_} }
441       @$unique_cols
442   };
443 }
444
445 =head2 search_related
446
447 =over 4
448
449 =item Arguments: $rel, $cond, \%attrs?
450
451 =item Return Value: $new_resultset
452
453 =back
454
455   $new_rs = $cd_rs->search_related('artist', {
456     name => 'Emo-R-Us',
457   });
458
459 Searches the specified relationship, optionally specifying a condition and
460 attributes for matching records. See L</ATTRIBUTES> for more information.
461
462 =cut
463
464 sub search_related {
465   return shift->related_resultset(shift)->search(@_);
466 }
467
468 =head2 cursor
469
470 =over 4
471
472 =item Arguments: none
473
474 =item Return Value: $cursor
475
476 =back
477
478 Returns a storage-driven cursor to the given resultset. See
479 L<DBIx::Class::Cursor> for more information.
480
481 =cut
482
483 sub cursor {
484   my ($self) = @_;
485
486   my $attrs = { %{$self->_resolved_attrs} };
487   return $self->{cursor}
488     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
489           $attrs->{where},$attrs);
490 }
491
492 =head2 single
493
494 =over 4
495
496 =item Arguments: $cond?
497
498 =item Return Value: $row_object?
499
500 =back
501
502   my $cd = $schema->resultset('CD')->single({ year => 2001 });
503
504 Inflates the first result without creating a cursor if the resultset has
505 any records in it; if not returns nothing. Used by L</find> as an optimisation.
506
507 Can optionally take an additional condition *only* - this is a fast-code-path
508 method; if you need to add extra joins or similar call ->search and then
509 ->single without a condition on the $rs returned from that.
510
511 =cut
512
513 sub single {
514   my ($self, $where) = @_;
515   my $attrs = { %{$self->_resolved_attrs} };
516   if ($where) {
517     if (defined $attrs->{where}) {
518       $attrs->{where} = {
519         '-and' =>
520             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
521                $where, delete $attrs->{where} ]
522       };
523     } else {
524       $attrs->{where} = $where;
525     }
526   }
527
528 #  XXX: Disabled since it doesn't infer uniqueness in all cases
529 #  unless ($self->_is_unique_query($attrs->{where})) {
530 #    carp "Query not guaranteed to return a single row"
531 #      . "; please declare your unique constraints or use search instead";
532 #  }
533
534   my @data = $self->result_source->storage->select_single(
535     $attrs->{from}, $attrs->{select},
536     $attrs->{where}, $attrs
537   );
538
539   return (@data ? ($self->_construct_object(@data))[0] : ());
540 }
541
542 # _is_unique_query
543 #
544 # Try to determine if the specified query is guaranteed to be unique, based on
545 # the declared unique constraints.
546
547 sub _is_unique_query {
548   my ($self, $query) = @_;
549
550   my $collapsed = $self->_collapse_query($query);
551   my $alias = $self->{attrs}{alias};
552
553   foreach my $name ($self->result_source->unique_constraint_names) {
554     my @unique_cols = map {
555       "$alias.$_"
556     } $self->result_source->unique_constraint_columns($name);
557
558     # Count the values for each unique column
559     my %seen = map { $_ => 0 } @unique_cols;
560
561     foreach my $key (keys %$collapsed) {
562       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
563       next unless exists $seen{$aliased};  # Additional constraints are okay
564       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
565     }
566
567     # If we get 0 or more than 1 value for a column, it's not necessarily unique
568     return 1 unless grep { $_ != 1 } values %seen;
569   }
570
571   return 0;
572 }
573
574 # _collapse_query
575 #
576 # Recursively collapse the query, accumulating values for each column.
577
578 sub _collapse_query {
579   my ($self, $query, $collapsed) = @_;
580
581   $collapsed ||= {};
582
583   if (ref $query eq 'ARRAY') {
584     foreach my $subquery (@$query) {
585       next unless ref $subquery;  # -or
586 #      warn "ARRAY: " . Dumper $subquery;
587       $collapsed = $self->_collapse_query($subquery, $collapsed);
588     }
589   }
590   elsif (ref $query eq 'HASH') {
591     if (keys %$query and (keys %$query)[0] eq '-and') {
592       foreach my $subquery (@{$query->{-and}}) {
593 #        warn "HASH: " . Dumper $subquery;
594         $collapsed = $self->_collapse_query($subquery, $collapsed);
595       }
596     }
597     else {
598 #      warn "LEAF: " . Dumper $query;
599       foreach my $col (keys %$query) {
600         my $value = $query->{$col};
601         $collapsed->{$col}{$value}++;
602       }
603     }
604   }
605
606   return $collapsed;
607 }
608
609 =head2 get_column
610
611 =over 4
612
613 =item Arguments: $cond?
614
615 =item Return Value: $resultsetcolumn
616
617 =back
618
619   my $max_length = $rs->get_column('length')->max;
620
621 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
622
623 =cut
624
625 sub get_column {
626   my ($self, $column) = @_;
627   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
628   return $new;
629 }
630
631 =head2 search_like
632
633 =over 4
634
635 =item Arguments: $cond, \%attrs?
636
637 =item Return Value: $resultset (scalar context), @row_objs (list context)
638
639 =back
640
641   # WHERE title LIKE '%blue%'
642   $cd_rs = $rs->search_like({ title => '%blue%'});
643
644 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
645 that this is simply a convenience method. You most likely want to use
646 L</search> with specific operators.
647
648 For more information, see L<DBIx::Class::Manual::Cookbook>.
649
650 =cut
651
652 sub search_like {
653   my $class = shift;
654   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
655   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
656   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
657   return $class->search($query, { %$attrs });
658 }
659
660 =head2 slice
661
662 =over 4
663
664 =item Arguments: $first, $last
665
666 =item Return Value: $resultset (scalar context), @row_objs (list context)
667
668 =back
669
670 Returns a resultset or object list representing a subset of elements from the
671 resultset slice is called on. Indexes are from 0, i.e., to get the first
672 three records, call:
673
674   my ($one, $two, $three) = $rs->slice(0, 2);
675
676 =cut
677
678 sub slice {
679   my ($self, $min, $max) = @_;
680   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
681   $attrs->{offset} = $self->{attrs}{offset} || 0;
682   $attrs->{offset} += $min;
683   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
684   return $self->search(undef(), $attrs);
685   #my $slice = (ref $self)->new($self->result_source, $attrs);
686   #return (wantarray ? $slice->all : $slice);
687 }
688
689 =head2 next
690
691 =over 4
692
693 =item Arguments: none
694
695 =item Return Value: $result?
696
697 =back
698
699 Returns the next element in the resultset (C<undef> is there is none).
700
701 Can be used to efficiently iterate over records in the resultset:
702
703   my $rs = $schema->resultset('CD')->search;
704   while (my $cd = $rs->next) {
705     print $cd->title;
706   }
707
708 Note that you need to store the resultset object, and call C<next> on it.
709 Calling C<< resultset('Table')->next >> repeatedly will always return the
710 first record from the resultset.
711
712 =cut
713
714 sub next {
715   my ($self) = @_;
716   if (my $cache = $self->get_cache) {
717     $self->{all_cache_position} ||= 0;
718     return $cache->[$self->{all_cache_position}++];
719   }
720   if ($self->{attrs}{cache}) {
721     $self->{all_cache_position} = 1;
722     return ($self->all)[0];
723   }
724   if ($self->{stashed_objects}) {
725     my $obj = shift(@{$self->{stashed_objects}});
726     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
727     return $obj;
728   }
729   my @row = (
730     exists $self->{stashed_row}
731       ? @{delete $self->{stashed_row}}
732       : $self->cursor->next
733   );
734   return unless (@row);
735   my ($row, @more) = $self->_construct_object(@row);
736   $self->{stashed_objects} = \@more if @more;
737   return $row;
738 }
739
740 sub _construct_object {
741   my ($self, @row) = @_;
742   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
743   my @new = $self->result_class->inflate_result($self->result_source, @$info);
744   @new = $self->{_attrs}{record_filter}->(@new)
745     if exists $self->{_attrs}{record_filter};
746   return @new;
747 }
748
749 sub _collapse_result {
750   my ($self, $as, $row, $prefix) = @_;
751
752   my %const;
753   my @copy = @$row;
754   
755   foreach my $this_as (@$as) {
756     my $val = shift @copy;
757     if (defined $prefix) {
758       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
759         my $remain = $1;
760         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
761         $const{$1||''}{$2} = $val;
762       }
763     } else {
764       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
765       $const{$1||''}{$2} = $val;
766     }
767   }
768
769   my $alias = $self->{attrs}{alias};
770   my $info = [ {}, {} ];
771   foreach my $key (keys %const) {
772     if (length $key && $key ne $alias) {
773       my $target = $info;
774       my @parts = split(/\./, $key);
775       foreach my $p (@parts) {
776         $target = $target->[1]->{$p} ||= [];
777       }
778       $target->[0] = $const{$key};
779     } else {
780       $info->[0] = $const{$key};
781     }
782   }
783   
784   my @collapse;
785   if (defined $prefix) {
786     @collapse = map {
787         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
788     } keys %{$self->{_attrs}{collapse}}
789   } else {
790     @collapse = keys %{$self->{_attrs}{collapse}};
791   };
792
793   if (@collapse) {
794     my ($c) = sort { length $a <=> length $b } @collapse;
795     my $target = $info;
796     foreach my $p (split(/\./, $c)) {
797       $target = $target->[1]->{$p} ||= [];
798     }
799     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
800     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
801     my $tree = $self->_collapse_result($as, $row, $c_prefix);
802     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
803     my (@final, @raw);
804
805     while (
806       !(
807         grep {
808           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
809         } @co_key
810         )
811     ) {
812       push(@final, $tree);
813       last unless (@raw = $self->cursor->next);
814       $row = $self->{stashed_row} = \@raw;
815       $tree = $self->_collapse_result($as, $row, $c_prefix);
816     }
817     @$target = (@final ? @final : [ {}, {} ]);
818       # single empty result to indicate an empty prefetched has_many
819   }
820
821   #print "final info: " . Dumper($info);
822   return $info;
823 }
824
825 =head2 result_source
826
827 =over 4
828
829 =item Arguments: $result_source?
830
831 =item Return Value: $result_source
832
833 =back
834
835 An accessor for the primary ResultSource object from which this ResultSet
836 is derived.
837
838 =head2 result_class
839
840 =over 4
841
842 =item Arguments: $result_class?
843
844 =item Return Value: $result_class
845
846 =back
847
848 An accessor for the class to use when creating row objects. Defaults to 
849 C<< result_source->result_class >> - which in most cases is the name of the 
850 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
851
852 =cut
853
854
855 =head2 count
856
857 =over 4
858
859 =item Arguments: $cond, \%attrs??
860
861 =item Return Value: $count
862
863 =back
864
865 Performs an SQL C<COUNT> with the same query as the resultset was built
866 with to find the number of elements. If passed arguments, does a search
867 on the resultset and counts the results of that.
868
869 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
870 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
871 not support C<DISTINCT> with multiple columns. If you are using such a
872 database, you should only use columns from the main table in your C<group_by>
873 clause.
874
875 =cut
876
877 sub count {
878   my $self = shift;
879   return $self->search(@_)->count if @_ and defined $_[0];
880   return scalar @{ $self->get_cache } if $self->get_cache;
881   my $count = $self->_count;
882   return 0 unless $count;
883
884   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
885   $count = $self->{attrs}{rows} if
886     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
887   return $count;
888 }
889
890 sub _count { # Separated out so pager can get the full count
891   my $self = shift;
892   my $select = { count => '*' };
893
894   my $attrs = { %{$self->_resolved_attrs} };
895   if (my $group_by = delete $attrs->{group_by}) {
896     delete $attrs->{having};
897     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
898     # todo: try CONCAT for multi-column pk
899     my @pk = $self->result_source->primary_columns;
900     if (@pk == 1) {
901       my $alias = $attrs->{alias};
902       foreach my $column (@distinct) {
903         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
904           @distinct = ($column);
905           last;
906         }
907       }
908     }
909
910     $select = { count => { distinct => \@distinct } };
911   }
912
913   $attrs->{select} = $select;
914   $attrs->{as} = [qw/count/];
915
916   # offset, order by and page are not needed to count. record_filter is cdbi
917   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
918
919   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
920   my ($count) = $tmp_rs->cursor->next;
921   return $count;
922 }
923
924 =head2 count_literal
925
926 =over 4
927
928 =item Arguments: $sql_fragment, @bind_values
929
930 =item Return Value: $count
931
932 =back
933
934 Counts the results in a literal query. Equivalent to calling L</search_literal>
935 with the passed arguments, then L</count>.
936
937 =cut
938
939 sub count_literal { shift->search_literal(@_)->count; }
940
941 =head2 all
942
943 =over 4
944
945 =item Arguments: none
946
947 =item Return Value: @objects
948
949 =back
950
951 Returns all elements in the resultset. Called implicitly if the resultset
952 is returned in list context.
953
954 =cut
955
956 sub all {
957   my ($self) = @_;
958   return @{ $self->get_cache } if $self->get_cache;
959
960   my @obj;
961
962   # TODO: don't call resolve here
963   if (keys %{$self->_resolved_attrs->{collapse}}) {
964 #  if ($self->{attrs}{prefetch}) {
965       # Using $self->cursor->all is really just an optimisation.
966       # If we're collapsing has_many prefetches it probably makes
967       # very little difference, and this is cleaner than hacking
968       # _construct_object to survive the approach
969     my @row = $self->cursor->next;
970     while (@row) {
971       push(@obj, $self->_construct_object(@row));
972       @row = (exists $self->{stashed_row}
973                ? @{delete $self->{stashed_row}}
974                : $self->cursor->next);
975     }
976   } else {
977     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
978   }
979
980   $self->set_cache(\@obj) if $self->{attrs}{cache};
981   return @obj;
982 }
983
984 =head2 reset
985
986 =over 4
987
988 =item Arguments: none
989
990 =item Return Value: $self
991
992 =back
993
994 Resets the resultset's cursor, so you can iterate through the elements again.
995
996 =cut
997
998 sub reset {
999   my ($self) = @_;
1000   delete $self->{_attrs} if exists $self->{_attrs};
1001   $self->{all_cache_position} = 0;
1002   $self->cursor->reset;
1003   return $self;
1004 }
1005
1006 =head2 first
1007
1008 =over 4
1009
1010 =item Arguments: none
1011
1012 =item Return Value: $object?
1013
1014 =back
1015
1016 Resets the resultset and returns an object for the first result (if the
1017 resultset returns anything).
1018
1019 =cut
1020
1021 sub first {
1022   return $_[0]->reset->next;
1023 }
1024
1025 # _cond_for_update_delete
1026 #
1027 # update/delete require the condition to be modified to handle
1028 # the differing SQL syntax available.  This transforms the $self->{cond}
1029 # appropriately, returning the new condition.
1030
1031 sub _cond_for_update_delete {
1032   my ($self, $full_cond) = @_;
1033   my $cond = {};
1034
1035   $full_cond ||= $self->{cond};
1036   # No-op. No condition, we're updating/deleting everything
1037   return $cond unless ref $full_cond;
1038
1039   if (ref $full_cond eq 'ARRAY') {
1040     $cond = [
1041       map {
1042         my %hash;
1043         foreach my $key (keys %{$_}) {
1044           $key =~ /([^.]+)$/;
1045           $hash{$1} = $_->{$key};
1046         }
1047         \%hash;
1048       } @{$full_cond}
1049     ];
1050   }
1051   elsif (ref $full_cond eq 'HASH') {
1052     if ((keys %{$full_cond})[0] eq '-and') {
1053       $cond->{-and} = [];
1054
1055       my @cond = @{$full_cond->{-and}};
1056       for (my $i = 0; $i < @cond; $i++) {
1057         my $entry = $cond[$i];
1058
1059         my $hash;
1060         if (ref $entry eq 'HASH') {
1061           $hash = $self->_cond_for_update_delete($entry);
1062         }
1063         else {
1064           $entry =~ /([^.]+)$/;
1065           $hash->{$1} = $cond[++$i];
1066         }
1067
1068         push @{$cond->{-and}}, $hash;
1069       }
1070     }
1071     else {
1072       foreach my $key (keys %{$full_cond}) {
1073         $key =~ /([^.]+)$/;
1074         $cond->{$1} = $full_cond->{$key};
1075       }
1076     }
1077   }
1078   else {
1079     $self->throw_exception(
1080       "Can't update/delete on resultset with condition unless hash or array"
1081     );
1082   }
1083
1084   return $cond;
1085 }
1086
1087
1088 =head2 update
1089
1090 =over 4
1091
1092 =item Arguments: \%values
1093
1094 =item Return Value: $storage_rv
1095
1096 =back
1097
1098 Sets the specified columns in the resultset to the supplied values in a
1099 single query. Return value will be true if the update succeeded or false
1100 if no records were updated; exact type of success value is storage-dependent.
1101
1102 =cut
1103
1104 sub update {
1105   my ($self, $values) = @_;
1106   $self->throw_exception("Values for update must be a hash")
1107     unless ref $values eq 'HASH';
1108
1109   my $cond = $self->_cond_for_update_delete;
1110   
1111   my $bind_attributes;
1112   foreach my $column ($self->result_source->columns) {
1113   
1114     $bind_attributes->{$column} = $self->result_source->column_info($column)->{bind_attributes}
1115      if defined $self->result_source->column_info($column)->{bind_attributes};
1116   }
1117   $self->result_source->storage->bind_attributes($bind_attributes);
1118   
1119   return $self->result_source->storage->update(
1120     $self->result_source->from, $values, $cond
1121   );
1122 }
1123
1124 =head2 update_all
1125
1126 =over 4
1127
1128 =item Arguments: \%values
1129
1130 =item Return Value: 1
1131
1132 =back
1133
1134 Fetches all objects and updates them one at a time. Note that C<update_all>
1135 will run DBIC cascade triggers, while L</update> will not.
1136
1137 =cut
1138
1139 sub update_all {
1140   my ($self, $values) = @_;
1141   $self->throw_exception("Values for update must be a hash")
1142     unless ref $values eq 'HASH';
1143   foreach my $obj ($self->all) {
1144     $obj->set_columns($values)->update;
1145   }
1146   return 1;
1147 }
1148
1149 =head2 delete
1150
1151 =over 4
1152
1153 =item Arguments: none
1154
1155 =item Return Value: 1
1156
1157 =back
1158
1159 Deletes the contents of the resultset from its result source. Note that this
1160 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1161 to run. See also L<DBIx::Class::Row/delete>.
1162
1163 =cut
1164
1165 sub delete {
1166   my ($self) = @_;
1167
1168   my $cond = $self->_cond_for_update_delete;
1169
1170   $self->result_source->storage->delete($self->result_source->from, $cond);
1171   return 1;
1172 }
1173
1174 =head2 delete_all
1175
1176 =over 4
1177
1178 =item Arguments: none
1179
1180 =item Return Value: 1
1181
1182 =back
1183
1184 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1185 will run DBIC cascade triggers, while L</delete> will not.
1186
1187 =cut
1188
1189 sub delete_all {
1190   my ($self) = @_;
1191   $_->delete for $self->all;
1192   return 1;
1193 }
1194
1195 =head2 pager
1196
1197 =over 4
1198
1199 =item Arguments: none
1200
1201 =item Return Value: $pager
1202
1203 =back
1204
1205 Return Value a L<Data::Page> object for the current resultset. Only makes
1206 sense for queries with a C<page> attribute.
1207
1208 =cut
1209
1210 sub pager {
1211   my ($self) = @_;
1212   my $attrs = $self->{attrs};
1213   $self->throw_exception("Can't create pager for non-paged rs")
1214     unless $self->{attrs}{page};
1215   $attrs->{rows} ||= 10;
1216   return $self->{pager} ||= Data::Page->new(
1217     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1218 }
1219
1220 =head2 page
1221
1222 =over 4
1223
1224 =item Arguments: $page_number
1225
1226 =item Return Value: $rs
1227
1228 =back
1229
1230 Returns a resultset for the $page_number page of the resultset on which page
1231 is called, where each page contains a number of rows equal to the 'rows'
1232 attribute set on the resultset (10 by default).
1233
1234 =cut
1235
1236 sub page {
1237   my ($self, $page) = @_;
1238   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1239 }
1240
1241 =head2 new_result
1242
1243 =over 4
1244
1245 =item Arguments: \%vals
1246
1247 =item Return Value: $object
1248
1249 =back
1250
1251 Creates an object in the resultset's result class and returns it.
1252
1253 =cut
1254
1255 sub new_result {
1256   my ($self, $values) = @_;
1257   $self->throw_exception( "new_result needs a hash" )
1258     unless (ref $values eq 'HASH');
1259   $self->throw_exception(
1260     "Can't abstract implicit construct, condition not a hash"
1261   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1262
1263   my $alias = $self->{attrs}{alias};
1264   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1265   my %new = (
1266     %{ $self->_remove_alias($values, $alias) },
1267     %{ $self->_remove_alias($collapsed_cond, $alias) },
1268     -result_source => $self->result_source,
1269   );
1270
1271   my $obj = $self->result_class->new(\%new);
1272   return $obj;
1273 }
1274
1275 # _collapse_cond
1276 #
1277 # Recursively collapse the condition.
1278
1279 sub _collapse_cond {
1280   my ($self, $cond, $collapsed) = @_;
1281
1282   $collapsed ||= {};
1283
1284   if (ref $cond eq 'ARRAY') {
1285     foreach my $subcond (@$cond) {
1286       next unless ref $subcond;  # -or
1287 #      warn "ARRAY: " . Dumper $subcond;
1288       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1289     }
1290   }
1291   elsif (ref $cond eq 'HASH') {
1292     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1293       foreach my $subcond (@{$cond->{-and}}) {
1294 #        warn "HASH: " . Dumper $subcond;
1295         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1296       }
1297     }
1298     else {
1299 #      warn "LEAF: " . Dumper $cond;
1300       foreach my $col (keys %$cond) {
1301         my $value = $cond->{$col};
1302         $collapsed->{$col} = $value;
1303       }
1304     }
1305   }
1306
1307   return $collapsed;
1308 }
1309
1310 # _remove_alias
1311 #
1312 # Remove the specified alias from the specified query hash. A copy is made so
1313 # the original query is not modified.
1314
1315 sub _remove_alias {
1316   my ($self, $query, $alias) = @_;
1317
1318   my %orig = %{ $query || {} };
1319   my %unaliased;
1320
1321   foreach my $key (keys %orig) {
1322     if ($key !~ /\./) {
1323       $unaliased{$key} = $orig{$key};
1324       next;
1325     }
1326     $unaliased{$1} = $orig{$key}
1327       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1328   }
1329
1330   return \%unaliased;
1331 }
1332
1333 =head2 find_or_new
1334
1335 =over 4
1336
1337 =item Arguments: \%vals, \%attrs?
1338
1339 =item Return Value: $object
1340
1341 =back
1342
1343 Find an existing record from this resultset. If none exists, instantiate a new
1344 result object and return it. The object will not be saved into your storage
1345 until you call L<DBIx::Class::Row/insert> on it.
1346
1347 If you want objects to be saved immediately, use L</find_or_create> instead.
1348
1349 =cut
1350
1351 sub find_or_new {
1352   my $self     = shift;
1353   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1354   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1355   my $exists   = $self->find($hash, $attrs);
1356   return defined $exists ? $exists : $self->new_result($hash);
1357 }
1358
1359 =head2 create
1360
1361 =over 4
1362
1363 =item Arguments: \%vals
1364
1365 =item Return Value: $object
1366
1367 =back
1368
1369 Inserts a record into the resultset and returns the object representing it.
1370
1371 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1372
1373 =cut
1374
1375 sub create {
1376   my ($self, $attrs) = @_;
1377   $self->throw_exception( "create needs a hashref" )
1378     unless ref $attrs eq 'HASH';
1379   return $self->new_result($attrs)->insert;
1380 }
1381
1382 =head2 find_or_create
1383
1384 =over 4
1385
1386 =item Arguments: \%vals, \%attrs?
1387
1388 =item Return Value: $object
1389
1390 =back
1391
1392   $class->find_or_create({ key => $val, ... });
1393
1394 Tries to find a record based on its primary key or unique constraint; if none
1395 is found, creates one and returns that instead.
1396
1397   my $cd = $schema->resultset('CD')->find_or_create({
1398     cdid   => 5,
1399     artist => 'Massive Attack',
1400     title  => 'Mezzanine',
1401     year   => 2005,
1402   });
1403
1404 Also takes an optional C<key> attribute, to search by a specific key or unique
1405 constraint. For example:
1406
1407   my $cd = $schema->resultset('CD')->find_or_create(
1408     {
1409       artist => 'Massive Attack',
1410       title  => 'Mezzanine',
1411     },
1412     { key => 'cd_artist_title' }
1413   );
1414
1415 See also L</find> and L</update_or_create>. For information on how to declare
1416 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1417
1418 =cut
1419
1420 sub find_or_create {
1421   my $self     = shift;
1422   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1423   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1424   my $exists   = $self->find($hash, $attrs);
1425   return defined $exists ? $exists : $self->create($hash);
1426 }
1427
1428 =head2 update_or_create
1429
1430 =over 4
1431
1432 =item Arguments: \%col_values, { key => $unique_constraint }?
1433
1434 =item Return Value: $object
1435
1436 =back
1437
1438   $class->update_or_create({ col => $val, ... });
1439
1440 First, searches for an existing row matching one of the unique constraints
1441 (including the primary key) on the source of this resultset. If a row is
1442 found, updates it with the other given column values. Otherwise, creates a new
1443 row.
1444
1445 Takes an optional C<key> attribute to search on a specific unique constraint.
1446 For example:
1447
1448   # In your application
1449   my $cd = $schema->resultset('CD')->update_or_create(
1450     {
1451       artist => 'Massive Attack',
1452       title  => 'Mezzanine',
1453       year   => 1998,
1454     },
1455     { key => 'cd_artist_title' }
1456   );
1457
1458 If no C<key> is specified, it searches on all unique constraints defined on the
1459 source, including the primary key.
1460
1461 If the C<key> is specified as C<primary>, it searches only on the primary key.
1462
1463 See also L</find> and L</find_or_create>. For information on how to declare
1464 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1465
1466 =cut
1467
1468 sub update_or_create {
1469   my $self = shift;
1470   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1471   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1472
1473   my $row = $self->find($cond, $attrs);
1474   if (defined $row) {
1475     $row->update($cond);
1476     return $row;
1477   }
1478
1479   return $self->create($cond);
1480 }
1481
1482 =head2 get_cache
1483
1484 =over 4
1485
1486 =item Arguments: none
1487
1488 =item Return Value: \@cache_objects?
1489
1490 =back
1491
1492 Gets the contents of the cache for the resultset, if the cache is set.
1493
1494 =cut
1495
1496 sub get_cache {
1497   shift->{all_cache};
1498 }
1499
1500 =head2 set_cache
1501
1502 =over 4
1503
1504 =item Arguments: \@cache_objects
1505
1506 =item Return Value: \@cache_objects
1507
1508 =back
1509
1510 Sets the contents of the cache for the resultset. Expects an arrayref
1511 of objects of the same class as those produced by the resultset. Note that
1512 if the cache is set the resultset will return the cached objects rather
1513 than re-querying the database even if the cache attr is not set.
1514
1515 =cut
1516
1517 sub set_cache {
1518   my ( $self, $data ) = @_;
1519   $self->throw_exception("set_cache requires an arrayref")
1520       if defined($data) && (ref $data ne 'ARRAY');
1521   $self->{all_cache} = $data;
1522 }
1523
1524 =head2 clear_cache
1525
1526 =over 4
1527
1528 =item Arguments: none
1529
1530 =item Return Value: []
1531
1532 =back
1533
1534 Clears the cache for the resultset.
1535
1536 =cut
1537
1538 sub clear_cache {
1539   shift->set_cache(undef);
1540 }
1541
1542 =head2 related_resultset
1543
1544 =over 4
1545
1546 =item Arguments: $relationship_name
1547
1548 =item Return Value: $resultset
1549
1550 =back
1551
1552 Returns a related resultset for the supplied relationship name.
1553
1554   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1555
1556 =cut
1557
1558 sub related_resultset {
1559   my ($self, $rel) = @_;
1560
1561   $self->{related_resultsets} ||= {};
1562   return $self->{related_resultsets}{$rel} ||= do {
1563     my $rel_obj = $self->result_source->relationship_info($rel);
1564
1565     $self->throw_exception(
1566       "search_related: result source '" . $self->result_source->name .
1567         "' has no such relationship $rel")
1568       unless $rel_obj;
1569     
1570     my ($from,$seen) = $self->_resolve_from($rel);
1571
1572     my $join_count = $seen->{$rel};
1573     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1574
1575     $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
1576       undef, {
1577         %{$self->{attrs}||{}},
1578         join => undef,
1579         prefetch => undef,
1580         select => undef,
1581         as => undef,
1582         alias => $alias,
1583         where => $self->{cond},
1584         seen_join => $seen,
1585         from => $from,
1586     });
1587   };
1588 }
1589
1590 sub _resolve_from {
1591   my ($self, $extra_join) = @_;
1592   my $source = $self->result_source;
1593   my $attrs = $self->{attrs};
1594   
1595   my $from = $attrs->{from}
1596     || [ { $attrs->{alias} => $source->from } ];
1597     
1598   my $seen = { %{$attrs->{seen_join}||{}} };
1599
1600   my $join = ($attrs->{join}
1601                ? [ $attrs->{join}, $extra_join ]
1602                : $extra_join);
1603   $from = [
1604     @$from,
1605     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1606   ];
1607
1608   return ($from,$seen);
1609 }
1610
1611 sub _resolved_attrs {
1612   my $self = shift;
1613   return $self->{_attrs} if $self->{_attrs};
1614
1615   my $attrs = { %{$self->{attrs}||{}} };
1616   my $source = $self->{result_source};
1617   my $alias = $attrs->{alias};
1618
1619   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1620   if ($attrs->{columns}) {
1621     delete $attrs->{as};
1622   } elsif (!$attrs->{select}) {
1623     $attrs->{columns} = [ $source->columns ];
1624   }
1625  
1626   $attrs->{select} = 
1627     ($attrs->{select}
1628       ? (ref $attrs->{select} eq 'ARRAY'
1629           ? [ @{$attrs->{select}} ]
1630           : [ $attrs->{select} ])
1631       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1632     );
1633   $attrs->{as} =
1634     ($attrs->{as}
1635       ? (ref $attrs->{as} eq 'ARRAY'
1636           ? [ @{$attrs->{as}} ]
1637           : [ $attrs->{as} ])
1638       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1639     );
1640   
1641   my $adds;
1642   if ($adds = delete $attrs->{include_columns}) {
1643     $adds = [$adds] unless ref $adds eq 'ARRAY';
1644     push(@{$attrs->{select}}, @$adds);
1645     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1646   }
1647   if ($adds = delete $attrs->{'+select'}) {
1648     $adds = [$adds] unless ref $adds eq 'ARRAY';
1649     push(@{$attrs->{select}},
1650            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1651   }
1652   if (my $adds = delete $attrs->{'+as'}) {
1653     $adds = [$adds] unless ref $adds eq 'ARRAY';
1654     push(@{$attrs->{as}}, @$adds);
1655   }
1656
1657   $attrs->{from} ||= [ { 'me' => $source->from } ];
1658
1659   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1660     my $join = delete $attrs->{join} || {};
1661
1662     if (defined $attrs->{prefetch}) {
1663       $join = $self->_merge_attr(
1664         $join, $attrs->{prefetch}
1665       );
1666     }
1667
1668     $attrs->{from} =   # have to copy here to avoid corrupting the original
1669       [
1670         @{$attrs->{from}}, 
1671         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1672       ];
1673   }
1674
1675   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1676   if ($attrs->{order_by}) {
1677     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1678                            ? [ @{$attrs->{order_by}} ]
1679                            : [ $attrs->{order_by} ]);
1680   } else {
1681     $attrs->{order_by} = [];    
1682   }
1683
1684   my $collapse = $attrs->{collapse} || {};
1685   if (my $prefetch = delete $attrs->{prefetch}) {
1686     $prefetch = $self->_merge_attr({}, $prefetch);
1687     my @pre_order;
1688     my $seen = $attrs->{seen_join} || {};
1689     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1690       # bring joins back to level of current class
1691       my @prefetch = $source->resolve_prefetch(
1692         $p, $alias, $seen, \@pre_order, $collapse
1693       );
1694       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1695       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1696     }
1697     push(@{$attrs->{order_by}}, @pre_order);
1698   }
1699   $attrs->{collapse} = $collapse;
1700
1701   return $self->{_attrs} = $attrs;
1702 }
1703
1704 sub _merge_attr {
1705   my ($self, $a, $b) = @_;
1706   return $b unless defined($a);
1707   return $a unless defined($b);
1708   
1709   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1710     foreach my $key (keys %{$b}) {
1711       if (exists $a->{$key}) {
1712         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1713       } else {
1714         $a->{$key} = $b->{$key};
1715       }
1716     }
1717     return $a;
1718   } else {
1719     $a = [$a] unless ref $a eq 'ARRAY';
1720     $b = [$b] unless ref $b eq 'ARRAY';
1721
1722     my $hash = {};
1723     my @array;
1724     foreach my $x ($a, $b) {
1725       foreach my $element (@{$x}) {
1726         if (ref $element eq 'HASH') {
1727           $hash = $self->_merge_attr($hash, $element);
1728         } elsif (ref $element eq 'ARRAY') {
1729           push(@array, @{$element});
1730         } else {
1731           push(@array, $element) unless $b == $x
1732             && grep { $_ eq $element } @array;
1733         }
1734       }
1735     }
1736     
1737     @array = grep { !exists $hash->{$_} } @array;
1738
1739     return keys %{$hash}
1740       ? ( scalar(@array)
1741             ? [$hash, @array]
1742             : $hash
1743         )
1744       : \@array;
1745   }
1746 }
1747
1748 =head2 throw_exception
1749
1750 See L<DBIx::Class::Schema/throw_exception> for details.
1751
1752 =cut
1753
1754 sub throw_exception {
1755   my $self=shift;
1756   $self->result_source->schema->throw_exception(@_);
1757 }
1758
1759 # XXX: FIXME: Attributes docs need clearing up
1760
1761 =head1 ATTRIBUTES
1762
1763 The resultset takes various attributes that modify its behavior. Here's an
1764 overview of them:
1765
1766 =head2 order_by
1767
1768 =over 4
1769
1770 =item Value: ($order_by | \@order_by)
1771
1772 =back
1773
1774 Which column(s) to order the results by. This is currently passed
1775 through directly to SQL, so you can give e.g. C<year DESC> for a
1776 descending order on the column `year'.
1777
1778 Please note that if you have C<quote_char> enabled (see
1779 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1780 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1781 so you will need to manually quote things as appropriate.)
1782
1783 =head2 columns
1784
1785 =over 4
1786
1787 =item Value: \@columns
1788
1789 =back
1790
1791 Shortcut to request a particular set of columns to be retrieved.  Adds
1792 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1793 from that, then auto-populates C<as> from C<select> as normal. (You may also
1794 use the C<cols> attribute, as in earlier versions of DBIC.)
1795
1796 =head2 include_columns
1797
1798 =over 4
1799
1800 =item Value: \@columns
1801
1802 =back
1803
1804 Shortcut to include additional columns in the returned results - for example
1805
1806   $schema->resultset('CD')->search(undef, {
1807     include_columns => ['artist.name'],
1808     join => ['artist']
1809   });
1810
1811 would return all CDs and include a 'name' column to the information
1812 passed to object inflation
1813
1814 =head2 select
1815
1816 =over 4
1817
1818 =item Value: \@select_columns
1819
1820 =back
1821
1822 Indicates which columns should be selected from the storage. You can use
1823 column names, or in the case of RDBMS back ends, function or stored procedure
1824 names:
1825
1826   $rs = $schema->resultset('Employee')->search(undef, {
1827     select => [
1828       'name',
1829       { count => 'employeeid' },
1830       { sum => 'salary' }
1831     ]
1832   });
1833
1834 When you use function/stored procedure names and do not supply an C<as>
1835 attribute, the column names returned are storage-dependent. E.g. MySQL would
1836 return a column named C<count(employeeid)> in the above example.
1837
1838 =head2 +select
1839
1840 =over 4
1841
1842 Indicates additional columns to be selected from storage.  Works the same as
1843 L<select> but adds columns to the selection.
1844
1845 =back
1846
1847 =head2 +as
1848
1849 =over 4
1850
1851 Indicates additional column names for those added via L<+select>.
1852
1853 =back
1854
1855 =head2 as
1856
1857 =over 4
1858
1859 =item Value: \@inflation_names
1860
1861 =back
1862
1863 Indicates column names for object inflation. This is used in conjunction with
1864 C<select>, usually when C<select> contains one or more function or stored
1865 procedure names:
1866
1867   $rs = $schema->resultset('Employee')->search(undef, {
1868     select => [
1869       'name',
1870       { count => 'employeeid' }
1871     ],
1872     as => ['name', 'employee_count'],
1873   });
1874
1875   my $employee = $rs->first(); # get the first Employee
1876
1877 If the object against which the search is performed already has an accessor
1878 matching a column name specified in C<as>, the value can be retrieved using
1879 the accessor as normal:
1880
1881   my $name = $employee->name();
1882
1883 If on the other hand an accessor does not exist in the object, you need to
1884 use C<get_column> instead:
1885
1886   my $employee_count = $employee->get_column('employee_count');
1887
1888 You can create your own accessors if required - see
1889 L<DBIx::Class::Manual::Cookbook> for details.
1890
1891 Please note: This will NOT insert an C<AS employee_count> into the SQL
1892 statement produced, it is used for internal access only. Thus
1893 attempting to use the accessor in an C<order_by> clause or similar
1894 will fail miserably.
1895
1896 To get around this limitation, you can supply literal SQL to your
1897 C<select> attibute that contains the C<AS alias> text, eg:
1898
1899   select => [\'myfield AS alias']
1900
1901 =head2 join
1902
1903 =over 4
1904
1905 =item Value: ($rel_name | \@rel_names | \%rel_names)
1906
1907 =back
1908
1909 Contains a list of relationships that should be joined for this query.  For
1910 example:
1911
1912   # Get CDs by Nine Inch Nails
1913   my $rs = $schema->resultset('CD')->search(
1914     { 'artist.name' => 'Nine Inch Nails' },
1915     { join => 'artist' }
1916   );
1917
1918 Can also contain a hash reference to refer to the other relation's relations.
1919 For example:
1920
1921   package MyApp::Schema::Track;
1922   use base qw/DBIx::Class/;
1923   __PACKAGE__->table('track');
1924   __PACKAGE__->add_columns(qw/trackid cd position title/);
1925   __PACKAGE__->set_primary_key('trackid');
1926   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1927   1;
1928
1929   # In your application
1930   my $rs = $schema->resultset('Artist')->search(
1931     { 'track.title' => 'Teardrop' },
1932     {
1933       join     => { cd => 'track' },
1934       order_by => 'artist.name',
1935     }
1936   );
1937
1938 You need to use the relationship (not the table) name in  conditions, 
1939 because they are aliased as such. The current table is aliased as "me", so 
1940 you need to use me.column_name in order to avoid ambiguity. For example:
1941
1942   # Get CDs from 1984 with a 'Foo' track 
1943   my $rs = $schema->resultset('CD')->search(
1944     { 
1945       'me.year' => 1984,
1946       'tracks.name' => 'Foo'
1947     },
1948     { join => 'tracks' }
1949   );
1950   
1951 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1952 similarly for a third time). For e.g.
1953
1954   my $rs = $schema->resultset('Artist')->search({
1955     'cds.title'   => 'Down to Earth',
1956     'cds_2.title' => 'Popular',
1957   }, {
1958     join => [ qw/cds cds/ ],
1959   });
1960
1961 will return a set of all artists that have both a cd with title 'Down
1962 to Earth' and a cd with title 'Popular'.
1963
1964 If you want to fetch related objects from other tables as well, see C<prefetch>
1965 below.
1966
1967 =head2 prefetch
1968
1969 =over 4
1970
1971 =item Value: ($rel_name | \@rel_names | \%rel_names)
1972
1973 =back
1974
1975 Contains one or more relationships that should be fetched along with the main
1976 query (when they are accessed afterwards they will have already been
1977 "prefetched").  This is useful for when you know you will need the related
1978 objects, because it saves at least one query:
1979
1980   my $rs = $schema->resultset('Tag')->search(
1981     undef,
1982     {
1983       prefetch => {
1984         cd => 'artist'
1985       }
1986     }
1987   );
1988
1989 The initial search results in SQL like the following:
1990
1991   SELECT tag.*, cd.*, artist.* FROM tag
1992   JOIN cd ON tag.cd = cd.cdid
1993   JOIN artist ON cd.artist = artist.artistid
1994
1995 L<DBIx::Class> has no need to go back to the database when we access the
1996 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1997 case.
1998
1999 Simple prefetches will be joined automatically, so there is no need
2000 for a C<join> attribute in the above search. If you're prefetching to
2001 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2002 specify the join as well.
2003
2004 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2005 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2006 with an accessor type of 'single' or 'filter').
2007
2008 =head2 page
2009
2010 =over 4
2011
2012 =item Value: $page
2013
2014 =back
2015
2016 Makes the resultset paged and specifies the page to retrieve. Effectively
2017 identical to creating a non-pages resultset and then calling ->page($page)
2018 on it.
2019
2020 If L<rows> attribute is not specified it defualts to 10 rows per page.
2021
2022 =head2 rows
2023
2024 =over 4
2025
2026 =item Value: $rows
2027
2028 =back
2029
2030 Specifes the maximum number of rows for direct retrieval or the number of
2031 rows per page if the page attribute or method is used.
2032
2033 =head2 offset
2034
2035 =over 4
2036
2037 =item Value: $offset
2038
2039 =back
2040
2041 Specifies the (zero-based) row number for the  first row to be returned, or the
2042 of the first row of the first page if paging is used.
2043
2044 =head2 group_by
2045
2046 =over 4
2047
2048 =item Value: \@columns
2049
2050 =back
2051
2052 A arrayref of columns to group by. Can include columns of joined tables.
2053
2054   group_by => [qw/ column1 column2 ... /]
2055
2056 =head2 having
2057
2058 =over 4
2059
2060 =item Value: $condition
2061
2062 =back
2063
2064 HAVING is a select statement attribute that is applied between GROUP BY and
2065 ORDER BY. It is applied to the after the grouping calculations have been
2066 done.
2067
2068   having => { 'count(employee)' => { '>=', 100 } }
2069
2070 =head2 distinct
2071
2072 =over 4
2073
2074 =item Value: (0 | 1)
2075
2076 =back
2077
2078 Set to 1 to group by all columns.
2079
2080 =head2 where
2081
2082 =over 4
2083
2084 Adds to the WHERE clause.
2085
2086   # only return rows WHERE deleted IS NULL for all searches
2087   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2088
2089 Can be overridden by passing C<{ where => undef }> as an attribute
2090 to a resulset.
2091
2092 =back
2093
2094 =head2 cache
2095
2096 Set to 1 to cache search results. This prevents extra SQL queries if you
2097 revisit rows in your ResultSet:
2098
2099   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2100
2101   while( my $artist = $resultset->next ) {
2102     ... do stuff ...
2103   }
2104
2105   $rs->first; # without cache, this would issue a query
2106
2107 By default, searches are not cached.
2108
2109 For more examples of using these attributes, see
2110 L<DBIx::Class::Manual::Cookbook>.
2111
2112 =head2 from
2113
2114 =over 4
2115
2116 =item Value: \@from_clause
2117
2118 =back
2119
2120 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2121 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2122 clauses.
2123
2124 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2125
2126 C<join> will usually do what you need and it is strongly recommended that you
2127 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2128 And we really do mean "cannot", not just tried and failed. Attempting to use
2129 this because you're having problems with C<join> is like trying to use x86
2130 ASM because you've got a syntax error in your C. Trust us on this.
2131
2132 Now, if you're still really, really sure you need to use this (and if you're
2133 not 100% sure, ask the mailing list first), here's an explanation of how this
2134 works.
2135
2136 The syntax is as follows -
2137
2138   [
2139     { <alias1> => <table1> },
2140     [
2141       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2142       [], # nested JOIN (optional)
2143       { <table1.column1> => <table2.column2>, ... (more conditions) },
2144     ],
2145     # More of the above [ ] may follow for additional joins
2146   ]
2147
2148   <table1> <alias1>
2149   JOIN
2150     <table2> <alias2>
2151     [JOIN ...]
2152   ON <table1.column1> = <table2.column2>
2153   <more joins may follow>
2154
2155 An easy way to follow the examples below is to remember the following:
2156
2157     Anything inside "[]" is a JOIN
2158     Anything inside "{}" is a condition for the enclosing JOIN
2159
2160 The following examples utilize a "person" table in a family tree application.
2161 In order to express parent->child relationships, this table is self-joined:
2162
2163     # Person->belongs_to('father' => 'Person');
2164     # Person->belongs_to('mother' => 'Person');
2165
2166 C<from> can be used to nest joins. Here we return all children with a father,
2167 then search against all mothers of those children:
2168
2169   $rs = $schema->resultset('Person')->search(
2170       undef,
2171       {
2172           alias => 'mother', # alias columns in accordance with "from"
2173           from => [
2174               { mother => 'person' },
2175               [
2176                   [
2177                       { child => 'person' },
2178                       [
2179                           { father => 'person' },
2180                           { 'father.person_id' => 'child.father_id' }
2181                       ]
2182                   ],
2183                   { 'mother.person_id' => 'child.mother_id' }
2184               ],
2185           ]
2186       },
2187   );
2188
2189   # Equivalent SQL:
2190   # SELECT mother.* FROM person mother
2191   # JOIN (
2192   #   person child
2193   #   JOIN person father
2194   #   ON ( father.person_id = child.father_id )
2195   # )
2196   # ON ( mother.person_id = child.mother_id )
2197
2198 The type of any join can be controlled manually. To search against only people
2199 with a father in the person table, we could explicitly use C<INNER JOIN>:
2200
2201     $rs = $schema->resultset('Person')->search(
2202         undef,
2203         {
2204             alias => 'child', # alias columns in accordance with "from"
2205             from => [
2206                 { child => 'person' },
2207                 [
2208                     { father => 'person', -join_type => 'inner' },
2209                     { 'father.id' => 'child.father_id' }
2210                 ],
2211             ]
2212         },
2213     );
2214
2215     # Equivalent SQL:
2216     # SELECT child.* FROM person child
2217     # INNER JOIN person father ON child.father_id = father.id
2218
2219 =cut
2220
2221 1;