formatting fixes for ResultSet.pm
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14 use Data::Dumper;
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $attrs = {};
164   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
165   my $our_attrs = ($attrs->{_parent_attrs})
166     ? { %{$attrs->{_parent_attrs}} }
167       : { %{$self->{attrs}} };
168   my $having = delete $our_attrs->{having};
169
170   # XXX this is getting messy
171   if ($attrs->{_live_join_stack}) {
172     my $live_join = $attrs->{_live_join_stack};
173     foreach (reverse @{$live_join}) {
174       $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
175     }
176   }
177
178   # merge new attrs into old
179   foreach my $key (qw/join prefetch/) {
180     next unless (exists $attrs->{$key});
181     if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
182       my $live_join = $attrs->{_live_join_stack} ||
183                       $our_attrs->{_live_join_stack};
184       foreach (reverse @{$live_join}) {
185         $attrs->{$key} = { $_ => $attrs->{$key} };
186       }
187     }
188
189     if (exists $our_attrs->{$key}) {
190       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
191     } else {
192       $our_attrs->{$key} = $attrs->{$key};
193     }
194     delete $attrs->{$key};
195   }
196
197   $our_attrs->{join} = $self->_merge_attr(
198     $our_attrs->{join}, $attrs->{_live_join_h}, 1
199   ) if ($attrs->{_live_join_h});
200
201   if (defined $our_attrs->{prefetch}) {
202     $our_attrs->{join} = $self->_merge_attr(
203       $our_attrs->{join}, $our_attrs->{prefetch}, 1
204     );
205   }
206
207   my $new_attrs = { %{$our_attrs}, %{$attrs} };
208   my $where = (@_
209     ? (
210         (@_ == 1 || ref $_[0] eq "HASH")
211           ? shift
212           : (
213               (@_ % 2)
214                 ? $self->throw_exception("Odd number of arguments to search")
215                 : {@_}
216              )
217        )
218     : undef()
219   );
220
221   if (defined $where) {
222     $new_attrs->{where} = (
223       defined $new_attrs->{where}
224         ? { '-and' => [
225               map {
226                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
227               } $where, $new_attrs->{where}
228             ]
229           }
230         : $where);
231   }
232
233   if (defined $having) {
234     $new_attrs->{having} = (
235       defined $new_attrs->{having}
236         ? { '-and' => [
237               map {
238                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
239               } $having, $new_attrs->{having}
240             ]
241           }
242         : $having);
243   }
244
245   my $rs = (ref $self)->new($self->result_source, $new_attrs);
246   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
247        #XXX - hack to pass through parent of related resultsets
248
249   unless (@_) {                 # no search, effectively just a clone
250     my $rows = $self->get_cache;
251     if ($rows) {
252       $rs->set_cache($rows);
253     }
254   }
255   
256   return $rs;
257 }
258
259 =head2 search_literal
260
261 =over 4
262
263 =item Arguments: $sql_fragment, @bind_values
264
265 =item Return Value: $resultset (scalar context), @row_objs (list context)
266
267 =back
268
269   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
270   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
271
272 Pass a literal chunk of SQL to be added to the conditional part of the
273 resultset query.
274
275 =cut
276
277 sub search_literal {
278   my ($self, $cond, @vals) = @_;
279   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
280   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
281   return $self->search(\$cond, $attrs);
282 }
283
284 =head2 find
285
286 =over 4
287
288 =item Arguments: @values | \%cols, \%attrs?
289
290 =item Return Value: $row_object
291
292 =back
293
294 Finds a row based on its primary key or unique constraint. For example, to find
295 a row by its primary key:
296
297   my $cd = $schema->resultset('CD')->find(5);
298
299 You can also find a row by a specific unique constraint using the C<key>
300 attribute. For example:
301
302   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
303     key => 'cd_artist_title'
304   });
305
306 Additionally, you can specify the columns explicitly by name:
307
308   my $cd = $schema->resultset('CD')->find(
309     {
310       artist => 'Massive Attack',
311       title  => 'Mezzanine',
312     },
313     { key => 'cd_artist_title' }
314   );
315
316 If the C<key> is specified as C<primary>, it searches only on the primary key.
317
318 If no C<key> is specified, it searches on all unique constraints defined on the
319 source, including the primary key.
320
321 See also L</find_or_create> and L</update_or_create>. For information on how to
322 declare unique constraints, see
323 L<DBIx::Class::ResultSource/add_unique_constraint>.
324
325 =cut
326
327 sub find {
328   my $self = shift;
329   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
330
331   # Default to the primary key, but allow a specific key
332   my @cols = exists $attrs->{key}
333     ? $self->result_source->unique_constraint_columns($attrs->{key})
334     : $self->result_source->primary_columns;
335   $self->throw_exception(
336     "Can't find unless a primary key or unique constraint is defined"
337   ) unless @cols;
338
339   # Parse out a hashref from input
340   my $input_query;
341   if (ref $_[0] eq 'HASH') {
342     $input_query = { %{$_[0]} };
343   }
344   elsif (@_ == @cols) {
345     $input_query = {};
346     @{$input_query}{@cols} = @_;
347   }
348   else {
349     # Compatibility: Allow e.g. find(id => $value)
350     carp "Find by key => value deprecated; please use a hashref instead";
351     $input_query = {@_};
352   }
353
354   my @unique_queries = $self->_unique_queries($input_query, $attrs);
355
356   # Handle cases where the ResultSet defines the query, or where the user is
357   # abusing find
358   my $query = @unique_queries ? \@unique_queries : $input_query;
359
360   # Run the query
361   if (keys %$attrs) {
362     my $rs = $self->search($query, $attrs);
363     $rs->_resolve;
364     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
365   }
366   else {
367     $self->_resolve;  
368     return (keys %{$self->{_attrs}->{collapse}})
369       ? $self->search($query)->next
370       : $self->single($query);
371   }
372 }
373
374 # _unique_queries
375 #
376 # Build a list of queries which satisfy unique constraints.
377
378 sub _unique_queries {
379   my ($self, $query, $attrs) = @_;
380
381   my @constraint_names = exists $attrs->{key}
382     ? ($attrs->{key})
383     : $self->result_source->unique_constraint_names;
384
385   my @unique_queries;
386   foreach my $name (@constraint_names) {
387     my @unique_cols = $self->result_source->unique_constraint_columns($name);
388     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
389
390     next unless scalar keys %$unique_query;
391
392     # Add the ResultSet's alias
393     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
394       my $alias = ($self->{attrs}->{_live_join})
395         ? $self->{attrs}->{_live_join}
396         : $self->{attrs}->{alias};
397       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
398     }
399
400     push @unique_queries, $unique_query;
401   }
402
403   return @unique_queries;
404 }
405
406 # _build_unique_query
407 #
408 # Constrain the specified query hash based on the specified column names.
409
410 sub _build_unique_query {
411   my ($self, $query, $unique_cols) = @_;
412
413   my %unique_query =
414     map  { $_ => $query->{$_} }
415     grep { exists $query->{$_} }
416     @$unique_cols;
417
418   return \%unique_query;
419 }
420
421 =head2 search_related
422
423 =over 4
424
425 =item Arguments: $cond, \%attrs?
426
427 =item Return Value: $new_resultset
428
429 =back
430
431   $new_rs = $cd_rs->search_related('artist', {
432     name => 'Emo-R-Us',
433   });
434
435 Searches the specified relationship, optionally specifying a condition and
436 attributes for matching records. See L</ATTRIBUTES> for more information.
437
438 =cut
439
440 sub search_related {
441   return shift->related_resultset(shift)->search(@_);
442 }
443
444 =head2 cursor
445
446 =over 4
447
448 =item Arguments: none
449
450 =item Return Value: $cursor
451
452 =back
453
454 Returns a storage-driven cursor to the given resultset. See
455 L<DBIx::Class::Cursor> for more information.
456
457 =cut
458
459 sub cursor {
460   my ($self) = @_;
461
462   $self->_resolve;
463   my $attrs = { %{$self->{_attrs}} };
464   return $self->{cursor}
465     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
466           $attrs->{where},$attrs);
467 }
468
469 =head2 single
470
471 =over 4
472
473 =item Arguments: $cond?
474
475 =item Return Value: $row_object?
476
477 =back
478
479   my $cd = $schema->resultset('CD')->single({ year => 2001 });
480
481 Inflates the first result without creating a cursor if the resultset has
482 any records in it; if not returns nothing. Used by L</find> as an optimisation.
483
484 Can optionally take an additional condition *only* - this is a fast-code-path
485 method; if you need to add extra joins or similar call ->search and then
486 ->single without a condition on the $rs returned from that.
487
488 =cut
489
490 sub single {
491   my ($self, $where) = @_;
492   $self->_resolve;
493   my $attrs = { %{$self->{_attrs}} };
494   if ($where) {
495     if (defined $attrs->{where}) {
496       $attrs->{where} = {
497         '-and' =>
498             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
499                $where, delete $attrs->{where} ]
500       };
501     } else {
502       $attrs->{where} = $where;
503     }
504   }
505
506   unless ($self->_is_unique_query($attrs->{where})) {
507     carp "Query not guaranteed to return a single row"
508       . "; please declare your unique constraints or use search instead";
509   }
510
511   my @data = $self->result_source->storage->select_single(
512     $attrs->{from}, $attrs->{select},
513     $attrs->{where},$attrs
514   );
515
516   return (@data ? $self->_construct_object(@data) : ());
517 }
518
519 # _is_unique_query
520 #
521 # Try to determine if the specified query is guaranteed to be unique, based on
522 # the declared unique constraints.
523
524 sub _is_unique_query {
525   my ($self, $query) = @_;
526
527   my $collapsed = $self->_collapse_query($query);
528   my $alias = ($self->{attrs}->{_live_join})
529     ? $self->{attrs}->{_live_join}
530     : $self->{attrs}->{alias};
531
532   foreach my $name ($self->result_source->unique_constraint_names) {
533     my @unique_cols = map {
534       "$alias.$_"
535     } $self->result_source->unique_constraint_columns($name);
536
537     # Count the values for each unique column
538     my %seen = map { $_ => 0 } @unique_cols;
539
540     foreach my $key (keys %$collapsed) {
541       my $aliased = $key;
542       $aliased = "$alias.$key" unless $key =~ /\./;
543
544       next unless exists $seen{$aliased};  # Additional constraints are okay
545       $seen{$aliased} = scalar @{ $collapsed->{$key} };
546     }
547
548     # If we get 0 or more than 1 value for a column, it's not necessarily unique
549     return 1 unless grep { $_ != 1 } values %seen;
550   }
551
552   return 0;
553 }
554
555 # _collapse_query
556 #
557 # Recursively collapse the query, accumulating values for each column.
558
559 sub _collapse_query {
560   my ($self, $query, $collapsed) = @_;
561
562   $collapsed ||= {};
563
564   if (ref $query eq 'ARRAY') {
565     foreach my $subquery (@$query) {
566       next unless ref $subquery;  # -or
567 #      warn "ARRAY: " . Dumper $subquery;
568       $collapsed = $self->_collapse_query($subquery, $collapsed);
569     }
570   }
571   elsif (ref $query eq 'HASH') {
572     if (keys %$query and (keys %$query)[0] eq '-and') {
573       foreach my $subquery (@{$query->{-and}}) {
574 #        warn "HASH: " . Dumper $subquery;
575         $collapsed = $self->_collapse_query($subquery, $collapsed);
576       }
577     }
578     else {
579 #      warn "LEAF: " . Dumper $query;
580       foreach my $key (keys %$query) {
581         push @{$collapsed->{$key}}, $query->{$key};
582       }
583     }
584   }
585
586   return $collapsed;
587 }
588
589 =head2 get_column
590
591 =over 4
592
593 =item Arguments: $cond?
594
595 =item Return Value: $resultsetcolumn
596
597 =back
598
599   my $max_length = $rs->get_column('length')->max;
600
601 Returns a ResultSetColumn instance for $column based on $self
602
603 =cut
604
605 sub get_column {
606   my ($self, $column) = @_;
607   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
608   return $new;
609 }
610
611 =head2 search_like
612
613 =over 4
614
615 =item Arguments: $cond, \%attrs?
616
617 =item Return Value: $resultset (scalar context), @row_objs (list context)
618
619 =back
620
621   # WHERE title LIKE '%blue%'
622   $cd_rs = $rs->search_like({ title => '%blue%'});
623
624 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
625 that this is simply a convenience method. You most likely want to use
626 L</search> with specific operators.
627
628 For more information, see L<DBIx::Class::Manual::Cookbook>.
629
630 =cut
631
632 sub search_like {
633   my $class = shift;
634   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
635   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
636   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
637   return $class->search($query, { %$attrs });
638 }
639
640 =head2 slice
641
642 =over 4
643
644 =item Arguments: $first, $last
645
646 =item Return Value: $resultset (scalar context), @row_objs (list context)
647
648 =back
649
650 Returns a resultset or object list representing a subset of elements from the
651 resultset slice is called on. Indexes are from 0, i.e., to get the first
652 three records, call:
653
654   my ($one, $two, $three) = $rs->slice(0, 2);
655
656 =cut
657
658 sub slice {
659   my ($self, $min, $max) = @_;
660   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
661   $attrs->{offset} = $self->{attrs}{offset} || 0;
662   $attrs->{offset} += $min;
663   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
664   return $self->search(undef(), $attrs);
665   #my $slice = (ref $self)->new($self->result_source, $attrs);
666   #return (wantarray ? $slice->all : $slice);
667 }
668
669 =head2 next
670
671 =over 4
672
673 =item Arguments: none
674
675 =item Return Value: $result?
676
677 =back
678
679 Returns the next element in the resultset (C<undef> is there is none).
680
681 Can be used to efficiently iterate over records in the resultset:
682
683   my $rs = $schema->resultset('CD')->search;
684   while (my $cd = $rs->next) {
685     print $cd->title;
686   }
687
688 Note that you need to store the resultset object, and call C<next> on it. 
689 Calling C<< resultset('Table')->next >> repeatedly will always return the
690 first record from the resultset.
691
692 =cut
693
694 sub next {
695   my ($self) = @_;
696   if (my $cache = $self->get_cache) {
697     $self->{all_cache_position} ||= 0;
698     return $cache->[$self->{all_cache_position}++];
699   }
700   if ($self->{attrs}{cache}) {
701     $self->{all_cache_position} = 1;
702     return ($self->all)[0];
703   }
704   my @row = (
705     exists $self->{stashed_row}
706       ? @{delete $self->{stashed_row}}
707       : $self->cursor->next
708   );
709   return unless (@row);
710   return $self->_construct_object(@row);
711 }
712
713 sub _resolve {
714   my $self = shift;
715
716   return if(exists $self->{_attrs}); #return if _resolve has already been called
717
718   my $attrs = $self->{attrs};    
719   my $source = ($self->{_parent_rs})
720     ? $self->{_parent_rs}
721     : $self->{result_source};
722
723   # XXX - lose storable dclone
724   my $record_filter = delete $attrs->{record_filter}
725     if (defined $attrs->{record_filter});
726   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
727   $attrs->{record_filter} = $record_filter if ($record_filter);
728   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
729
730   my $alias = $attrs->{alias};
731  
732   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
733   delete $attrs->{as} if $attrs->{columns};
734   $attrs->{columns} ||= [ $self->{result_source}->columns ]
735     unless $attrs->{select};
736   my $select_alias = ($self->{_parent_rs})
737     ? $self->{attrs}->{_live_join}
738     : $alias;
739   $attrs->{select} = [
740     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
741   ] if $attrs->{columns};
742   $attrs->{as} ||= [
743     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
744   ];
745   if (my $include = delete $attrs->{include_columns}) {
746     push(@{$attrs->{select}}, @$include);
747     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
748   }
749
750   $attrs->{from} ||= [ { $alias => $source->from } ];
751   $attrs->{seen_join} ||= {};
752   my %seen;
753   if (my $join = delete $attrs->{join}) {
754     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
755       if (ref $j eq 'HASH') {
756         $seen{$_} = 1 foreach keys %$j;
757       } else {
758         $seen{$j} = 1;
759       }
760     }
761     push(@{$attrs->{from}},
762       $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join})
763     );
764   }
765   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
766   $attrs->{order_by} = [ $attrs->{order_by} ] if
767       $attrs->{order_by} and !ref($attrs->{order_by});
768   $attrs->{order_by} ||= [];
769
770   if(my $seladds = delete($attrs->{'+select'})) {
771     my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
772     $attrs->{select} = [
773       @{ $attrs->{select} },
774       map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
775     ];
776   }
777   if(my $asadds = delete($attrs->{'+as'})) {
778     my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
779     $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
780   }
781   my $collapse = $attrs->{collapse} || {};
782   if (my $prefetch = delete $attrs->{prefetch}) {
783     my @pre_order;
784     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
785       if ( ref $p eq 'HASH' ) {
786         foreach my $key (keys %$p) {
787           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
788             unless $seen{$key};
789         }
790       } else {
791         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
792           unless $seen{$p};
793       }
794
795       # we're about to resolve_join on the current class, so we need to bring
796       # the joins (which are from the original class) to the right level
797       # XXX the below alg is ridiculous
798       if ($attrs->{_live_join_stack}) {
799       STACK:
800         foreach (@{$attrs->{_live_join_stack}}) {
801           if (ref $p eq 'HASH') {
802             if (exists $p->{$_}) {
803               $p = $p->{$_};
804             } else {
805               $p = undef;
806               last STACK;
807             }
808           } elsif (ref $p eq 'ARRAY') {
809             foreach my $pe (@{$p}) {
810               if ($pe eq $_) {
811                 $p = undef;
812                 last STACK;
813               }
814               next unless(ref $pe eq 'HASH');
815               next unless(exists $pe->{$_});
816               $p = $pe->{$_};
817               next STACK;
818             }                                           
819             $p = undef;
820             last STACK;
821           } else {
822             $p = undef;
823             last STACK;
824           }
825         }
826       }
827                 
828       if ($p) {
829         my @prefetch = $self->result_source->resolve_prefetch(
830           $p, $attrs->{alias}, {}, \@pre_order, $collapse
831         );
832                 
833         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
834         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
835       }
836     }
837     push(@{$attrs->{order_by}}, @pre_order);
838   }
839   $attrs->{collapse} = $collapse;
840   $self->{_attrs} = $attrs;
841 }
842
843 sub _merge_attr {
844   my ($self, $a, $b, $is_prefetch) = @_;
845     
846   return $b unless $a;
847   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
848     foreach my $key (keys %{$b}) {
849       if (exists $a->{$key}) {
850         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
851       } else {
852         $a->{$key} = $b->{$key};
853       }
854     }
855     return $a;
856   } else {
857     $a = [$a] unless (ref $a eq 'ARRAY');
858     $b = [$b] unless (ref $b eq 'ARRAY');
859     
860     my $hash = {};
861     my $array = [];      
862     foreach ($a, $b) {
863       foreach my $element (@{$_}) {
864         if (ref $element eq 'HASH') {
865           $hash = $self->_merge_attr($hash, $element, $is_prefetch);
866         } elsif (ref $element eq 'ARRAY') {
867           $array = [@{$array}, @{$element}];
868         } else {        
869           if (($b == $_) && $is_prefetch) {
870             $self->_merge_array($array, $element, $is_prefetch);
871           } else {
872             push(@{$array}, $element);
873           }
874         }
875       }
876     }
877
878     my $final_array = [];
879     foreach my $element (@{$array}) {
880       push(@{$final_array}, $element) unless (exists $hash->{$element});
881     }
882     $array = $final_array;
883
884     if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
885       return [$hash, @{$array}];
886     } else {    
887       return (keys %{$hash}) ? $hash : $array;
888     }
889   }
890 }
891
892 sub _merge_array {
893   my ($self, $a, $b) = @_;
894   
895   $b = [$b] unless (ref $b eq 'ARRAY');
896   # add elements from @{$b} to @{$a} which aren't already in @{$a}
897   foreach my $b_element (@{$b}) {
898     push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
899   }
900 }
901
902 sub _construct_object {
903   my ($self, @row) = @_;
904   my @as = @{ $self->{_attrs}{as} };
905   
906   my $info = $self->_collapse_result(\@as, \@row);
907   my $new = $self->result_class->inflate_result($self->result_source, @$info);
908   $new = $self->{_attrs}{record_filter}->($new)
909     if exists $self->{_attrs}{record_filter};
910   return $new;
911 }
912
913 sub _collapse_result {
914   my ($self, $as, $row, $prefix) = @_;
915
916   my $live_join = $self->{attrs}->{_live_join} ||="";
917   my %const;
918
919   my @copy = @$row;
920   foreach my $this_as (@$as) {
921     my $val = shift @copy;
922     if (defined $prefix) {
923       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
924         my $remain = $1;
925         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
926         $const{$1||''}{$2} = $val;
927       }
928     } else {
929       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
930       $const{$1||''}{$2} = $val;
931     }
932   }
933
934   my $info = [ {}, {} ];
935   foreach my $key (keys %const) {
936     if (length $key && $key ne $live_join) {
937       my $target = $info;
938       my @parts = split(/\./, $key);
939       foreach my $p (@parts) {
940         $target = $target->[1]->{$p} ||= [];
941       }
942       $target->[0] = $const{$key};
943     } else {
944       $info->[0] = $const{$key};
945     }
946   }
947   my @collapse;
948
949   if (defined $prefix) {
950     @collapse = map {
951         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
952     } keys %{$self->{_attrs}->{collapse}}
953   } else {
954     @collapse = keys %{$self->{_attrs}->{collapse}};
955   };
956
957   if (@collapse) {
958     my ($c) = sort { length $a <=> length $b } @collapse;
959     my $target = $info;
960     foreach my $p (split(/\./, $c)) {
961       $target = $target->[1]->{$p} ||= [];
962     }
963     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
964     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
965     my $tree = $self->_collapse_result($as, $row, $c_prefix);
966     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
967     my (@final, @raw);
968
969     while (
970       !(
971         grep {
972           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
973         } @co_key
974         )
975     ) {
976       push(@final, $tree);
977       last unless (@raw = $self->cursor->next);
978       $row = $self->{stashed_row} = \@raw;
979       $tree = $self->_collapse_result($as, $row, $c_prefix);
980     }
981     @$target = (@final ? @final : [ {}, {} ]); 
982       # single empty result to indicate an empty prefetched has_many
983   }
984
985   #print "final info: " . Dumper($info);
986   return $info;
987 }
988
989 =head2 result_source
990
991 =over 4
992
993 =item Arguments: $result_source?
994
995 =item Return Value: $result_source
996
997 =back
998
999 An accessor for the primary ResultSource object from which this ResultSet
1000 is derived.
1001
1002 =cut
1003
1004
1005 =head2 count
1006
1007 =over 4
1008
1009 =item Arguments: $cond, \%attrs??
1010
1011 =item Return Value: $count
1012
1013 =back
1014
1015 Performs an SQL C<COUNT> with the same query as the resultset was built
1016 with to find the number of elements. If passed arguments, does a search
1017 on the resultset and counts the results of that.
1018
1019 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
1020 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1021 not support C<DISTINCT> with multiple columns. If you are using such a
1022 database, you should only use columns from the main table in your C<group_by>
1023 clause.
1024
1025 =cut
1026
1027 sub count {
1028   my $self = shift;
1029   return $self->search(@_)->count if @_ and defined $_[0];
1030   return scalar @{ $self->get_cache } if $self->get_cache;
1031   my $count = $self->_count;
1032   return 0 unless $count;
1033
1034   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
1035   $count = $self->{attrs}{rows} if
1036     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1037   return $count;
1038 }
1039
1040 sub _count { # Separated out so pager can get the full count
1041   my $self = shift;
1042   my $select = { count => '*' };
1043   
1044   $self->_resolve;
1045   my $attrs = { %{ $self->{_attrs} } };
1046   if (my $group_by = delete $attrs->{group_by}) {
1047     delete $attrs->{having};
1048     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1049     # todo: try CONCAT for multi-column pk
1050     my @pk = $self->result_source->primary_columns;
1051     if (@pk == 1) {
1052       foreach my $column (@distinct) {
1053         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
1054           @distinct = ($column);
1055           last;
1056         }
1057       }
1058     }
1059
1060     $select = { count => { distinct => \@distinct } };
1061   }
1062
1063   $attrs->{select} = $select;
1064   $attrs->{as} = [qw/count/];
1065
1066   # offset, order by and page are not needed to count. record_filter is cdbi
1067   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1068         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1069         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
1070              #XXX - hack to pass through parent of related resultsets
1071
1072   my ($count) = $tmp_rs->cursor->next;
1073   return $count;
1074 }
1075
1076 =head2 count_literal
1077
1078 =over 4
1079
1080 =item Arguments: $sql_fragment, @bind_values
1081
1082 =item Return Value: $count
1083
1084 =back
1085
1086 Counts the results in a literal query. Equivalent to calling L</search_literal>
1087 with the passed arguments, then L</count>.
1088
1089 =cut
1090
1091 sub count_literal { shift->search_literal(@_)->count; }
1092
1093 =head2 all
1094
1095 =over 4
1096
1097 =item Arguments: none
1098
1099 =item Return Value: @objects
1100
1101 =back
1102
1103 Returns all elements in the resultset. Called implicitly if the resultset
1104 is returned in list context.
1105
1106 =cut
1107
1108 sub all {
1109   my ($self) = @_;
1110   return @{ $self->get_cache } if $self->get_cache;
1111
1112   my @obj;
1113
1114   # TODO: don't call resolve here
1115   $self->_resolve;
1116   if (keys %{$self->{_attrs}->{collapse}}) {
1117 #  if ($self->{attrs}->{prefetch}) {
1118       # Using $self->cursor->all is really just an optimisation.
1119       # If we're collapsing has_many prefetches it probably makes
1120       # very little difference, and this is cleaner than hacking
1121       # _construct_object to survive the approach
1122     my @row = $self->cursor->next;
1123     while (@row) {
1124       push(@obj, $self->_construct_object(@row));
1125       @row = (exists $self->{stashed_row}
1126                ? @{delete $self->{stashed_row}}
1127                : $self->cursor->next);
1128     }
1129   } else {
1130     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1131   }
1132
1133   $self->set_cache(\@obj) if $self->{attrs}{cache};
1134   return @obj;
1135 }
1136
1137 =head2 reset
1138
1139 =over 4
1140
1141 =item Arguments: none
1142
1143 =item Return Value: $self
1144
1145 =back
1146
1147 Resets the resultset's cursor, so you can iterate through the elements again.
1148
1149 =cut
1150
1151 sub reset {
1152   my ($self) = @_;
1153   delete $self->{_attrs} if (exists $self->{_attrs});
1154
1155   $self->{all_cache_position} = 0;
1156   $self->cursor->reset;
1157   return $self;
1158 }
1159
1160 =head2 first
1161
1162 =over 4
1163
1164 =item Arguments: none
1165
1166 =item Return Value: $object?
1167
1168 =back
1169
1170 Resets the resultset and returns an object for the first result (if the
1171 resultset returns anything).
1172
1173 =cut
1174
1175 sub first {
1176   return $_[0]->reset->next;
1177 }
1178
1179 # _cond_for_update_delete
1180 #
1181 # update/delete require the condition to be modified to handle
1182 # the differing SQL syntax available.  This transforms the $self->{cond}
1183 # appropriately, returning the new condition.
1184
1185 sub _cond_for_update_delete {
1186   my ($self) = @_;
1187   my $cond = {};
1188
1189   if (!ref($self->{cond})) {
1190     # No-op. No condition, we're updating/deleting everything
1191   }
1192   elsif (ref $self->{cond} eq 'ARRAY') {
1193     $cond = [
1194       map {
1195         my %hash;
1196         foreach my $key (keys %{$_}) {
1197           $key =~ /([^.]+)$/;
1198           $hash{$1} = $_->{$key};
1199         }
1200         \%hash;
1201       } @{$self->{cond}}
1202     ];
1203   }
1204   elsif (ref $self->{cond} eq 'HASH') {
1205     if ((keys %{$self->{cond}})[0] eq '-and') {
1206       $cond->{-and} = [];
1207
1208       my @cond = @{$self->{cond}{-and}};
1209       for (my $i = 0; $i <= @cond - 1; $i++) {
1210         my $entry = $cond[$i];
1211
1212         my %hash;
1213         if (ref $entry eq 'HASH') {
1214           foreach my $key (keys %{$entry}) {
1215             $key =~ /([^.]+)$/;
1216             $hash{$1} = $entry->{$key};
1217           }
1218         }
1219         else {
1220           $entry =~ /([^.]+)$/;
1221           $hash{$1} = $cond[++$i];
1222         }
1223
1224         push @{$cond->{-and}}, \%hash;
1225       }
1226     }
1227     else {
1228       foreach my $key (keys %{$self->{cond}}) {
1229         $key =~ /([^.]+)$/;
1230         $cond->{$1} = $self->{cond}{$key};
1231       }
1232     }
1233   }
1234   else {
1235     $self->throw_exception(
1236       "Can't update/delete on resultset with condition unless hash or array"
1237     );
1238   }
1239
1240   return $cond;
1241 }
1242
1243
1244 =head2 update
1245
1246 =over 4
1247
1248 =item Arguments: \%values
1249
1250 =item Return Value: $storage_rv
1251
1252 =back
1253
1254 Sets the specified columns in the resultset to the supplied values in a
1255 single query. Return value will be true if the update succeeded or false
1256 if no records were updated; exact type of success value is storage-dependent.
1257
1258 =cut
1259
1260 sub update {
1261   my ($self, $values) = @_;
1262   $self->throw_exception("Values for update must be a hash")
1263     unless ref $values eq 'HASH';
1264
1265   my $cond = $self->_cond_for_update_delete;
1266
1267   return $self->result_source->storage->update(
1268     $self->result_source->from, $values, $cond
1269   );
1270 }
1271
1272 =head2 update_all
1273
1274 =over 4
1275
1276 =item Arguments: \%values
1277
1278 =item Return Value: 1
1279
1280 =back
1281
1282 Fetches all objects and updates them one at a time. Note that C<update_all>
1283 will run DBIC cascade triggers, while L</update> will not.
1284
1285 =cut
1286
1287 sub update_all {
1288   my ($self, $values) = @_;
1289   $self->throw_exception("Values for update must be a hash")
1290     unless ref $values eq 'HASH';
1291   foreach my $obj ($self->all) {
1292     $obj->set_columns($values)->update;
1293   }
1294   return 1;
1295 }
1296
1297 =head2 delete
1298
1299 =over 4
1300
1301 =item Arguments: none
1302
1303 =item Return Value: 1
1304
1305 =back
1306
1307 Deletes the contents of the resultset from its result source. Note that this
1308 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1309 to run.
1310
1311 =cut
1312
1313 sub delete {
1314   my ($self) = @_;
1315   my $del = {};
1316
1317   my $cond = $self->_cond_for_update_delete;
1318
1319   $self->result_source->storage->delete($self->result_source->from, $cond);
1320   return 1;
1321 }
1322
1323 =head2 delete_all
1324
1325 =over 4
1326
1327 =item Arguments: none
1328
1329 =item Return Value: 1
1330
1331 =back
1332
1333 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1334 will run DBIC cascade triggers, while L</delete> will not.
1335
1336 =cut
1337
1338 sub delete_all {
1339   my ($self) = @_;
1340   $_->delete for $self->all;
1341   return 1;
1342 }
1343
1344 =head2 pager
1345
1346 =over 4
1347
1348 =item Arguments: none
1349
1350 =item Return Value: $pager
1351
1352 =back
1353
1354 Return Value a L<Data::Page> object for the current resultset. Only makes
1355 sense for queries with a C<page> attribute.
1356
1357 =cut
1358
1359 sub pager {
1360   my ($self) = @_;
1361   my $attrs = $self->{attrs};
1362   $self->throw_exception("Can't create pager for non-paged rs")
1363     unless $self->{page};
1364   $attrs->{rows} ||= 10;
1365   return $self->{pager} ||= Data::Page->new(
1366     $self->_count, $attrs->{rows}, $self->{page});
1367 }
1368
1369 =head2 page
1370
1371 =over 4
1372
1373 =item Arguments: $page_number
1374
1375 =item Return Value: $rs
1376
1377 =back
1378
1379 Returns a resultset for the $page_number page of the resultset on which page
1380 is called, where each page contains a number of rows equal to the 'rows'
1381 attribute set on the resultset (10 by default).
1382
1383 =cut
1384
1385 sub page {
1386   my ($self, $page) = @_;
1387   my $attrs = { %{$self->{attrs}} };
1388   $attrs->{page} = $page;
1389   return (ref $self)->new($self->result_source, $attrs);
1390 }
1391
1392 =head2 new_result
1393
1394 =over 4
1395
1396 =item Arguments: \%vals
1397
1398 =item Return Value: $object
1399
1400 =back
1401
1402 Creates an object in the resultset's result class and returns it.
1403
1404 =cut
1405
1406 sub new_result {
1407   my ($self, $values) = @_;
1408   $self->throw_exception( "new_result needs a hash" )
1409     unless (ref $values eq 'HASH');
1410   $self->throw_exception(
1411     "Can't abstract implicit construct, condition not a hash"
1412   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1413   my %new = %$values;
1414   my $alias = $self->{attrs}{alias};
1415   foreach my $key (keys %{$self->{cond}||{}}) {
1416     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1417   }
1418   my $obj = $self->result_class->new(\%new);
1419   $obj->result_source($self->result_source) if $obj->can('result_source');
1420   return $obj;
1421 }
1422
1423 =head2 find_or_new
1424
1425 =over 4
1426
1427 =item Arguments: \%vals, \%attrs?
1428
1429 =item Return Value: $object
1430
1431 =back
1432
1433 Find an existing record from this resultset. If none exists, instantiate a new
1434 result object and return it. The object will not be saved into your storage
1435 until you call L<DBIx::Class::Row/insert> on it.
1436
1437 If you want objects to be saved immediately, use L</find_or_create> instead.
1438
1439 =cut
1440
1441 sub find_or_new {
1442   my $self     = shift;
1443   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1444   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1445   my $exists   = $self->find($hash, $attrs);
1446   return defined $exists ? $exists : $self->new_result($hash);
1447 }
1448
1449 =head2 create
1450
1451 =over 4
1452
1453 =item Arguments: \%vals
1454
1455 =item Return Value: $object
1456
1457 =back
1458
1459 Inserts a record into the resultset and returns the object representing it.
1460
1461 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1462
1463 =cut
1464
1465 sub create {
1466   my ($self, $attrs) = @_;
1467   $self->throw_exception( "create needs a hashref" )
1468     unless ref $attrs eq 'HASH';
1469   return $self->new_result($attrs)->insert;
1470 }
1471
1472 =head2 find_or_create
1473
1474 =over 4
1475
1476 =item Arguments: \%vals, \%attrs?
1477
1478 =item Return Value: $object
1479
1480 =back
1481
1482   $class->find_or_create({ key => $val, ... });
1483
1484 Tries to find a record based on its primary key or unique constraint; if none
1485 is found, creates one and returns that instead.
1486
1487   my $cd = $schema->resultset('CD')->find_or_create({
1488     cdid   => 5,
1489     artist => 'Massive Attack',
1490     title  => 'Mezzanine',
1491     year   => 2005,
1492   });
1493
1494 Also takes an optional C<key> attribute, to search by a specific key or unique
1495 constraint. For example:
1496
1497   my $cd = $schema->resultset('CD')->find_or_create(
1498     {
1499       artist => 'Massive Attack',
1500       title  => 'Mezzanine',
1501     },
1502     { key => 'cd_artist_title' }
1503   );
1504
1505 See also L</find> and L</update_or_create>. For information on how to declare
1506 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1507
1508 =cut
1509
1510 sub find_or_create {
1511   my $self     = shift;
1512   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1513   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1514   my $exists   = $self->find($hash, $attrs);
1515   return defined $exists ? $exists : $self->create($hash);
1516 }
1517
1518 =head2 update_or_create
1519
1520 =over 4
1521
1522 =item Arguments: \%col_values, { key => $unique_constraint }?
1523
1524 =item Return Value: $object
1525
1526 =back
1527
1528   $class->update_or_create({ col => $val, ... });
1529
1530 First, searches for an existing row matching one of the unique constraints
1531 (including the primary key) on the source of this resultset. If a row is
1532 found, updates it with the other given column values. Otherwise, creates a new
1533 row.
1534
1535 Takes an optional C<key> attribute to search on a specific unique constraint.
1536 For example:
1537
1538   # In your application
1539   my $cd = $schema->resultset('CD')->update_or_create(
1540     {
1541       artist => 'Massive Attack',
1542       title  => 'Mezzanine',
1543       year   => 1998,
1544     },
1545     { key => 'cd_artist_title' }
1546   );
1547
1548 If no C<key> is specified, it searches on all unique constraints defined on the
1549 source, including the primary key.
1550
1551 If the C<key> is specified as C<primary>, it searches only on the primary key.
1552
1553 See also L</find> and L</find_or_create>. For information on how to declare
1554 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1555
1556 =cut
1557
1558 sub update_or_create {
1559   my $self = shift;
1560   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1561   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1562
1563   my $row = $self->find($cond);
1564   if (defined $row) {
1565     $row->update($cond);
1566     return $row;
1567   }
1568
1569   return $self->create($cond);
1570 }
1571
1572 =head2 get_cache
1573
1574 =over 4
1575
1576 =item Arguments: none
1577
1578 =item Return Value: \@cache_objects?
1579
1580 =back
1581
1582 Gets the contents of the cache for the resultset, if the cache is set.
1583
1584 =cut
1585
1586 sub get_cache {
1587   shift->{all_cache};
1588 }
1589
1590 =head2 set_cache
1591
1592 =over 4
1593
1594 =item Arguments: \@cache_objects
1595
1596 =item Return Value: \@cache_objects
1597
1598 =back
1599
1600 Sets the contents of the cache for the resultset. Expects an arrayref
1601 of objects of the same class as those produced by the resultset. Note that
1602 if the cache is set the resultset will return the cached objects rather
1603 than re-querying the database even if the cache attr is not set.
1604
1605 =cut
1606
1607 sub set_cache {
1608   my ( $self, $data ) = @_;
1609   $self->throw_exception("set_cache requires an arrayref")
1610       if defined($data) && (ref $data ne 'ARRAY');
1611   $self->{all_cache} = $data;
1612 }
1613
1614 =head2 clear_cache
1615
1616 =over 4
1617
1618 =item Arguments: none
1619
1620 =item Return Value: []
1621
1622 =back
1623
1624 Clears the cache for the resultset.
1625
1626 =cut
1627
1628 sub clear_cache {
1629   shift->set_cache(undef);
1630 }
1631
1632 =head2 related_resultset
1633
1634 =over 4
1635
1636 =item Arguments: $relationship_name
1637
1638 =item Return Value: $resultset
1639
1640 =back
1641
1642 Returns a related resultset for the supplied relationship name.
1643
1644   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1645
1646 =cut
1647
1648 sub related_resultset {
1649   my ( $self, $rel ) = @_;
1650   
1651   $self->{related_resultsets} ||= {};
1652   return $self->{related_resultsets}{$rel} ||= do {
1653     #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1654     my $rel_obj = $self->result_source->relationship_info($rel);
1655                 #print Dumper($self->result_source->_relationships);
1656     $self->throw_exception(
1657       "search_related: result source '" . $self->result_source->name .
1658         "' has no such relationship ${rel}")
1659       unless $rel_obj; #die Dumper $self->{attrs};
1660
1661     my @live_join_stack = (
1662       exists $self->{attrs}->{_live_join_stack})
1663       ? @{$self->{attrs}->{_live_join_stack}}
1664       : ();             
1665
1666     push(@live_join_stack, $rel);
1667                 
1668     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1669       undef, {
1670         select => undef,
1671         as => undef,
1672         _live_join => $rel, #the most recent
1673         _live_join_stack => \@live_join_stack, #the trail of rels
1674         _parent_attrs => $self->{attrs}}
1675     );    
1676
1677     # keep reference of the original resultset
1678     $rs->{_parent_rs} = ($self->{_parent_rs})
1679       ? $self->{_parent_rs}
1680       : $self->result_source;
1681
1682     return $rs;
1683   };
1684 }
1685
1686 =head2 throw_exception
1687
1688 See L<DBIx::Class::Schema/throw_exception> for details.
1689
1690 =cut
1691
1692 sub throw_exception {
1693   my $self=shift;
1694   $self->result_source->schema->throw_exception(@_);
1695 }
1696
1697 # XXX: FIXME: Attributes docs need clearing up
1698
1699 =head1 ATTRIBUTES
1700
1701 The resultset takes various attributes that modify its behavior. Here's an
1702 overview of them:
1703
1704 =head2 order_by
1705
1706 =over 4
1707
1708 =item Value: ($order_by | \@order_by)
1709
1710 =back
1711
1712 Which column(s) to order the results by. This is currently passed
1713 through directly to SQL, so you can give e.g. C<year DESC> for a
1714 descending order on the column `year'.
1715
1716 Please note that if you have quoting enabled (see 
1717 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1718 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1719 so you will need to manually quote things as appropriate.)
1720
1721 =head2 columns
1722
1723 =over 4
1724
1725 =item Value: \@columns
1726
1727 =back
1728
1729 Shortcut to request a particular set of columns to be retrieved.  Adds
1730 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1731 from that, then auto-populates C<as> from C<select> as normal. (You may also
1732 use the C<cols> attribute, as in earlier versions of DBIC.)
1733
1734 =head2 include_columns
1735
1736 =over 4
1737
1738 =item Value: \@columns
1739
1740 =back
1741
1742 Shortcut to include additional columns in the returned results - for example
1743
1744   $schema->resultset('CD')->search(undef, {
1745     include_columns => ['artist.name'],
1746     join => ['artist']
1747   });
1748
1749 would return all CDs and include a 'name' column to the information
1750 passed to object inflation
1751
1752 =head2 select
1753
1754 =over 4
1755
1756 =item Value: \@select_columns
1757
1758 =back
1759
1760 Indicates which columns should be selected from the storage. You can use
1761 column names, or in the case of RDBMS back ends, function or stored procedure
1762 names:
1763
1764   $rs = $schema->resultset('Employee')->search(undef, {
1765     select => [
1766       'name',
1767       { count => 'employeeid' },
1768       { sum => 'salary' }
1769     ]
1770   });
1771
1772 When you use function/stored procedure names and do not supply an C<as>
1773 attribute, the column names returned are storage-dependent. E.g. MySQL would
1774 return a column named C<count(employeeid)> in the above example.
1775
1776 =head2 +select
1777
1778 =over 4
1779
1780 Indicates additional columns to be selected from storage.  Works the same as
1781 L<select> but adds columns to the selection.
1782
1783 =back
1784
1785 =head2 +as
1786
1787 =over 4
1788
1789 Indicates additional column names for those added via L<+select>.
1790
1791 =back
1792
1793 =head2 as
1794
1795 =over 4
1796
1797 =item Value: \@inflation_names
1798
1799 =back
1800
1801 Indicates column names for object inflation. This is used in conjunction with
1802 C<select>, usually when C<select> contains one or more function or stored
1803 procedure names:
1804
1805   $rs = $schema->resultset('Employee')->search(undef, {
1806     select => [
1807       'name',
1808       { count => 'employeeid' }
1809     ],
1810     as => ['name', 'employee_count'],
1811   });
1812
1813   my $employee = $rs->first(); # get the first Employee
1814
1815 If the object against which the search is performed already has an accessor
1816 matching a column name specified in C<as>, the value can be retrieved using
1817 the accessor as normal:
1818
1819   my $name = $employee->name();
1820
1821 If on the other hand an accessor does not exist in the object, you need to
1822 use C<get_column> instead:
1823
1824   my $employee_count = $employee->get_column('employee_count');
1825
1826 You can create your own accessors if required - see
1827 L<DBIx::Class::Manual::Cookbook> for details.
1828
1829 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1830 produced, it is used for internal access only. Thus attempting to use the accessor
1831 in an C<order_by> clause or similar will fail misrably.
1832
1833 =head2 join
1834
1835 =over 4
1836
1837 =item Value: ($rel_name | \@rel_names | \%rel_names)
1838
1839 =back
1840
1841 Contains a list of relationships that should be joined for this query.  For
1842 example:
1843
1844   # Get CDs by Nine Inch Nails
1845   my $rs = $schema->resultset('CD')->search(
1846     { 'artist.name' => 'Nine Inch Nails' },
1847     { join => 'artist' }
1848   );
1849
1850 Can also contain a hash reference to refer to the other relation's relations.
1851 For example:
1852
1853   package MyApp::Schema::Track;
1854   use base qw/DBIx::Class/;
1855   __PACKAGE__->table('track');
1856   __PACKAGE__->add_columns(qw/trackid cd position title/);
1857   __PACKAGE__->set_primary_key('trackid');
1858   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1859   1;
1860
1861   # In your application
1862   my $rs = $schema->resultset('Artist')->search(
1863     { 'track.title' => 'Teardrop' },
1864     {
1865       join     => { cd => 'track' },
1866       order_by => 'artist.name',
1867     }
1868   );
1869
1870 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1871 similarly for a third time). For e.g.
1872
1873   my $rs = $schema->resultset('Artist')->search({
1874     'cds.title'   => 'Down to Earth',
1875     'cds_2.title' => 'Popular',
1876   }, {
1877     join => [ qw/cds cds/ ],
1878   });
1879
1880 will return a set of all artists that have both a cd with title 'Down
1881 to Earth' and a cd with title 'Popular'.
1882
1883 If you want to fetch related objects from other tables as well, see C<prefetch>
1884 below.
1885
1886 =head2 prefetch
1887
1888 =over 4
1889
1890 =item Value: ($rel_name | \@rel_names | \%rel_names)
1891
1892 =back
1893
1894 Contains one or more relationships that should be fetched along with the main
1895 query (when they are accessed afterwards they will have already been
1896 "prefetched").  This is useful for when you know you will need the related
1897 objects, because it saves at least one query:
1898
1899   my $rs = $schema->resultset('Tag')->search(
1900     undef,
1901     {
1902       prefetch => {
1903         cd => 'artist'
1904       }
1905     }
1906   );
1907
1908 The initial search results in SQL like the following:
1909
1910   SELECT tag.*, cd.*, artist.* FROM tag
1911   JOIN cd ON tag.cd = cd.cdid
1912   JOIN artist ON cd.artist = artist.artistid
1913
1914 L<DBIx::Class> has no need to go back to the database when we access the
1915 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1916 case.
1917
1918 Simple prefetches will be joined automatically, so there is no need
1919 for a C<join> attribute in the above search. If you're prefetching to
1920 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1921 specify the join as well.
1922
1923 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1924 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1925 with an accessor type of 'single' or 'filter').
1926
1927 =head2 page
1928
1929 =over 4
1930
1931 =item Value: $page
1932
1933 =back
1934
1935 Makes the resultset paged and specifies the page to retrieve. Effectively
1936 identical to creating a non-pages resultset and then calling ->page($page)
1937 on it. 
1938
1939 If L<rows> attribute is not specified it defualts to 10 rows per page.
1940
1941 =head2 rows
1942
1943 =over 4
1944
1945 =item Value: $rows
1946
1947 =back
1948
1949 Specifes the maximum number of rows for direct retrieval or the number of
1950 rows per page if the page attribute or method is used.
1951
1952 =head2 offset
1953
1954 =over 4
1955
1956 =item Value: $offset
1957
1958 =back
1959
1960 Specifies the (zero-based) row number for the  first row to be returned, or the
1961 of the first row of the first page if paging is used.
1962
1963 =head2 group_by
1964
1965 =over 4
1966
1967 =item Value: \@columns
1968
1969 =back
1970
1971 A arrayref of columns to group by. Can include columns of joined tables.
1972
1973   group_by => [qw/ column1 column2 ... /]
1974
1975 =head2 having
1976
1977 =over 4
1978
1979 =item Value: $condition
1980
1981 =back
1982
1983 HAVING is a select statement attribute that is applied between GROUP BY and
1984 ORDER BY. It is applied to the after the grouping calculations have been
1985 done. 
1986
1987   having => { 'count(employee)' => { '>=', 100 } }
1988
1989 =head2 distinct
1990
1991 =over 4
1992
1993 =item Value: (0 | 1)
1994
1995 =back
1996
1997 Set to 1 to group by all columns.
1998
1999 =head2 cache
2000
2001 Set to 1 to cache search results. This prevents extra SQL queries if you
2002 revisit rows in your ResultSet:
2003
2004   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2005   
2006   while( my $artist = $resultset->next ) {
2007     ... do stuff ...
2008   }
2009
2010   $rs->first; # without cache, this would issue a query
2011
2012 By default, searches are not cached.
2013
2014 For more examples of using these attributes, see
2015 L<DBIx::Class::Manual::Cookbook>.
2016
2017 =head2 from
2018
2019 =over 4
2020
2021 =item Value: \@from_clause
2022
2023 =back
2024
2025 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2026 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2027 clauses.
2028
2029 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2030
2031 C<join> will usually do what you need and it is strongly recommended that you
2032 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2033 And we really do mean "cannot", not just tried and failed. Attempting to use
2034 this because you're having problems with C<join> is like trying to use x86
2035 ASM because you've got a syntax error in your C. Trust us on this.
2036
2037 Now, if you're still really, really sure you need to use this (and if you're
2038 not 100% sure, ask the mailing list first), here's an explanation of how this
2039 works.
2040
2041 The syntax is as follows -
2042
2043   [
2044     { <alias1> => <table1> },
2045     [
2046       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2047       [], # nested JOIN (optional)
2048       { <table1.column1> => <table2.column2>, ... (more conditions) },
2049     ],
2050     # More of the above [ ] may follow for additional joins
2051   ]
2052
2053   <table1> <alias1>
2054   JOIN
2055     <table2> <alias2>
2056     [JOIN ...]
2057   ON <table1.column1> = <table2.column2>
2058   <more joins may follow>
2059
2060 An easy way to follow the examples below is to remember the following:
2061
2062     Anything inside "[]" is a JOIN
2063     Anything inside "{}" is a condition for the enclosing JOIN
2064
2065 The following examples utilize a "person" table in a family tree application.
2066 In order to express parent->child relationships, this table is self-joined:
2067
2068     # Person->belongs_to('father' => 'Person');
2069     # Person->belongs_to('mother' => 'Person');
2070
2071 C<from> can be used to nest joins. Here we return all children with a father,
2072 then search against all mothers of those children:
2073
2074   $rs = $schema->resultset('Person')->search(
2075       undef,
2076       {
2077           alias => 'mother', # alias columns in accordance with "from"
2078           from => [
2079               { mother => 'person' },
2080               [
2081                   [
2082                       { child => 'person' },
2083                       [
2084                           { father => 'person' },
2085                           { 'father.person_id' => 'child.father_id' }
2086                       ]
2087                   ],
2088                   { 'mother.person_id' => 'child.mother_id' }
2089               ],
2090           ]
2091       },
2092   );
2093
2094   # Equivalent SQL:
2095   # SELECT mother.* FROM person mother
2096   # JOIN (
2097   #   person child
2098   #   JOIN person father
2099   #   ON ( father.person_id = child.father_id )
2100   # )
2101   # ON ( mother.person_id = child.mother_id )
2102
2103 The type of any join can be controlled manually. To search against only people
2104 with a father in the person table, we could explicitly use C<INNER JOIN>:
2105
2106     $rs = $schema->resultset('Person')->search(
2107         undef,
2108         {
2109             alias => 'child', # alias columns in accordance with "from"
2110             from => [
2111                 { child => 'person' },
2112                 [
2113                     { father => 'person', -join_type => 'inner' },
2114                     { 'father.id' => 'child.father_id' }
2115                 ],
2116             ]
2117         },
2118     );
2119
2120     # Equivalent SQL:
2121     # SELECT child.* FROM person child
2122     # INNER JOIN person father ON child.father_id = father.id
2123
2124 =cut
2125
2126 1;