Merge 're_refactor_bugfix' into 'DBIx-Class-current'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14 use Data::Dumper;
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs}) ? { %{$attrs->{_parent_attrs}} } : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168         # XXX this is getting messy
169         if ($attrs->{_live_join_stack}) {
170                 my $live_join = $attrs->{_live_join_stack};
171                 foreach (reverse @{$live_join}) {
172                         $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
173                 }
174         }
175
176   # merge new attrs into old
177   foreach my $key (qw/join prefetch/) {
178     next unless (exists $attrs->{$key});
179     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
180                         my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack};
181                         foreach (reverse @{$live_join}) {
182                                 $attrs->{$key} = { $_ => $attrs->{$key} };
183                         }
184     }
185
186     if (exists $our_attrs->{$key}) {
187       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
188     } else {
189       $our_attrs->{$key} = $attrs->{$key};
190     }
191     delete $attrs->{$key};
192   }
193
194         $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $attrs->{_live_join_h}, 1) if ($attrs->{_live_join_h});
195
196   if (exists $our_attrs->{prefetch}) {
197       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
198   }
199
200   my $new_attrs = { %{$our_attrs}, %{$attrs} };
201   my $where = (@_
202                 ? ((@_ == 1 || ref $_[0] eq "HASH")
203                     ? shift
204                     : ((@_ % 2)
205                         ? $self->throw_exception(
206                             "Odd number of arguments to search")
207                         : {@_}))
208                 : undef());
209   if (defined $where) {
210     $new_attrs->{where} = (defined $new_attrs->{where}
211               ? { '-and' =>
212                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
213                       $where, $new_attrs->{where} ] }
214               : $where);
215   }
216
217   if (defined $having) {
218     $new_attrs->{having} = (defined $new_attrs->{having}
219               ? { '-and' =>
220                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
221                       $having, $new_attrs->{having} ] }
222               : $having);
223   }
224
225   my $rs = (ref $self)->new($self->result_source, $new_attrs);
226   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
227
228   unless (@_) { # no search, effectively just a clone
229     my $rows = $self->get_cache;
230     if ($rows) {
231       $rs->set_cache($rows);
232     }
233   }
234   
235   return $rs;
236 }
237
238 =head2 search_literal
239
240 =over 4
241
242 =item Arguments: $sql_fragment, @bind_values
243
244 =item Return Value: $resultset (scalar context), @row_objs (list context)
245
246 =back
247
248   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
249   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
250
251 Pass a literal chunk of SQL to be added to the conditional part of the
252 resultset query.
253
254 =cut
255
256 sub search_literal {
257   my ($self, $cond, @vals) = @_;
258   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
259   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
260   return $self->search(\$cond, $attrs);
261 }
262
263 =head2 find
264
265 =over 4
266
267 =item Arguments: @values | \%cols, \%attrs?
268
269 =item Return Value: $row_object
270
271 =back
272
273 Finds a row based on its primary key or unique constraint. For example, to find
274 a row by its primary key:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 You can also find a row by a specific unique constraint using the C<key>
279 attribute. For example:
280
281   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
282
283 Additionally, you can specify the columns explicitly by name:
284
285   my $cd = $schema->resultset('CD')->find(
286     {
287       artist => 'Massive Attack',
288       title  => 'Mezzanine',
289     },
290     { key => 'cd_artist_title' }
291   );
292
293 If the C<key> is specified as C<primary>, it searches only on the primary key.
294
295 If no C<key> is specified, it searches on all unique constraints defined on the
296 source, including the primary key.
297
298 See also L</find_or_create> and L</update_or_create>. For information on how to
299 declare unique constraints, see
300 L<DBIx::Class::ResultSource/add_unique_constraint>.
301
302 =cut
303
304 sub find {
305   my $self = shift;
306   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
307
308   # Default to the primary key, but allow a specific key
309   my @cols = exists $attrs->{key}
310     ? $self->result_source->unique_constraint_columns($attrs->{key})
311     : $self->result_source->primary_columns;
312   $self->throw_exception(
313     "Can't find unless a primary key or unique constraint is defined"
314   ) unless @cols;
315
316   # Parse out a hashref from input
317   my $input_query;
318   if (ref $_[0] eq 'HASH') {
319     $input_query = { %{$_[0]} };
320   }
321   elsif (@_ == @cols) {
322     $input_query = {};
323     @{$input_query}{@cols} = @_;
324   }
325   else {
326     # Compatibility: Allow e.g. find(id => $value)
327     carp "Find by key => value deprecated; please use a hashref instead";
328     $input_query = {@_};
329   }
330
331   my @unique_queries = $self->_unique_queries($input_query, $attrs);
332
333   # Handle cases where the ResultSet defines the query, or where the user is
334   # abusing find
335   my $query = @unique_queries ? \@unique_queries : $input_query;
336
337   # Run the query
338   if (keys %$attrs) {
339     my $rs = $self->search($query, $attrs);
340     $rs->_resolve;
341     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
342   }
343   else {
344     $self->_resolve;  
345     return (keys %{$self->{_attrs}->{collapse}})
346       ? $self->search($query)->next
347       : $self->single($query);
348   }
349 }
350
351 # _unique_queries
352 #
353 # Build a list of queries which satisfy unique constraints.
354
355 sub _unique_queries {
356   my ($self, $query, $attrs) = @_;
357
358   my @constraint_names = exists $attrs->{key}
359     ? ($attrs->{key})
360     : $self->result_source->unique_constraint_names;
361
362   my @unique_queries;
363   foreach my $name (@constraint_names) {
364     my @unique_cols = $self->result_source->unique_constraint_columns($name);
365     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
366
367     next unless scalar keys %$unique_query;
368
369     # Add the ResultSet's alias
370     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
371                         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
372       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
373     }
374
375     push @unique_queries, $unique_query;
376   }
377
378   return @unique_queries;
379 }
380
381 # _build_unique_query
382 #
383 # Constrain the specified query hash based on the specified column names.
384
385 sub _build_unique_query {
386   my ($self, $query, $unique_cols) = @_;
387
388   my %unique_query =
389     map  { $_ => $query->{$_} }
390     grep { exists $query->{$_} }
391     @$unique_cols;
392
393   return \%unique_query;
394 }
395
396 =head2 search_related
397
398 =over 4
399
400 =item Arguments: $cond, \%attrs?
401
402 =item Return Value: $new_resultset
403
404 =back
405
406   $new_rs = $cd_rs->search_related('artist', {
407     name => 'Emo-R-Us',
408   });
409
410 Searches the specified relationship, optionally specifying a condition and
411 attributes for matching records. See L</ATTRIBUTES> for more information.
412
413 =cut
414
415 sub search_related {
416   return shift->related_resultset(shift)->search(@_);
417 }
418
419 =head2 cursor
420
421 =over 4
422
423 =item Arguments: none
424
425 =item Return Value: $cursor
426
427 =back
428
429 Returns a storage-driven cursor to the given resultset. See
430 L<DBIx::Class::Cursor> for more information.
431
432 =cut
433
434 sub cursor {
435   my ($self) = @_;
436
437   $self->_resolve;
438   my $attrs = { %{$self->{_attrs}} };
439   return $self->{cursor}
440     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
441           $attrs->{where},$attrs);
442 }
443
444 =head2 single
445
446 =over 4
447
448 =item Arguments: $cond?
449
450 =item Return Value: $row_object?
451
452 =back
453
454   my $cd = $schema->resultset('CD')->single({ year => 2001 });
455
456 Inflates the first result without creating a cursor if the resultset has
457 any records in it; if not returns nothing. Used by L</find> as an optimisation.
458
459 Can optionally take an additional condition *only* - this is a fast-code-path
460 method; if you need to add extra joins or similar call ->search and then
461 ->single without a condition on the $rs returned from that.
462
463 =cut
464
465 sub single {
466   my ($self, $where) = @_;
467   $self->_resolve;
468   my $attrs = { %{$self->{_attrs}} };
469   if ($where) {
470     if (defined $attrs->{where}) {
471       $attrs->{where} = {
472         '-and' =>
473             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
474                $where, delete $attrs->{where} ]
475       };
476     } else {
477       $attrs->{where} = $where;
478     }
479   }
480
481   unless ($self->_is_unique_query($attrs->{where})) {
482     carp "Query not guaranteed to return a single row"
483       . "; please declare your unique constraints or use search instead";
484   }
485
486   my @data = $self->result_source->storage->select_single(
487           $attrs->{from}, $attrs->{select},
488           $attrs->{where},$attrs);
489   return (@data ? $self->_construct_object(@data) : ());
490 }
491
492 # _is_unique_query
493 #
494 # Try to determine if the specified query is guaranteed to be unique, based on
495 # the declared unique constraints.
496
497 sub _is_unique_query {
498   my ($self, $query) = @_;
499
500   my $collapsed = $self->_collapse_query($query);
501
502         my $alias = ($self->{attrs}->{_live_join}) ? $self->{attrs}->{_live_join} : $self->{attrs}->{alias};
503   foreach my $name ($self->result_source->unique_constraint_names) {
504     my @unique_cols = map { "$alias.$_" }
505       $self->result_source->unique_constraint_columns($name);
506
507     # Count the values for each unique column
508     my %seen = map { $_ => 0 } @unique_cols;
509
510     foreach my $key (keys %$collapsed) {
511       my $aliased = $key;
512       $aliased = "$alias.$key" unless $key =~ /\./;
513
514       next unless exists $seen{$aliased};  # Additional constraints are okay
515       $seen{$aliased} = scalar @{ $collapsed->{$key} };
516     }
517
518     # If we get 0 or more than 1 value for a column, it's not necessarily unique
519     return 1 unless grep { $_ != 1 } values %seen;
520   }
521
522   return 0;
523 }
524
525 # _collapse_query
526 #
527 # Recursively collapse the query, accumulating values for each column.
528
529 sub _collapse_query {
530   my ($self, $query, $collapsed) = @_;
531
532   $collapsed ||= {};
533
534   if (ref $query eq 'ARRAY') {
535     foreach my $subquery (@$query) {
536       next unless ref $subquery;  # -or
537 #      warn "ARRAY: " . Dumper $subquery;
538       $collapsed = $self->_collapse_query($subquery, $collapsed);
539     }
540   }
541   elsif (ref $query eq 'HASH') {
542     if (keys %$query and (keys %$query)[0] eq '-and') {
543       foreach my $subquery (@{$query->{-and}}) {
544 #        warn "HASH: " . Dumper $subquery;
545         $collapsed = $self->_collapse_query($subquery, $collapsed);
546       }
547     }
548     else {
549 #      warn "LEAF: " . Dumper $query;
550       foreach my $key (keys %$query) {
551         push @{$collapsed->{$key}}, $query->{$key};
552       }
553     }
554   }
555
556   return $collapsed;
557 }
558
559 =head2 get_column
560
561 =over 4
562
563 =item Arguments: $cond?
564
565 =item Return Value: $resultsetcolumn
566
567 =back
568
569   my $max_length = $rs->get_column('length')->max;
570
571 Returns a ResultSetColumn instance for $column based on $self
572
573 =cut
574
575 sub get_column {
576   my ($self, $column) = @_;
577
578   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
579   return $new;
580 }
581
582 =head2 search_like
583
584 =over 4
585
586 =item Arguments: $cond, \%attrs?
587
588 =item Return Value: $resultset (scalar context), @row_objs (list context)
589
590 =back
591
592   # WHERE title LIKE '%blue%'
593   $cd_rs = $rs->search_like({ title => '%blue%'});
594
595 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
596 that this is simply a convenience method. You most likely want to use
597 L</search> with specific operators.
598
599 For more information, see L<DBIx::Class::Manual::Cookbook>.
600
601 =cut
602
603 sub search_like {
604   my $class = shift;
605   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
606   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
607   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
608   return $class->search($query, { %$attrs });
609 }
610
611 =head2 slice
612
613 =over 4
614
615 =item Arguments: $first, $last
616
617 =item Return Value: $resultset (scalar context), @row_objs (list context)
618
619 =back
620
621 Returns a resultset or object list representing a subset of elements from the
622 resultset slice is called on. Indexes are from 0, i.e., to get the first
623 three records, call:
624
625   my ($one, $two, $three) = $rs->slice(0, 2);
626
627 =cut
628
629 sub slice {
630   my ($self, $min, $max) = @_;
631   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
632   $attrs->{offset} = $self->{attrs}{offset} || 0;
633   $attrs->{offset} += $min;
634   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
635   return $self->search(undef(), $attrs);
636   #my $slice = (ref $self)->new($self->result_source, $attrs);
637   #return (wantarray ? $slice->all : $slice);
638 }
639
640 =head2 next
641
642 =over 4
643
644 =item Arguments: none
645
646 =item Return Value: $result?
647
648 =back
649
650 Returns the next element in the resultset (C<undef> is there is none).
651
652 Can be used to efficiently iterate over records in the resultset:
653
654   my $rs = $schema->resultset('CD')->search;
655   while (my $cd = $rs->next) {
656     print $cd->title;
657   }
658
659 Note that you need to store the resultset object, and call C<next> on it. 
660 Calling C<< resultset('Table')->next >> repeatedly will always return the
661 first record from the resultset.
662
663 =cut
664
665 sub next {
666   my ($self) = @_;
667   if (my $cache = $self->get_cache) {
668     $self->{all_cache_position} ||= 0;
669     return $cache->[$self->{all_cache_position}++];
670   }
671   if ($self->{attrs}{cache}) {
672     $self->{all_cache_position} = 1;
673     return ($self->all)[0];
674   }
675   my @row = (exists $self->{stashed_row} ?
676                @{delete $self->{stashed_row}} :
677                $self->cursor->next
678   );
679   return unless (@row);
680   return $self->_construct_object(@row);
681 }
682
683 sub _resolve {
684   my $self = shift;
685
686   return if(exists $self->{_attrs}); #return if _resolve has already been called
687
688   my $attrs = $self->{attrs};    
689   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
690
691   # XXX - lose storable dclone
692   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
693   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
694   $attrs->{record_filter} = $record_filter if ($record_filter);
695   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
696
697   my $alias = $attrs->{alias};
698  
699   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
700   delete $attrs->{as} if $attrs->{columns};
701   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
702   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
703   $attrs->{select} = [
704                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
705                       ] if $attrs->{columns};
706   $attrs->{as} ||= [
707                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
708                     ];
709   if (my $include = delete $attrs->{include_columns}) {
710       push(@{$attrs->{select}}, @$include);
711       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
712   }
713
714   $attrs->{from} ||= [ { $alias => $source->from } ];
715   $attrs->{seen_join} ||= {};
716   my %seen;
717   if (my $join = delete $attrs->{join}) {
718     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
719       if (ref $j eq 'HASH') {
720         $seen{$_} = 1 foreach keys %$j;
721       } else {
722         $seen{$j} = 1;
723       }
724     }
725     
726     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
727   }
728   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
729   $attrs->{order_by} = [ $attrs->{order_by} ] if
730       $attrs->{order_by} and !ref($attrs->{order_by});
731   $attrs->{order_by} ||= [];
732
733   if(my $seladds = delete($attrs->{'+select'})) {
734     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
735     $attrs->{select} = [
736                         @{ $attrs->{select} },
737                         map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
738                         ];
739   }
740   if(my $asadds = delete($attrs->{'+as'})) {
741     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
742     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
743   }
744   my $collapse = $attrs->{collapse} || {};
745   if (my $prefetch = delete $attrs->{prefetch}) {
746       my @pre_order;
747       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
748           if ( ref $p eq 'HASH' ) {
749               foreach my $key (keys %$p) {
750                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
751                       unless $seen{$key};
752               }
753           } else {
754               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
755                   unless $seen{$p};
756           }
757
758                 # we're about to resolve_join on the current class, so we need to bring
759                 # the joins (which are from the original class) to the right level
760                 # XXX the below alg is ridiculous
761                 if ($attrs->{_live_join_stack}) {
762                         STACK: foreach (@{$attrs->{_live_join_stack}}) {
763                                 if (ref $p eq 'HASH') {
764                                         if (exists $p->{$_}) {
765                                                 $p = $p->{$_};
766                                         } else {
767                                                 $p = undef;
768                                                 last STACK;
769                                         }
770                                 } elsif (ref $p eq 'ARRAY') {
771                                         foreach my $pe (@{$p}) {
772                                                 if ($pe eq $_) {
773                                                         $p = undef;
774                                                         last STACK;
775                                                 }
776                                                 next unless(ref $pe eq 'HASH');
777                                                 next unless(exists $pe->{$_});
778                                                 $p = $pe->{$_};
779                                                 next STACK;
780                                         }                                               
781                                         $p = undef;
782                                         last STACK;
783                                 } else {
784                                         $p = undef;
785                                         last STACK;
786                                 }
787                         }
788                 }
789                 
790                 if ($p) {
791                         my @prefetch = $self->result_source->resolve_prefetch(
792                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
793                 
794                         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
795                         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
796                 }
797       }
798       push(@{$attrs->{order_by}}, @pre_order);
799   }
800   $attrs->{collapse} = $collapse;
801   $self->{_attrs} = $attrs;
802 }
803
804 sub _merge_attr {
805   my ($self, $a, $b, $is_prefetch) = @_;
806     
807   return $b unless $a;
808   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
809     foreach my $key (keys %{$b}) {
810       if (exists $a->{$key}) {
811         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
812       } else {
813         $a->{$key} = $b->{$key};
814       }
815     }
816     return $a;
817   } else {
818     $a = [$a] unless (ref $a eq 'ARRAY');
819     $b = [$b] unless (ref $b eq 'ARRAY');
820     
821     my $hash = {};
822     my $array = [];      
823     foreach ($a, $b) {
824       foreach my $element (@{$_}) {
825         if (ref $element eq 'HASH') {
826           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
827         } elsif (ref $element eq 'ARRAY') {
828           $array = [@{$array}, @{$element}];
829         } else {        
830           if (($b == $_) && $is_prefetch) {
831             $self->_merge_array($array, $element, $is_prefetch);
832           } else {
833             push(@{$array}, $element);
834           }
835         }
836       }
837     }
838
839                 my $final_array = [];
840                 foreach my $element (@{$array}) {
841                         push(@{$final_array}, $element) unless (exists $hash->{$element});
842                 }
843                 $array = $final_array;
844
845     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
846       return [$hash, @{$array}];
847     } else {    
848       return (keys %{$hash}) ? $hash : $array;
849     }
850   }
851 }
852
853 sub _merge_array {
854   my ($self, $a, $b) = @_;
855   
856   $b = [$b] unless (ref $b eq 'ARRAY');
857   # add elements from @{$b} to @{$a} which aren't already in @{$a}
858   foreach my $b_element (@{$b}) {
859     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
860   }
861 }
862
863 sub _construct_object {
864   my ($self, @row) = @_;
865   my @as = @{ $self->{_attrs}{as} };
866   
867   my $info = $self->_collapse_result(\@as, \@row);
868   my $new = $self->result_class->inflate_result($self->result_source, @$info);
869   $new = $self->{_attrs}{record_filter}->($new)
870     if exists $self->{_attrs}{record_filter};
871   return $new;
872 }
873
874 sub _collapse_result {
875   my ($self, $as, $row, $prefix) = @_;
876
877   my $live_join = $self->{attrs}->{_live_join} ||="";
878   my %const;
879
880   my @copy = @$row;
881   foreach my $this_as (@$as) {
882     my $val = shift @copy;
883     if (defined $prefix) {
884       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
885         my $remain = $1;
886         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
887         $const{$1||''}{$2} = $val;
888       }
889     } else {
890       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
891       $const{$1||''}{$2} = $val;
892     }
893   }
894
895   my $info = [ {}, {} ];
896   foreach my $key (keys %const) {
897     if (length $key && $key ne $live_join) {
898       my $target = $info;
899       my @parts = split(/\./, $key);
900       foreach my $p (@parts) {
901         $target = $target->[1]->{$p} ||= [];
902       }
903       $target->[0] = $const{$key};
904     } else {
905       $info->[0] = $const{$key};
906     }
907   }
908   my @collapse;
909
910   if (defined $prefix) {
911     @collapse = map {
912         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
913     } keys %{$self->{_attrs}->{collapse}}
914   } else {
915     @collapse = keys %{$self->{_attrs}->{collapse}};
916   };
917
918   if (@collapse) {
919     my ($c) = sort { length $a <=> length $b } @collapse;
920     my $target = $info;
921     foreach my $p (split(/\./, $c)) {
922       $target = $target->[1]->{$p} ||= [];
923     }
924     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
925     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
926     my $tree = $self->_collapse_result($as, $row, $c_prefix);
927     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
928     my (@final, @raw);
929
930     while ( !(grep {
931                 !defined($tree->[0]->{$_}) ||
932                 $co_check{$_} ne $tree->[0]->{$_}
933               } @co_key) ) {
934       push(@final, $tree);
935       last unless (@raw = $self->cursor->next);
936       $row = $self->{stashed_row} = \@raw;
937       $tree = $self->_collapse_result($as, $row, $c_prefix);
938     }
939     @$target = (@final ? @final : [ {}, {} ]); 
940       # single empty result to indicate an empty prefetched has_many
941   }
942
943   #print "final info: " . Dumper($info);
944   return $info;
945 }
946
947 =head2 result_source
948
949 =over 4
950
951 =item Arguments: $result_source?
952
953 =item Return Value: $result_source
954
955 =back
956
957 An accessor for the primary ResultSource object from which this ResultSet
958 is derived.
959
960 =cut
961
962
963 =head2 count
964
965 =over 4
966
967 =item Arguments: $cond, \%attrs??
968
969 =item Return Value: $count
970
971 =back
972
973 Performs an SQL C<COUNT> with the same query as the resultset was built
974 with to find the number of elements. If passed arguments, does a search
975 on the resultset and counts the results of that.
976
977 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
978 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
979 not support C<DISTINCT> with multiple columns. If you are using such a
980 database, you should only use columns from the main table in your C<group_by>
981 clause.
982
983 =cut
984
985 sub count {
986   my $self = shift;
987   return $self->search(@_)->count if @_ and defined $_[0];
988   return scalar @{ $self->get_cache } if $self->get_cache;
989   my $count = $self->_count;
990   return 0 unless $count;
991
992   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
993   $count = $self->{attrs}{rows} if
994     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
995   return $count;
996 }
997
998 sub _count { # Separated out so pager can get the full count
999   my $self = shift;
1000   my $select = { count => '*' };
1001   
1002   $self->_resolve;
1003   my $attrs = { %{ $self->{_attrs} } };
1004   if (my $group_by = delete $attrs->{group_by}) {
1005     delete $attrs->{having};
1006     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1007     # todo: try CONCAT for multi-column pk
1008     my @pk = $self->result_source->primary_columns;
1009     if (@pk == 1) {
1010       foreach my $column (@distinct) {
1011         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
1012           @distinct = ($column);
1013           last;
1014         }
1015       }
1016     }
1017
1018     $select = { count => { distinct => \@distinct } };
1019   }
1020
1021   $attrs->{select} = $select;
1022   $attrs->{as} = [qw/count/];
1023
1024   # offset, order by and page are not needed to count. record_filter is cdbi
1025   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1026         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1027         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
1028
1029   my ($count) = $tmp_rs->cursor->next;
1030   return $count;
1031 }
1032
1033 =head2 count_literal
1034
1035 =over 4
1036
1037 =item Arguments: $sql_fragment, @bind_values
1038
1039 =item Return Value: $count
1040
1041 =back
1042
1043 Counts the results in a literal query. Equivalent to calling L</search_literal>
1044 with the passed arguments, then L</count>.
1045
1046 =cut
1047
1048 sub count_literal { shift->search_literal(@_)->count; }
1049
1050 =head2 all
1051
1052 =over 4
1053
1054 =item Arguments: none
1055
1056 =item Return Value: @objects
1057
1058 =back
1059
1060 Returns all elements in the resultset. Called implicitly if the resultset
1061 is returned in list context.
1062
1063 =cut
1064
1065 sub all {
1066   my ($self) = @_;
1067   return @{ $self->get_cache } if $self->get_cache;
1068
1069   my @obj;
1070
1071   # TODO: don't call resolve here
1072   $self->_resolve;
1073   if (keys %{$self->{_attrs}->{collapse}}) {
1074 #  if ($self->{attrs}->{prefetch}) {
1075       # Using $self->cursor->all is really just an optimisation.
1076       # If we're collapsing has_many prefetches it probably makes
1077       # very little difference, and this is cleaner than hacking
1078       # _construct_object to survive the approach
1079     my @row = $self->cursor->next;
1080     while (@row) {
1081       push(@obj, $self->_construct_object(@row));
1082       @row = (exists $self->{stashed_row}
1083                ? @{delete $self->{stashed_row}}
1084                : $self->cursor->next);
1085     }
1086   } else {
1087     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1088   }
1089
1090   $self->set_cache(\@obj) if $self->{attrs}{cache};
1091   return @obj;
1092 }
1093
1094 =head2 reset
1095
1096 =over 4
1097
1098 =item Arguments: none
1099
1100 =item Return Value: $self
1101
1102 =back
1103
1104 Resets the resultset's cursor, so you can iterate through the elements again.
1105
1106 =cut
1107
1108 sub reset {
1109   my ($self) = @_;
1110   delete $self->{_attrs} if (exists $self->{_attrs});
1111
1112   $self->{all_cache_position} = 0;
1113   $self->cursor->reset;
1114   return $self;
1115 }
1116
1117 =head2 first
1118
1119 =over 4
1120
1121 =item Arguments: none
1122
1123 =item Return Value: $object?
1124
1125 =back
1126
1127 Resets the resultset and returns an object for the first result (if the
1128 resultset returns anything).
1129
1130 =cut
1131
1132 sub first {
1133   return $_[0]->reset->next;
1134 }
1135
1136 # _cond_for_update_delete
1137 #
1138 # update/delete require the condition to be modified to handle
1139 # the differing SQL syntax available.  This transforms the $self->{cond}
1140 # appropriately, returning the new condition.
1141
1142 sub _cond_for_update_delete {
1143   my ($self) = @_;
1144   my $cond = {};
1145
1146   if (!ref($self->{cond})) {
1147     # No-op. No condition, we're updating/deleting everything
1148   }
1149   elsif (ref $self->{cond} eq 'ARRAY') {
1150     $cond = [
1151       map {
1152         my %hash;
1153         foreach my $key (keys %{$_}) {
1154           $key =~ /([^.]+)$/;
1155           $hash{$1} = $_->{$key};
1156         }
1157         \%hash;
1158       } @{$self->{cond}}
1159     ];
1160   }
1161   elsif (ref $self->{cond} eq 'HASH') {
1162     if ((keys %{$self->{cond}})[0] eq '-and') {
1163       $cond->{-and} = [];
1164
1165       my @cond = @{$self->{cond}{-and}};
1166       for (my $i = 0; $i <= @cond - 1; $i++) {
1167         my $entry = $cond[$i];
1168
1169         my %hash;
1170         if (ref $entry eq 'HASH') {
1171           foreach my $key (keys %{$entry}) {
1172             $key =~ /([^.]+)$/;
1173             $hash{$1} = $entry->{$key};
1174           }
1175         }
1176         else {
1177           $entry =~ /([^.]+)$/;
1178           $hash{$1} = $cond[++$i];
1179         }
1180
1181         push @{$cond->{-and}}, \%hash;
1182       }
1183     }
1184     else {
1185       foreach my $key (keys %{$self->{cond}}) {
1186         $key =~ /([^.]+)$/;
1187         $cond->{$1} = $self->{cond}{$key};
1188       }
1189     }
1190   }
1191   else {
1192     $self->throw_exception(
1193       "Can't update/delete on resultset with condition unless hash or array"
1194     );
1195   }
1196
1197   return $cond;
1198 }
1199
1200
1201 =head2 update
1202
1203 =over 4
1204
1205 =item Arguments: \%values
1206
1207 =item Return Value: $storage_rv
1208
1209 =back
1210
1211 Sets the specified columns in the resultset to the supplied values in a
1212 single query. Return value will be true if the update succeeded or false
1213 if no records were updated; exact type of success value is storage-dependent.
1214
1215 =cut
1216
1217 sub update {
1218   my ($self, $values) = @_;
1219   $self->throw_exception("Values for update must be a hash")
1220     unless ref $values eq 'HASH';
1221
1222   my $cond = $self->_cond_for_update_delete;
1223
1224   return $self->result_source->storage->update(
1225     $self->result_source->from, $values, $cond
1226   );
1227 }
1228
1229 =head2 update_all
1230
1231 =over 4
1232
1233 =item Arguments: \%values
1234
1235 =item Return Value: 1
1236
1237 =back
1238
1239 Fetches all objects and updates them one at a time. Note that C<update_all>
1240 will run DBIC cascade triggers, while L</update> will not.
1241
1242 =cut
1243
1244 sub update_all {
1245   my ($self, $values) = @_;
1246   $self->throw_exception("Values for update must be a hash")
1247     unless ref $values eq 'HASH';
1248   foreach my $obj ($self->all) {
1249     $obj->set_columns($values)->update;
1250   }
1251   return 1;
1252 }
1253
1254 =head2 delete
1255
1256 =over 4
1257
1258 =item Arguments: none
1259
1260 =item Return Value: 1
1261
1262 =back
1263
1264 Deletes the contents of the resultset from its result source. Note that this
1265 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1266 to run.
1267
1268 =cut
1269
1270 sub delete {
1271   my ($self) = @_;
1272   my $del = {};
1273
1274   my $cond = $self->_cond_for_update_delete;
1275
1276   $self->result_source->storage->delete($self->result_source->from, $cond);
1277   return 1;
1278 }
1279
1280 =head2 delete_all
1281
1282 =over 4
1283
1284 =item Arguments: none
1285
1286 =item Return Value: 1
1287
1288 =back
1289
1290 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1291 will run DBIC cascade triggers, while L</delete> will not.
1292
1293 =cut
1294
1295 sub delete_all {
1296   my ($self) = @_;
1297   $_->delete for $self->all;
1298   return 1;
1299 }
1300
1301 =head2 pager
1302
1303 =over 4
1304
1305 =item Arguments: none
1306
1307 =item Return Value: $pager
1308
1309 =back
1310
1311 Return Value a L<Data::Page> object for the current resultset. Only makes
1312 sense for queries with a C<page> attribute.
1313
1314 =cut
1315
1316 sub pager {
1317   my ($self) = @_;
1318   my $attrs = $self->{attrs};
1319   $self->throw_exception("Can't create pager for non-paged rs")
1320     unless $self->{page};
1321   $attrs->{rows} ||= 10;
1322   return $self->{pager} ||= Data::Page->new(
1323     $self->_count, $attrs->{rows}, $self->{page});
1324 }
1325
1326 =head2 page
1327
1328 =over 4
1329
1330 =item Arguments: $page_number
1331
1332 =item Return Value: $rs
1333
1334 =back
1335
1336 Returns a resultset for the $page_number page of the resultset on which page
1337 is called, where each page contains a number of rows equal to the 'rows'
1338 attribute set on the resultset (10 by default).
1339
1340 =cut
1341
1342 sub page {
1343   my ($self, $page) = @_;
1344   my $attrs = { %{$self->{attrs}} };
1345   $attrs->{page} = $page;
1346   return (ref $self)->new($self->result_source, $attrs);
1347 }
1348
1349 =head2 new_result
1350
1351 =over 4
1352
1353 =item Arguments: \%vals
1354
1355 =item Return Value: $object
1356
1357 =back
1358
1359 Creates an object in the resultset's result class and returns it.
1360
1361 =cut
1362
1363 sub new_result {
1364   my ($self, $values) = @_;
1365   $self->throw_exception( "new_result needs a hash" )
1366     unless (ref $values eq 'HASH');
1367   $self->throw_exception(
1368     "Can't abstract implicit construct, condition not a hash"
1369   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1370   my %new = %$values;
1371   my $alias = $self->{attrs}{alias};
1372   foreach my $key (keys %{$self->{cond}||{}}) {
1373     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1374   }
1375   my $obj = $self->result_class->new(\%new);
1376   $obj->result_source($self->result_source) if $obj->can('result_source');
1377   return $obj;
1378 }
1379
1380 =head2 find_or_new
1381
1382 =over 4
1383
1384 =item Arguments: \%vals, \%attrs?
1385
1386 =item Return Value: $object
1387
1388 =back
1389
1390 Find an existing record from this resultset. If none exists, instantiate a new
1391 result object and return it. The object will not be saved into your storage
1392 until you call L<DBIx::Class::Row/insert> on it.
1393
1394 If you want objects to be saved immediately, use L</find_or_create> instead.
1395
1396 =cut
1397
1398 sub find_or_new {
1399   my $self     = shift;
1400   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1401   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1402   my $exists   = $self->find($hash, $attrs);
1403   return defined $exists ? $exists : $self->new_result($hash);
1404 }
1405
1406 =head2 create
1407
1408 =over 4
1409
1410 =item Arguments: \%vals
1411
1412 =item Return Value: $object
1413
1414 =back
1415
1416 Inserts a record into the resultset and returns the object representing it.
1417
1418 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1419
1420 =cut
1421
1422 sub create {
1423   my ($self, $attrs) = @_;
1424   $self->throw_exception( "create needs a hashref" )
1425     unless ref $attrs eq 'HASH';
1426   return $self->new_result($attrs)->insert;
1427 }
1428
1429 =head2 find_or_create
1430
1431 =over 4
1432
1433 =item Arguments: \%vals, \%attrs?
1434
1435 =item Return Value: $object
1436
1437 =back
1438
1439   $class->find_or_create({ key => $val, ... });
1440
1441 Tries to find a record based on its primary key or unique constraint; if none
1442 is found, creates one and returns that instead.
1443
1444   my $cd = $schema->resultset('CD')->find_or_create({
1445     cdid   => 5,
1446     artist => 'Massive Attack',
1447     title  => 'Mezzanine',
1448     year   => 2005,
1449   });
1450
1451 Also takes an optional C<key> attribute, to search by a specific key or unique
1452 constraint. For example:
1453
1454   my $cd = $schema->resultset('CD')->find_or_create(
1455     {
1456       artist => 'Massive Attack',
1457       title  => 'Mezzanine',
1458     },
1459     { key => 'cd_artist_title' }
1460   );
1461
1462 See also L</find> and L</update_or_create>. For information on how to declare
1463 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1464
1465 =cut
1466
1467 sub find_or_create {
1468   my $self     = shift;
1469   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1470   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1471   my $exists   = $self->find($hash, $attrs);
1472   return defined $exists ? $exists : $self->create($hash);
1473 }
1474
1475 =head2 update_or_create
1476
1477 =over 4
1478
1479 =item Arguments: \%col_values, { key => $unique_constraint }?
1480
1481 =item Return Value: $object
1482
1483 =back
1484
1485   $class->update_or_create({ col => $val, ... });
1486
1487 First, searches for an existing row matching one of the unique constraints
1488 (including the primary key) on the source of this resultset. If a row is
1489 found, updates it with the other given column values. Otherwise, creates a new
1490 row.
1491
1492 Takes an optional C<key> attribute to search on a specific unique constraint.
1493 For example:
1494
1495   # In your application
1496   my $cd = $schema->resultset('CD')->update_or_create(
1497     {
1498       artist => 'Massive Attack',
1499       title  => 'Mezzanine',
1500       year   => 1998,
1501     },
1502     { key => 'cd_artist_title' }
1503   );
1504
1505 If no C<key> is specified, it searches on all unique constraints defined on the
1506 source, including the primary key.
1507
1508 If the C<key> is specified as C<primary>, it searches only on the primary key.
1509
1510 See also L</find> and L</find_or_create>. For information on how to declare
1511 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1512
1513 =cut
1514
1515 sub update_or_create {
1516   my $self = shift;
1517   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1518   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1519
1520   my $row = $self->find($cond);
1521   if (defined $row) {
1522     $row->update($cond);
1523     return $row;
1524   }
1525
1526   return $self->create($cond);
1527 }
1528
1529 =head2 get_cache
1530
1531 =over 4
1532
1533 =item Arguments: none
1534
1535 =item Return Value: \@cache_objects?
1536
1537 =back
1538
1539 Gets the contents of the cache for the resultset, if the cache is set.
1540
1541 =cut
1542
1543 sub get_cache {
1544   shift->{all_cache};
1545 }
1546
1547 =head2 set_cache
1548
1549 =over 4
1550
1551 =item Arguments: \@cache_objects
1552
1553 =item Return Value: \@cache_objects
1554
1555 =back
1556
1557 Sets the contents of the cache for the resultset. Expects an arrayref
1558 of objects of the same class as those produced by the resultset. Note that
1559 if the cache is set the resultset will return the cached objects rather
1560 than re-querying the database even if the cache attr is not set.
1561
1562 =cut
1563
1564 sub set_cache {
1565   my ( $self, $data ) = @_;
1566   $self->throw_exception("set_cache requires an arrayref")
1567       if defined($data) && (ref $data ne 'ARRAY');
1568   $self->{all_cache} = $data;
1569 }
1570
1571 =head2 clear_cache
1572
1573 =over 4
1574
1575 =item Arguments: none
1576
1577 =item Return Value: []
1578
1579 =back
1580
1581 Clears the cache for the resultset.
1582
1583 =cut
1584
1585 sub clear_cache {
1586   shift->set_cache(undef);
1587 }
1588
1589 =head2 related_resultset
1590
1591 =over 4
1592
1593 =item Arguments: $relationship_name
1594
1595 =item Return Value: $resultset
1596
1597 =back
1598
1599 Returns a related resultset for the supplied relationship name.
1600
1601   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1602
1603 =cut
1604
1605 sub related_resultset {
1606   my ( $self, $rel ) = @_;
1607   
1608   $self->{related_resultsets} ||= {};
1609   return $self->{related_resultsets}{$rel} ||= do {
1610     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1611     my $rel_obj = $self->result_source->relationship_info($rel);
1612                 #print Dumper($self->result_source->_relationships);
1613     $self->throw_exception(
1614         "search_related: result source '" . $self->result_source->name .
1615         "' has no such relationship ${rel}")
1616         unless $rel_obj; #die Dumper $self->{attrs};
1617
1618                 my @live_join_stack = (exists $self->{attrs}->{_live_join_stack}) ?
1619                         @{$self->{attrs}->{_live_join_stack}}:
1620                         ();             
1621                 push(@live_join_stack, $rel);
1622                 
1623     my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1624            )->search( undef,
1625                       { select => undef,
1626                         as => undef,
1627                         _live_join => $rel, #the most recent
1628                         _live_join_stack => \@live_join_stack, #the trail of rels
1629                         _parent_attrs => $self->{attrs}}
1630                        );    
1631
1632     # keep reference of the original resultset
1633     $rs->{_parent_rs} = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->result_source;
1634     return $rs;
1635   };
1636 }
1637
1638 =head2 throw_exception
1639
1640 See L<DBIx::Class::Schema/throw_exception> for details.
1641
1642 =cut
1643
1644 sub throw_exception {
1645   my $self=shift;
1646   $self->result_source->schema->throw_exception(@_);
1647 }
1648
1649 # XXX: FIXME: Attributes docs need clearing up
1650
1651 =head1 ATTRIBUTES
1652
1653 The resultset takes various attributes that modify its behavior. Here's an
1654 overview of them:
1655
1656 =head2 order_by
1657
1658 =over 4
1659
1660 =item Value: ($order_by | \@order_by)
1661
1662 =back
1663
1664 Which column(s) to order the results by. This is currently passed
1665 through directly to SQL, so you can give e.g. C<year DESC> for a
1666 descending order on the column `year'.
1667
1668 Please note that if you have quoting enabled (see 
1669 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1670 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1671 so you will need to manually quote things as appropriate.)
1672
1673 =head2 columns
1674
1675 =over 4
1676
1677 =item Value: \@columns
1678
1679 =back
1680
1681 Shortcut to request a particular set of columns to be retrieved.  Adds
1682 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1683 from that, then auto-populates C<as> from C<select> as normal. (You may also
1684 use the C<cols> attribute, as in earlier versions of DBIC.)
1685
1686 =head2 include_columns
1687
1688 =over 4
1689
1690 =item Value: \@columns
1691
1692 =back
1693
1694 Shortcut to include additional columns in the returned results - for example
1695
1696   $schema->resultset('CD')->search(undef, {
1697     include_columns => ['artist.name'],
1698     join => ['artist']
1699   });
1700
1701 would return all CDs and include a 'name' column to the information
1702 passed to object inflation
1703
1704 =head2 select
1705
1706 =over 4
1707
1708 =item Value: \@select_columns
1709
1710 =back
1711
1712 Indicates which columns should be selected from the storage. You can use
1713 column names, or in the case of RDBMS back ends, function or stored procedure
1714 names:
1715
1716   $rs = $schema->resultset('Employee')->search(undef, {
1717     select => [
1718       'name',
1719       { count => 'employeeid' },
1720       { sum => 'salary' }
1721     ]
1722   });
1723
1724 When you use function/stored procedure names and do not supply an C<as>
1725 attribute, the column names returned are storage-dependent. E.g. MySQL would
1726 return a column named C<count(employeeid)> in the above example.
1727
1728 =head2 +select
1729
1730 =over 4
1731
1732 Indicates additional columns to be selected from storage.  Works the same as
1733 L<select> but adds columns to the selection.
1734
1735 =back
1736
1737 =head2 +as
1738
1739 =over 4
1740
1741 Indicates additional column names for those added via L<+select>.
1742
1743 =back
1744
1745 =head2 as
1746
1747 =over 4
1748
1749 =item Value: \@inflation_names
1750
1751 =back
1752
1753 Indicates column names for object inflation. This is used in conjunction with
1754 C<select>, usually when C<select> contains one or more function or stored
1755 procedure names:
1756
1757   $rs = $schema->resultset('Employee')->search(undef, {
1758     select => [
1759       'name',
1760       { count => 'employeeid' }
1761     ],
1762     as => ['name', 'employee_count'],
1763   });
1764
1765   my $employee = $rs->first(); # get the first Employee
1766
1767 If the object against which the search is performed already has an accessor
1768 matching a column name specified in C<as>, the value can be retrieved using
1769 the accessor as normal:
1770
1771   my $name = $employee->name();
1772
1773 If on the other hand an accessor does not exist in the object, you need to
1774 use C<get_column> instead:
1775
1776   my $employee_count = $employee->get_column('employee_count');
1777
1778 You can create your own accessors if required - see
1779 L<DBIx::Class::Manual::Cookbook> for details.
1780
1781 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1782 produced, it is used for internal access only. Thus attempting to use the accessor
1783 in an C<order_by> clause or similar will fail misrably.
1784
1785 =head2 join
1786
1787 =over 4
1788
1789 =item Value: ($rel_name | \@rel_names | \%rel_names)
1790
1791 =back
1792
1793 Contains a list of relationships that should be joined for this query.  For
1794 example:
1795
1796   # Get CDs by Nine Inch Nails
1797   my $rs = $schema->resultset('CD')->search(
1798     { 'artist.name' => 'Nine Inch Nails' },
1799     { join => 'artist' }
1800   );
1801
1802 Can also contain a hash reference to refer to the other relation's relations.
1803 For example:
1804
1805   package MyApp::Schema::Track;
1806   use base qw/DBIx::Class/;
1807   __PACKAGE__->table('track');
1808   __PACKAGE__->add_columns(qw/trackid cd position title/);
1809   __PACKAGE__->set_primary_key('trackid');
1810   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1811   1;
1812
1813   # In your application
1814   my $rs = $schema->resultset('Artist')->search(
1815     { 'track.title' => 'Teardrop' },
1816     {
1817       join     => { cd => 'track' },
1818       order_by => 'artist.name',
1819     }
1820   );
1821
1822 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1823 similarly for a third time). For e.g.
1824
1825   my $rs = $schema->resultset('Artist')->search({
1826     'cds.title'   => 'Down to Earth',
1827     'cds_2.title' => 'Popular',
1828   }, {
1829     join => [ qw/cds cds/ ],
1830   });
1831
1832 will return a set of all artists that have both a cd with title 'Down
1833 to Earth' and a cd with title 'Popular'.
1834
1835 If you want to fetch related objects from other tables as well, see C<prefetch>
1836 below.
1837
1838 =head2 prefetch
1839
1840 =over 4
1841
1842 =item Value: ($rel_name | \@rel_names | \%rel_names)
1843
1844 =back
1845
1846 Contains one or more relationships that should be fetched along with the main
1847 query (when they are accessed afterwards they will have already been
1848 "prefetched").  This is useful for when you know you will need the related
1849 objects, because it saves at least one query:
1850
1851   my $rs = $schema->resultset('Tag')->search(
1852     undef,
1853     {
1854       prefetch => {
1855         cd => 'artist'
1856       }
1857     }
1858   );
1859
1860 The initial search results in SQL like the following:
1861
1862   SELECT tag.*, cd.*, artist.* FROM tag
1863   JOIN cd ON tag.cd = cd.cdid
1864   JOIN artist ON cd.artist = artist.artistid
1865
1866 L<DBIx::Class> has no need to go back to the database when we access the
1867 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1868 case.
1869
1870 Simple prefetches will be joined automatically, so there is no need
1871 for a C<join> attribute in the above search. If you're prefetching to
1872 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1873 specify the join as well.
1874
1875 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1876 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1877 with an accessor type of 'single' or 'filter').
1878
1879 =head2 page
1880
1881 =over 4
1882
1883 =item Value: $page
1884
1885 =back
1886
1887 Makes the resultset paged and specifies the page to retrieve. Effectively
1888 identical to creating a non-pages resultset and then calling ->page($page)
1889 on it. 
1890
1891 If L<rows> attribute is not specified it defualts to 10 rows per page.
1892
1893 =head2 rows
1894
1895 =over 4
1896
1897 =item Value: $rows
1898
1899 =back
1900
1901 Specifes the maximum number of rows for direct retrieval or the number of
1902 rows per page if the page attribute or method is used.
1903
1904 =head2 offset
1905
1906 =over 4
1907
1908 =item Value: $offset
1909
1910 =back
1911
1912 Specifies the (zero-based) row number for the  first row to be returned, or the
1913 of the first row of the first page if paging is used.
1914
1915 =head2 group_by
1916
1917 =over 4
1918
1919 =item Value: \@columns
1920
1921 =back
1922
1923 A arrayref of columns to group by. Can include columns of joined tables.
1924
1925   group_by => [qw/ column1 column2 ... /]
1926
1927 =head2 having
1928
1929 =over 4
1930
1931 =item Value: $condition
1932
1933 =back
1934
1935 HAVING is a select statement attribute that is applied between GROUP BY and
1936 ORDER BY. It is applied to the after the grouping calculations have been
1937 done. 
1938
1939   having => { 'count(employee)' => { '>=', 100 } }
1940
1941 =head2 distinct
1942
1943 =over 4
1944
1945 =item Value: (0 | 1)
1946
1947 =back
1948
1949 Set to 1 to group by all columns.
1950
1951 =head2 cache
1952
1953 Set to 1 to cache search results. This prevents extra SQL queries if you
1954 revisit rows in your ResultSet:
1955
1956   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1957   
1958   while( my $artist = $resultset->next ) {
1959     ... do stuff ...
1960   }
1961
1962   $rs->first; # without cache, this would issue a query
1963
1964 By default, searches are not cached.
1965
1966 For more examples of using these attributes, see
1967 L<DBIx::Class::Manual::Cookbook>.
1968
1969 =head2 from
1970
1971 =over 4
1972
1973 =item Value: \@from_clause
1974
1975 =back
1976
1977 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1978 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1979 clauses.
1980
1981 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1982
1983 C<join> will usually do what you need and it is strongly recommended that you
1984 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1985 And we really do mean "cannot", not just tried and failed. Attempting to use
1986 this because you're having problems with C<join> is like trying to use x86
1987 ASM because you've got a syntax error in your C. Trust us on this.
1988
1989 Now, if you're still really, really sure you need to use this (and if you're
1990 not 100% sure, ask the mailing list first), here's an explanation of how this
1991 works.
1992
1993 The syntax is as follows -
1994
1995   [
1996     { <alias1> => <table1> },
1997     [
1998       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1999       [], # nested JOIN (optional)
2000       { <table1.column1> => <table2.column2>, ... (more conditions) },
2001     ],
2002     # More of the above [ ] may follow for additional joins
2003   ]
2004
2005   <table1> <alias1>
2006   JOIN
2007     <table2> <alias2>
2008     [JOIN ...]
2009   ON <table1.column1> = <table2.column2>
2010   <more joins may follow>
2011
2012 An easy way to follow the examples below is to remember the following:
2013
2014     Anything inside "[]" is a JOIN
2015     Anything inside "{}" is a condition for the enclosing JOIN
2016
2017 The following examples utilize a "person" table in a family tree application.
2018 In order to express parent->child relationships, this table is self-joined:
2019
2020     # Person->belongs_to('father' => 'Person');
2021     # Person->belongs_to('mother' => 'Person');
2022
2023 C<from> can be used to nest joins. Here we return all children with a father,
2024 then search against all mothers of those children:
2025
2026   $rs = $schema->resultset('Person')->search(
2027       undef,
2028       {
2029           alias => 'mother', # alias columns in accordance with "from"
2030           from => [
2031               { mother => 'person' },
2032               [
2033                   [
2034                       { child => 'person' },
2035                       [
2036                           { father => 'person' },
2037                           { 'father.person_id' => 'child.father_id' }
2038                       ]
2039                   ],
2040                   { 'mother.person_id' => 'child.mother_id' }
2041               ],
2042           ]
2043       },
2044   );
2045
2046   # Equivalent SQL:
2047   # SELECT mother.* FROM person mother
2048   # JOIN (
2049   #   person child
2050   #   JOIN person father
2051   #   ON ( father.person_id = child.father_id )
2052   # )
2053   # ON ( mother.person_id = child.mother_id )
2054
2055 The type of any join can be controlled manually. To search against only people
2056 with a father in the person table, we could explicitly use C<INNER JOIN>:
2057
2058     $rs = $schema->resultset('Person')->search(
2059         undef,
2060         {
2061             alias => 'child', # alias columns in accordance with "from"
2062             from => [
2063                 { child => 'person' },
2064                 [
2065                     { father => 'person', -join_type => 'inner' },
2066                     { 'father.id' => 'child.father_id' }
2067                 ],
2068             ]
2069         },
2070     );
2071
2072     # Equivalent SQL:
2073     # SELECT child.* FROM person child
2074     # INNER JOIN person father ON child.father_id = father.id
2075
2076 =cut
2077
2078 1;