69b0358a4e5b4595ceac59fc69bd9d95ea1189b5
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Scalar::Util qw/weaken/;
13 use DBIx::Class::ResultSetColumn;
14 use base qw/DBIx::Class/;
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97   $attrs->{_orig_alias} ||= $attrs->{alias};
98
99   bless {
100     result_source => $source,
101     result_class => $attrs->{result_class} || $source->result_class,
102     cond => $attrs->{where},
103 #    from => $attrs->{from},
104 #    collapse => $collapse,
105     count => undef,
106     pager => undef,
107     attrs => $attrs
108   }, $class;
109 }
110
111 =head2 search
112
113 =over 4
114
115 =item Arguments: $cond, \%attrs?
116
117 =item Return Value: $resultset (scalar context), @row_objs (list context)
118
119 =back
120
121   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
122   my $new_rs = $cd_rs->search({ year => 2005 });
123
124   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
125                  # year = 2005 OR year = 2004
126
127 If you need to pass in additional attributes but no additional condition,
128 call it as C<search(undef, \%attrs)>.
129
130   # "SELECT name, artistid FROM $artist_table"
131   my @all_artists = $schema->resultset('Artist')->search(undef, {
132     columns => [qw/name artistid/],
133   });
134
135 =cut
136
137 sub search {
138   my $self = shift;
139   my $rs = $self->search_rs( @_ );
140   return (wantarray ? $rs->all : $rs);
141 }
142
143 =head2 search_rs
144
145 =over 4
146
147 =item Arguments: $cond, \%attrs?
148
149 =item Return Value: $resultset
150
151 =back
152
153 This method does the same exact thing as search() except it will
154 always return a resultset, even in list context.
155
156 =cut
157
158 sub search_rs {
159   my $self = shift;
160
161   my $attrs = {};
162   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
163   my $our_attrs = exists $attrs->{_parent_attrs}
164     ? { %{delete $attrs->{_parent_attrs}} }
165     : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168   # XXX should only maintain _live_join_stack and generate _live_join_h from that
169   if ($attrs->{_live_join_stack}) {
170     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
171       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
172         ? { $join => $attrs->{_live_join_h} }
173         : $join;
174     }
175   }
176
177   # merge new attrs into inherited
178   foreach my $key (qw/join prefetch/) {
179     next unless exists $attrs->{$key};
180     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
181       foreach my $join (reverse @{$live_join}) {
182         $attrs->{$key} = { $join => $attrs->{$key} };
183       }
184     }
185
186     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
187   }
188
189   $our_attrs->{join} = $self->_merge_attr(
190     $our_attrs->{join}, $attrs->{_live_join_h}
191   ) if ($attrs->{_live_join_h});
192
193   if (defined $our_attrs->{prefetch}) {
194     $our_attrs->{join} = $self->_merge_attr(
195       $our_attrs->{join}, $our_attrs->{prefetch}
196     );
197   }
198
199   my $new_attrs = { %{$our_attrs}, %{$attrs} };
200   my $where = (@_
201     ? (
202         (@_ == 1 || ref $_[0] eq "HASH")
203           ? shift
204           : (
205               (@_ % 2)
206                 ? $self->throw_exception("Odd number of arguments to search")
207                 : {@_}
208              )
209       )
210     : undef
211   );
212
213   if (defined $where) {
214     $new_attrs->{where} = (
215       defined $new_attrs->{where}
216         ? { '-and' => [
217               map {
218                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
219               } $where, $new_attrs->{where}
220             ]
221           }
222         : $where);
223   }
224
225   if (defined $having) {
226     $new_attrs->{having} = (
227       defined $new_attrs->{having}
228         ? { '-and' => [
229               map {
230                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
231               } $having, $new_attrs->{having}
232             ]
233           }
234         : $having);
235   }
236
237   my $rs = (ref $self)->new($self->result_source, $new_attrs);
238   $rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
239
240   unless (@_) {                 # no search, effectively just a clone
241     my $rows = $self->get_cache;
242     if ($rows) {
243       $rs->set_cache($rows);
244     }
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 See also L</find_or_create> and L</update_or_create>. For information on how to
312 declare unique constraints, see
313 L<DBIx::Class::ResultSource/add_unique_constraint>.
314
315 =cut
316
317 sub find {
318   my $self = shift;
319   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
320
321   # Default to the primary key, but allow a specific key
322   my @cols = exists $attrs->{key}
323     ? $self->result_source->unique_constraint_columns($attrs->{key})
324     : $self->result_source->primary_columns;
325   $self->throw_exception(
326     "Can't find unless a primary key or unique constraint is defined"
327   ) unless @cols;
328
329   # Parse out a hashref from input
330   my $input_query;
331   if (ref $_[0] eq 'HASH') {
332     $input_query = { %{$_[0]} };
333   }
334   elsif (@_ == @cols) {
335     $input_query = {};
336     @{$input_query}{@cols} = @_;
337   }
338   else {
339     # Compatibility: Allow e.g. find(id => $value)
340     carp "Find by key => value deprecated; please use a hashref instead";
341     $input_query = {@_};
342   }
343
344   my @unique_queries = $self->_unique_queries($input_query, $attrs);
345
346   # Handle cases where the ResultSet defines the query, or where the user is
347   # abusing find
348   my $query = @unique_queries ? \@unique_queries : $input_query;
349
350   # Run the query
351   if (keys %$attrs) {
352     my $rs = $self->search($query, $attrs);
353     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
354   }
355   else {
356     return keys %{$self->_resolved_attrs->{collapse}}
357       ? $self->search($query)->next
358       : $self->single($query);
359   }
360 }
361
362 # _unique_queries
363 #
364 # Build a list of queries which satisfy unique constraints.
365
366 sub _unique_queries {
367   my ($self, $query, $attrs) = @_;
368
369   my @constraint_names = exists $attrs->{key}
370     ? ($attrs->{key})
371     : $self->result_source->unique_constraint_names;
372
373   my @unique_queries;
374   foreach my $name (@constraint_names) {
375     my @unique_cols = $self->result_source->unique_constraint_columns($name);
376     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
377
378     next unless scalar keys %$unique_query;
379
380     # Add the ResultSet's alias
381     my $alias = $self->{attrs}{alias};
382     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
383       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
384     }
385
386     push @unique_queries, $unique_query;
387   }
388
389   return @unique_queries;
390 }
391
392 # _build_unique_query
393 #
394 # Constrain the specified query hash based on the specified column names.
395
396 sub _build_unique_query {
397   my ($self, $query, $unique_cols) = @_;
398
399   return {
400     map  { $_ => $query->{$_} }
401     grep { exists $query->{$_} }
402       @$unique_cols
403   };
404 }
405
406 =head2 search_related
407
408 =over 4
409
410 =item Arguments: $rel, $cond, \%attrs?
411
412 =item Return Value: $new_resultset
413
414 =back
415
416   $new_rs = $cd_rs->search_related('artist', {
417     name => 'Emo-R-Us',
418   });
419
420 Searches the specified relationship, optionally specifying a condition and
421 attributes for matching records. See L</ATTRIBUTES> for more information.
422
423 =cut
424
425 sub search_related {
426   return shift->related_resultset(shift)->search(@_);
427 }
428
429 =head2 cursor
430
431 =over 4
432
433 =item Arguments: none
434
435 =item Return Value: $cursor
436
437 =back
438
439 Returns a storage-driven cursor to the given resultset. See
440 L<DBIx::Class::Cursor> for more information.
441
442 =cut
443
444 sub cursor {
445   my ($self) = @_;
446
447   my $attrs = { %{$self->_resolved_attrs} };
448   return $self->{cursor}
449     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
450           $attrs->{where},$attrs);
451 }
452
453 =head2 single
454
455 =over 4
456
457 =item Arguments: $cond?
458
459 =item Return Value: $row_object?
460
461 =back
462
463   my $cd = $schema->resultset('CD')->single({ year => 2001 });
464
465 Inflates the first result without creating a cursor if the resultset has
466 any records in it; if not returns nothing. Used by L</find> as an optimisation.
467
468 Can optionally take an additional condition *only* - this is a fast-code-path
469 method; if you need to add extra joins or similar call ->search and then
470 ->single without a condition on the $rs returned from that.
471
472 =cut
473
474 sub single {
475   my ($self, $where) = @_;
476   my $attrs = { %{$self->_resolved_attrs} };
477   if ($where) {
478     if (defined $attrs->{where}) {
479       $attrs->{where} = {
480         '-and' =>
481             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
482                $where, delete $attrs->{where} ]
483       };
484     } else {
485       $attrs->{where} = $where;
486     }
487   }
488
489   unless ($self->_is_unique_query($attrs->{where})) {
490     carp "Query not guaranteed to return a single row"
491       . "; please declare your unique constraints or use search instead";
492   }
493
494   my @data = $self->result_source->storage->select_single(
495     $attrs->{from}, $attrs->{select},
496     $attrs->{where}, $attrs
497   );
498
499   return (@data ? $self->_construct_object(@data) : ());
500 }
501
502 # _is_unique_query
503 #
504 # Try to determine if the specified query is guaranteed to be unique, based on
505 # the declared unique constraints.
506
507 sub _is_unique_query {
508   my ($self, $query) = @_;
509
510   my $collapsed = $self->_collapse_query($query);
511   my $alias = $self->{attrs}{alias};
512
513   foreach my $name ($self->result_source->unique_constraint_names) {
514     my @unique_cols = map {
515       "$alias.$_"
516     } $self->result_source->unique_constraint_columns($name);
517
518     # Count the values for each unique column
519     my %seen = map { $_ => 0 } @unique_cols;
520
521     foreach my $key (keys %$collapsed) {
522       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
523       next unless exists $seen{$aliased};  # Additional constraints are okay
524       $seen{$aliased} = scalar @{ $collapsed->{$key} };
525     }
526
527     # If we get 0 or more than 1 value for a column, it's not necessarily unique
528     return 1 unless grep { $_ != 1 } values %seen;
529   }
530
531   return 0;
532 }
533
534 # _collapse_query
535 #
536 # Recursively collapse the query, accumulating values for each column.
537
538 sub _collapse_query {
539   my ($self, $query, $collapsed) = @_;
540
541   $collapsed ||= {};
542
543   if (ref $query eq 'ARRAY') {
544     foreach my $subquery (@$query) {
545       next unless ref $subquery;  # -or
546 #      warn "ARRAY: " . Dumper $subquery;
547       $collapsed = $self->_collapse_query($subquery, $collapsed);
548     }
549   }
550   elsif (ref $query eq 'HASH') {
551     if (keys %$query and (keys %$query)[0] eq '-and') {
552       foreach my $subquery (@{$query->{-and}}) {
553 #        warn "HASH: " . Dumper $subquery;
554         $collapsed = $self->_collapse_query($subquery, $collapsed);
555       }
556     }
557     else {
558 #      warn "LEAF: " . Dumper $query;
559       foreach my $key (keys %$query) {
560         push @{$collapsed->{$key}}, $query->{$key};
561       }
562     }
563   }
564
565   return $collapsed;
566 }
567
568 =head2 get_column
569
570 =over 4
571
572 =item Arguments: $cond?
573
574 =item Return Value: $resultsetcolumn
575
576 =back
577
578   my $max_length = $rs->get_column('length')->max;
579
580 Returns a ResultSetColumn instance for $column based on $self
581
582 =cut
583
584 sub get_column {
585   my ($self, $column) = @_;
586   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
587   return $new;
588 }
589
590 =head2 search_like
591
592 =over 4
593
594 =item Arguments: $cond, \%attrs?
595
596 =item Return Value: $resultset (scalar context), @row_objs (list context)
597
598 =back
599
600   # WHERE title LIKE '%blue%'
601   $cd_rs = $rs->search_like({ title => '%blue%'});
602
603 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
604 that this is simply a convenience method. You most likely want to use
605 L</search> with specific operators.
606
607 For more information, see L<DBIx::Class::Manual::Cookbook>.
608
609 =cut
610
611 sub search_like {
612   my $class = shift;
613   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
614   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
615   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
616   return $class->search($query, { %$attrs });
617 }
618
619 =head2 slice
620
621 =over 4
622
623 =item Arguments: $first, $last
624
625 =item Return Value: $resultset (scalar context), @row_objs (list context)
626
627 =back
628
629 Returns a resultset or object list representing a subset of elements from the
630 resultset slice is called on. Indexes are from 0, i.e., to get the first
631 three records, call:
632
633   my ($one, $two, $three) = $rs->slice(0, 2);
634
635 =cut
636
637 sub slice {
638   my ($self, $min, $max) = @_;
639   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
640   $attrs->{offset} = $self->{attrs}{offset} || 0;
641   $attrs->{offset} += $min;
642   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
643   return $self->search(undef(), $attrs);
644   #my $slice = (ref $self)->new($self->result_source, $attrs);
645   #return (wantarray ? $slice->all : $slice);
646 }
647
648 =head2 next
649
650 =over 4
651
652 =item Arguments: none
653
654 =item Return Value: $result?
655
656 =back
657
658 Returns the next element in the resultset (C<undef> is there is none).
659
660 Can be used to efficiently iterate over records in the resultset:
661
662   my $rs = $schema->resultset('CD')->search;
663   while (my $cd = $rs->next) {
664     print $cd->title;
665   }
666
667 Note that you need to store the resultset object, and call C<next> on it.
668 Calling C<< resultset('Table')->next >> repeatedly will always return the
669 first record from the resultset.
670
671 =cut
672
673 sub next {
674   my ($self) = @_;
675   if (my $cache = $self->get_cache) {
676     $self->{all_cache_position} ||= 0;
677     return $cache->[$self->{all_cache_position}++];
678   }
679   if ($self->{attrs}{cache}) {
680     $self->{all_cache_position} = 1;
681     return ($self->all)[0];
682   }
683   my @row = (
684     exists $self->{stashed_row}
685       ? @{delete $self->{stashed_row}}
686       : $self->cursor->next
687   );
688   return unless (@row);
689   return $self->_construct_object(@row);
690 }
691
692 sub _resolved_attrs {
693   my $self = shift;
694   return $self->{_attrs} if $self->{_attrs};
695
696   my $attrs = $self->{attrs};
697   my $source = $self->{_parent_source} || $self->{result_source};
698   my $alias = $attrs->{_orig_alias};
699
700   # XXX - lose storable dclone
701   my $record_filter = delete $attrs->{record_filter};
702   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
703   $attrs->{record_filter} = $record_filter if $record_filter;
704
705   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
706   if ($attrs->{columns}) {
707     delete $attrs->{as};
708   } elsif (!$attrs->{select}) {
709     $attrs->{columns} = [ $self->{result_source}->columns ];
710   }
711   
712   my $select_alias = $self->{attrs}{alias};
713   $attrs->{select} ||= [
714     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
715   ];
716   $attrs->{as} ||= [
717     map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
718   ];
719   
720   my $adds;
721   if ($adds = delete $attrs->{include_columns}) {
722     $adds = [$adds] unless ref $adds eq 'ARRAY';
723     push(@{$attrs->{select}}, @$adds);
724     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
725   }
726   if ($adds = delete $attrs->{'+select'}) {
727     $adds = [$adds] unless ref $adds eq 'ARRAY';
728     push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
729   }
730   if (my $adds = delete $attrs->{'+as'}) {
731     $adds = [$adds] unless ref $adds eq 'ARRAY';
732     push(@{$attrs->{as}}, @$adds);
733   }
734
735   $attrs->{from} ||= [ { $alias => $source->from } ];
736   $attrs->{seen_join} ||= {};
737   my %seen;
738   if (my $join = delete $attrs->{join}) {
739     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
740       if (ref $j eq 'HASH') {
741         $seen{$_} = 1 foreach keys %$j;
742       } else {
743         $seen{$j} = 1;
744       }
745     }
746     push(@{$attrs->{from}},
747       $source->resolve_join($join, $alias, $attrs->{seen_join})
748     );
749   }
750
751   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
752   if ($attrs->{order_by}) {
753     $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
754   } else {
755     $attrs->{order_by} ||= [];    
756   }
757
758   my $collapse = $attrs->{collapse} || {};
759   if (my $prefetch = delete $attrs->{prefetch}) {
760     my @pre_order;
761     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
762       if ( ref $p eq 'HASH' ) {
763         foreach my $key (keys %$p) {
764           push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
765             unless $seen{$key};
766         }
767       } else {
768         push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
769           unless $seen{$p};
770       }
771       # bring joins back to level of current class
772       $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
773       if ($p) {
774         my @prefetch = $self->result_source->resolve_prefetch(
775           $p, $alias, {}, \@pre_order, $collapse
776         );
777         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
778         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
779       }
780     }
781     push(@{$attrs->{order_by}}, @pre_order);
782   }
783   $attrs->{collapse} = $collapse;
784
785   return $self->{_attrs} = $attrs;
786 }
787
788 sub _merge_attr {
789   my ($self, $a, $b) = @_;
790   return $b unless $a;
791   
792   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
793     foreach my $key (keys %{$b}) {
794       if (exists $a->{$key}) {
795         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
796       } else {
797         $a->{$key} = $b->{$key};
798       }
799     }
800     return $a;
801   } else {
802     $a = [$a] unless ref $a eq 'ARRAY';
803     $b = [$b] unless ref $b eq 'ARRAY';
804
805     my $hash = {};
806     my @array;
807     foreach my $x ($a, $b) {
808       foreach my $element (@{$x}) {
809         if (ref $element eq 'HASH') {
810           $hash = $self->_merge_attr($hash, $element);
811         } elsif (ref $element eq 'ARRAY') {
812           push(@array, @{$element});
813         } else {
814           push(@array, $element) unless $b == $x
815             && grep { $_ eq $element } @array;
816         }
817       }
818     }
819     
820     @array = grep { !exists $hash->{$_} } @array;
821
822     return keys %{$hash}
823       ? ( scalar(@array)
824             ? [$hash, @array]
825             : $hash
826         )
827       : \@array;
828   }
829 }
830
831 # bring the joins (which are from the original class) to the level
832 # of the current class so that we can resolve them properly
833 sub _reduce_joins {
834   my ($self, $p, $attrs) = @_;
835
836   STACK:
837   foreach my $join (@{$attrs->{_live_join_stack}}) {
838     if (ref $p eq 'HASH') {
839       return undef unless exists $p->{$join};
840       $p = $p->{$join};
841     } elsif (ref $p eq 'ARRAY') {
842       foreach my $pe (@{$p}) {
843         return undef if $pe eq $join;
844         if (ref $pe eq 'HASH' && exists $pe->{$join}) {
845           $p = $pe->{$join};
846           next STACK;
847         }
848       }
849       return undef;
850     } else {
851       return undef;
852     }
853   }
854   return $p;
855 }
856
857 sub _construct_object {
858   my ($self, @row) = @_;
859   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
860   my $new = $self->result_class->inflate_result($self->result_source, @$info);
861   $new = $self->{_attrs}{record_filter}->($new)
862     if exists $self->{_attrs}{record_filter};
863   return $new;
864 }
865
866 sub _collapse_result {
867   my ($self, $as, $row, $prefix) = @_;
868
869   my %const;
870   my @copy = @$row;
871   
872   foreach my $this_as (@$as) {
873     my $val = shift @copy;
874     if (defined $prefix) {
875       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
876         my $remain = $1;
877         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
878         $const{$1||''}{$2} = $val;
879       }
880     } else {
881       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
882       $const{$1||''}{$2} = $val;
883     }
884   }
885
886   my $alias = $self->{attrs}{alias};
887   my $info = [ {}, {} ];
888   foreach my $key (keys %const) {
889     if (length $key && $key ne $alias) {
890       my $target = $info;
891       my @parts = split(/\./, $key);
892       foreach my $p (@parts) {
893         $target = $target->[1]->{$p} ||= [];
894       }
895       $target->[0] = $const{$key};
896     } else {
897       $info->[0] = $const{$key};
898     }
899   }
900   
901   my @collapse;
902   if (defined $prefix) {
903     @collapse = map {
904         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
905     } keys %{$self->{_attrs}{collapse}}
906   } else {
907     @collapse = keys %{$self->{_attrs}{collapse}};
908   };
909
910   if (@collapse) {
911     my ($c) = sort { length $a <=> length $b } @collapse;
912     my $target = $info;
913     foreach my $p (split(/\./, $c)) {
914       $target = $target->[1]->{$p} ||= [];
915     }
916     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
917     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
918     my $tree = $self->_collapse_result($as, $row, $c_prefix);
919     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
920     my (@final, @raw);
921
922     while (
923       !(
924         grep {
925           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
926         } @co_key
927         )
928     ) {
929       push(@final, $tree);
930       last unless (@raw = $self->cursor->next);
931       $row = $self->{stashed_row} = \@raw;
932       $tree = $self->_collapse_result($as, $row, $c_prefix);
933     }
934     @$target = (@final ? @final : [ {}, {} ]);
935       # single empty result to indicate an empty prefetched has_many
936   }
937
938   #print "final info: " . Dumper($info);
939   return $info;
940 }
941
942 =head2 result_source
943
944 =over 4
945
946 =item Arguments: $result_source?
947
948 =item Return Value: $result_source
949
950 =back
951
952 An accessor for the primary ResultSource object from which this ResultSet
953 is derived.
954
955 =cut
956
957
958 =head2 count
959
960 =over 4
961
962 =item Arguments: $cond, \%attrs??
963
964 =item Return Value: $count
965
966 =back
967
968 Performs an SQL C<COUNT> with the same query as the resultset was built
969 with to find the number of elements. If passed arguments, does a search
970 on the resultset and counts the results of that.
971
972 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
973 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
974 not support C<DISTINCT> with multiple columns. If you are using such a
975 database, you should only use columns from the main table in your C<group_by>
976 clause.
977
978 =cut
979
980 sub count {
981   my $self = shift;
982   return $self->search(@_)->count if @_ and defined $_[0];
983   return scalar @{ $self->get_cache } if $self->get_cache;
984   my $count = $self->_count;
985   return 0 unless $count;
986
987   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
988   $count = $self->{attrs}{rows} if
989     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
990   return $count;
991 }
992
993 sub _count { # Separated out so pager can get the full count
994   my $self = shift;
995   my $select = { count => '*' };
996
997   my $attrs = { %{$self->_resolved_attrs} };
998   if (my $group_by = delete $attrs->{group_by}) {
999     delete $attrs->{having};
1000     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1001     # todo: try CONCAT for multi-column pk
1002     my @pk = $self->result_source->primary_columns;
1003     if (@pk == 1) {
1004       my $alias = $attrs->{_orig_alias};
1005       foreach my $column (@distinct) {
1006         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1007           @distinct = ($column);
1008           last;
1009         }
1010       }
1011     }
1012
1013     $select = { count => { distinct => \@distinct } };
1014   }
1015
1016   $attrs->{select} = $select;
1017   $attrs->{as} = [qw/count/];
1018
1019   # offset, order by and page are not needed to count. record_filter is cdbi
1020   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1021   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1022   $tmp_rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
1023        #XXX - hack to pass through parent of related resultsets
1024
1025   my ($count) = $tmp_rs->cursor->next;
1026   return $count;
1027 }
1028
1029 =head2 count_literal
1030
1031 =over 4
1032
1033 =item Arguments: $sql_fragment, @bind_values
1034
1035 =item Return Value: $count
1036
1037 =back
1038
1039 Counts the results in a literal query. Equivalent to calling L</search_literal>
1040 with the passed arguments, then L</count>.
1041
1042 =cut
1043
1044 sub count_literal { shift->search_literal(@_)->count; }
1045
1046 =head2 all
1047
1048 =over 4
1049
1050 =item Arguments: none
1051
1052 =item Return Value: @objects
1053
1054 =back
1055
1056 Returns all elements in the resultset. Called implicitly if the resultset
1057 is returned in list context.
1058
1059 =cut
1060
1061 sub all {
1062   my ($self) = @_;
1063   return @{ $self->get_cache } if $self->get_cache;
1064
1065   my @obj;
1066
1067   # TODO: don't call resolve here
1068   if (keys %{$self->_resolved_attrs->{collapse}}) {
1069 #  if ($self->{attrs}{prefetch}) {
1070       # Using $self->cursor->all is really just an optimisation.
1071       # If we're collapsing has_many prefetches it probably makes
1072       # very little difference, and this is cleaner than hacking
1073       # _construct_object to survive the approach
1074     my @row = $self->cursor->next;
1075     while (@row) {
1076       push(@obj, $self->_construct_object(@row));
1077       @row = (exists $self->{stashed_row}
1078                ? @{delete $self->{stashed_row}}
1079                : $self->cursor->next);
1080     }
1081   } else {
1082     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1083   }
1084
1085   $self->set_cache(\@obj) if $self->{attrs}{cache};
1086   return @obj;
1087 }
1088
1089 =head2 reset
1090
1091 =over 4
1092
1093 =item Arguments: none
1094
1095 =item Return Value: $self
1096
1097 =back
1098
1099 Resets the resultset's cursor, so you can iterate through the elements again.
1100
1101 =cut
1102
1103 sub reset {
1104   my ($self) = @_;
1105   delete $self->{_attrs} if exists $self->{_attrs};
1106   $self->{all_cache_position} = 0;
1107   $self->cursor->reset;
1108   return $self;
1109 }
1110
1111 =head2 first
1112
1113 =over 4
1114
1115 =item Arguments: none
1116
1117 =item Return Value: $object?
1118
1119 =back
1120
1121 Resets the resultset and returns an object for the first result (if the
1122 resultset returns anything).
1123
1124 =cut
1125
1126 sub first {
1127   return $_[0]->reset->next;
1128 }
1129
1130 # _cond_for_update_delete
1131 #
1132 # update/delete require the condition to be modified to handle
1133 # the differing SQL syntax available.  This transforms the $self->{cond}
1134 # appropriately, returning the new condition.
1135
1136 sub _cond_for_update_delete {
1137   my ($self) = @_;
1138   my $cond = {};
1139
1140   # No-op. No condition, we're updating/deleting everything
1141   return $cond unless ref $self->{cond};
1142
1143   if (ref $self->{cond} eq 'ARRAY') {
1144     $cond = [
1145       map {
1146         my %hash;
1147         foreach my $key (keys %{$_}) {
1148           $key =~ /([^.]+)$/;
1149           $hash{$1} = $_->{$key};
1150         }
1151         \%hash;
1152       } @{$self->{cond}}
1153     ];
1154   }
1155   elsif (ref $self->{cond} eq 'HASH') {
1156     if ((keys %{$self->{cond}})[0] eq '-and') {
1157       $cond->{-and} = [];
1158
1159       my @cond = @{$self->{cond}{-and}};
1160       for (my $i = 0; $i < @cond; $i++) {
1161         my $entry = $cond[$i];
1162
1163         my %hash;
1164         if (ref $entry eq 'HASH') {
1165           foreach my $key (keys %{$entry}) {
1166             $key =~ /([^.]+)$/;
1167             $hash{$1} = $entry->{$key};
1168           }
1169         }
1170         else {
1171           $entry =~ /([^.]+)$/;
1172           $hash{$1} = $cond[++$i];
1173         }
1174
1175         push @{$cond->{-and}}, \%hash;
1176       }
1177     }
1178     else {
1179       foreach my $key (keys %{$self->{cond}}) {
1180         $key =~ /([^.]+)$/;
1181         $cond->{$1} = $self->{cond}{$key};
1182       }
1183     }
1184   }
1185   else {
1186     $self->throw_exception(
1187       "Can't update/delete on resultset with condition unless hash or array"
1188     );
1189   }
1190
1191   return $cond;
1192 }
1193
1194
1195 =head2 update
1196
1197 =over 4
1198
1199 =item Arguments: \%values
1200
1201 =item Return Value: $storage_rv
1202
1203 =back
1204
1205 Sets the specified columns in the resultset to the supplied values in a
1206 single query. Return value will be true if the update succeeded or false
1207 if no records were updated; exact type of success value is storage-dependent.
1208
1209 =cut
1210
1211 sub update {
1212   my ($self, $values) = @_;
1213   $self->throw_exception("Values for update must be a hash")
1214     unless ref $values eq 'HASH';
1215
1216   my $cond = $self->_cond_for_update_delete;
1217
1218   return $self->result_source->storage->update(
1219     $self->result_source->from, $values, $cond
1220   );
1221 }
1222
1223 =head2 update_all
1224
1225 =over 4
1226
1227 =item Arguments: \%values
1228
1229 =item Return Value: 1
1230
1231 =back
1232
1233 Fetches all objects and updates them one at a time. Note that C<update_all>
1234 will run DBIC cascade triggers, while L</update> will not.
1235
1236 =cut
1237
1238 sub update_all {
1239   my ($self, $values) = @_;
1240   $self->throw_exception("Values for update must be a hash")
1241     unless ref $values eq 'HASH';
1242   foreach my $obj ($self->all) {
1243     $obj->set_columns($values)->update;
1244   }
1245   return 1;
1246 }
1247
1248 =head2 delete
1249
1250 =over 4
1251
1252 =item Arguments: none
1253
1254 =item Return Value: 1
1255
1256 =back
1257
1258 Deletes the contents of the resultset from its result source. Note that this
1259 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1260 to run.
1261
1262 =cut
1263
1264 sub delete {
1265   my ($self) = @_;
1266
1267   my $cond = $self->_cond_for_update_delete;
1268
1269   $self->result_source->storage->delete($self->result_source->from, $cond);
1270   return 1;
1271 }
1272
1273 =head2 delete_all
1274
1275 =over 4
1276
1277 =item Arguments: none
1278
1279 =item Return Value: 1
1280
1281 =back
1282
1283 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1284 will run DBIC cascade triggers, while L</delete> will not.
1285
1286 =cut
1287
1288 sub delete_all {
1289   my ($self) = @_;
1290   $_->delete for $self->all;
1291   return 1;
1292 }
1293
1294 =head2 pager
1295
1296 =over 4
1297
1298 =item Arguments: none
1299
1300 =item Return Value: $pager
1301
1302 =back
1303
1304 Return Value a L<Data::Page> object for the current resultset. Only makes
1305 sense for queries with a C<page> attribute.
1306
1307 =cut
1308
1309 sub pager {
1310   my ($self) = @_;
1311   my $attrs = $self->{attrs};
1312   $self->throw_exception("Can't create pager for non-paged rs")
1313     unless $self->{attrs}{page};
1314   $attrs->{rows} ||= 10;
1315   return $self->{pager} ||= Data::Page->new(
1316     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1317 }
1318
1319 =head2 page
1320
1321 =over 4
1322
1323 =item Arguments: $page_number
1324
1325 =item Return Value: $rs
1326
1327 =back
1328
1329 Returns a resultset for the $page_number page of the resultset on which page
1330 is called, where each page contains a number of rows equal to the 'rows'
1331 attribute set on the resultset (10 by default).
1332
1333 =cut
1334
1335 sub page {
1336   my ($self, $page) = @_;
1337   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1338 }
1339
1340 =head2 new_result
1341
1342 =over 4
1343
1344 =item Arguments: \%vals
1345
1346 =item Return Value: $object
1347
1348 =back
1349
1350 Creates an object in the resultset's result class and returns it.
1351
1352 =cut
1353
1354 sub new_result {
1355   my ($self, $values) = @_;
1356   $self->throw_exception( "new_result needs a hash" )
1357     unless (ref $values eq 'HASH');
1358   $self->throw_exception(
1359     "Can't abstract implicit construct, condition not a hash"
1360   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1361   my %new = %$values;
1362   my $alias = $self->{attrs}{_orig_alias};
1363   foreach my $key (keys %{$self->{cond}||{}}) {
1364     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1365   }
1366   my $obj = $self->result_class->new(\%new);
1367   $obj->result_source($self->result_source) if $obj->can('result_source');
1368   return $obj;
1369 }
1370
1371 =head2 find_or_new
1372
1373 =over 4
1374
1375 =item Arguments: \%vals, \%attrs?
1376
1377 =item Return Value: $object
1378
1379 =back
1380
1381 Find an existing record from this resultset. If none exists, instantiate a new
1382 result object and return it. The object will not be saved into your storage
1383 until you call L<DBIx::Class::Row/insert> on it.
1384
1385 If you want objects to be saved immediately, use L</find_or_create> instead.
1386
1387 =cut
1388
1389 sub find_or_new {
1390   my $self     = shift;
1391   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1392   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1393   my $exists   = $self->find($hash, $attrs);
1394   return defined $exists ? $exists : $self->new_result($hash);
1395 }
1396
1397 =head2 create
1398
1399 =over 4
1400
1401 =item Arguments: \%vals
1402
1403 =item Return Value: $object
1404
1405 =back
1406
1407 Inserts a record into the resultset and returns the object representing it.
1408
1409 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1410
1411 =cut
1412
1413 sub create {
1414   my ($self, $attrs) = @_;
1415   $self->throw_exception( "create needs a hashref" )
1416     unless ref $attrs eq 'HASH';
1417   return $self->new_result($attrs)->insert;
1418 }
1419
1420 =head2 find_or_create
1421
1422 =over 4
1423
1424 =item Arguments: \%vals, \%attrs?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430   $class->find_or_create({ key => $val, ... });
1431
1432 Tries to find a record based on its primary key or unique constraint; if none
1433 is found, creates one and returns that instead.
1434
1435   my $cd = $schema->resultset('CD')->find_or_create({
1436     cdid   => 5,
1437     artist => 'Massive Attack',
1438     title  => 'Mezzanine',
1439     year   => 2005,
1440   });
1441
1442 Also takes an optional C<key> attribute, to search by a specific key or unique
1443 constraint. For example:
1444
1445   my $cd = $schema->resultset('CD')->find_or_create(
1446     {
1447       artist => 'Massive Attack',
1448       title  => 'Mezzanine',
1449     },
1450     { key => 'cd_artist_title' }
1451   );
1452
1453 See also L</find> and L</update_or_create>. For information on how to declare
1454 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1455
1456 =cut
1457
1458 sub find_or_create {
1459   my $self     = shift;
1460   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1461   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1462   my $exists   = $self->find($hash, $attrs);
1463   return defined $exists ? $exists : $self->create($hash);
1464 }
1465
1466 =head2 update_or_create
1467
1468 =over 4
1469
1470 =item Arguments: \%col_values, { key => $unique_constraint }?
1471
1472 =item Return Value: $object
1473
1474 =back
1475
1476   $class->update_or_create({ col => $val, ... });
1477
1478 First, searches for an existing row matching one of the unique constraints
1479 (including the primary key) on the source of this resultset. If a row is
1480 found, updates it with the other given column values. Otherwise, creates a new
1481 row.
1482
1483 Takes an optional C<key> attribute to search on a specific unique constraint.
1484 For example:
1485
1486   # In your application
1487   my $cd = $schema->resultset('CD')->update_or_create(
1488     {
1489       artist => 'Massive Attack',
1490       title  => 'Mezzanine',
1491       year   => 1998,
1492     },
1493     { key => 'cd_artist_title' }
1494   );
1495
1496 If no C<key> is specified, it searches on all unique constraints defined on the
1497 source, including the primary key.
1498
1499 If the C<key> is specified as C<primary>, it searches only on the primary key.
1500
1501 See also L</find> and L</find_or_create>. For information on how to declare
1502 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1503
1504 =cut
1505
1506 sub update_or_create {
1507   my $self = shift;
1508   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1509   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1510
1511   my $row = $self->find($cond);
1512   if (defined $row) {
1513     $row->update($cond);
1514     return $row;
1515   }
1516
1517   return $self->create($cond);
1518 }
1519
1520 =head2 get_cache
1521
1522 =over 4
1523
1524 =item Arguments: none
1525
1526 =item Return Value: \@cache_objects?
1527
1528 =back
1529
1530 Gets the contents of the cache for the resultset, if the cache is set.
1531
1532 =cut
1533
1534 sub get_cache {
1535   shift->{all_cache};
1536 }
1537
1538 =head2 set_cache
1539
1540 =over 4
1541
1542 =item Arguments: \@cache_objects
1543
1544 =item Return Value: \@cache_objects
1545
1546 =back
1547
1548 Sets the contents of the cache for the resultset. Expects an arrayref
1549 of objects of the same class as those produced by the resultset. Note that
1550 if the cache is set the resultset will return the cached objects rather
1551 than re-querying the database even if the cache attr is not set.
1552
1553 =cut
1554
1555 sub set_cache {
1556   my ( $self, $data ) = @_;
1557   $self->throw_exception("set_cache requires an arrayref")
1558       if defined($data) && (ref $data ne 'ARRAY');
1559   $self->{all_cache} = $data;
1560 }
1561
1562 =head2 clear_cache
1563
1564 =over 4
1565
1566 =item Arguments: none
1567
1568 =item Return Value: []
1569
1570 =back
1571
1572 Clears the cache for the resultset.
1573
1574 =cut
1575
1576 sub clear_cache {
1577   shift->set_cache(undef);
1578 }
1579
1580 =head2 related_resultset
1581
1582 =over 4
1583
1584 =item Arguments: $relationship_name
1585
1586 =item Return Value: $resultset
1587
1588 =back
1589
1590 Returns a related resultset for the supplied relationship name.
1591
1592   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1593
1594 =cut
1595
1596 sub related_resultset {
1597   my ($self, $rel) = @_;
1598
1599   $self->{related_resultsets} ||= {};
1600   return $self->{related_resultsets}{$rel} ||= do {
1601     my $rel_obj = $self->result_source->relationship_info($rel);
1602
1603     $self->throw_exception(
1604       "search_related: result source '" . $self->result_source->name .
1605         "' has no such relationship $rel")
1606       unless $rel_obj;
1607     
1608     my $join_count = $self->_resolved_attrs->{seen_join}{$rel};
1609     my $alias = $join_count ? join('_', $rel, $join_count+1) : $rel;
1610
1611     my @live_join_stack = (@{$self->{attrs}{_live_join_stack}||[]}, $rel);
1612
1613     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1614       undef, {
1615         select => undef,
1616         as => undef,
1617         alias => $alias,
1618         _live_join_stack => \@live_join_stack,
1619         _parent_attrs => $self->{attrs}}
1620     );
1621
1622     # keep reference of the original resultset
1623     $rs->{_parent_source} = $self->{_parent_source} || $self->result_source;
1624
1625     return $rs;
1626   };
1627 }
1628
1629 =head2 throw_exception
1630
1631 See L<DBIx::Class::Schema/throw_exception> for details.
1632
1633 =cut
1634
1635 sub throw_exception {
1636   my $self=shift;
1637   $self->result_source->schema->throw_exception(@_);
1638 }
1639
1640 # XXX: FIXME: Attributes docs need clearing up
1641
1642 =head1 ATTRIBUTES
1643
1644 The resultset takes various attributes that modify its behavior. Here's an
1645 overview of them:
1646
1647 =head2 order_by
1648
1649 =over 4
1650
1651 =item Value: ($order_by | \@order_by)
1652
1653 =back
1654
1655 Which column(s) to order the results by. This is currently passed
1656 through directly to SQL, so you can give e.g. C<year DESC> for a
1657 descending order on the column `year'.
1658
1659 Please note that if you have quoting enabled (see
1660 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1661 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1662 so you will need to manually quote things as appropriate.)
1663
1664 =head2 columns
1665
1666 =over 4
1667
1668 =item Value: \@columns
1669
1670 =back
1671
1672 Shortcut to request a particular set of columns to be retrieved.  Adds
1673 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1674 from that, then auto-populates C<as> from C<select> as normal. (You may also
1675 use the C<cols> attribute, as in earlier versions of DBIC.)
1676
1677 =head2 include_columns
1678
1679 =over 4
1680
1681 =item Value: \@columns
1682
1683 =back
1684
1685 Shortcut to include additional columns in the returned results - for example
1686
1687   $schema->resultset('CD')->search(undef, {
1688     include_columns => ['artist.name'],
1689     join => ['artist']
1690   });
1691
1692 would return all CDs and include a 'name' column to the information
1693 passed to object inflation
1694
1695 =head2 select
1696
1697 =over 4
1698
1699 =item Value: \@select_columns
1700
1701 =back
1702
1703 Indicates which columns should be selected from the storage. You can use
1704 column names, or in the case of RDBMS back ends, function or stored procedure
1705 names:
1706
1707   $rs = $schema->resultset('Employee')->search(undef, {
1708     select => [
1709       'name',
1710       { count => 'employeeid' },
1711       { sum => 'salary' }
1712     ]
1713   });
1714
1715 When you use function/stored procedure names and do not supply an C<as>
1716 attribute, the column names returned are storage-dependent. E.g. MySQL would
1717 return a column named C<count(employeeid)> in the above example.
1718
1719 =head2 +select
1720
1721 =over 4
1722
1723 Indicates additional columns to be selected from storage.  Works the same as
1724 L<select> but adds columns to the selection.
1725
1726 =back
1727
1728 =head2 +as
1729
1730 =over 4
1731
1732 Indicates additional column names for those added via L<+select>.
1733
1734 =back
1735
1736 =head2 as
1737
1738 =over 4
1739
1740 =item Value: \@inflation_names
1741
1742 =back
1743
1744 Indicates column names for object inflation. This is used in conjunction with
1745 C<select>, usually when C<select> contains one or more function or stored
1746 procedure names:
1747
1748   $rs = $schema->resultset('Employee')->search(undef, {
1749     select => [
1750       'name',
1751       { count => 'employeeid' }
1752     ],
1753     as => ['name', 'employee_count'],
1754   });
1755
1756   my $employee = $rs->first(); # get the first Employee
1757
1758 If the object against which the search is performed already has an accessor
1759 matching a column name specified in C<as>, the value can be retrieved using
1760 the accessor as normal:
1761
1762   my $name = $employee->name();
1763
1764 If on the other hand an accessor does not exist in the object, you need to
1765 use C<get_column> instead:
1766
1767   my $employee_count = $employee->get_column('employee_count');
1768
1769 You can create your own accessors if required - see
1770 L<DBIx::Class::Manual::Cookbook> for details.
1771
1772 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1773 produced, it is used for internal access only. Thus attempting to use the accessor
1774 in an C<order_by> clause or similar will fail misrably.
1775
1776 =head2 join
1777
1778 =over 4
1779
1780 =item Value: ($rel_name | \@rel_names | \%rel_names)
1781
1782 =back
1783
1784 Contains a list of relationships that should be joined for this query.  For
1785 example:
1786
1787   # Get CDs by Nine Inch Nails
1788   my $rs = $schema->resultset('CD')->search(
1789     { 'artist.name' => 'Nine Inch Nails' },
1790     { join => 'artist' }
1791   );
1792
1793 Can also contain a hash reference to refer to the other relation's relations.
1794 For example:
1795
1796   package MyApp::Schema::Track;
1797   use base qw/DBIx::Class/;
1798   __PACKAGE__->table('track');
1799   __PACKAGE__->add_columns(qw/trackid cd position title/);
1800   __PACKAGE__->set_primary_key('trackid');
1801   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1802   1;
1803
1804   # In your application
1805   my $rs = $schema->resultset('Artist')->search(
1806     { 'track.title' => 'Teardrop' },
1807     {
1808       join     => { cd => 'track' },
1809       order_by => 'artist.name',
1810     }
1811   );
1812
1813 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1814 similarly for a third time). For e.g.
1815
1816   my $rs = $schema->resultset('Artist')->search({
1817     'cds.title'   => 'Down to Earth',
1818     'cds_2.title' => 'Popular',
1819   }, {
1820     join => [ qw/cds cds/ ],
1821   });
1822
1823 will return a set of all artists that have both a cd with title 'Down
1824 to Earth' and a cd with title 'Popular'.
1825
1826 If you want to fetch related objects from other tables as well, see C<prefetch>
1827 below.
1828
1829 =head2 prefetch
1830
1831 =over 4
1832
1833 =item Value: ($rel_name | \@rel_names | \%rel_names)
1834
1835 =back
1836
1837 Contains one or more relationships that should be fetched along with the main
1838 query (when they are accessed afterwards they will have already been
1839 "prefetched").  This is useful for when you know you will need the related
1840 objects, because it saves at least one query:
1841
1842   my $rs = $schema->resultset('Tag')->search(
1843     undef,
1844     {
1845       prefetch => {
1846         cd => 'artist'
1847       }
1848     }
1849   );
1850
1851 The initial search results in SQL like the following:
1852
1853   SELECT tag.*, cd.*, artist.* FROM tag
1854   JOIN cd ON tag.cd = cd.cdid
1855   JOIN artist ON cd.artist = artist.artistid
1856
1857 L<DBIx::Class> has no need to go back to the database when we access the
1858 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1859 case.
1860
1861 Simple prefetches will be joined automatically, so there is no need
1862 for a C<join> attribute in the above search. If you're prefetching to
1863 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1864 specify the join as well.
1865
1866 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1867 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1868 with an accessor type of 'single' or 'filter').
1869
1870 =head2 page
1871
1872 =over 4
1873
1874 =item Value: $page
1875
1876 =back
1877
1878 Makes the resultset paged and specifies the page to retrieve. Effectively
1879 identical to creating a non-pages resultset and then calling ->page($page)
1880 on it.
1881
1882 If L<rows> attribute is not specified it defualts to 10 rows per page.
1883
1884 =head2 rows
1885
1886 =over 4
1887
1888 =item Value: $rows
1889
1890 =back
1891
1892 Specifes the maximum number of rows for direct retrieval or the number of
1893 rows per page if the page attribute or method is used.
1894
1895 =head2 offset
1896
1897 =over 4
1898
1899 =item Value: $offset
1900
1901 =back
1902
1903 Specifies the (zero-based) row number for the  first row to be returned, or the
1904 of the first row of the first page if paging is used.
1905
1906 =head2 group_by
1907
1908 =over 4
1909
1910 =item Value: \@columns
1911
1912 =back
1913
1914 A arrayref of columns to group by. Can include columns of joined tables.
1915
1916   group_by => [qw/ column1 column2 ... /]
1917
1918 =head2 having
1919
1920 =over 4
1921
1922 =item Value: $condition
1923
1924 =back
1925
1926 HAVING is a select statement attribute that is applied between GROUP BY and
1927 ORDER BY. It is applied to the after the grouping calculations have been
1928 done.
1929
1930   having => { 'count(employee)' => { '>=', 100 } }
1931
1932 =head2 distinct
1933
1934 =over 4
1935
1936 =item Value: (0 | 1)
1937
1938 =back
1939
1940 Set to 1 to group by all columns.
1941
1942 =head2 where
1943
1944 =over 4
1945
1946 Adds to the WHERE clause.
1947
1948   # only return rows WHERE deleted IS NULL for all searches
1949   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1950
1951 Can be overridden by passing C<{ where => undef }> as an attribute
1952 to a resulset.
1953
1954 =back
1955
1956 =head2 cache
1957
1958 Set to 1 to cache search results. This prevents extra SQL queries if you
1959 revisit rows in your ResultSet:
1960
1961   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1962
1963   while( my $artist = $resultset->next ) {
1964     ... do stuff ...
1965   }
1966
1967   $rs->first; # without cache, this would issue a query
1968
1969 By default, searches are not cached.
1970
1971 For more examples of using these attributes, see
1972 L<DBIx::Class::Manual::Cookbook>.
1973
1974 =head2 from
1975
1976 =over 4
1977
1978 =item Value: \@from_clause
1979
1980 =back
1981
1982 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1983 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1984 clauses.
1985
1986 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1987
1988 C<join> will usually do what you need and it is strongly recommended that you
1989 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1990 And we really do mean "cannot", not just tried and failed. Attempting to use
1991 this because you're having problems with C<join> is like trying to use x86
1992 ASM because you've got a syntax error in your C. Trust us on this.
1993
1994 Now, if you're still really, really sure you need to use this (and if you're
1995 not 100% sure, ask the mailing list first), here's an explanation of how this
1996 works.
1997
1998 The syntax is as follows -
1999
2000   [
2001     { <alias1> => <table1> },
2002     [
2003       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2004       [], # nested JOIN (optional)
2005       { <table1.column1> => <table2.column2>, ... (more conditions) },
2006     ],
2007     # More of the above [ ] may follow for additional joins
2008   ]
2009
2010   <table1> <alias1>
2011   JOIN
2012     <table2> <alias2>
2013     [JOIN ...]
2014   ON <table1.column1> = <table2.column2>
2015   <more joins may follow>
2016
2017 An easy way to follow the examples below is to remember the following:
2018
2019     Anything inside "[]" is a JOIN
2020     Anything inside "{}" is a condition for the enclosing JOIN
2021
2022 The following examples utilize a "person" table in a family tree application.
2023 In order to express parent->child relationships, this table is self-joined:
2024
2025     # Person->belongs_to('father' => 'Person');
2026     # Person->belongs_to('mother' => 'Person');
2027
2028 C<from> can be used to nest joins. Here we return all children with a father,
2029 then search against all mothers of those children:
2030
2031   $rs = $schema->resultset('Person')->search(
2032       undef,
2033       {
2034           alias => 'mother', # alias columns in accordance with "from"
2035           from => [
2036               { mother => 'person' },
2037               [
2038                   [
2039                       { child => 'person' },
2040                       [
2041                           { father => 'person' },
2042                           { 'father.person_id' => 'child.father_id' }
2043                       ]
2044                   ],
2045                   { 'mother.person_id' => 'child.mother_id' }
2046               ],
2047           ]
2048       },
2049   );
2050
2051   # Equivalent SQL:
2052   # SELECT mother.* FROM person mother
2053   # JOIN (
2054   #   person child
2055   #   JOIN person father
2056   #   ON ( father.person_id = child.father_id )
2057   # )
2058   # ON ( mother.person_id = child.mother_id )
2059
2060 The type of any join can be controlled manually. To search against only people
2061 with a father in the person table, we could explicitly use C<INNER JOIN>:
2062
2063     $rs = $schema->resultset('Person')->search(
2064         undef,
2065         {
2066             alias => 'child', # alias columns in accordance with "from"
2067             from => [
2068                 { child => 'person' },
2069                 [
2070                     { father => 'person', -join_type => 'inner' },
2071                     { 'father.id' => 'child.father_id' }
2072                 ],
2073             ]
2074         },
2075     );
2076
2077     # Equivalent SQL:
2078     # SELECT child.* FROM person child
2079     # INNER JOIN person father ON child.father_id = father.id
2080
2081 =cut
2082
2083 1;