fixed search with joins from related resultset
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs}) ? { %{$attrs->{_parent_attrs}} } : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168   # merge new attrs into old
169   foreach my $key (qw/join prefetch/) {
170     next unless (exists $attrs->{$key});
171     if ($attrs->{_live_join} || $our_attrs->{_live_join}) {
172       $attrs->{$key} = { ($attrs->{_live_join}) ? $attrs->{_live_join} : $our_attrs->{_live_join} => $attrs->{$key} };
173     }
174     if (exists $our_attrs->{$key}) {
175       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
176     } else {
177       $our_attrs->{$key} = $attrs->{$key};
178     }
179     delete $attrs->{$key};
180   }
181
182   $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $attrs->{_live_join}, 1) if ($attrs->{_live_join});
183   if (exists $our_attrs->{prefetch}) {
184       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
185   }
186
187   my $new_attrs = { %{$our_attrs}, %{$attrs} };
188   my $where = (@_
189                 ? ((@_ == 1 || ref $_[0] eq "HASH")
190                     ? shift
191                     : ((@_ % 2)
192                         ? $self->throw_exception(
193                             "Odd number of arguments to search")
194                         : {@_}))
195                 : undef());
196   if (defined $where) {
197     $new_attrs->{where} = (defined $new_attrs->{where}
198               ? { '-and' =>
199                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
200                       $where, $new_attrs->{where} ] }
201               : $where);
202   }
203
204   if (defined $having) {
205     $new_attrs->{having} = (defined $new_attrs->{having}
206               ? { '-and' =>
207                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
208                       $having, $new_attrs->{having} ] }
209               : $having);
210   }
211
212   my $rs = (ref $self)->new($self->result_source, $new_attrs);
213   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
214
215   unless (@_) { # no search, effectively just a clone
216     my $rows = $self->get_cache;
217     if ($rows) {
218       $rs->set_cache($rows);
219     }
220   }
221   
222   return $rs;
223 }
224
225 =head2 search_literal
226
227 =over 4
228
229 =item Arguments: $sql_fragment, @bind_values
230
231 =item Return Value: $resultset (scalar context), @row_objs (list context)
232
233 =back
234
235   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
236   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
237
238 Pass a literal chunk of SQL to be added to the conditional part of the
239 resultset query.
240
241 =cut
242
243 sub search_literal {
244   my ($self, $cond, @vals) = @_;
245   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
246   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
247   return $self->search(\$cond, $attrs);
248 }
249
250 =head2 find
251
252 =over 4
253
254 =item Arguments: @values | \%cols, \%attrs?
255
256 =item Return Value: $row_object
257
258 =back
259
260 Finds a row based on its primary key or unique constraint. For example, to find
261 a row by its primary key:
262
263   my $cd = $schema->resultset('CD')->find(5);
264
265 You can also find a row by a specific unique constraint using the C<key>
266 attribute. For example:
267
268   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
269
270 Additionally, you can specify the columns explicitly by name:
271
272   my $cd = $schema->resultset('CD')->find(
273     {
274       artist => 'Massive Attack',
275       title  => 'Mezzanine',
276     },
277     { key => 'cd_artist_title' }
278   );
279
280 If the C<key> is specified as C<primary>, it searches only on the primary key.
281
282 If no C<key> is specified, it searches on all unique constraints defined on the
283 source, including the primary key.
284
285 See also L</find_or_create> and L</update_or_create>. For information on how to
286 declare unique constraints, see
287 L<DBIx::Class::ResultSource/add_unique_constraint>.
288
289 =cut
290
291 sub find {
292   my $self = shift;
293   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
294
295   # Default to the primary key, but allow a specific key
296   my @cols = exists $attrs->{key}
297     ? $self->result_source->unique_constraint_columns($attrs->{key})
298     : $self->result_source->primary_columns;
299   $self->throw_exception(
300     "Can't find unless a primary key or unique constraint is defined"
301   ) unless @cols;
302
303   # Parse out a hashref from input
304   my $input_query;
305   if (ref $_[0] eq 'HASH') {
306     $input_query = { %{$_[0]} };
307   }
308   elsif (@_ == @cols) {
309     $input_query = {};
310     @{$input_query}{@cols} = @_;
311   }
312   else {
313     # Compatibility: Allow e.g. find(id => $value)
314     carp "Find by key => value deprecated; please use a hashref instead";
315     $input_query = {@_};
316   }
317
318   my @unique_queries = $self->_unique_queries($input_query, $attrs);
319
320   # Handle cases where the ResultSet defines the query, or where the user is
321   # abusing find
322   my $query = @unique_queries ? \@unique_queries : $input_query;
323
324   # Run the query
325   if (keys %$attrs) {
326     my $rs = $self->search($query, $attrs);
327     $rs->_resolve;
328     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
329   }
330   else {
331     $self->_resolve;  
332     return (keys %{$self->{_attrs}->{collapse}})
333       ? $self->search($query)->next
334       : $self->single($query);
335   }
336 }
337
338 # _unique_queries
339 #
340 # Build a list of queries which satisfy unique constraints.
341
342 sub _unique_queries {
343   my ($self, $query, $attrs) = @_;
344
345   my @constraint_names = exists $attrs->{key}
346     ? ($attrs->{key})
347     : $self->result_source->unique_constraint_names;
348
349   my @unique_queries;
350   foreach my $name (@constraint_names) {
351     my @unique_cols = $self->result_source->unique_constraint_columns($name);
352     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
353
354     next unless scalar keys %$unique_query;
355
356     # Add the ResultSet's alias
357     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
358                         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
359       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
360     }
361
362     push @unique_queries, $unique_query;
363   }
364
365   return @unique_queries;
366 }
367
368 # _build_unique_query
369 #
370 # Constrain the specified query hash based on the specified column names.
371
372 sub _build_unique_query {
373   my ($self, $query, $unique_cols) = @_;
374
375   my %unique_query =
376     map  { $_ => $query->{$_} }
377     grep { exists $query->{$_} }
378     @$unique_cols;
379
380   return \%unique_query;
381 }
382
383 =head2 search_related
384
385 =over 4
386
387 =item Arguments: $cond, \%attrs?
388
389 =item Return Value: $new_resultset
390
391 =back
392
393   $new_rs = $cd_rs->search_related('artist', {
394     name => 'Emo-R-Us',
395   });
396
397 Searches the specified relationship, optionally specifying a condition and
398 attributes for matching records. See L</ATTRIBUTES> for more information.
399
400 =cut
401
402 sub search_related {
403   return shift->related_resultset(shift)->search(@_);
404 }
405
406 =head2 cursor
407
408 =over 4
409
410 =item Arguments: none
411
412 =item Return Value: $cursor
413
414 =back
415
416 Returns a storage-driven cursor to the given resultset. See
417 L<DBIx::Class::Cursor> for more information.
418
419 =cut
420
421 sub cursor {
422   my ($self) = @_;
423
424   $self->_resolve;
425   my $attrs = { %{$self->{_attrs}} };
426   return $self->{cursor}
427     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
428           $attrs->{where},$attrs);
429 }
430
431 =head2 single
432
433 =over 4
434
435 =item Arguments: $cond?
436
437 =item Return Value: $row_object?
438
439 =back
440
441   my $cd = $schema->resultset('CD')->single({ year => 2001 });
442
443 Inflates the first result without creating a cursor if the resultset has
444 any records in it; if not returns nothing. Used by L</find> as an optimisation.
445
446 Can optionally take an additional condition *only* - this is a fast-code-path
447 method; if you need to add extra joins or similar call ->search and then
448 ->single without a condition on the $rs returned from that.
449
450 =cut
451
452 sub single {
453   my ($self, $where) = @_;
454   $self->_resolve;
455   my $attrs = { %{$self->{_attrs}} };
456   if ($where) {
457     if (defined $attrs->{where}) {
458       $attrs->{where} = {
459         '-and' =>
460             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
461                $where, delete $attrs->{where} ]
462       };
463     } else {
464       $attrs->{where} = $where;
465     }
466   }
467
468   unless ($self->_is_unique_query($attrs->{where})) {
469     carp "Query not guarnteed to return a single row"
470       . "; please declare your unique constraints or use search instead";
471   }
472
473   my @data = $self->result_source->storage->select_single(
474           $attrs->{from}, $attrs->{select},
475           $attrs->{where},$attrs);
476   return (@data ? $self->_construct_object(@data) : ());
477 }
478
479 # _is_unique_query
480 #
481 # Try to determine if the specified query is guaranteed to be unique, based on
482 # the declared unique constraints.
483
484 sub _is_unique_query {
485   my ($self, $query) = @_;
486
487   my $collapsed = $self->_collapse_query($query);
488
489         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
490   foreach my $name ($self->result_source->unique_constraint_names) {
491     my @unique_cols = map { "$alias.$_" }
492       $self->result_source->unique_constraint_columns($name);
493
494     # Count the values for each unique column
495     my %seen = map { $_ => 0 } @unique_cols;
496
497     foreach my $key (keys %$collapsed) {
498       my $aliased = $key;
499       $aliased = "$alias.$key" unless $key =~ /\./;
500
501       next unless exists $seen{$aliased};  # Additional constraints are okay
502       $seen{$aliased} = scalar @{ $collapsed->{$key} };
503     }
504
505     # If we get 0 or more than 1 value for a column, it's not necessarily unique
506     return 1 unless grep { $_ != 1 } values %seen;
507   }
508
509   return 0;
510 }
511
512 # _collapse_query
513 #
514 # Recursively collapse the query, accumulating values for each column.
515
516 sub _collapse_query {
517   my ($self, $query, $collapsed) = @_;
518
519   $collapsed ||= {};
520
521   if (ref $query eq 'ARRAY') {
522     foreach my $subquery (@$query) {
523       next unless ref $subquery;  # -or
524 #      warn "ARRAY: " . Dumper $subquery;
525       $collapsed = $self->_collapse_query($subquery, $collapsed);
526     }
527   }
528   elsif (ref $query eq 'HASH') {
529     if (keys %$query and (keys %$query)[0] eq '-and') {
530       foreach my $subquery (@{$query->{-and}}) {
531 #        warn "HASH: " . Dumper $subquery;
532         $collapsed = $self->_collapse_query($subquery, $collapsed);
533       }
534     }
535     else {
536 #      warn "LEAF: " . Dumper $query;
537       foreach my $key (keys %$query) {
538         push @{$collapsed->{$key}}, $query->{$key};
539       }
540     }
541   }
542
543   return $collapsed;
544 }
545
546 =head2 get_column
547
548 =over 4
549
550 =item Arguments: $cond?
551
552 =item Return Value: $resultsetcolumn
553
554 =back
555
556   my $max_length = $rs->get_column('length')->max;
557
558 Returns a ResultSetColumn instance for $column based on $self
559
560 =cut
561
562 sub get_column {
563   my ($self, $column) = @_;
564
565   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
566   return $new;
567 }
568
569 =head2 search_like
570
571 =over 4
572
573 =item Arguments: $cond, \%attrs?
574
575 =item Return Value: $resultset (scalar context), @row_objs (list context)
576
577 =back
578
579   # WHERE title LIKE '%blue%'
580   $cd_rs = $rs->search_like({ title => '%blue%'});
581
582 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
583 that this is simply a convenience method. You most likely want to use
584 L</search> with specific operators.
585
586 For more information, see L<DBIx::Class::Manual::Cookbook>.
587
588 =cut
589
590 sub search_like {
591   my $class = shift;
592   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
593   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
594   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
595   return $class->search($query, { %$attrs });
596 }
597
598 =head2 slice
599
600 =over 4
601
602 =item Arguments: $first, $last
603
604 =item Return Value: $resultset (scalar context), @row_objs (list context)
605
606 =back
607
608 Returns a resultset or object list representing a subset of elements from the
609 resultset slice is called on. Indexes are from 0, i.e., to get the first
610 three records, call:
611
612   my ($one, $two, $three) = $rs->slice(0, 2);
613
614 =cut
615
616 sub slice {
617   my ($self, $min, $max) = @_;
618   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
619   $attrs->{offset} = $self->{attrs}{offset} || 0;
620   $attrs->{offset} += $min;
621   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
622   return $self->search(undef(), $attrs);
623   #my $slice = (ref $self)->new($self->result_source, $attrs);
624   #return (wantarray ? $slice->all : $slice);
625 }
626
627 =head2 next
628
629 =over 4
630
631 =item Arguments: none
632
633 =item Return Value: $result?
634
635 =back
636
637 Returns the next element in the resultset (C<undef> is there is none).
638
639 Can be used to efficiently iterate over records in the resultset:
640
641   my $rs = $schema->resultset('CD')->search;
642   while (my $cd = $rs->next) {
643     print $cd->title;
644   }
645
646 Note that you need to store the resultset object, and call C<next> on it. 
647 Calling C<< resultset('Table')->next >> repeatedly will always return the
648 first record from the resultset.
649
650 =cut
651
652 sub next {
653   my ($self) = @_;
654   if (my $cache = $self->get_cache) {
655     $self->{all_cache_position} ||= 0;
656     return $cache->[$self->{all_cache_position}++];
657   }
658   if ($self->{attrs}{cache}) {
659     $self->{all_cache_position} = 1;
660     return ($self->all)[0];
661   }
662   my @row = (exists $self->{stashed_row} ?
663                @{delete $self->{stashed_row}} :
664                $self->cursor->next
665   );
666   return unless (@row);
667   return $self->_construct_object(@row);
668 }
669
670 sub _resolve {
671   my $self = shift;
672
673   return if(exists $self->{_attrs}); #return if _resolve has already been called
674
675   my $attrs = $self->{attrs};    
676   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
677
678   # XXX - lose storable dclone
679   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
680   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
681   $attrs->{record_filter} = $record_filter if ($record_filter);
682   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
683
684   my $alias = $attrs->{alias};
685  
686   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
687   delete $attrs->{as} if $attrs->{columns};
688   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
689   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
690   $attrs->{select} = [
691                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
692                       ] if $attrs->{columns};
693   $attrs->{as} ||= [
694                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
695                     ];
696   if (my $include = delete $attrs->{include_columns}) {
697       push(@{$attrs->{select}}, @$include);
698       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
699   }
700
701   $attrs->{from} ||= [ { $alias => $source->from } ];
702   $attrs->{seen_join} ||= {};
703   my %seen;
704   if (my $join = delete $attrs->{join}) {
705     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
706       if (ref $j eq 'HASH') {
707         $seen{$_} = 1 foreach keys %$j;
708       } else {
709         $seen{$j} = 1;
710       }
711     }
712     
713     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
714   }
715   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
716   $attrs->{order_by} = [ $attrs->{order_by} ] if
717       $attrs->{order_by} and !ref($attrs->{order_by});
718   $attrs->{order_by} ||= [];
719
720   if(my $seladds = delete($attrs->{'+select'})) {
721     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
722     $attrs->{select} = [
723                         @{ $attrs->{select} },
724                         map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
725                         ];
726   }
727   if(my $asadds = delete($attrs->{'+as'})) {
728     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
729     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
730   }
731   
732   my $collapse = $attrs->{collapse} || {};
733   if (my $prefetch = delete $attrs->{prefetch}) {
734       my @pre_order;
735       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
736           if ( ref $p eq 'HASH' ) {
737               foreach my $key (keys %$p) {
738                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
739                       unless $seen{$key};
740               }
741           } else {
742               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
743                   unless $seen{$p};
744           }
745           my @prefetch = $source->resolve_prefetch(
746                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
747           push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
748           push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
749       }
750       push(@{$attrs->{order_by}}, @pre_order);
751   }
752   $attrs->{collapse} = $collapse;
753   $self->{_attrs} = $attrs;
754 }
755
756 sub _merge_attr {
757   my ($self, $a, $b, $is_prefetch) = @_;
758     
759   return $b unless $a;
760   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
761     foreach my $key (keys %{$b}) {
762       if (exists $a->{$key}) {
763         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
764       } else {
765         $a->{$key} = delete $b->{$key};
766       }
767     }
768     return $a;
769   } else {
770     $a = [$a] unless (ref $a eq 'ARRAY');
771     $b = [$b] unless (ref $b eq 'ARRAY');
772     
773     my $hash = {};
774     my $array = [];      
775     foreach ($a, $b) {
776       foreach my $element (@{$_}) {
777         if (ref $element eq 'HASH') {
778           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
779         } elsif (ref $element eq 'ARRAY') {
780           $array = [@{$array}, @{$element}];
781         } else {        
782           if (($b == $_) && $is_prefetch) {
783             $self->_merge_array($array, $element, $is_prefetch);
784           } else {
785             push(@{$array}, $element);
786           }
787         }
788       }
789     }
790     if ($is_prefetch) {
791       my $final_array = [];
792       foreach my $element (@{$array}) {
793         push(@{$final_array}, $element) unless (exists $hash->{$element});
794       }
795       $array = $final_array;
796     }
797     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
798       return [$hash, @{$array}];
799     } else {    
800       return (keys %{$hash}) ? $hash : $array;
801     }
802   }
803 }
804
805 sub _merge_array {
806   my ($self, $a, $b) = @_;
807   
808   $b = [$b] unless (ref $b eq 'ARRAY');
809   # add elements from @{$b} to @{$a} which aren't already in @{$a}
810   foreach my $b_element (@{$b}) {
811     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
812   }
813 }
814
815 sub _construct_object {
816   my ($self, @row) = @_;
817   my @as = @{ $self->{_attrs}{as} };
818   
819   my $info = $self->_collapse_result(\@as, \@row);
820   my $new = $self->result_class->inflate_result($self->result_source, @$info);
821   $new = $self->{_attrs}{record_filter}->($new)
822     if exists $self->{_attrs}{record_filter};
823   return $new;
824 }
825
826 sub _collapse_result {
827   my ($self, $as, $row, $prefix) = @_;
828
829   my $live_join = $self->{attrs}->{_live_join} ||="";
830   my %const;
831
832   my @copy = @$row;
833   foreach my $this_as (@$as) {
834     my $val = shift @copy;
835     if (defined $prefix) {
836       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
837         my $remain = $1;
838         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
839         $const{$1||''}{$2} = $val;
840       }
841     } else {
842       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
843       $const{$1||''}{$2} = $val;
844     }
845   }
846
847   my $info = [ {}, {} ];
848   foreach my $key (keys %const) {
849     if (length $key && $key ne $live_join) {
850       my $target = $info;
851       my @parts = split(/\./, $key);
852       foreach my $p (@parts) {
853         $target = $target->[1]->{$p} ||= [];
854       }
855       $target->[0] = $const{$key};
856     } else {
857       $info->[0] = $const{$key};
858     }
859   }
860
861   my @collapse;
862   if (defined $prefix) {
863     @collapse = map {
864         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
865     } keys %{$self->{_attrs}->{collapse}}
866   } else {
867     @collapse = keys %{$self->{_attrs}->{collapse}};
868   };
869
870   if (@collapse) {
871     my ($c) = sort { length $a <=> length $b } @collapse;
872     my $target = $info;
873     foreach my $p (split(/\./, $c)) {
874       $target = $target->[1]->{$p} ||= [];
875     }
876     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
877     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
878     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
879     my $tree = $self->_collapse_result($as, $row, $c_prefix);
880     my (@final, @raw);
881     while ( !(grep {
882                 !defined($tree->[0]->{$_}) ||
883                 $co_check{$_} ne $tree->[0]->{$_}
884               } @co_key) ) {
885       push(@final, $tree);
886       last unless (@raw = $self->cursor->next);
887       $row = $self->{stashed_row} = \@raw;
888       $tree = $self->_collapse_result($as, $row, $c_prefix);
889     }
890     @$target = (@final ? @final : [ {}, {} ]); 
891       # single empty result to indicate an empty prefetched has_many
892   }
893   return $info;
894 }
895
896 =head2 result_source
897
898 =over 4
899
900 =item Arguments: $result_source?
901
902 =item Return Value: $result_source
903
904 =back
905
906 An accessor for the primary ResultSource object from which this ResultSet
907 is derived.
908
909 =cut
910
911
912 =head2 count
913
914 =over 4
915
916 =item Arguments: $cond, \%attrs??
917
918 =item Return Value: $count
919
920 =back
921
922 Performs an SQL C<COUNT> with the same query as the resultset was built
923 with to find the number of elements. If passed arguments, does a search
924 on the resultset and counts the results of that.
925
926 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
927 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
928 not support C<DISTINCT> with multiple columns. If you are using such a
929 database, you should only use columns from the main table in your C<group_by>
930 clause.
931
932 =cut
933
934 sub count {
935   my $self = shift;
936   return $self->search(@_)->count if @_ and defined $_[0];
937   return scalar @{ $self->get_cache } if $self->get_cache;
938   my $count = $self->_count;
939   return 0 unless $count;
940
941   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
942   $count = $self->{attrs}{rows} if
943     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
944   return $count;
945 }
946
947 sub _count { # Separated out so pager can get the full count
948   my $self = shift;
949   my $select = { count => '*' };
950   
951   $self->_resolve;
952   my $attrs = { %{ $self->{_attrs} } };
953   if (my $group_by = delete $attrs->{group_by}) {
954     delete $attrs->{having};
955     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
956     # todo: try CONCAT for multi-column pk
957     my @pk = $self->result_source->primary_columns;
958     if (@pk == 1) {
959       foreach my $column (@distinct) {
960         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
961           @distinct = ($column);
962           last;
963         }
964       }
965     }
966
967     $select = { count => { distinct => \@distinct } };
968   }
969
970   $attrs->{select} = $select;
971   $attrs->{as} = [qw/count/];
972
973   # offset, order by and page are not needed to count. record_filter is cdbi
974   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
975         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
976         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
977
978   my ($count) = $tmp_rs->cursor->next;
979   return $count;
980 }
981
982 =head2 count_literal
983
984 =over 4
985
986 =item Arguments: $sql_fragment, @bind_values
987
988 =item Return Value: $count
989
990 =back
991
992 Counts the results in a literal query. Equivalent to calling L</search_literal>
993 with the passed arguments, then L</count>.
994
995 =cut
996
997 sub count_literal { shift->search_literal(@_)->count; }
998
999 =head2 all
1000
1001 =over 4
1002
1003 =item Arguments: none
1004
1005 =item Return Value: @objects
1006
1007 =back
1008
1009 Returns all elements in the resultset. Called implicitly if the resultset
1010 is returned in list context.
1011
1012 =cut
1013
1014 sub all {
1015   my ($self) = @_;
1016   return @{ $self->get_cache } if $self->get_cache;
1017
1018   my @obj;
1019
1020   # TODO: don't call resolve here
1021   $self->_resolve;
1022   if (keys %{$self->{_attrs}->{collapse}}) {
1023 #  if ($self->{attrs}->{prefetch}) {
1024       # Using $self->cursor->all is really just an optimisation.
1025       # If we're collapsing has_many prefetches it probably makes
1026       # very little difference, and this is cleaner than hacking
1027       # _construct_object to survive the approach
1028     my @row = $self->cursor->next;
1029     while (@row) {
1030       push(@obj, $self->_construct_object(@row));
1031       @row = (exists $self->{stashed_row}
1032                ? @{delete $self->{stashed_row}}
1033                : $self->cursor->next);
1034     }
1035   } else {
1036     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1037   }
1038
1039   $self->set_cache(\@obj) if $self->{attrs}{cache};
1040   return @obj;
1041 }
1042
1043 =head2 reset
1044
1045 =over 4
1046
1047 =item Arguments: none
1048
1049 =item Return Value: $self
1050
1051 =back
1052
1053 Resets the resultset's cursor, so you can iterate through the elements again.
1054
1055 =cut
1056
1057 sub reset {
1058   my ($self) = @_;
1059   delete $self->{_attrs} if (exists $self->{_attrs});
1060
1061   $self->{all_cache_position} = 0;
1062   $self->cursor->reset;
1063   return $self;
1064 }
1065
1066 =head2 first
1067
1068 =over 4
1069
1070 =item Arguments: none
1071
1072 =item Return Value: $object?
1073
1074 =back
1075
1076 Resets the resultset and returns an object for the first result (if the
1077 resultset returns anything).
1078
1079 =cut
1080
1081 sub first {
1082   return $_[0]->reset->next;
1083 }
1084
1085 # _cond_for_update_delete
1086 #
1087 # update/delete require the condition to be modified to handle
1088 # the differing SQL syntax available.  This transforms the $self->{cond}
1089 # appropriately, returning the new condition.
1090
1091 sub _cond_for_update_delete {
1092   my ($self) = @_;
1093   my $cond = {};
1094
1095   if (!ref($self->{cond})) {
1096     # No-op. No condition, we're updating/deleting everything
1097   }
1098   elsif (ref $self->{cond} eq 'ARRAY') {
1099     $cond = [
1100       map {
1101         my %hash;
1102         foreach my $key (keys %{$_}) {
1103           $key =~ /([^.]+)$/;
1104           $hash{$1} = $_->{$key};
1105         }
1106         \%hash;
1107       } @{$self->{cond}}
1108     ];
1109   }
1110   elsif (ref $self->{cond} eq 'HASH') {
1111     if ((keys %{$self->{cond}})[0] eq '-and') {
1112       $cond->{-and} = [];
1113
1114       my @cond = @{$self->{cond}{-and}};
1115       for (my $i = 0; $i <= @cond - 1; $i++) {
1116         my $entry = $cond[$i];
1117
1118         my %hash;
1119         if (ref $entry eq 'HASH') {
1120           foreach my $key (keys %{$entry}) {
1121             $key =~ /([^.]+)$/;
1122             $hash{$1} = $entry->{$key};
1123           }
1124         }
1125         else {
1126           $entry =~ /([^.]+)$/;
1127           $hash{$1} = $cond[++$i];
1128         }
1129
1130         push @{$cond->{-and}}, \%hash;
1131       }
1132     }
1133     else {
1134       foreach my $key (keys %{$self->{cond}}) {
1135         $key =~ /([^.]+)$/;
1136         $cond->{$1} = $self->{cond}{$key};
1137       }
1138     }
1139   }
1140   else {
1141     $self->throw_exception(
1142       "Can't update/delete on resultset with condition unless hash or array"
1143     );
1144   }
1145
1146   return $cond;
1147 }
1148
1149
1150 =head2 update
1151
1152 =over 4
1153
1154 =item Arguments: \%values
1155
1156 =item Return Value: $storage_rv
1157
1158 =back
1159
1160 Sets the specified columns in the resultset to the supplied values in a
1161 single query. Return value will be true if the update succeeded or false
1162 if no records were updated; exact type of success value is storage-dependent.
1163
1164 =cut
1165
1166 sub update {
1167   my ($self, $values) = @_;
1168   $self->throw_exception("Values for update must be a hash")
1169     unless ref $values eq 'HASH';
1170
1171   my $cond = $self->_cond_for_update_delete;
1172
1173   return $self->result_source->storage->update(
1174     $self->result_source->from, $values, $cond
1175   );
1176 }
1177
1178 =head2 update_all
1179
1180 =over 4
1181
1182 =item Arguments: \%values
1183
1184 =item Return Value: 1
1185
1186 =back
1187
1188 Fetches all objects and updates them one at a time. Note that C<update_all>
1189 will run DBIC cascade triggers, while L</update> will not.
1190
1191 =cut
1192
1193 sub update_all {
1194   my ($self, $values) = @_;
1195   $self->throw_exception("Values for update must be a hash")
1196     unless ref $values eq 'HASH';
1197   foreach my $obj ($self->all) {
1198     $obj->set_columns($values)->update;
1199   }
1200   return 1;
1201 }
1202
1203 =head2 delete
1204
1205 =over 4
1206
1207 =item Arguments: none
1208
1209 =item Return Value: 1
1210
1211 =back
1212
1213 Deletes the contents of the resultset from its result source. Note that this
1214 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1215 to run.
1216
1217 =cut
1218
1219 sub delete {
1220   my ($self) = @_;
1221   my $del = {};
1222
1223   my $cond = $self->_cond_for_update_delete;
1224
1225   $self->result_source->storage->delete($self->result_source->from, $cond);
1226   return 1;
1227 }
1228
1229 =head2 delete_all
1230
1231 =over 4
1232
1233 =item Arguments: none
1234
1235 =item Return Value: 1
1236
1237 =back
1238
1239 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1240 will run DBIC cascade triggers, while L</delete> will not.
1241
1242 =cut
1243
1244 sub delete_all {
1245   my ($self) = @_;
1246   $_->delete for $self->all;
1247   return 1;
1248 }
1249
1250 =head2 pager
1251
1252 =over 4
1253
1254 =item Arguments: none
1255
1256 =item Return Value: $pager
1257
1258 =back
1259
1260 Return Value a L<Data::Page> object for the current resultset. Only makes
1261 sense for queries with a C<page> attribute.
1262
1263 =cut
1264
1265 sub pager {
1266   my ($self) = @_;
1267   my $attrs = $self->{attrs};
1268   $self->throw_exception("Can't create pager for non-paged rs")
1269     unless $self->{page};
1270   $attrs->{rows} ||= 10;
1271   return $self->{pager} ||= Data::Page->new(
1272     $self->_count, $attrs->{rows}, $self->{page});
1273 }
1274
1275 =head2 page
1276
1277 =over 4
1278
1279 =item Arguments: $page_number
1280
1281 =item Return Value: $rs
1282
1283 =back
1284
1285 Returns a resultset for the $page_number page of the resultset on which page
1286 is called, where each page contains a number of rows equal to the 'rows'
1287 attribute set on the resultset (10 by default).
1288
1289 =cut
1290
1291 sub page {
1292   my ($self, $page) = @_;
1293   my $attrs = { %{$self->{attrs}} };
1294   $attrs->{page} = $page;
1295   return (ref $self)->new($self->result_source, $attrs);
1296 }
1297
1298 =head2 new_result
1299
1300 =over 4
1301
1302 =item Arguments: \%vals
1303
1304 =item Return Value: $object
1305
1306 =back
1307
1308 Creates an object in the resultset's result class and returns it.
1309
1310 =cut
1311
1312 sub new_result {
1313   my ($self, $values) = @_;
1314   $self->throw_exception( "new_result needs a hash" )
1315     unless (ref $values eq 'HASH');
1316   $self->throw_exception(
1317     "Can't abstract implicit construct, condition not a hash"
1318   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1319   my %new = %$values;
1320   my $alias = $self->{attrs}{alias};
1321   foreach my $key (keys %{$self->{cond}||{}}) {
1322     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1323   }
1324   my $obj = $self->result_class->new(\%new);
1325   $obj->result_source($self->result_source) if $obj->can('result_source');
1326   return $obj;
1327 }
1328
1329 =head2 find_or_new
1330
1331 =over 4
1332
1333 =item Arguments: \%vals, \%attrs?
1334
1335 =item Return Value: $object
1336
1337 =back
1338
1339 Find an existing record from this resultset. If none exists, instantiate a new
1340 result object and return it. The object will not be saved into your storage
1341 until you call L<DBIx::Class::Row/insert> on it.
1342
1343 If you want objects to be saved immediately, use L</find_or_create> instead.
1344
1345 =cut
1346
1347 sub find_or_new {
1348   my $self     = shift;
1349   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1350   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1351   my $exists   = $self->find($hash, $attrs);
1352   return defined $exists ? $exists : $self->new_result($hash);
1353 }
1354
1355 =head2 create
1356
1357 =over 4
1358
1359 =item Arguments: \%vals
1360
1361 =item Return Value: $object
1362
1363 =back
1364
1365 Inserts a record into the resultset and returns the object representing it.
1366
1367 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1368
1369 =cut
1370
1371 sub create {
1372   my ($self, $attrs) = @_;
1373   $self->throw_exception( "create needs a hashref" )
1374     unless ref $attrs eq 'HASH';
1375   return $self->new_result($attrs)->insert;
1376 }
1377
1378 =head2 find_or_create
1379
1380 =over 4
1381
1382 =item Arguments: \%vals, \%attrs?
1383
1384 =item Return Value: $object
1385
1386 =back
1387
1388   $class->find_or_create({ key => $val, ... });
1389
1390 Tries to find a record based on its primary key or unique constraint; if none
1391 is found, creates one and returns that instead.
1392
1393   my $cd = $schema->resultset('CD')->find_or_create({
1394     cdid   => 5,
1395     artist => 'Massive Attack',
1396     title  => 'Mezzanine',
1397     year   => 2005,
1398   });
1399
1400 Also takes an optional C<key> attribute, to search by a specific key or unique
1401 constraint. For example:
1402
1403   my $cd = $schema->resultset('CD')->find_or_create(
1404     {
1405       artist => 'Massive Attack',
1406       title  => 'Mezzanine',
1407     },
1408     { key => 'cd_artist_title' }
1409   );
1410
1411 See also L</find> and L</update_or_create>. For information on how to declare
1412 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1413
1414 =cut
1415
1416 sub find_or_create {
1417   my $self     = shift;
1418   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1419   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1420   my $exists   = $self->find($hash, $attrs);
1421   return defined $exists ? $exists : $self->create($hash);
1422 }
1423
1424 =head2 update_or_create
1425
1426 =over 4
1427
1428 =item Arguments: \%col_values, { key => $unique_constraint }?
1429
1430 =item Return Value: $object
1431
1432 =back
1433
1434   $class->update_or_create({ col => $val, ... });
1435
1436 First, searches for an existing row matching one of the unique constraints
1437 (including the primary key) on the source of this resultset. If a row is
1438 found, updates it with the other given column values. Otherwise, creates a new
1439 row.
1440
1441 Takes an optional C<key> attribute to search on a specific unique constraint.
1442 For example:
1443
1444   # In your application
1445   my $cd = $schema->resultset('CD')->update_or_create(
1446     {
1447       artist => 'Massive Attack',
1448       title  => 'Mezzanine',
1449       year   => 1998,
1450     },
1451     { key => 'cd_artist_title' }
1452   );
1453
1454 If no C<key> is specified, it searches on all unique constraints defined on the
1455 source, including the primary key.
1456
1457 If the C<key> is specified as C<primary>, it searches only on the primary key.
1458
1459 See also L</find> and L</find_or_create>. For information on how to declare
1460 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1461
1462 =cut
1463
1464 sub update_or_create {
1465   my $self = shift;
1466   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1467   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1468
1469   my $row = $self->find($cond);
1470   if (defined $row) {
1471     $row->update($cond);
1472     return $row;
1473   }
1474
1475   return $self->create($cond);
1476 }
1477
1478 =head2 get_cache
1479
1480 =over 4
1481
1482 =item Arguments: none
1483
1484 =item Return Value: \@cache_objects?
1485
1486 =back
1487
1488 Gets the contents of the cache for the resultset, if the cache is set.
1489
1490 =cut
1491
1492 sub get_cache {
1493   shift->{all_cache};
1494 }
1495
1496 =head2 set_cache
1497
1498 =over 4
1499
1500 =item Arguments: \@cache_objects
1501
1502 =item Return Value: \@cache_objects
1503
1504 =back
1505
1506 Sets the contents of the cache for the resultset. Expects an arrayref
1507 of objects of the same class as those produced by the resultset. Note that
1508 if the cache is set the resultset will return the cached objects rather
1509 than re-querying the database even if the cache attr is not set.
1510
1511 =cut
1512
1513 sub set_cache {
1514   my ( $self, $data ) = @_;
1515   $self->throw_exception("set_cache requires an arrayref")
1516       if defined($data) && (ref $data ne 'ARRAY');
1517   $self->{all_cache} = $data;
1518 }
1519
1520 =head2 clear_cache
1521
1522 =over 4
1523
1524 =item Arguments: none
1525
1526 =item Return Value: []
1527
1528 =back
1529
1530 Clears the cache for the resultset.
1531
1532 =cut
1533
1534 sub clear_cache {
1535   shift->set_cache(undef);
1536 }
1537
1538 =head2 related_resultset
1539
1540 =over 4
1541
1542 =item Arguments: $relationship_name
1543
1544 =item Return Value: $resultset
1545
1546 =back
1547
1548 Returns a related resultset for the supplied relationship name.
1549
1550   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1551
1552 =cut
1553
1554 sub related_resultset {
1555   my ( $self, $rel ) = @_;
1556   
1557   $self->{related_resultsets} ||= {};
1558   return $self->{related_resultsets}{$rel} ||= do {
1559     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1560     my $rel_obj = $self->result_source->relationship_info($rel);
1561     $self->throw_exception(
1562         "search_related: result source '" . $self->result_source->name .
1563         "' has no such relationship ${rel}")
1564         unless $rel_obj; #die Dumper $self->{attrs};
1565
1566     my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1567            )->search( undef,
1568                       { select => undef,
1569                         as => undef,
1570                         #join => $rel,
1571                         _live_join => $rel,
1572                         _parent_attrs => $self->{attrs}}
1573                       );
1574     
1575     # keep reference of the original resultset
1576     $rs->{_parent_rs} = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->result_source;
1577     return $rs;
1578   };
1579 }
1580
1581 =head2 throw_exception
1582
1583 See L<DBIx::Class::Schema/throw_exception> for details.
1584
1585 =cut
1586
1587 sub throw_exception {
1588   my $self=shift;
1589   $self->result_source->schema->throw_exception(@_);
1590 }
1591
1592 # XXX: FIXME: Attributes docs need clearing up
1593
1594 =head1 ATTRIBUTES
1595
1596 The resultset takes various attributes that modify its behavior. Here's an
1597 overview of them:
1598
1599 =head2 order_by
1600
1601 =over 4
1602
1603 =item Value: ($order_by | \@order_by)
1604
1605 =back
1606
1607 Which column(s) to order the results by. This is currently passed
1608 through directly to SQL, so you can give e.g. C<year DESC> for a
1609 descending order on the column `year'.
1610
1611 Please note that if you have quoting enabled (see 
1612 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1613 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1614 so you will need to manually quote things as appropriate.)
1615
1616 =head2 columns
1617
1618 =over 4
1619
1620 =item Value: \@columns
1621
1622 =back
1623
1624 Shortcut to request a particular set of columns to be retrieved.  Adds
1625 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1626 from that, then auto-populates C<as> from C<select> as normal. (You may also
1627 use the C<cols> attribute, as in earlier versions of DBIC.)
1628
1629 =head2 include_columns
1630
1631 =over 4
1632
1633 =item Value: \@columns
1634
1635 =back
1636
1637 Shortcut to include additional columns in the returned results - for example
1638
1639   $schema->resultset('CD')->search(undef, {
1640     include_columns => ['artist.name'],
1641     join => ['artist']
1642   });
1643
1644 would return all CDs and include a 'name' column to the information
1645 passed to object inflation
1646
1647 =head2 select
1648
1649 =over 4
1650
1651 =item Value: \@select_columns
1652
1653 =back
1654
1655 Indicates which columns should be selected from the storage. You can use
1656 column names, or in the case of RDBMS back ends, function or stored procedure
1657 names:
1658
1659   $rs = $schema->resultset('Employee')->search(undef, {
1660     select => [
1661       'name',
1662       { count => 'employeeid' },
1663       { sum => 'salary' }
1664     ]
1665   });
1666
1667 When you use function/stored procedure names and do not supply an C<as>
1668 attribute, the column names returned are storage-dependent. E.g. MySQL would
1669 return a column named C<count(employeeid)> in the above example.
1670
1671 =head2 +select
1672
1673 =over 4
1674
1675 Indicates additional columns to be selected from storage.  Works the same as
1676 L<select> but adds columns to the selection.
1677
1678 =back
1679
1680 =head2 +as
1681
1682 =over 4
1683
1684 Indicates additional column names for those added via L<+select>.
1685
1686 =back
1687
1688 =head2 as
1689
1690 =over 4
1691
1692 =item Value: \@inflation_names
1693
1694 =back
1695
1696 Indicates column names for object inflation. This is used in conjunction with
1697 C<select>, usually when C<select> contains one or more function or stored
1698 procedure names:
1699
1700   $rs = $schema->resultset('Employee')->search(undef, {
1701     select => [
1702       'name',
1703       { count => 'employeeid' }
1704     ],
1705     as => ['name', 'employee_count'],
1706   });
1707
1708   my $employee = $rs->first(); # get the first Employee
1709
1710 If the object against which the search is performed already has an accessor
1711 matching a column name specified in C<as>, the value can be retrieved using
1712 the accessor as normal:
1713
1714   my $name = $employee->name();
1715
1716 If on the other hand an accessor does not exist in the object, you need to
1717 use C<get_column> instead:
1718
1719   my $employee_count = $employee->get_column('employee_count');
1720
1721 You can create your own accessors if required - see
1722 L<DBIx::Class::Manual::Cookbook> for details.
1723
1724 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1725 produced, it is used for internal access only. Thus attempting to use the accessor
1726 in an C<order_by> clause or similar will fail misrably.
1727
1728 =head2 join
1729
1730 =over 4
1731
1732 =item Value: ($rel_name | \@rel_names | \%rel_names)
1733
1734 =back
1735
1736 Contains a list of relationships that should be joined for this query.  For
1737 example:
1738
1739   # Get CDs by Nine Inch Nails
1740   my $rs = $schema->resultset('CD')->search(
1741     { 'artist.name' => 'Nine Inch Nails' },
1742     { join => 'artist' }
1743   );
1744
1745 Can also contain a hash reference to refer to the other relation's relations.
1746 For example:
1747
1748   package MyApp::Schema::Track;
1749   use base qw/DBIx::Class/;
1750   __PACKAGE__->table('track');
1751   __PACKAGE__->add_columns(qw/trackid cd position title/);
1752   __PACKAGE__->set_primary_key('trackid');
1753   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1754   1;
1755
1756   # In your application
1757   my $rs = $schema->resultset('Artist')->search(
1758     { 'track.title' => 'Teardrop' },
1759     {
1760       join     => { cd => 'track' },
1761       order_by => 'artist.name',
1762     }
1763   );
1764
1765 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1766 similarly for a third time). For e.g.
1767
1768   my $rs = $schema->resultset('Artist')->search({
1769     'cds.title'   => 'Down to Earth',
1770     'cds_2.title' => 'Popular',
1771   }, {
1772     join => [ qw/cds cds/ ],
1773   });
1774
1775 will return a set of all artists that have both a cd with title 'Down
1776 to Earth' and a cd with title 'Popular'.
1777
1778 If you want to fetch related objects from other tables as well, see C<prefetch>
1779 below.
1780
1781 =head2 prefetch
1782
1783 =over 4
1784
1785 =item Value: ($rel_name | \@rel_names | \%rel_names)
1786
1787 =back
1788
1789 Contains one or more relationships that should be fetched along with the main
1790 query (when they are accessed afterwards they will have already been
1791 "prefetched").  This is useful for when you know you will need the related
1792 objects, because it saves at least one query:
1793
1794   my $rs = $schema->resultset('Tag')->search(
1795     undef,
1796     {
1797       prefetch => {
1798         cd => 'artist'
1799       }
1800     }
1801   );
1802
1803 The initial search results in SQL like the following:
1804
1805   SELECT tag.*, cd.*, artist.* FROM tag
1806   JOIN cd ON tag.cd = cd.cdid
1807   JOIN artist ON cd.artist = artist.artistid
1808
1809 L<DBIx::Class> has no need to go back to the database when we access the
1810 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1811 case.
1812
1813 Simple prefetches will be joined automatically, so there is no need
1814 for a C<join> attribute in the above search. If you're prefetching to
1815 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1816 specify the join as well.
1817
1818 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1819 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1820 with an accessor type of 'single' or 'filter').
1821
1822 =head2 page
1823
1824 =over 4
1825
1826 =item Value: $page
1827
1828 =back
1829
1830 Makes the resultset paged and specifies the page to retrieve. Effectively
1831 identical to creating a non-pages resultset and then calling ->page($page)
1832 on it. 
1833
1834 If L<rows> attribute is not specified it defualts to 10 rows per page.
1835
1836 =head2 rows
1837
1838 =over 4
1839
1840 =item Value: $rows
1841
1842 =back
1843
1844 Specifes the maximum number of rows for direct retrieval or the number of
1845 rows per page if the page attribute or method is used.
1846
1847 =head2 offset
1848
1849 =over 4
1850
1851 =item Value: $offset
1852
1853 =back
1854
1855 Specifies the (zero-based) row number for the  first row to be returned, or the
1856 of the first row of the first page if paging is used.
1857
1858 =head2 group_by
1859
1860 =over 4
1861
1862 =item Value: \@columns
1863
1864 =back
1865
1866 A arrayref of columns to group by. Can include columns of joined tables.
1867
1868   group_by => [qw/ column1 column2 ... /]
1869
1870 =head2 having
1871
1872 =over 4
1873
1874 =item Value: $condition
1875
1876 =back
1877
1878 HAVING is a select statement attribute that is applied between GROUP BY and
1879 ORDER BY. It is applied to the after the grouping calculations have been
1880 done. 
1881
1882   having => { 'count(employee)' => { '>=', 100 } }
1883
1884 =head2 distinct
1885
1886 =over 4
1887
1888 =item Value: (0 | 1)
1889
1890 =back
1891
1892 Set to 1 to group by all columns.
1893
1894 =head2 cache
1895
1896 Set to 1 to cache search results. This prevents extra SQL queries if you
1897 revisit rows in your ResultSet:
1898
1899   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1900   
1901   while( my $artist = $resultset->next ) {
1902     ... do stuff ...
1903   }
1904
1905   $rs->first; # without cache, this would issue a query
1906
1907 By default, searches are not cached.
1908
1909 For more examples of using these attributes, see
1910 L<DBIx::Class::Manual::Cookbook>.
1911
1912 =head2 from
1913
1914 =over 4
1915
1916 =item Value: \@from_clause
1917
1918 =back
1919
1920 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1921 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1922 clauses.
1923
1924 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1925
1926 C<join> will usually do what you need and it is strongly recommended that you
1927 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1928 And we really do mean "cannot", not just tried and failed. Attempting to use
1929 this because you're having problems with C<join> is like trying to use x86
1930 ASM because you've got a syntax error in your C. Trust us on this.
1931
1932 Now, if you're still really, really sure you need to use this (and if you're
1933 not 100% sure, ask the mailing list first), here's an explanation of how this
1934 works.
1935
1936 The syntax is as follows -
1937
1938   [
1939     { <alias1> => <table1> },
1940     [
1941       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1942       [], # nested JOIN (optional)
1943       { <table1.column1> => <table2.column2>, ... (more conditions) },
1944     ],
1945     # More of the above [ ] may follow for additional joins
1946   ]
1947
1948   <table1> <alias1>
1949   JOIN
1950     <table2> <alias2>
1951     [JOIN ...]
1952   ON <table1.column1> = <table2.column2>
1953   <more joins may follow>
1954
1955 An easy way to follow the examples below is to remember the following:
1956
1957     Anything inside "[]" is a JOIN
1958     Anything inside "{}" is a condition for the enclosing JOIN
1959
1960 The following examples utilize a "person" table in a family tree application.
1961 In order to express parent->child relationships, this table is self-joined:
1962
1963     # Person->belongs_to('father' => 'Person');
1964     # Person->belongs_to('mother' => 'Person');
1965
1966 C<from> can be used to nest joins. Here we return all children with a father,
1967 then search against all mothers of those children:
1968
1969   $rs = $schema->resultset('Person')->search(
1970       undef,
1971       {
1972           alias => 'mother', # alias columns in accordance with "from"
1973           from => [
1974               { mother => 'person' },
1975               [
1976                   [
1977                       { child => 'person' },
1978                       [
1979                           { father => 'person' },
1980                           { 'father.person_id' => 'child.father_id' }
1981                       ]
1982                   ],
1983                   { 'mother.person_id' => 'child.mother_id' }
1984               ],
1985           ]
1986       },
1987   );
1988
1989   # Equivalent SQL:
1990   # SELECT mother.* FROM person mother
1991   # JOIN (
1992   #   person child
1993   #   JOIN person father
1994   #   ON ( father.person_id = child.father_id )
1995   # )
1996   # ON ( mother.person_id = child.mother_id )
1997
1998 The type of any join can be controlled manually. To search against only people
1999 with a father in the person table, we could explicitly use C<INNER JOIN>:
2000
2001     $rs = $schema->resultset('Person')->search(
2002         undef,
2003         {
2004             alias => 'child', # alias columns in accordance with "from"
2005             from => [
2006                 { child => 'person' },
2007                 [
2008                     { father => 'person', -join_type => 'inner' },
2009                     { 'father.id' => 'child.father_id' }
2010                 ],
2011             ]
2012         },
2013     );
2014
2015     # Equivalent SQL:
2016     # SELECT child.* FROM person child
2017     # INNER JOIN person father ON child.father_id = father.id
2018
2019 =cut
2020
2021 1;