docced where attribute in resultset
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14 use Data::Dumper;
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs})
166     ? { %{$attrs->{_parent_attrs}} }
167       : { %{$self->{attrs}} };
168   delete($attrs->{_parent_attrs}) if(exists $attrs->{_parent_attrs});  
169   my $having = delete $our_attrs->{having};
170
171   # XXX should only maintain _live_join_stack and generate _live_join_h from that  
172   if ($attrs->{_live_join_stack}) {
173     my $live_join = $attrs->{_live_join_stack};
174     foreach (reverse @{$live_join}) {
175       $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
176     }
177   }
178
179   # merge new attrs into inherited
180   foreach my $key (qw/join prefetch/) {
181     next unless (exists $attrs->{$key});
182     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
183       my $live_join = $attrs->{_live_join_stack} ||
184                       $our_attrs->{_live_join_stack};
185       foreach (reverse @{$live_join}) {
186         $attrs->{$key} = { $_ => $attrs->{$key} };
187       }
188     }
189
190     if (exists $our_attrs->{$key}) {
191       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
192     } else {
193       $our_attrs->{$key} = $attrs->{$key};
194     }
195     delete $attrs->{$key};
196   }
197
198   $our_attrs->{join} = $self->_merge_attr(
199     $our_attrs->{join}, $attrs->{_live_join_h}
200   ) if ($attrs->{_live_join_h});
201
202   if (defined $our_attrs->{prefetch}) {
203     $our_attrs->{join} = $self->_merge_attr(
204       $our_attrs->{join}, $our_attrs->{prefetch}
205     );
206   }
207
208   my $new_attrs = { %{$our_attrs}, %{$attrs} };
209   my $where = (@_
210     ? (
211         (@_ == 1 || ref $_[0] eq "HASH")
212           ? shift
213           : (
214               (@_ % 2)
215                 ? $self->throw_exception("Odd number of arguments to search")
216                 : {@_}
217              )
218        )
219     : undef()
220   );
221
222   if (defined $where) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $where, $new_attrs->{where}
229             ]
230           }
231         : $where);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->result_source, $new_attrs);
247   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
248
249   unless (@_) {                 # no search, effectively just a clone
250     my $rows = $self->get_cache;
251     if ($rows) {
252       $rs->set_cache($rows);
253     }
254   }
255   return $rs;
256 }
257
258 =head2 search_literal
259
260 =over 4
261
262 =item Arguments: $sql_fragment, @bind_values
263
264 =item Return Value: $resultset (scalar context), @row_objs (list context)
265
266 =back
267
268   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
269   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
270
271 Pass a literal chunk of SQL to be added to the conditional part of the
272 resultset query.
273
274 =cut
275
276 sub search_literal {
277   my ($self, $cond, @vals) = @_;
278   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
279   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
280   return $self->search(\$cond, $attrs);
281 }
282
283 =head2 find
284
285 =over 4
286
287 =item Arguments: @values | \%cols, \%attrs?
288
289 =item Return Value: $row_object
290
291 =back
292
293 Finds a row based on its primary key or unique constraint. For example, to find
294 a row by its primary key:
295
296   my $cd = $schema->resultset('CD')->find(5);
297
298 You can also find a row by a specific unique constraint using the C<key>
299 attribute. For example:
300
301   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
302     key => 'cd_artist_title'
303   });
304
305 Additionally, you can specify the columns explicitly by name:
306
307   my $cd = $schema->resultset('CD')->find(
308     {
309       artist => 'Massive Attack',
310       title  => 'Mezzanine',
311     },
312     { key => 'cd_artist_title' }
313   );
314
315 If the C<key> is specified as C<primary>, it searches only on the primary key.
316
317 If no C<key> is specified, it searches on all unique constraints defined on the
318 source, including the primary key.
319
320 See also L</find_or_create> and L</update_or_create>. For information on how to
321 declare unique constraints, see
322 L<DBIx::Class::ResultSource/add_unique_constraint>.
323
324 =cut
325
326 sub find {
327   my $self = shift;
328   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
329
330   # Default to the primary key, but allow a specific key
331   my @cols = exists $attrs->{key}
332     ? $self->result_source->unique_constraint_columns($attrs->{key})
333     : $self->result_source->primary_columns;
334   $self->throw_exception(
335     "Can't find unless a primary key or unique constraint is defined"
336   ) unless @cols;
337
338   # Parse out a hashref from input
339   my $input_query;
340   if (ref $_[0] eq 'HASH') {
341     $input_query = { %{$_[0]} };
342   }
343   elsif (@_ == @cols) {
344     $input_query = {};
345     @{$input_query}{@cols} = @_;
346   }
347   else {
348     # Compatibility: Allow e.g. find(id => $value)
349     carp "Find by key => value deprecated; please use a hashref instead";
350     $input_query = {@_};
351   }
352
353   my @unique_queries = $self->_unique_queries($input_query, $attrs);
354
355   # Handle cases where the ResultSet defines the query, or where the user is
356   # abusing find
357   my $query = @unique_queries ? \@unique_queries : $input_query;
358
359   # Run the query
360   if (keys %$attrs) {
361     my $rs = $self->search($query, $attrs);
362     $rs->_resolve;
363     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
364   }
365   else {
366     $self->_resolve;  
367     return (keys %{$self->{_attrs}->{collapse}})
368       ? $self->search($query)->next
369       : $self->single($query);
370   }
371 }
372
373 # _unique_queries
374 #
375 # Build a list of queries which satisfy unique constraints.
376
377 sub _unique_queries {
378   my ($self, $query, $attrs) = @_;
379
380   my @constraint_names = exists $attrs->{key}
381     ? ($attrs->{key})
382     : $self->result_source->unique_constraint_names;
383
384   my @unique_queries;
385   foreach my $name (@constraint_names) {
386     my @unique_cols = $self->result_source->unique_constraint_columns($name);
387     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
388
389     next unless scalar keys %$unique_query;
390
391     # Add the ResultSet's alias
392     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
393       my $alias = ($self->{attrs}->{_live_join})
394         ? $self->{attrs}->{_live_join}
395         : $self->{attrs}->{alias};
396       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
397     }
398
399     push @unique_queries, $unique_query;
400   }
401
402   return @unique_queries;
403 }
404
405 # _build_unique_query
406 #
407 # Constrain the specified query hash based on the specified column names.
408
409 sub _build_unique_query {
410   my ($self, $query, $unique_cols) = @_;
411
412   my %unique_query =
413     map  { $_ => $query->{$_} }
414     grep { exists $query->{$_} }
415     @$unique_cols;
416
417   return \%unique_query;
418 }
419
420 =head2 search_related
421
422 =over 4
423
424 =item Arguments: $rel, $cond, \%attrs?
425
426 =item Return Value: $new_resultset
427
428 =back
429
430   $new_rs = $cd_rs->search_related('artist', {
431     name => 'Emo-R-Us',
432   });
433
434 Searches the specified relationship, optionally specifying a condition and
435 attributes for matching records. See L</ATTRIBUTES> for more information.
436
437 =cut
438
439 sub search_related {
440   return shift->related_resultset(shift)->search(@_);
441 }
442
443 =head2 cursor
444
445 =over 4
446
447 =item Arguments: none
448
449 =item Return Value: $cursor
450
451 =back
452
453 Returns a storage-driven cursor to the given resultset. See
454 L<DBIx::Class::Cursor> for more information.
455
456 =cut
457
458 sub cursor {
459   my ($self) = @_;
460
461   $self->_resolve;
462   my $attrs = { %{$self->{_attrs}} };
463   return $self->{cursor}
464     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
465           $attrs->{where},$attrs);
466 }
467
468 =head2 single
469
470 =over 4
471
472 =item Arguments: $cond?
473
474 =item Return Value: $row_object?
475
476 =back
477
478   my $cd = $schema->resultset('CD')->single({ year => 2001 });
479
480 Inflates the first result without creating a cursor if the resultset has
481 any records in it; if not returns nothing. Used by L</find> as an optimisation.
482
483 Can optionally take an additional condition *only* - this is a fast-code-path
484 method; if you need to add extra joins or similar call ->search and then
485 ->single without a condition on the $rs returned from that.
486
487 =cut
488
489 sub single {
490   my ($self, $where) = @_;
491   $self->_resolve;
492   my $attrs = { %{$self->{_attrs}} };
493   if ($where) {
494     if (defined $attrs->{where}) {
495       $attrs->{where} = {
496         '-and' =>
497             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
498                $where, delete $attrs->{where} ]
499       };
500     } else {
501       $attrs->{where} = $where;
502     }
503   }
504
505   unless ($self->_is_unique_query($attrs->{where})) {
506     carp "Query not guaranteed to return a single row"
507       . "; please declare your unique constraints or use search instead";
508   }
509
510   my @data = $self->result_source->storage->select_single(
511     $attrs->{from}, $attrs->{select},
512     $attrs->{where},$attrs
513   );
514
515   return (@data ? $self->_construct_object(@data) : ());
516 }
517
518 # _is_unique_query
519 #
520 # Try to determine if the specified query is guaranteed to be unique, based on
521 # the declared unique constraints.
522
523 sub _is_unique_query {
524   my ($self, $query) = @_;
525
526   my $collapsed = $self->_collapse_query($query);
527   my $alias = ($self->{attrs}->{_live_join})
528     ? $self->{attrs}->{_live_join}
529     : $self->{attrs}->{alias};
530
531   foreach my $name ($self->result_source->unique_constraint_names) {
532     my @unique_cols = map {
533       "$alias.$_"
534     } $self->result_source->unique_constraint_columns($name);
535
536     # Count the values for each unique column
537     my %seen = map { $_ => 0 } @unique_cols;
538
539     foreach my $key (keys %$collapsed) {
540       my $aliased = $key;
541       $aliased = "$alias.$key" unless $key =~ /\./;
542
543       next unless exists $seen{$aliased};  # Additional constraints are okay
544       $seen{$aliased} = scalar @{ $collapsed->{$key} };
545     }
546
547     # If we get 0 or more than 1 value for a column, it's not necessarily unique
548     return 1 unless grep { $_ != 1 } values %seen;
549   }
550
551   return 0;
552 }
553
554 # _collapse_query
555 #
556 # Recursively collapse the query, accumulating values for each column.
557
558 sub _collapse_query {
559   my ($self, $query, $collapsed) = @_;
560
561   $collapsed ||= {};
562
563   if (ref $query eq 'ARRAY') {
564     foreach my $subquery (@$query) {
565       next unless ref $subquery;  # -or
566 #      warn "ARRAY: " . Dumper $subquery;
567       $collapsed = $self->_collapse_query($subquery, $collapsed);
568     }
569   }
570   elsif (ref $query eq 'HASH') {
571     if (keys %$query and (keys %$query)[0] eq '-and') {
572       foreach my $subquery (@{$query->{-and}}) {
573 #        warn "HASH: " . Dumper $subquery;
574         $collapsed = $self->_collapse_query($subquery, $collapsed);
575       }
576     }
577     else {
578 #      warn "LEAF: " . Dumper $query;
579       foreach my $key (keys %$query) {
580         push @{$collapsed->{$key}}, $query->{$key};
581       }
582     }
583   }
584
585   return $collapsed;
586 }
587
588 =head2 get_column
589
590 =over 4
591
592 =item Arguments: $cond?
593
594 =item Return Value: $resultsetcolumn
595
596 =back
597
598   my $max_length = $rs->get_column('length')->max;
599
600 Returns a ResultSetColumn instance for $column based on $self
601
602 =cut
603
604 sub get_column {
605   my ($self, $column) = @_;
606   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
607   return $new;
608 }
609
610 =head2 search_like
611
612 =over 4
613
614 =item Arguments: $cond, \%attrs?
615
616 =item Return Value: $resultset (scalar context), @row_objs (list context)
617
618 =back
619
620   # WHERE title LIKE '%blue%'
621   $cd_rs = $rs->search_like({ title => '%blue%'});
622
623 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
624 that this is simply a convenience method. You most likely want to use
625 L</search> with specific operators.
626
627 For more information, see L<DBIx::Class::Manual::Cookbook>.
628
629 =cut
630
631 sub search_like {
632   my $class = shift;
633   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
634   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
635   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
636   return $class->search($query, { %$attrs });
637 }
638
639 =head2 slice
640
641 =over 4
642
643 =item Arguments: $first, $last
644
645 =item Return Value: $resultset (scalar context), @row_objs (list context)
646
647 =back
648
649 Returns a resultset or object list representing a subset of elements from the
650 resultset slice is called on. Indexes are from 0, i.e., to get the first
651 three records, call:
652
653   my ($one, $two, $three) = $rs->slice(0, 2);
654
655 =cut
656
657 sub slice {
658   my ($self, $min, $max) = @_;
659   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
660   $attrs->{offset} = $self->{attrs}{offset} || 0;
661   $attrs->{offset} += $min;
662   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
663   return $self->search(undef(), $attrs);
664   #my $slice = (ref $self)->new($self->result_source, $attrs);
665   #return (wantarray ? $slice->all : $slice);
666 }
667
668 =head2 next
669
670 =over 4
671
672 =item Arguments: none
673
674 =item Return Value: $result?
675
676 =back
677
678 Returns the next element in the resultset (C<undef> is there is none).
679
680 Can be used to efficiently iterate over records in the resultset:
681
682   my $rs = $schema->resultset('CD')->search;
683   while (my $cd = $rs->next) {
684     print $cd->title;
685   }
686
687 Note that you need to store the resultset object, and call C<next> on it. 
688 Calling C<< resultset('Table')->next >> repeatedly will always return the
689 first record from the resultset.
690
691 =cut
692
693 sub next {
694   my ($self) = @_;
695   if (my $cache = $self->get_cache) {
696     $self->{all_cache_position} ||= 0;
697     return $cache->[$self->{all_cache_position}++];
698   }
699   if ($self->{attrs}{cache}) {
700     $self->{all_cache_position} = 1;
701     return ($self->all)[0];
702   }
703   my @row = (
704     exists $self->{stashed_row}
705       ? @{delete $self->{stashed_row}}
706       : $self->cursor->next
707   );
708   return unless (@row);
709   return $self->_construct_object(@row);
710 }
711
712 sub _resolve {
713   my $self = shift;
714
715   return if(exists $self->{_attrs}); #return if _resolve has already been called
716
717   my $attrs = $self->{attrs};    
718   my $source = ($self->{_parent_rs})
719     ? $self->{_parent_rs}
720     : $self->{result_source};
721
722   # XXX - lose storable dclone
723   my $record_filter = delete $attrs->{record_filter}
724     if (defined $attrs->{record_filter});
725   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
726   $attrs->{record_filter} = $record_filter if ($record_filter);
727   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
728
729   my $alias = $attrs->{alias};
730  
731   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
732   delete $attrs->{as} if $attrs->{columns};
733   $attrs->{columns} ||= [ $self->{result_source}->columns ]
734     unless $attrs->{select};
735   my $select_alias = ($self->{_parent_rs})
736     ? $self->{attrs}->{_live_join}
737     : $alias;
738   $attrs->{select} = [
739     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
740   ] if $attrs->{columns};
741   $attrs->{as} ||= [
742     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
743   ];
744   if (my $include = delete $attrs->{include_columns}) {
745     push(@{$attrs->{select}}, @$include);
746     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
747   }
748
749   $attrs->{from} ||= [ { $alias => $source->from } ];
750   $attrs->{seen_join} ||= {};
751   my %seen;
752   if (my $join = delete $attrs->{join}) {
753     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
754       if (ref $j eq 'HASH') {
755         $seen{$_} = 1 foreach keys %$j;
756       } else {
757         $seen{$j} = 1;
758       }
759     }
760     push(@{$attrs->{from}},
761       $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join})
762     );
763   }
764   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
765   $attrs->{order_by} = [ $attrs->{order_by} ] if
766       $attrs->{order_by} and !ref($attrs->{order_by});
767   $attrs->{order_by} ||= [];
768
769   if(my $seladds = delete($attrs->{'+select'})) {
770     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
771     $attrs->{select} = [
772       @{ $attrs->{select} },
773       map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
774     ];
775   }
776   if(my $asadds = delete($attrs->{'+as'})) {
777     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
778     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
779   }
780   my $collapse = $attrs->{collapse} || {};
781   if (my $prefetch = delete $attrs->{prefetch}) {
782     my @pre_order;
783     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
784       if ( ref $p eq 'HASH' ) {
785         foreach my $key (keys %$p) {
786           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
787             unless $seen{$key};
788         }
789       } else {
790         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
791           unless $seen{$p};
792       }
793       # bring joins back to level of current class
794       $p = $self->_reduce_joins($p, $attrs) if ($attrs->{_live_join_stack});
795       if ($p) {
796         my @prefetch = $self->result_source->resolve_prefetch(
797           $p, $attrs->{alias}, {}, \@pre_order, $collapse
798         );
799         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
800         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
801       }
802     }
803     push(@{$attrs->{order_by}}, @pre_order);
804   }
805   $attrs->{collapse} = $collapse;
806   $self->{_attrs} = $attrs;
807 }
808
809 sub _merge_attr {
810   my ($self, $a, $b) = @_;
811     
812   return $b unless $a;
813   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
814     foreach my $key (keys %{$b}) {
815       if (exists $a->{$key}) {
816         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
817       } else {
818         $a->{$key} = $b->{$key};
819       }
820     }
821     return $a;
822   } else {
823     $a = [$a] unless (ref $a eq 'ARRAY');
824     $b = [$b] unless (ref $b eq 'ARRAY');
825     
826     my $hash = {};
827     my $array = [];      
828     foreach ($a, $b) {
829       foreach my $element (@{$_}) {
830         if (ref $element eq 'HASH') {
831           $hash = $self->_merge_attr($hash, $element);
832         } elsif (ref $element eq 'ARRAY') {
833           $array = [@{$array}, @{$element}];
834         } else {        
835           if ($b == $_) {
836             $self->_merge_array($array, $element);
837           } else {
838             push(@{$array}, $element);
839           }
840         }
841       }
842     }
843
844     my $final_array = [];
845     foreach my $element (@{$array}) {
846       push(@{$final_array}, $element) unless (exists $hash->{$element});
847     }
848     $array = $final_array;
849
850     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
851       return [$hash, @{$array}];
852     } else {    
853       return (keys %{$hash}) ? $hash : $array;
854     }
855   }
856 }
857
858 sub _merge_array {
859   my ($self, $a, $b) = @_;
860   
861   $b = [$b] unless (ref $b eq 'ARRAY');
862   # add elements from @{$b} to @{$a} which aren't already in @{$a}
863   foreach my $b_element (@{$b}) {
864     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
865   }
866 }
867
868 # bring the joins (which are from the original class) to the level
869 # of the current class so that we can resolve them properly
870 sub _reduce_joins {
871   my ($self, $p, $attrs) = @_;
872
873  STACK:
874   foreach (@{$attrs->{_live_join_stack}}) {
875     if (ref $p eq 'HASH') {
876       if (exists $p->{$_}) {
877         $p = $p->{$_};
878       } else {
879         return undef;
880       }
881     } elsif (ref $p eq 'ARRAY') {
882       foreach my $pe (@{$p}) {
883         if ($pe eq $_) {
884           return undef;
885         }
886         if ((ref $pe eq 'HASH') && (exists $pe->{$_})) {
887           $p = $pe->{$_};
888           next STACK;
889         }
890       }                                           
891       return undef;
892     } else {
893       return undef;
894     }
895   }
896   return $p;
897 }
898
899 sub _construct_object {
900   my ($self, @row) = @_;
901   my @as = @{ $self->{_attrs}{as} };
902   
903   my $info = $self->_collapse_result(\@as, \@row);
904   my $new = $self->result_class->inflate_result($self->result_source, @$info);
905   $new = $self->{_attrs}{record_filter}->($new)
906     if exists $self->{_attrs}{record_filter};
907   return $new;
908 }
909
910 sub _collapse_result {
911   my ($self, $as, $row, $prefix) = @_;
912
913   my $live_join = $self->{attrs}->{_live_join} ||="";
914   my %const;
915
916   my @copy = @$row;
917   foreach my $this_as (@$as) {
918     my $val = shift @copy;
919     if (defined $prefix) {
920       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
921         my $remain = $1;
922         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
923         $const{$1||''}{$2} = $val;
924       }
925     } else {
926       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
927       $const{$1||''}{$2} = $val;
928     }
929   }
930
931   my $info = [ {}, {} ];
932   foreach my $key (keys %const) {
933     if (length $key && $key ne $live_join) {
934       my $target = $info;
935       my @parts = split(/\./, $key);
936       foreach my $p (@parts) {
937         $target = $target->[1]->{$p} ||= [];
938       }
939       $target->[0] = $const{$key};
940     } else {
941       $info->[0] = $const{$key};
942     }
943   }
944   my @collapse;
945
946   if (defined $prefix) {
947     @collapse = map {
948         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
949     } keys %{$self->{_attrs}->{collapse}}
950   } else {
951     @collapse = keys %{$self->{_attrs}->{collapse}};
952   };
953
954   if (@collapse) {
955     my ($c) = sort { length $a <=> length $b } @collapse;
956     my $target = $info;
957     foreach my $p (split(/\./, $c)) {
958       $target = $target->[1]->{$p} ||= [];
959     }
960     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
961     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
962     my $tree = $self->_collapse_result($as, $row, $c_prefix);
963     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
964     my (@final, @raw);
965
966     while (
967       !(
968         grep {
969           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
970         } @co_key
971         )
972     ) {
973       push(@final, $tree);
974       last unless (@raw = $self->cursor->next);
975       $row = $self->{stashed_row} = \@raw;
976       $tree = $self->_collapse_result($as, $row, $c_prefix);
977     }
978     @$target = (@final ? @final : [ {}, {} ]); 
979       # single empty result to indicate an empty prefetched has_many
980   }
981
982   #print "final info: " . Dumper($info);
983   return $info;
984 }
985
986 =head2 result_source
987
988 =over 4
989
990 =item Arguments: $result_source?
991
992 =item Return Value: $result_source
993
994 =back
995
996 An accessor for the primary ResultSource object from which this ResultSet
997 is derived.
998
999 =cut
1000
1001
1002 =head2 count
1003
1004 =over 4
1005
1006 =item Arguments: $cond, \%attrs??
1007
1008 =item Return Value: $count
1009
1010 =back
1011
1012 Performs an SQL C<COUNT> with the same query as the resultset was built
1013 with to find the number of elements. If passed arguments, does a search
1014 on the resultset and counts the results of that.
1015
1016 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
1017 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1018 not support C<DISTINCT> with multiple columns. If you are using such a
1019 database, you should only use columns from the main table in your C<group_by>
1020 clause.
1021
1022 =cut
1023
1024 sub count {
1025   my $self = shift;
1026   return $self->search(@_)->count if @_ and defined $_[0];
1027   return scalar @{ $self->get_cache } if $self->get_cache;
1028   my $count = $self->_count;
1029   return 0 unless $count;
1030
1031   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
1032   $count = $self->{attrs}{rows} if
1033     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1034   return $count;
1035 }
1036
1037 sub _count { # Separated out so pager can get the full count
1038   my $self = shift;
1039   my $select = { count => '*' };
1040   
1041   $self->_resolve;
1042   my $attrs = { %{ $self->{_attrs} } };
1043   if (my $group_by = delete $attrs->{group_by}) {
1044     delete $attrs->{having};
1045     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1046     # todo: try CONCAT for multi-column pk
1047     my @pk = $self->result_source->primary_columns;
1048     if (@pk == 1) {
1049       foreach my $column (@distinct) {
1050         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
1051           @distinct = ($column);
1052           last;
1053         }
1054       }
1055     }
1056
1057     $select = { count => { distinct => \@distinct } };
1058   }
1059
1060   $attrs->{select} = $select;
1061   $attrs->{as} = [qw/count/];
1062
1063   # offset, order by and page are not needed to count. record_filter is cdbi
1064   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1065         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1066         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
1067              #XXX - hack to pass through parent of related resultsets
1068
1069   my ($count) = $tmp_rs->cursor->next;
1070   return $count;
1071 }
1072
1073 =head2 count_literal
1074
1075 =over 4
1076
1077 =item Arguments: $sql_fragment, @bind_values
1078
1079 =item Return Value: $count
1080
1081 =back
1082
1083 Counts the results in a literal query. Equivalent to calling L</search_literal>
1084 with the passed arguments, then L</count>.
1085
1086 =cut
1087
1088 sub count_literal { shift->search_literal(@_)->count; }
1089
1090 =head2 all
1091
1092 =over 4
1093
1094 =item Arguments: none
1095
1096 =item Return Value: @objects
1097
1098 =back
1099
1100 Returns all elements in the resultset. Called implicitly if the resultset
1101 is returned in list context.
1102
1103 =cut
1104
1105 sub all {
1106   my ($self) = @_;
1107   return @{ $self->get_cache } if $self->get_cache;
1108
1109   my @obj;
1110
1111   # TODO: don't call resolve here
1112   $self->_resolve;
1113   if (keys %{$self->{_attrs}->{collapse}}) {
1114 #  if ($self->{attrs}->{prefetch}) {
1115       # Using $self->cursor->all is really just an optimisation.
1116       # If we're collapsing has_many prefetches it probably makes
1117       # very little difference, and this is cleaner than hacking
1118       # _construct_object to survive the approach
1119     my @row = $self->cursor->next;
1120     while (@row) {
1121       push(@obj, $self->_construct_object(@row));
1122       @row = (exists $self->{stashed_row}
1123                ? @{delete $self->{stashed_row}}
1124                : $self->cursor->next);
1125     }
1126   } else {
1127     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1128   }
1129
1130   $self->set_cache(\@obj) if $self->{attrs}{cache};
1131   return @obj;
1132 }
1133
1134 =head2 reset
1135
1136 =over 4
1137
1138 =item Arguments: none
1139
1140 =item Return Value: $self
1141
1142 =back
1143
1144 Resets the resultset's cursor, so you can iterate through the elements again.
1145
1146 =cut
1147
1148 sub reset {
1149   my ($self) = @_;
1150   delete $self->{_attrs} if (exists $self->{_attrs});
1151
1152   $self->{all_cache_position} = 0;
1153   $self->cursor->reset;
1154   return $self;
1155 }
1156
1157 =head2 first
1158
1159 =over 4
1160
1161 =item Arguments: none
1162
1163 =item Return Value: $object?
1164
1165 =back
1166
1167 Resets the resultset and returns an object for the first result (if the
1168 resultset returns anything).
1169
1170 =cut
1171
1172 sub first {
1173   return $_[0]->reset->next;
1174 }
1175
1176 # _cond_for_update_delete
1177 #
1178 # update/delete require the condition to be modified to handle
1179 # the differing SQL syntax available.  This transforms the $self->{cond}
1180 # appropriately, returning the new condition.
1181
1182 sub _cond_for_update_delete {
1183   my ($self) = @_;
1184   my $cond = {};
1185
1186   if (!ref($self->{cond})) {
1187     # No-op. No condition, we're updating/deleting everything
1188   }
1189   elsif (ref $self->{cond} eq 'ARRAY') {
1190     $cond = [
1191       map {
1192         my %hash;
1193         foreach my $key (keys %{$_}) {
1194           $key =~ /([^.]+)$/;
1195           $hash{$1} = $_->{$key};
1196         }
1197         \%hash;
1198       } @{$self->{cond}}
1199     ];
1200   }
1201   elsif (ref $self->{cond} eq 'HASH') {
1202     if ((keys %{$self->{cond}})[0] eq '-and') {
1203       $cond->{-and} = [];
1204
1205       my @cond = @{$self->{cond}{-and}};
1206       for (my $i = 0; $i <= @cond - 1; $i++) {
1207         my $entry = $cond[$i];
1208
1209         my %hash;
1210         if (ref $entry eq 'HASH') {
1211           foreach my $key (keys %{$entry}) {
1212             $key =~ /([^.]+)$/;
1213             $hash{$1} = $entry->{$key};
1214           }
1215         }
1216         else {
1217           $entry =~ /([^.]+)$/;
1218           $hash{$1} = $cond[++$i];
1219         }
1220
1221         push @{$cond->{-and}}, \%hash;
1222       }
1223     }
1224     else {
1225       foreach my $key (keys %{$self->{cond}}) {
1226         $key =~ /([^.]+)$/;
1227         $cond->{$1} = $self->{cond}{$key};
1228       }
1229     }
1230   }
1231   else {
1232     $self->throw_exception(
1233       "Can't update/delete on resultset with condition unless hash or array"
1234     );
1235   }
1236
1237   return $cond;
1238 }
1239
1240
1241 =head2 update
1242
1243 =over 4
1244
1245 =item Arguments: \%values
1246
1247 =item Return Value: $storage_rv
1248
1249 =back
1250
1251 Sets the specified columns in the resultset to the supplied values in a
1252 single query. Return value will be true if the update succeeded or false
1253 if no records were updated; exact type of success value is storage-dependent.
1254
1255 =cut
1256
1257 sub update {
1258   my ($self, $values) = @_;
1259   $self->throw_exception("Values for update must be a hash")
1260     unless ref $values eq 'HASH';
1261
1262   my $cond = $self->_cond_for_update_delete;
1263
1264   return $self->result_source->storage->update(
1265     $self->result_source->from, $values, $cond
1266   );
1267 }
1268
1269 =head2 update_all
1270
1271 =over 4
1272
1273 =item Arguments: \%values
1274
1275 =item Return Value: 1
1276
1277 =back
1278
1279 Fetches all objects and updates them one at a time. Note that C<update_all>
1280 will run DBIC cascade triggers, while L</update> will not.
1281
1282 =cut
1283
1284 sub update_all {
1285   my ($self, $values) = @_;
1286   $self->throw_exception("Values for update must be a hash")
1287     unless ref $values eq 'HASH';
1288   foreach my $obj ($self->all) {
1289     $obj->set_columns($values)->update;
1290   }
1291   return 1;
1292 }
1293
1294 =head2 delete
1295
1296 =over 4
1297
1298 =item Arguments: none
1299
1300 =item Return Value: 1
1301
1302 =back
1303
1304 Deletes the contents of the resultset from its result source. Note that this
1305 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1306 to run.
1307
1308 =cut
1309
1310 sub delete {
1311   my ($self) = @_;
1312   my $del = {};
1313
1314   my $cond = $self->_cond_for_update_delete;
1315
1316   $self->result_source->storage->delete($self->result_source->from, $cond);
1317   return 1;
1318 }
1319
1320 =head2 delete_all
1321
1322 =over 4
1323
1324 =item Arguments: none
1325
1326 =item Return Value: 1
1327
1328 =back
1329
1330 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1331 will run DBIC cascade triggers, while L</delete> will not.
1332
1333 =cut
1334
1335 sub delete_all {
1336   my ($self) = @_;
1337   $_->delete for $self->all;
1338   return 1;
1339 }
1340
1341 =head2 pager
1342
1343 =over 4
1344
1345 =item Arguments: none
1346
1347 =item Return Value: $pager
1348
1349 =back
1350
1351 Return Value a L<Data::Page> object for the current resultset. Only makes
1352 sense for queries with a C<page> attribute.
1353
1354 =cut
1355
1356 sub pager {
1357   my ($self) = @_;
1358   my $attrs = $self->{attrs};
1359   $self->throw_exception("Can't create pager for non-paged rs")
1360     unless $self->{page};
1361   $attrs->{rows} ||= 10;
1362   return $self->{pager} ||= Data::Page->new(
1363     $self->_count, $attrs->{rows}, $self->{page});
1364 }
1365
1366 =head2 page
1367
1368 =over 4
1369
1370 =item Arguments: $page_number
1371
1372 =item Return Value: $rs
1373
1374 =back
1375
1376 Returns a resultset for the $page_number page of the resultset on which page
1377 is called, where each page contains a number of rows equal to the 'rows'
1378 attribute set on the resultset (10 by default).
1379
1380 =cut
1381
1382 sub page {
1383   my ($self, $page) = @_;
1384   my $attrs = { %{$self->{attrs}} };
1385   $attrs->{page} = $page;
1386   return (ref $self)->new($self->result_source, $attrs);
1387 }
1388
1389 =head2 new_result
1390
1391 =over 4
1392
1393 =item Arguments: \%vals
1394
1395 =item Return Value: $object
1396
1397 =back
1398
1399 Creates an object in the resultset's result class and returns it.
1400
1401 =cut
1402
1403 sub new_result {
1404   my ($self, $values) = @_;
1405   $self->throw_exception( "new_result needs a hash" )
1406     unless (ref $values eq 'HASH');
1407   $self->throw_exception(
1408     "Can't abstract implicit construct, condition not a hash"
1409   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1410   my %new = %$values;
1411   my $alias = $self->{attrs}{alias};
1412   foreach my $key (keys %{$self->{cond}||{}}) {
1413     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1414   }
1415   my $obj = $self->result_class->new(\%new);
1416   $obj->result_source($self->result_source) if $obj->can('result_source');
1417   return $obj;
1418 }
1419
1420 =head2 find_or_new
1421
1422 =over 4
1423
1424 =item Arguments: \%vals, \%attrs?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430 Find an existing record from this resultset. If none exists, instantiate a new
1431 result object and return it. The object will not be saved into your storage
1432 until you call L<DBIx::Class::Row/insert> on it.
1433
1434 If you want objects to be saved immediately, use L</find_or_create> instead.
1435
1436 =cut
1437
1438 sub find_or_new {
1439   my $self     = shift;
1440   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1441   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1442   my $exists   = $self->find($hash, $attrs);
1443   return defined $exists ? $exists : $self->new_result($hash);
1444 }
1445
1446 =head2 create
1447
1448 =over 4
1449
1450 =item Arguments: \%vals
1451
1452 =item Return Value: $object
1453
1454 =back
1455
1456 Inserts a record into the resultset and returns the object representing it.
1457
1458 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1459
1460 =cut
1461
1462 sub create {
1463   my ($self, $attrs) = @_;
1464   $self->throw_exception( "create needs a hashref" )
1465     unless ref $attrs eq 'HASH';
1466   return $self->new_result($attrs)->insert;
1467 }
1468
1469 =head2 find_or_create
1470
1471 =over 4
1472
1473 =item Arguments: \%vals, \%attrs?
1474
1475 =item Return Value: $object
1476
1477 =back
1478
1479   $class->find_or_create({ key => $val, ... });
1480
1481 Tries to find a record based on its primary key or unique constraint; if none
1482 is found, creates one and returns that instead.
1483
1484   my $cd = $schema->resultset('CD')->find_or_create({
1485     cdid   => 5,
1486     artist => 'Massive Attack',
1487     title  => 'Mezzanine',
1488     year   => 2005,
1489   });
1490
1491 Also takes an optional C<key> attribute, to search by a specific key or unique
1492 constraint. For example:
1493
1494   my $cd = $schema->resultset('CD')->find_or_create(
1495     {
1496       artist => 'Massive Attack',
1497       title  => 'Mezzanine',
1498     },
1499     { key => 'cd_artist_title' }
1500   );
1501
1502 See also L</find> and L</update_or_create>. For information on how to declare
1503 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1504
1505 =cut
1506
1507 sub find_or_create {
1508   my $self     = shift;
1509   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1510   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1511   my $exists   = $self->find($hash, $attrs);
1512   return defined $exists ? $exists : $self->create($hash);
1513 }
1514
1515 =head2 update_or_create
1516
1517 =over 4
1518
1519 =item Arguments: \%col_values, { key => $unique_constraint }?
1520
1521 =item Return Value: $object
1522
1523 =back
1524
1525   $class->update_or_create({ col => $val, ... });
1526
1527 First, searches for an existing row matching one of the unique constraints
1528 (including the primary key) on the source of this resultset. If a row is
1529 found, updates it with the other given column values. Otherwise, creates a new
1530 row.
1531
1532 Takes an optional C<key> attribute to search on a specific unique constraint.
1533 For example:
1534
1535   # In your application
1536   my $cd = $schema->resultset('CD')->update_or_create(
1537     {
1538       artist => 'Massive Attack',
1539       title  => 'Mezzanine',
1540       year   => 1998,
1541     },
1542     { key => 'cd_artist_title' }
1543   );
1544
1545 If no C<key> is specified, it searches on all unique constraints defined on the
1546 source, including the primary key.
1547
1548 If the C<key> is specified as C<primary>, it searches only on the primary key.
1549
1550 See also L</find> and L</find_or_create>. For information on how to declare
1551 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1552
1553 =cut
1554
1555 sub update_or_create {
1556   my $self = shift;
1557   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1558   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1559
1560   my $row = $self->find($cond);
1561   if (defined $row) {
1562     $row->update($cond);
1563     return $row;
1564   }
1565
1566   return $self->create($cond);
1567 }
1568
1569 =head2 get_cache
1570
1571 =over 4
1572
1573 =item Arguments: none
1574
1575 =item Return Value: \@cache_objects?
1576
1577 =back
1578
1579 Gets the contents of the cache for the resultset, if the cache is set.
1580
1581 =cut
1582
1583 sub get_cache {
1584   shift->{all_cache};
1585 }
1586
1587 =head2 set_cache
1588
1589 =over 4
1590
1591 =item Arguments: \@cache_objects
1592
1593 =item Return Value: \@cache_objects
1594
1595 =back
1596
1597 Sets the contents of the cache for the resultset. Expects an arrayref
1598 of objects of the same class as those produced by the resultset. Note that
1599 if the cache is set the resultset will return the cached objects rather
1600 than re-querying the database even if the cache attr is not set.
1601
1602 =cut
1603
1604 sub set_cache {
1605   my ( $self, $data ) = @_;
1606   $self->throw_exception("set_cache requires an arrayref")
1607       if defined($data) && (ref $data ne 'ARRAY');
1608   $self->{all_cache} = $data;
1609 }
1610
1611 =head2 clear_cache
1612
1613 =over 4
1614
1615 =item Arguments: none
1616
1617 =item Return Value: []
1618
1619 =back
1620
1621 Clears the cache for the resultset.
1622
1623 =cut
1624
1625 sub clear_cache {
1626   shift->set_cache(undef);
1627 }
1628
1629 =head2 related_resultset
1630
1631 =over 4
1632
1633 =item Arguments: $relationship_name
1634
1635 =item Return Value: $resultset
1636
1637 =back
1638
1639 Returns a related resultset for the supplied relationship name.
1640
1641   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1642
1643 =cut
1644
1645 sub related_resultset {
1646   my ( $self, $rel ) = @_;
1647   
1648   $self->{related_resultsets} ||= {};
1649   return $self->{related_resultsets}{$rel} ||= do {
1650     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1651     my $rel_obj = $self->result_source->relationship_info($rel);
1652                 #print Dumper($self->result_source->_relationships);
1653     $self->throw_exception(
1654       "search_related: result source '" . $self->result_source->name .
1655         "' has no such relationship ${rel}")
1656       unless $rel_obj; #die Dumper $self->{attrs};
1657
1658     my @live_join_stack = (
1659       exists $self->{attrs}->{_live_join_stack})
1660       ? @{$self->{attrs}->{_live_join_stack}}
1661       : ();             
1662
1663     push(@live_join_stack, $rel);
1664                 
1665     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1666       undef, {
1667         select => undef,
1668         as => undef,
1669         _live_join => $rel, #the most recent
1670         _live_join_stack => \@live_join_stack, #the trail of rels
1671         _parent_attrs => $self->{attrs}}
1672     );    
1673
1674     # keep reference of the original resultset
1675     $rs->{_parent_rs} = ($self->{_parent_rs})
1676       ? $self->{_parent_rs}
1677       : $self->result_source;
1678
1679     return $rs;
1680   };
1681 }
1682
1683 =head2 throw_exception
1684
1685 See L<DBIx::Class::Schema/throw_exception> for details.
1686
1687 =cut
1688
1689 sub throw_exception {
1690   my $self=shift;
1691   $self->result_source->schema->throw_exception(@_);
1692 }
1693
1694 # XXX: FIXME: Attributes docs need clearing up
1695
1696 =head1 ATTRIBUTES
1697
1698 The resultset takes various attributes that modify its behavior. Here's an
1699 overview of them:
1700
1701 =head2 order_by
1702
1703 =over 4
1704
1705 =item Value: ($order_by | \@order_by)
1706
1707 =back
1708
1709 Which column(s) to order the results by. This is currently passed
1710 through directly to SQL, so you can give e.g. C<year DESC> for a
1711 descending order on the column `year'.
1712
1713 Please note that if you have quoting enabled (see 
1714 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1715 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1716 so you will need to manually quote things as appropriate.)
1717
1718 =head2 columns
1719
1720 =over 4
1721
1722 =item Value: \@columns
1723
1724 =back
1725
1726 Shortcut to request a particular set of columns to be retrieved.  Adds
1727 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1728 from that, then auto-populates C<as> from C<select> as normal. (You may also
1729 use the C<cols> attribute, as in earlier versions of DBIC.)
1730
1731 =head2 include_columns
1732
1733 =over 4
1734
1735 =item Value: \@columns
1736
1737 =back
1738
1739 Shortcut to include additional columns in the returned results - for example
1740
1741   $schema->resultset('CD')->search(undef, {
1742     include_columns => ['artist.name'],
1743     join => ['artist']
1744   });
1745
1746 would return all CDs and include a 'name' column to the information
1747 passed to object inflation
1748
1749 =head2 select
1750
1751 =over 4
1752
1753 =item Value: \@select_columns
1754
1755 =back
1756
1757 Indicates which columns should be selected from the storage. You can use
1758 column names, or in the case of RDBMS back ends, function or stored procedure
1759 names:
1760
1761   $rs = $schema->resultset('Employee')->search(undef, {
1762     select => [
1763       'name',
1764       { count => 'employeeid' },
1765       { sum => 'salary' }
1766     ]
1767   });
1768
1769 When you use function/stored procedure names and do not supply an C<as>
1770 attribute, the column names returned are storage-dependent. E.g. MySQL would
1771 return a column named C<count(employeeid)> in the above example.
1772
1773 =head2 +select
1774
1775 =over 4
1776
1777 Indicates additional columns to be selected from storage.  Works the same as
1778 L<select> but adds columns to the selection.
1779
1780 =back
1781
1782 =head2 +as
1783
1784 =over 4
1785
1786 Indicates additional column names for those added via L<+select>.
1787
1788 =back
1789
1790 =head2 as
1791
1792 =over 4
1793
1794 =item Value: \@inflation_names
1795
1796 =back
1797
1798 Indicates column names for object inflation. This is used in conjunction with
1799 C<select>, usually when C<select> contains one or more function or stored
1800 procedure names:
1801
1802   $rs = $schema->resultset('Employee')->search(undef, {
1803     select => [
1804       'name',
1805       { count => 'employeeid' }
1806     ],
1807     as => ['name', 'employee_count'],
1808   });
1809
1810   my $employee = $rs->first(); # get the first Employee
1811
1812 If the object against which the search is performed already has an accessor
1813 matching a column name specified in C<as>, the value can be retrieved using
1814 the accessor as normal:
1815
1816   my $name = $employee->name();
1817
1818 If on the other hand an accessor does not exist in the object, you need to
1819 use C<get_column> instead:
1820
1821   my $employee_count = $employee->get_column('employee_count');
1822
1823 You can create your own accessors if required - see
1824 L<DBIx::Class::Manual::Cookbook> for details.
1825
1826 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1827 produced, it is used for internal access only. Thus attempting to use the accessor
1828 in an C<order_by> clause or similar will fail misrably.
1829
1830 =head2 join
1831
1832 =over 4
1833
1834 =item Value: ($rel_name | \@rel_names | \%rel_names)
1835
1836 =back
1837
1838 Contains a list of relationships that should be joined for this query.  For
1839 example:
1840
1841   # Get CDs by Nine Inch Nails
1842   my $rs = $schema->resultset('CD')->search(
1843     { 'artist.name' => 'Nine Inch Nails' },
1844     { join => 'artist' }
1845   );
1846
1847 Can also contain a hash reference to refer to the other relation's relations.
1848 For example:
1849
1850   package MyApp::Schema::Track;
1851   use base qw/DBIx::Class/;
1852   __PACKAGE__->table('track');
1853   __PACKAGE__->add_columns(qw/trackid cd position title/);
1854   __PACKAGE__->set_primary_key('trackid');
1855   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1856   1;
1857
1858   # In your application
1859   my $rs = $schema->resultset('Artist')->search(
1860     { 'track.title' => 'Teardrop' },
1861     {
1862       join     => { cd => 'track' },
1863       order_by => 'artist.name',
1864     }
1865   );
1866
1867 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1868 similarly for a third time). For e.g.
1869
1870   my $rs = $schema->resultset('Artist')->search({
1871     'cds.title'   => 'Down to Earth',
1872     'cds_2.title' => 'Popular',
1873   }, {
1874     join => [ qw/cds cds/ ],
1875   });
1876
1877 will return a set of all artists that have both a cd with title 'Down
1878 to Earth' and a cd with title 'Popular'.
1879
1880 If you want to fetch related objects from other tables as well, see C<prefetch>
1881 below.
1882
1883 =head2 prefetch
1884
1885 =over 4
1886
1887 =item Value: ($rel_name | \@rel_names | \%rel_names)
1888
1889 =back
1890
1891 Contains one or more relationships that should be fetched along with the main
1892 query (when they are accessed afterwards they will have already been
1893 "prefetched").  This is useful for when you know you will need the related
1894 objects, because it saves at least one query:
1895
1896   my $rs = $schema->resultset('Tag')->search(
1897     undef,
1898     {
1899       prefetch => {
1900         cd => 'artist'
1901       }
1902     }
1903   );
1904
1905 The initial search results in SQL like the following:
1906
1907   SELECT tag.*, cd.*, artist.* FROM tag
1908   JOIN cd ON tag.cd = cd.cdid
1909   JOIN artist ON cd.artist = artist.artistid
1910
1911 L<DBIx::Class> has no need to go back to the database when we access the
1912 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1913 case.
1914
1915 Simple prefetches will be joined automatically, so there is no need
1916 for a C<join> attribute in the above search. If you're prefetching to
1917 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1918 specify the join as well.
1919
1920 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1921 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1922 with an accessor type of 'single' or 'filter').
1923
1924 =head2 page
1925
1926 =over 4
1927
1928 =item Value: $page
1929
1930 =back
1931
1932 Makes the resultset paged and specifies the page to retrieve. Effectively
1933 identical to creating a non-pages resultset and then calling ->page($page)
1934 on it. 
1935
1936 If L<rows> attribute is not specified it defualts to 10 rows per page.
1937
1938 =head2 rows
1939
1940 =over 4
1941
1942 =item Value: $rows
1943
1944 =back
1945
1946 Specifes the maximum number of rows for direct retrieval or the number of
1947 rows per page if the page attribute or method is used.
1948
1949 =head2 offset
1950
1951 =over 4
1952
1953 =item Value: $offset
1954
1955 =back
1956
1957 Specifies the (zero-based) row number for the  first row to be returned, or the
1958 of the first row of the first page if paging is used.
1959
1960 =head2 group_by
1961
1962 =over 4
1963
1964 =item Value: \@columns
1965
1966 =back
1967
1968 A arrayref of columns to group by. Can include columns of joined tables.
1969
1970   group_by => [qw/ column1 column2 ... /]
1971
1972 =head2 having
1973
1974 =over 4
1975
1976 =item Value: $condition
1977
1978 =back
1979
1980 HAVING is a select statement attribute that is applied between GROUP BY and
1981 ORDER BY. It is applied to the after the grouping calculations have been
1982 done. 
1983
1984   having => { 'count(employee)' => { '>=', 100 } }
1985
1986 =head2 distinct
1987
1988 =over 4
1989
1990 =item Value: (0 | 1)
1991
1992 =back
1993
1994 Set to 1 to group by all columns.
1995
1996 =head2 where
1997
1998 =over 4
1999
2000 Adds to the WHERE clause.
2001
2002   # only return rows WHERE deleted IS NULL for all searches
2003   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2004
2005 Can be overridden by passing C<{ where => undef }> as an attribute
2006 to a resulset.
2007
2008 =back
2009
2010 =head2 cache
2011
2012 Set to 1 to cache search results. This prevents extra SQL queries if you
2013 revisit rows in your ResultSet:
2014
2015   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2016   
2017   while( my $artist = $resultset->next ) {
2018     ... do stuff ...
2019   }
2020
2021   $rs->first; # without cache, this would issue a query
2022
2023 By default, searches are not cached.
2024
2025 For more examples of using these attributes, see
2026 L<DBIx::Class::Manual::Cookbook>.
2027
2028 =head2 from
2029
2030 =over 4
2031
2032 =item Value: \@from_clause
2033
2034 =back
2035
2036 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2037 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2038 clauses.
2039
2040 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2041
2042 C<join> will usually do what you need and it is strongly recommended that you
2043 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2044 And we really do mean "cannot", not just tried and failed. Attempting to use
2045 this because you're having problems with C<join> is like trying to use x86
2046 ASM because you've got a syntax error in your C. Trust us on this.
2047
2048 Now, if you're still really, really sure you need to use this (and if you're
2049 not 100% sure, ask the mailing list first), here's an explanation of how this
2050 works.
2051
2052 The syntax is as follows -
2053
2054   [
2055     { <alias1> => <table1> },
2056     [
2057       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2058       [], # nested JOIN (optional)
2059       { <table1.column1> => <table2.column2>, ... (more conditions) },
2060     ],
2061     # More of the above [ ] may follow for additional joins
2062   ]
2063
2064   <table1> <alias1>
2065   JOIN
2066     <table2> <alias2>
2067     [JOIN ...]
2068   ON <table1.column1> = <table2.column2>
2069   <more joins may follow>
2070
2071 An easy way to follow the examples below is to remember the following:
2072
2073     Anything inside "[]" is a JOIN
2074     Anything inside "{}" is a condition for the enclosing JOIN
2075
2076 The following examples utilize a "person" table in a family tree application.
2077 In order to express parent->child relationships, this table is self-joined:
2078
2079     # Person->belongs_to('father' => 'Person');
2080     # Person->belongs_to('mother' => 'Person');
2081
2082 C<from> can be used to nest joins. Here we return all children with a father,
2083 then search against all mothers of those children:
2084
2085   $rs = $schema->resultset('Person')->search(
2086       undef,
2087       {
2088           alias => 'mother', # alias columns in accordance with "from"
2089           from => [
2090               { mother => 'person' },
2091               [
2092                   [
2093                       { child => 'person' },
2094                       [
2095                           { father => 'person' },
2096                           { 'father.person_id' => 'child.father_id' }
2097                       ]
2098                   ],
2099                   { 'mother.person_id' => 'child.mother_id' }
2100               ],
2101           ]
2102       },
2103   );
2104
2105   # Equivalent SQL:
2106   # SELECT mother.* FROM person mother
2107   # JOIN (
2108   #   person child
2109   #   JOIN person father
2110   #   ON ( father.person_id = child.father_id )
2111   # )
2112   # ON ( mother.person_id = child.mother_id )
2113
2114 The type of any join can be controlled manually. To search against only people
2115 with a father in the person table, we could explicitly use C<INNER JOIN>:
2116
2117     $rs = $schema->resultset('Person')->search(
2118         undef,
2119         {
2120             alias => 'child', # alias columns in accordance with "from"
2121             from => [
2122                 { child => 'person' },
2123                 [
2124                     { father => 'person', -join_type => 'inner' },
2125                     { 'father.id' => 'child.father_id' }
2126                 ],
2127             ]
2128         },
2129     );
2130
2131     # Equivalent SQL:
2132     # SELECT child.* FROM person child
2133     # INNER JOIN person father ON child.father_id = father.id
2134
2135 =cut
2136
2137 1;