f4e7f9650fefb253ced099ce61fec3c45525c28f
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Scalar::Util qw/weaken/;
13 use DBIx::Class::ResultSetColumn;
14 use base qw/DBIx::Class/;
15 __PACKAGE__->load_components(qw/AccessorGroup/);
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   weaken $source;
89
90   if ($attrs->{page}) {
91     $attrs->{rows} ||= 10;
92     $attrs->{offset} ||= 0;
93     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
94   }
95
96   $attrs->{alias} ||= 'me';
97   $attrs->{_orig_alias} ||= $attrs->{alias};
98
99   bless {
100     result_source => $source,
101     result_class => $attrs->{result_class} || $source->result_class,
102     cond => $attrs->{where},
103 #    from => $attrs->{from},
104 #    collapse => $collapse,
105     count => undef,
106     pager => undef,
107     attrs => $attrs
108   }, $class;
109 }
110
111 =head2 search
112
113 =over 4
114
115 =item Arguments: $cond, \%attrs?
116
117 =item Return Value: $resultset (scalar context), @row_objs (list context)
118
119 =back
120
121   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
122   my $new_rs = $cd_rs->search({ year => 2005 });
123
124   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
125                  # year = 2005 OR year = 2004
126
127 If you need to pass in additional attributes but no additional condition,
128 call it as C<search(undef, \%attrs)>.
129
130   # "SELECT name, artistid FROM $artist_table"
131   my @all_artists = $schema->resultset('Artist')->search(undef, {
132     columns => [qw/name artistid/],
133   });
134
135 =cut
136
137 sub search {
138   my $self = shift;
139   my $rs = $self->search_rs( @_ );
140   return (wantarray ? $rs->all : $rs);
141 }
142
143 =head2 search_rs
144
145 =over 4
146
147 =item Arguments: $cond, \%attrs?
148
149 =item Return Value: $resultset
150
151 =back
152
153 This method does the same exact thing as search() except it will
154 always return a resultset, even in list context.
155
156 =cut
157
158 sub search_rs {
159   my $self = shift;
160
161   my $attrs = {};
162   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
163   my $our_attrs = exists $attrs->{_parent_attrs}
164     ? { %{delete $attrs->{_parent_attrs}} }
165     : { %{$self->{attrs}} };
166   my $having = delete $our_attrs->{having};
167
168   # XXX should only maintain _live_join_stack and generate _live_join_h from that
169   if ($attrs->{_live_join_stack}) {
170     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
171       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
172         ? { $join => $attrs->{_live_join_h} }
173         : $join;
174     }
175   }
176
177   # merge new attrs into inherited
178   foreach my $key (qw/join prefetch/) {
179     next unless exists $attrs->{$key};
180     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
181       foreach my $join (reverse @{$live_join}) {
182         $attrs->{$key} = { $join => $attrs->{$key} };
183       }
184     }
185
186     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
187   }
188
189   $our_attrs->{join} = $self->_merge_attr(
190     $our_attrs->{join}, $attrs->{_live_join_h}
191   ) if ($attrs->{_live_join_h});
192
193   if (defined $our_attrs->{prefetch}) {
194     $our_attrs->{join} = $self->_merge_attr(
195       $our_attrs->{join}, $our_attrs->{prefetch}
196     );
197   }
198
199   my $new_attrs = { %{$our_attrs}, %{$attrs} };
200   my $where = (@_
201     ? (
202         (@_ == 1 || ref $_[0] eq "HASH")
203           ? shift
204           : (
205               (@_ % 2)
206                 ? $self->throw_exception("Odd number of arguments to search")
207                 : {@_}
208              )
209       )
210     : undef
211   );
212
213   if (defined $where) {
214     $new_attrs->{where} = (
215       defined $new_attrs->{where}
216         ? { '-and' => [
217               map {
218                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
219               } $where, $new_attrs->{where}
220             ]
221           }
222         : $where);
223   }
224
225   if (defined $having) {
226     $new_attrs->{having} = (
227       defined $new_attrs->{having}
228         ? { '-and' => [
229               map {
230                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
231               } $having, $new_attrs->{having}
232             ]
233           }
234         : $having);
235   }
236
237   my $rs = (ref $self)->new($self->result_source, $new_attrs);
238   $rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
239
240   unless (@_) {                 # no search, effectively just a clone
241     my $rows = $self->get_cache;
242     if ($rows) {
243       $rs->set_cache($rows);
244     }
245   }
246   return $rs;
247 }
248
249 =head2 search_literal
250
251 =over 4
252
253 =item Arguments: $sql_fragment, @bind_values
254
255 =item Return Value: $resultset (scalar context), @row_objs (list context)
256
257 =back
258
259   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
260   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
261
262 Pass a literal chunk of SQL to be added to the conditional part of the
263 resultset query.
264
265 =cut
266
267 sub search_literal {
268   my ($self, $cond, @vals) = @_;
269   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
270   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
271   return $self->search(\$cond, $attrs);
272 }
273
274 =head2 find
275
276 =over 4
277
278 =item Arguments: @values | \%cols, \%attrs?
279
280 =item Return Value: $row_object
281
282 =back
283
284 Finds a row based on its primary key or unique constraint. For example, to find
285 a row by its primary key:
286
287   my $cd = $schema->resultset('CD')->find(5);
288
289 You can also find a row by a specific unique constraint using the C<key>
290 attribute. For example:
291
292   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
293     key => 'cd_artist_title'
294   });
295
296 Additionally, you can specify the columns explicitly by name:
297
298   my $cd = $schema->resultset('CD')->find(
299     {
300       artist => 'Massive Attack',
301       title  => 'Mezzanine',
302     },
303     { key => 'cd_artist_title' }
304   );
305
306 If the C<key> is specified as C<primary>, it searches only on the primary key.
307
308 If no C<key> is specified, it searches on all unique constraints defined on the
309 source, including the primary key.
310
311 See also L</find_or_create> and L</update_or_create>. For information on how to
312 declare unique constraints, see
313 L<DBIx::Class::ResultSource/add_unique_constraint>.
314
315 =cut
316
317 sub find {
318   my $self = shift;
319   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
320
321   # Default to the primary key, but allow a specific key
322   my @cols = exists $attrs->{key}
323     ? $self->result_source->unique_constraint_columns($attrs->{key})
324     : $self->result_source->primary_columns;
325   $self->throw_exception(
326     "Can't find unless a primary key or unique constraint is defined"
327   ) unless @cols;
328
329   # Parse out a hashref from input
330   my $input_query;
331   if (ref $_[0] eq 'HASH') {
332     $input_query = { %{$_[0]} };
333   }
334   elsif (@_ == @cols) {
335     $input_query = {};
336     @{$input_query}{@cols} = @_;
337   }
338   else {
339     # Compatibility: Allow e.g. find(id => $value)
340     carp "Find by key => value deprecated; please use a hashref instead";
341     $input_query = {@_};
342   }
343
344   my @unique_queries = $self->_unique_queries($input_query, $attrs);
345
346   # Handle cases where the ResultSet defines the query, or where the user is
347   # abusing find
348   my $query = @unique_queries ? \@unique_queries : $input_query;
349
350   # Run the query
351   if (keys %$attrs) {
352     my $rs = $self->search($query, $attrs);
353     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
354   }
355   else {
356     return keys %{$self->_resolved_attrs->{collapse}}
357       ? $self->search($query)->next
358       : $self->single($query);
359   }
360 }
361
362 # _unique_queries
363 #
364 # Build a list of queries which satisfy unique constraints.
365
366 sub _unique_queries {
367   my ($self, $query, $attrs) = @_;
368
369   my @constraint_names = exists $attrs->{key}
370     ? ($attrs->{key})
371     : $self->result_source->unique_constraint_names;
372
373   my @unique_queries;
374   foreach my $name (@constraint_names) {
375     my @unique_cols = $self->result_source->unique_constraint_columns($name);
376     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
377
378     next unless scalar keys %$unique_query;
379
380     # Add the ResultSet's alias
381     my $alias = $self->{attrs}{alias};
382     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
383       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
384     }
385
386     push @unique_queries, $unique_query;
387   }
388
389   return @unique_queries;
390 }
391
392 # _build_unique_query
393 #
394 # Constrain the specified query hash based on the specified column names.
395
396 sub _build_unique_query {
397   my ($self, $query, $unique_cols) = @_;
398
399   return {
400     map  { $_ => $query->{$_} }
401     grep { exists $query->{$_} }
402       @$unique_cols
403   };
404 }
405
406 =head2 search_related
407
408 =over 4
409
410 =item Arguments: $rel, $cond, \%attrs?
411
412 =item Return Value: $new_resultset
413
414 =back
415
416   $new_rs = $cd_rs->search_related('artist', {
417     name => 'Emo-R-Us',
418   });
419
420 Searches the specified relationship, optionally specifying a condition and
421 attributes for matching records. See L</ATTRIBUTES> for more information.
422
423 =cut
424
425 sub search_related {
426   return shift->related_resultset(shift)->search(@_);
427 }
428
429 =head2 cursor
430
431 =over 4
432
433 =item Arguments: none
434
435 =item Return Value: $cursor
436
437 =back
438
439 Returns a storage-driven cursor to the given resultset. See
440 L<DBIx::Class::Cursor> for more information.
441
442 =cut
443
444 sub cursor {
445   my ($self) = @_;
446
447   my $attrs = { %{$self->_resolved_attrs} };
448   return $self->{cursor}
449     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
450           $attrs->{where},$attrs);
451 }
452
453 =head2 single
454
455 =over 4
456
457 =item Arguments: $cond?
458
459 =item Return Value: $row_object?
460
461 =back
462
463   my $cd = $schema->resultset('CD')->single({ year => 2001 });
464
465 Inflates the first result without creating a cursor if the resultset has
466 any records in it; if not returns nothing. Used by L</find> as an optimisation.
467
468 Can optionally take an additional condition *only* - this is a fast-code-path
469 method; if you need to add extra joins or similar call ->search and then
470 ->single without a condition on the $rs returned from that.
471
472 =cut
473
474 sub single {
475   my ($self, $where) = @_;
476   my $attrs = { %{$self->_resolved_attrs} };
477   if ($where) {
478     if (defined $attrs->{where}) {
479       $attrs->{where} = {
480         '-and' =>
481             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
482                $where, delete $attrs->{where} ]
483       };
484     } else {
485       $attrs->{where} = $where;
486     }
487   }
488
489   unless ($self->_is_unique_query($attrs->{where})) {
490     carp "Query not guaranteed to return a single row"
491       . "; please declare your unique constraints or use search instead";
492   }
493
494   my @data = $self->result_source->storage->select_single(
495     $attrs->{from}, $attrs->{select},
496     $attrs->{where}, $attrs
497   );
498
499   return (@data ? $self->_construct_object(@data) : ());
500 }
501
502 # _is_unique_query
503 #
504 # Try to determine if the specified query is guaranteed to be unique, based on
505 # the declared unique constraints.
506
507 sub _is_unique_query {
508   my ($self, $query) = @_;
509
510   my $collapsed = $self->_collapse_query($query);
511   my $alias = $self->{attrs}{alias};
512
513   foreach my $name ($self->result_source->unique_constraint_names) {
514     my @unique_cols = map {
515       "$alias.$_"
516     } $self->result_source->unique_constraint_columns($name);
517
518     # Count the values for each unique column
519     my %seen = map { $_ => 0 } @unique_cols;
520
521     foreach my $key (keys %$collapsed) {
522       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
523       next unless exists $seen{$aliased};  # Additional constraints are okay
524       $seen{$aliased} = scalar @{ $collapsed->{$key} };
525     }
526
527     # If we get 0 or more than 1 value for a column, it's not necessarily unique
528     return 1 unless grep { $_ != 1 } values %seen;
529   }
530
531   return 0;
532 }
533
534 # _collapse_query
535 #
536 # Recursively collapse the query, accumulating values for each column.
537
538 sub _collapse_query {
539   my ($self, $query, $collapsed) = @_;
540
541   $collapsed ||= {};
542
543   if (ref $query eq 'ARRAY') {
544     foreach my $subquery (@$query) {
545       next unless ref $subquery;  # -or
546 #      warn "ARRAY: " . Dumper $subquery;
547       $collapsed = $self->_collapse_query($subquery, $collapsed);
548     }
549   }
550   elsif (ref $query eq 'HASH') {
551     if (keys %$query and (keys %$query)[0] eq '-and') {
552       foreach my $subquery (@{$query->{-and}}) {
553 #        warn "HASH: " . Dumper $subquery;
554         $collapsed = $self->_collapse_query($subquery, $collapsed);
555       }
556     }
557     else {
558 #      warn "LEAF: " . Dumper $query;
559       foreach my $key (keys %$query) {
560         push @{$collapsed->{$key}}, $query->{$key};
561       }
562     }
563   }
564
565   return $collapsed;
566 }
567
568 =head2 get_column
569
570 =over 4
571
572 =item Arguments: $cond?
573
574 =item Return Value: $resultsetcolumn
575
576 =back
577
578   my $max_length = $rs->get_column('length')->max;
579
580 Returns a ResultSetColumn instance for $column based on $self
581
582 =cut
583
584 sub get_column {
585   my ($self, $column) = @_;
586   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
587   return $new;
588 }
589
590 =head2 search_like
591
592 =over 4
593
594 =item Arguments: $cond, \%attrs?
595
596 =item Return Value: $resultset (scalar context), @row_objs (list context)
597
598 =back
599
600   # WHERE title LIKE '%blue%'
601   $cd_rs = $rs->search_like({ title => '%blue%'});
602
603 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
604 that this is simply a convenience method. You most likely want to use
605 L</search> with specific operators.
606
607 For more information, see L<DBIx::Class::Manual::Cookbook>.
608
609 =cut
610
611 sub search_like {
612   my $class = shift;
613   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
614   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
615   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
616   return $class->search($query, { %$attrs });
617 }
618
619 =head2 slice
620
621 =over 4
622
623 =item Arguments: $first, $last
624
625 =item Return Value: $resultset (scalar context), @row_objs (list context)
626
627 =back
628
629 Returns a resultset or object list representing a subset of elements from the
630 resultset slice is called on. Indexes are from 0, i.e., to get the first
631 three records, call:
632
633   my ($one, $two, $three) = $rs->slice(0, 2);
634
635 =cut
636
637 sub slice {
638   my ($self, $min, $max) = @_;
639   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
640   $attrs->{offset} = $self->{attrs}{offset} || 0;
641   $attrs->{offset} += $min;
642   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
643   return $self->search(undef(), $attrs);
644   #my $slice = (ref $self)->new($self->result_source, $attrs);
645   #return (wantarray ? $slice->all : $slice);
646 }
647
648 =head2 next
649
650 =over 4
651
652 =item Arguments: none
653
654 =item Return Value: $result?
655
656 =back
657
658 Returns the next element in the resultset (C<undef> is there is none).
659
660 Can be used to efficiently iterate over records in the resultset:
661
662   my $rs = $schema->resultset('CD')->search;
663   while (my $cd = $rs->next) {
664     print $cd->title;
665   }
666
667 Note that you need to store the resultset object, and call C<next> on it.
668 Calling C<< resultset('Table')->next >> repeatedly will always return the
669 first record from the resultset.
670
671 =cut
672
673 sub next {
674   my ($self) = @_;
675   if (my $cache = $self->get_cache) {
676     $self->{all_cache_position} ||= 0;
677     return $cache->[$self->{all_cache_position}++];
678   }
679   if ($self->{attrs}{cache}) {
680     $self->{all_cache_position} = 1;
681     return ($self->all)[0];
682   }
683   my @row = (
684     exists $self->{stashed_row}
685       ? @{delete $self->{stashed_row}}
686       : $self->cursor->next
687   );
688   return unless (@row);
689   return $self->_construct_object(@row);
690 }
691
692 sub _resolved_attrs {
693   my $self = shift;
694   return $self->{_attrs} if $self->{_attrs};
695
696   my $attrs = $self->{attrs};
697   my $source = $self->{_parent_rs} || $self->{result_source};
698   my $alias = $attrs->{_orig_alias};
699
700   # XXX - lose storable dclone
701   my $record_filter = delete $attrs->{record_filter};
702   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
703   $attrs->{record_filter} = $record_filter if $record_filter;
704
705   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
706   if ($attrs->{columns}) {
707     delete $attrs->{as};
708   } elsif (!$attrs->{select}) {
709     $attrs->{columns} = [ $self->{result_source}->columns ];
710   }
711   
712   my $select_alias = $self->{attrs}{alias};
713   $attrs->{select} ||= [
714     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
715   ];
716   $attrs->{as} ||= [
717     map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
718   ];
719   
720   my $adds;
721   if ($adds = delete $attrs->{include_columns}) {
722     $adds = [$adds] unless ref $adds eq 'ARRAY';
723     push(@{$attrs->{select}}, @$adds);
724     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
725   }
726   if ($adds = delete $attrs->{'+select'}) {
727     $adds = [$adds] unless ref $adds eq 'ARRAY';
728     push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
729   }
730   if (my $adds = delete $attrs->{'+as'}) {
731     $adds = [$adds] unless ref $adds eq 'ARRAY';
732     push(@{$attrs->{as}}, @$adds);
733   }
734
735   $attrs->{from} ||= [ { $alias => $source->from } ];
736   $attrs->{seen_join} ||= {};
737   my %seen;
738   if (my $join = delete $attrs->{join}) {
739     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
740       if (ref $j eq 'HASH') {
741         $seen{$_} = 1 foreach keys %$j;
742       } else {
743         $seen{$j} = 1;
744       }
745     }
746     push(@{$attrs->{from}},
747       $source->resolve_join($join, $alias, $attrs->{seen_join})
748     );
749   }
750
751   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
752   if ($attrs->{order_by}) {
753     $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
754   } else {
755     $attrs->{order_by} ||= [];    
756   }
757
758   my $collapse = $attrs->{collapse} || {};
759   if (my $prefetch = delete $attrs->{prefetch}) {
760     my @pre_order;
761     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
762       if ( ref $p eq 'HASH' ) {
763         foreach my $key (keys %$p) {
764           push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
765             unless $seen{$key};
766         }
767       } else {
768         push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
769           unless $seen{$p};
770       }
771       # bring joins back to level of current class
772       $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
773       if ($p) {
774         my @prefetch = $self->result_source->resolve_prefetch(
775           $p, $alias, {}, \@pre_order, $collapse
776         );
777         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
778         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
779       }
780     }
781     push(@{$attrs->{order_by}}, @pre_order);
782   }
783   $attrs->{collapse} = $collapse;
784
785   return $self->{_attrs} = $attrs;
786 }
787
788 sub _merge_attr {
789   my ($self, $a, $b) = @_;
790   return $b unless $a;
791   
792   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
793     foreach my $key (keys %{$b}) {
794       if (exists $a->{$key}) {
795         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
796       } else {
797         $a->{$key} = $b->{$key};
798       }
799     }
800     return $a;
801   } else {
802     $a = [$a] unless ref $a eq 'ARRAY';
803     $b = [$b] unless ref $b eq 'ARRAY';
804
805     my $hash = {};
806     my @array;
807     foreach my $x ($a, $b) {
808       foreach my $element (@{$x}) {
809         if (ref $element eq 'HASH') {
810           $hash = $self->_merge_attr($hash, $element);
811         } elsif (ref $element eq 'ARRAY') {
812           push(@array, @{$element});
813         } else {
814           push(@array, $element) unless $b == $x
815             && grep { $_ eq $element } @array;
816         }
817       }
818     }
819     
820     @array = grep { !exists $hash->{$_} } @array;
821
822     return keys %{$hash}
823       ? ( scalar(@array)
824             ? [$hash, @array]
825             : $hash
826         )
827       : \@array;
828   }
829 }
830
831 # bring the joins (which are from the original class) to the level
832 # of the current class so that we can resolve them properly
833 sub _reduce_joins {
834   my ($self, $p, $attrs) = @_;
835
836   STACK:
837   foreach my $join (@{$attrs->{_live_join_stack}}) {
838     if (ref $p eq 'HASH') {
839       return undef unless exists $p->{$join};
840       $p = $p->{$join};
841     } elsif (ref $p eq 'ARRAY') {
842       foreach my $pe (@{$p}) {
843         return undef if $pe eq $join;
844         if (ref $pe eq 'HASH' && exists $pe->{$join}) {
845           $p = $pe->{$join};
846           next STACK;
847         }
848       }
849       return undef;
850     } else {
851       return undef;
852     }
853   }
854   return $p;
855 }
856
857 sub _construct_object {
858   my ($self, @row) = @_;
859   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
860   my $new = $self->result_class->inflate_result($self->result_source, @$info);
861   $new = $self->{_attrs}{record_filter}->($new)
862     if exists $self->{_attrs}{record_filter};
863   return $new;
864 }
865
866 sub _collapse_result {
867   my ($self, $as, $row, $prefix) = @_;
868
869   my %const;
870   my @copy = @$row;
871   
872   foreach my $this_as (@$as) {
873     my $val = shift @copy;
874     if (defined $prefix) {
875       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
876         my $remain = $1;
877         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
878         $const{$1||''}{$2} = $val;
879       }
880     } else {
881       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
882       $const{$1||''}{$2} = $val;
883     }
884   }
885
886   my $alias = $self->{attrs}{alias};
887   my $info = [ {}, {} ];
888   foreach my $key (keys %const) {
889     if (length $key && $key ne $alias) {
890       my $target = $info;
891       my @parts = split(/\./, $key);
892       foreach my $p (@parts) {
893         $target = $target->[1]->{$p} ||= [];
894       }
895       $target->[0] = $const{$key};
896     } else {
897       $info->[0] = $const{$key};
898     }
899   }
900   
901   my @collapse;
902   if (defined $prefix) {
903     @collapse = map {
904         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
905     } keys %{$self->{_attrs}{collapse}}
906   } else {
907     @collapse = keys %{$self->{_attrs}{collapse}};
908   };
909
910   if (@collapse) {
911     my ($c) = sort { length $a <=> length $b } @collapse;
912     my $target = $info;
913     foreach my $p (split(/\./, $c)) {
914       $target = $target->[1]->{$p} ||= [];
915     }
916     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
917     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
918     my $tree = $self->_collapse_result($as, $row, $c_prefix);
919     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
920     my (@final, @raw);
921
922     while (
923       !(
924         grep {
925           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
926         } @co_key
927         )
928     ) {
929       push(@final, $tree);
930       last unless (@raw = $self->cursor->next);
931       $row = $self->{stashed_row} = \@raw;
932       $tree = $self->_collapse_result($as, $row, $c_prefix);
933     }
934     @$target = (@final ? @final : [ {}, {} ]);
935       # single empty result to indicate an empty prefetched has_many
936   }
937
938   #print "final info: " . Dumper($info);
939   return $info;
940 }
941
942 =head2 result_source
943
944 =over 4
945
946 =item Arguments: $result_source?
947
948 =item Return Value: $result_source
949
950 =back
951
952 An accessor for the primary ResultSource object from which this ResultSet
953 is derived.
954
955 =cut
956
957
958 =head2 count
959
960 =over 4
961
962 =item Arguments: $cond, \%attrs??
963
964 =item Return Value: $count
965
966 =back
967
968 Performs an SQL C<COUNT> with the same query as the resultset was built
969 with to find the number of elements. If passed arguments, does a search
970 on the resultset and counts the results of that.
971
972 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
973 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
974 not support C<DISTINCT> with multiple columns. If you are using such a
975 database, you should only use columns from the main table in your C<group_by>
976 clause.
977
978 =cut
979
980 sub count {
981   my $self = shift;
982   return $self->search(@_)->count if @_ and defined $_[0];
983   return scalar @{ $self->get_cache } if $self->get_cache;
984   my $count = $self->_count;
985   return 0 unless $count;
986
987   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
988   $count = $self->{attrs}{rows} if
989     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
990   return $count;
991 }
992
993 sub _count { # Separated out so pager can get the full count
994   my $self = shift;
995   my $select = { count => '*' };
996
997   my $attrs = { %{$self->_resolved_attrs} };
998   if (my $group_by = delete $attrs->{group_by}) {
999     delete $attrs->{having};
1000     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1001     # todo: try CONCAT for multi-column pk
1002     my @pk = $self->result_source->primary_columns;
1003     if (@pk == 1) {
1004       my $alias = $attrs->{_orig_alias};
1005       foreach my $column (@distinct) {
1006         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1007           @distinct = ($column);
1008           last;
1009         }
1010       }
1011     }
1012
1013     $select = { count => { distinct => \@distinct } };
1014   }
1015
1016   $attrs->{select} = $select;
1017   $attrs->{as} = [qw/count/];
1018
1019   # offset, order by and page are not needed to count. record_filter is cdbi
1020   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1021   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1022   $tmp_rs->{_parent_rs} = $self->{_parent_rs} if $self->{_parent_rs};
1023        #XXX - hack to pass through parent of related resultsets
1024
1025   my ($count) = $tmp_rs->cursor->next;
1026   return $count;
1027 }
1028
1029 =head2 count_literal
1030
1031 =over 4
1032
1033 =item Arguments: $sql_fragment, @bind_values
1034
1035 =item Return Value: $count
1036
1037 =back
1038
1039 Counts the results in a literal query. Equivalent to calling L</search_literal>
1040 with the passed arguments, then L</count>.
1041
1042 =cut
1043
1044 sub count_literal { shift->search_literal(@_)->count; }
1045
1046 =head2 all
1047
1048 =over 4
1049
1050 =item Arguments: none
1051
1052 =item Return Value: @objects
1053
1054 =back
1055
1056 Returns all elements in the resultset. Called implicitly if the resultset
1057 is returned in list context.
1058
1059 =cut
1060
1061 sub all {
1062   my ($self) = @_;
1063   return @{ $self->get_cache } if $self->get_cache;
1064
1065   my @obj;
1066
1067   # TODO: don't call resolve here
1068   if (keys %{$self->_resolved_attrs->{collapse}}) {
1069 #  if ($self->{attrs}{prefetch}) {
1070       # Using $self->cursor->all is really just an optimisation.
1071       # If we're collapsing has_many prefetches it probably makes
1072       # very little difference, and this is cleaner than hacking
1073       # _construct_object to survive the approach
1074     my @row = $self->cursor->next;
1075     while (@row) {
1076       push(@obj, $self->_construct_object(@row));
1077       @row = (exists $self->{stashed_row}
1078                ? @{delete $self->{stashed_row}}
1079                : $self->cursor->next);
1080     }
1081   } else {
1082     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1083   }
1084
1085   $self->set_cache(\@obj) if $self->{attrs}{cache};
1086   return @obj;
1087 }
1088
1089 =head2 reset
1090
1091 =over 4
1092
1093 =item Arguments: none
1094
1095 =item Return Value: $self
1096
1097 =back
1098
1099 Resets the resultset's cursor, so you can iterate through the elements again.
1100
1101 =cut
1102
1103 sub reset {
1104   my ($self) = @_;
1105   delete $self->{_attrs} if exists $self->{_attrs};
1106   $self->{all_cache_position} = 0;
1107   $self->cursor->reset;
1108   return $self;
1109 }
1110
1111 =head2 first
1112
1113 =over 4
1114
1115 =item Arguments: none
1116
1117 =item Return Value: $object?
1118
1119 =back
1120
1121 Resets the resultset and returns an object for the first result (if the
1122 resultset returns anything).
1123
1124 =cut
1125
1126 sub first {
1127   return $_[0]->reset->next;
1128 }
1129
1130 # _cond_for_update_delete
1131 #
1132 # update/delete require the condition to be modified to handle
1133 # the differing SQL syntax available.  This transforms the $self->{cond}
1134 # appropriately, returning the new condition.
1135
1136 sub _cond_for_update_delete {
1137   my ($self) = @_;
1138   my $cond = {};
1139
1140   # No-op. No condition, we're updating/deleting everything
1141   return $cond unless ref $self->{cond};
1142
1143   if (ref $self->{cond} eq 'ARRAY') {
1144     $cond = [
1145       map {
1146         my %hash;
1147         foreach my $key (keys %{$_}) {
1148           $key =~ /([^.]+)$/;
1149           $hash{$1} = $_->{$key};
1150         }
1151         \%hash;
1152       } @{$self->{cond}}
1153     ];
1154   }
1155   elsif (ref $self->{cond} eq 'HASH') {
1156     if ((keys %{$self->{cond}})[0] eq '-and') {
1157       $cond->{-and} = [];
1158
1159       my @cond = @{$self->{cond}{-and}};
1160       for (my $i = 0; $i < @cond; $i++) {
1161         my $entry = $cond[$i];
1162
1163         my %hash;
1164         if (ref $entry eq 'HASH') {
1165           foreach my $key (keys %{$entry}) {
1166             $key =~ /([^.]+)$/;
1167             $hash{$1} = $entry->{$key};
1168           }
1169         }
1170         else {
1171           $entry =~ /([^.]+)$/;
1172           $hash{$1} = $cond[++$i];
1173         }
1174
1175         push @{$cond->{-and}}, \%hash;
1176       }
1177     }
1178     else {
1179       foreach my $key (keys %{$self->{cond}}) {
1180         $key =~ /([^.]+)$/;
1181         $cond->{$1} = $self->{cond}{$key};
1182       }
1183     }
1184   }
1185   else {
1186     $self->throw_exception(
1187       "Can't update/delete on resultset with condition unless hash or array"
1188     );
1189   }
1190
1191   return $cond;
1192 }
1193
1194
1195 =head2 update
1196
1197 =over 4
1198
1199 =item Arguments: \%values
1200
1201 =item Return Value: $storage_rv
1202
1203 =back
1204
1205 Sets the specified columns in the resultset to the supplied values in a
1206 single query. Return value will be true if the update succeeded or false
1207 if no records were updated; exact type of success value is storage-dependent.
1208
1209 =cut
1210
1211 sub update {
1212   my ($self, $values) = @_;
1213   $self->throw_exception("Values for update must be a hash")
1214     unless ref $values eq 'HASH';
1215
1216   my $cond = $self->_cond_for_update_delete;
1217
1218   return $self->result_source->storage->update(
1219     $self->result_source->from, $values, $cond
1220   );
1221 }
1222
1223 =head2 update_all
1224
1225 =over 4
1226
1227 =item Arguments: \%values
1228
1229 =item Return Value: 1
1230
1231 =back
1232
1233 Fetches all objects and updates them one at a time. Note that C<update_all>
1234 will run DBIC cascade triggers, while L</update> will not.
1235
1236 =cut
1237
1238 sub update_all {
1239   my ($self, $values) = @_;
1240   $self->throw_exception("Values for update must be a hash")
1241     unless ref $values eq 'HASH';
1242   foreach my $obj ($self->all) {
1243     $obj->set_columns($values)->update;
1244   }
1245   return 1;
1246 }
1247
1248 =head2 delete
1249
1250 =over 4
1251
1252 =item Arguments: none
1253
1254 =item Return Value: 1
1255
1256 =back
1257
1258 Deletes the contents of the resultset from its result source. Note that this
1259 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1260 to run.
1261
1262 =cut
1263
1264 sub delete {
1265   my ($self) = @_;
1266
1267   my $cond = $self->_cond_for_update_delete;
1268
1269   $self->result_source->storage->delete($self->result_source->from, $cond);
1270   return 1;
1271 }
1272
1273 =head2 delete_all
1274
1275 =over 4
1276
1277 =item Arguments: none
1278
1279 =item Return Value: 1
1280
1281 =back
1282
1283 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1284 will run DBIC cascade triggers, while L</delete> will not.
1285
1286 =cut
1287
1288 sub delete_all {
1289   my ($self) = @_;
1290   $_->delete for $self->all;
1291   return 1;
1292 }
1293
1294 =head2 pager
1295
1296 =over 4
1297
1298 =item Arguments: none
1299
1300 =item Return Value: $pager
1301
1302 =back
1303
1304 Return Value a L<Data::Page> object for the current resultset. Only makes
1305 sense for queries with a C<page> attribute.
1306
1307 =cut
1308
1309 sub pager {
1310   my ($self) = @_;
1311   my $attrs = $self->{attrs};
1312   $self->throw_exception("Can't create pager for non-paged rs")
1313     unless $self->{attrs}{page};
1314   $attrs->{rows} ||= 10;
1315   return $self->{pager} ||= Data::Page->new(
1316     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1317 }
1318
1319 =head2 page
1320
1321 =over 4
1322
1323 =item Arguments: $page_number
1324
1325 =item Return Value: $rs
1326
1327 =back
1328
1329 Returns a resultset for the $page_number page of the resultset on which page
1330 is called, where each page contains a number of rows equal to the 'rows'
1331 attribute set on the resultset (10 by default).
1332
1333 =cut
1334
1335 sub page {
1336   my ($self, $page) = @_;
1337   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1338 }
1339
1340 =head2 new_result
1341
1342 =over 4
1343
1344 =item Arguments: \%vals
1345
1346 =item Return Value: $object
1347
1348 =back
1349
1350 Creates an object in the resultset's result class and returns it.
1351
1352 =cut
1353
1354 sub new_result {
1355   my ($self, $values) = @_;
1356   $self->throw_exception( "new_result needs a hash" )
1357     unless (ref $values eq 'HASH');
1358   $self->throw_exception(
1359     "Can't abstract implicit construct, condition not a hash"
1360   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1361   my %new = %$values;
1362   my $alias = $self->{attrs}{_orig_alias};
1363   foreach my $key (keys %{$self->{cond}||{}}) {
1364     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1365   }
1366   my $obj = $self->result_class->new(\%new);
1367   $obj->result_source($self->result_source) if $obj->can('result_source');
1368   return $obj;
1369 }
1370
1371 =head2 find_or_new
1372
1373 =over 4
1374
1375 =item Arguments: \%vals, \%attrs?
1376
1377 =item Return Value: $object
1378
1379 =back
1380
1381 Find an existing record from this resultset. If none exists, instantiate a new
1382 result object and return it. The object will not be saved into your storage
1383 until you call L<DBIx::Class::Row/insert> on it.
1384
1385 If you want objects to be saved immediately, use L</find_or_create> instead.
1386
1387 =cut
1388
1389 sub find_or_new {
1390   my $self     = shift;
1391   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1392   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1393   my $exists   = $self->find($hash, $attrs);
1394   return defined $exists ? $exists : $self->new_result($hash);
1395 }
1396
1397 =head2 create
1398
1399 =over 4
1400
1401 =item Arguments: \%vals
1402
1403 =item Return Value: $object
1404
1405 =back
1406
1407 Inserts a record into the resultset and returns the object representing it.
1408
1409 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1410
1411 =cut
1412
1413 sub create {
1414   my ($self, $attrs) = @_;
1415   $self->throw_exception( "create needs a hashref" )
1416     unless ref $attrs eq 'HASH';
1417   return $self->new_result($attrs)->insert;
1418 }
1419
1420 =head2 find_or_create
1421
1422 =over 4
1423
1424 =item Arguments: \%vals, \%attrs?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430   $class->find_or_create({ key => $val, ... });
1431
1432 Tries to find a record based on its primary key or unique constraint; if none
1433 is found, creates one and returns that instead.
1434
1435   my $cd = $schema->resultset('CD')->find_or_create({
1436     cdid   => 5,
1437     artist => 'Massive Attack',
1438     title  => 'Mezzanine',
1439     year   => 2005,
1440   });
1441
1442 Also takes an optional C<key> attribute, to search by a specific key or unique
1443 constraint. For example:
1444
1445   my $cd = $schema->resultset('CD')->find_or_create(
1446     {
1447       artist => 'Massive Attack',
1448       title  => 'Mezzanine',
1449     },
1450     { key => 'cd_artist_title' }
1451   );
1452
1453 See also L</find> and L</update_or_create>. For information on how to declare
1454 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1455
1456 =cut
1457
1458 sub find_or_create {
1459   my $self     = shift;
1460   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1461   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1462   my $exists   = $self->find($hash, $attrs);
1463   return defined $exists ? $exists : $self->create($hash);
1464 }
1465
1466 =head2 update_or_create
1467
1468 =over 4
1469
1470 =item Arguments: \%col_values, { key => $unique_constraint }?
1471
1472 =item Return Value: $object
1473
1474 =back
1475
1476   $class->update_or_create({ col => $val, ... });
1477
1478 First, searches for an existing row matching one of the unique constraints
1479 (including the primary key) on the source of this resultset. If a row is
1480 found, updates it with the other given column values. Otherwise, creates a new
1481 row.
1482
1483 Takes an optional C<key> attribute to search on a specific unique constraint.
1484 For example:
1485
1486   # In your application
1487   my $cd = $schema->resultset('CD')->update_or_create(
1488     {
1489       artist => 'Massive Attack',
1490       title  => 'Mezzanine',
1491       year   => 1998,
1492     },
1493     { key => 'cd_artist_title' }
1494   );
1495
1496 If no C<key> is specified, it searches on all unique constraints defined on the
1497 source, including the primary key.
1498
1499 If the C<key> is specified as C<primary>, it searches only on the primary key.
1500
1501 See also L</find> and L</find_or_create>. For information on how to declare
1502 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1503
1504 =cut
1505
1506 sub update_or_create {
1507   my $self = shift;
1508   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1509   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1510
1511   my $row = $self->find($cond);
1512   if (defined $row) {
1513     $row->update($cond);
1514     return $row;
1515   }
1516
1517   return $self->create($cond);
1518 }
1519
1520 =head2 get_cache
1521
1522 =over 4
1523
1524 =item Arguments: none
1525
1526 =item Return Value: \@cache_objects?
1527
1528 =back
1529
1530 Gets the contents of the cache for the resultset, if the cache is set.
1531
1532 =cut
1533
1534 sub get_cache {
1535   shift->{all_cache};
1536 }
1537
1538 =head2 set_cache
1539
1540 =over 4
1541
1542 =item Arguments: \@cache_objects
1543
1544 =item Return Value: \@cache_objects
1545
1546 =back
1547
1548 Sets the contents of the cache for the resultset. Expects an arrayref
1549 of objects of the same class as those produced by the resultset. Note that
1550 if the cache is set the resultset will return the cached objects rather
1551 than re-querying the database even if the cache attr is not set.
1552
1553 =cut
1554
1555 sub set_cache {
1556   my ( $self, $data ) = @_;
1557   $self->throw_exception("set_cache requires an arrayref")
1558       if defined($data) && (ref $data ne 'ARRAY');
1559   $self->{all_cache} = $data;
1560 }
1561
1562 =head2 clear_cache
1563
1564 =over 4
1565
1566 =item Arguments: none
1567
1568 =item Return Value: []
1569
1570 =back
1571
1572 Clears the cache for the resultset.
1573
1574 =cut
1575
1576 sub clear_cache {
1577   shift->set_cache(undef);
1578 }
1579
1580 =head2 related_resultset
1581
1582 =over 4
1583
1584 =item Arguments: $relationship_name
1585
1586 =item Return Value: $resultset
1587
1588 =back
1589
1590 Returns a related resultset for the supplied relationship name.
1591
1592   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1593
1594 =cut
1595
1596 sub related_resultset {
1597   my ($self, $rel) = @_;
1598
1599   $self->{related_resultsets} ||= {};
1600   return $self->{related_resultsets}{$rel} ||= do {
1601     my $rel_obj = $self->result_source->relationship_info($rel);
1602
1603     $self->throw_exception(
1604       "search_related: result source '" . $self->result_source->name .
1605         "' has no such relationship $rel")
1606       unless $rel_obj;
1607     
1608     my $alias = $self->_resolved_attrs->{seen_join}{$rel}
1609                 ? join('_', $rel, $self->_resolved_attrs->{seen_join}{$rel}+1)
1610                 : $rel;
1611
1612     my @live_join_stack = (@{$self->{attrs}{_live_join_stack}||[]}, $rel);
1613
1614     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1615       undef, {
1616         select => undef,
1617         as => undef,
1618         alias => $alias,
1619         _live_join_stack => \@live_join_stack,
1620         _parent_attrs => $self->{attrs}}
1621     );
1622
1623     # keep reference of the original resultset
1624     $rs->{_parent_rs} = $self->{_parent_rs} || $self->result_source;
1625
1626     return $rs;
1627   };
1628 }
1629
1630 =head2 throw_exception
1631
1632 See L<DBIx::Class::Schema/throw_exception> for details.
1633
1634 =cut
1635
1636 sub throw_exception {
1637   my $self=shift;
1638   $self->result_source->schema->throw_exception(@_);
1639 }
1640
1641 # XXX: FIXME: Attributes docs need clearing up
1642
1643 =head1 ATTRIBUTES
1644
1645 The resultset takes various attributes that modify its behavior. Here's an
1646 overview of them:
1647
1648 =head2 order_by
1649
1650 =over 4
1651
1652 =item Value: ($order_by | \@order_by)
1653
1654 =back
1655
1656 Which column(s) to order the results by. This is currently passed
1657 through directly to SQL, so you can give e.g. C<year DESC> for a
1658 descending order on the column `year'.
1659
1660 Please note that if you have quoting enabled (see
1661 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1662 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1663 so you will need to manually quote things as appropriate.)
1664
1665 =head2 columns
1666
1667 =over 4
1668
1669 =item Value: \@columns
1670
1671 =back
1672
1673 Shortcut to request a particular set of columns to be retrieved.  Adds
1674 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1675 from that, then auto-populates C<as> from C<select> as normal. (You may also
1676 use the C<cols> attribute, as in earlier versions of DBIC.)
1677
1678 =head2 include_columns
1679
1680 =over 4
1681
1682 =item Value: \@columns
1683
1684 =back
1685
1686 Shortcut to include additional columns in the returned results - for example
1687
1688   $schema->resultset('CD')->search(undef, {
1689     include_columns => ['artist.name'],
1690     join => ['artist']
1691   });
1692
1693 would return all CDs and include a 'name' column to the information
1694 passed to object inflation
1695
1696 =head2 select
1697
1698 =over 4
1699
1700 =item Value: \@select_columns
1701
1702 =back
1703
1704 Indicates which columns should be selected from the storage. You can use
1705 column names, or in the case of RDBMS back ends, function or stored procedure
1706 names:
1707
1708   $rs = $schema->resultset('Employee')->search(undef, {
1709     select => [
1710       'name',
1711       { count => 'employeeid' },
1712       { sum => 'salary' }
1713     ]
1714   });
1715
1716 When you use function/stored procedure names and do not supply an C<as>
1717 attribute, the column names returned are storage-dependent. E.g. MySQL would
1718 return a column named C<count(employeeid)> in the above example.
1719
1720 =head2 +select
1721
1722 =over 4
1723
1724 Indicates additional columns to be selected from storage.  Works the same as
1725 L<select> but adds columns to the selection.
1726
1727 =back
1728
1729 =head2 +as
1730
1731 =over 4
1732
1733 Indicates additional column names for those added via L<+select>.
1734
1735 =back
1736
1737 =head2 as
1738
1739 =over 4
1740
1741 =item Value: \@inflation_names
1742
1743 =back
1744
1745 Indicates column names for object inflation. This is used in conjunction with
1746 C<select>, usually when C<select> contains one or more function or stored
1747 procedure names:
1748
1749   $rs = $schema->resultset('Employee')->search(undef, {
1750     select => [
1751       'name',
1752       { count => 'employeeid' }
1753     ],
1754     as => ['name', 'employee_count'],
1755   });
1756
1757   my $employee = $rs->first(); # get the first Employee
1758
1759 If the object against which the search is performed already has an accessor
1760 matching a column name specified in C<as>, the value can be retrieved using
1761 the accessor as normal:
1762
1763   my $name = $employee->name();
1764
1765 If on the other hand an accessor does not exist in the object, you need to
1766 use C<get_column> instead:
1767
1768   my $employee_count = $employee->get_column('employee_count');
1769
1770 You can create your own accessors if required - see
1771 L<DBIx::Class::Manual::Cookbook> for details.
1772
1773 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1774 produced, it is used for internal access only. Thus attempting to use the accessor
1775 in an C<order_by> clause or similar will fail misrably.
1776
1777 =head2 join
1778
1779 =over 4
1780
1781 =item Value: ($rel_name | \@rel_names | \%rel_names)
1782
1783 =back
1784
1785 Contains a list of relationships that should be joined for this query.  For
1786 example:
1787
1788   # Get CDs by Nine Inch Nails
1789   my $rs = $schema->resultset('CD')->search(
1790     { 'artist.name' => 'Nine Inch Nails' },
1791     { join => 'artist' }
1792   );
1793
1794 Can also contain a hash reference to refer to the other relation's relations.
1795 For example:
1796
1797   package MyApp::Schema::Track;
1798   use base qw/DBIx::Class/;
1799   __PACKAGE__->table('track');
1800   __PACKAGE__->add_columns(qw/trackid cd position title/);
1801   __PACKAGE__->set_primary_key('trackid');
1802   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1803   1;
1804
1805   # In your application
1806   my $rs = $schema->resultset('Artist')->search(
1807     { 'track.title' => 'Teardrop' },
1808     {
1809       join     => { cd => 'track' },
1810       order_by => 'artist.name',
1811     }
1812   );
1813
1814 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1815 similarly for a third time). For e.g.
1816
1817   my $rs = $schema->resultset('Artist')->search({
1818     'cds.title'   => 'Down to Earth',
1819     'cds_2.title' => 'Popular',
1820   }, {
1821     join => [ qw/cds cds/ ],
1822   });
1823
1824 will return a set of all artists that have both a cd with title 'Down
1825 to Earth' and a cd with title 'Popular'.
1826
1827 If you want to fetch related objects from other tables as well, see C<prefetch>
1828 below.
1829
1830 =head2 prefetch
1831
1832 =over 4
1833
1834 =item Value: ($rel_name | \@rel_names | \%rel_names)
1835
1836 =back
1837
1838 Contains one or more relationships that should be fetched along with the main
1839 query (when they are accessed afterwards they will have already been
1840 "prefetched").  This is useful for when you know you will need the related
1841 objects, because it saves at least one query:
1842
1843   my $rs = $schema->resultset('Tag')->search(
1844     undef,
1845     {
1846       prefetch => {
1847         cd => 'artist'
1848       }
1849     }
1850   );
1851
1852 The initial search results in SQL like the following:
1853
1854   SELECT tag.*, cd.*, artist.* FROM tag
1855   JOIN cd ON tag.cd = cd.cdid
1856   JOIN artist ON cd.artist = artist.artistid
1857
1858 L<DBIx::Class> has no need to go back to the database when we access the
1859 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1860 case.
1861
1862 Simple prefetches will be joined automatically, so there is no need
1863 for a C<join> attribute in the above search. If you're prefetching to
1864 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1865 specify the join as well.
1866
1867 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1868 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1869 with an accessor type of 'single' or 'filter').
1870
1871 =head2 page
1872
1873 =over 4
1874
1875 =item Value: $page
1876
1877 =back
1878
1879 Makes the resultset paged and specifies the page to retrieve. Effectively
1880 identical to creating a non-pages resultset and then calling ->page($page)
1881 on it.
1882
1883 If L<rows> attribute is not specified it defualts to 10 rows per page.
1884
1885 =head2 rows
1886
1887 =over 4
1888
1889 =item Value: $rows
1890
1891 =back
1892
1893 Specifes the maximum number of rows for direct retrieval or the number of
1894 rows per page if the page attribute or method is used.
1895
1896 =head2 offset
1897
1898 =over 4
1899
1900 =item Value: $offset
1901
1902 =back
1903
1904 Specifies the (zero-based) row number for the  first row to be returned, or the
1905 of the first row of the first page if paging is used.
1906
1907 =head2 group_by
1908
1909 =over 4
1910
1911 =item Value: \@columns
1912
1913 =back
1914
1915 A arrayref of columns to group by. Can include columns of joined tables.
1916
1917   group_by => [qw/ column1 column2 ... /]
1918
1919 =head2 having
1920
1921 =over 4
1922
1923 =item Value: $condition
1924
1925 =back
1926
1927 HAVING is a select statement attribute that is applied between GROUP BY and
1928 ORDER BY. It is applied to the after the grouping calculations have been
1929 done.
1930
1931   having => { 'count(employee)' => { '>=', 100 } }
1932
1933 =head2 distinct
1934
1935 =over 4
1936
1937 =item Value: (0 | 1)
1938
1939 =back
1940
1941 Set to 1 to group by all columns.
1942
1943 =head2 where
1944
1945 =over 4
1946
1947 Adds to the WHERE clause.
1948
1949   # only return rows WHERE deleted IS NULL for all searches
1950   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1951
1952 Can be overridden by passing C<{ where => undef }> as an attribute
1953 to a resulset.
1954
1955 =back
1956
1957 =head2 cache
1958
1959 Set to 1 to cache search results. This prevents extra SQL queries if you
1960 revisit rows in your ResultSet:
1961
1962   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1963
1964   while( my $artist = $resultset->next ) {
1965     ... do stuff ...
1966   }
1967
1968   $rs->first; # without cache, this would issue a query
1969
1970 By default, searches are not cached.
1971
1972 For more examples of using these attributes, see
1973 L<DBIx::Class::Manual::Cookbook>.
1974
1975 =head2 from
1976
1977 =over 4
1978
1979 =item Value: \@from_clause
1980
1981 =back
1982
1983 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1984 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1985 clauses.
1986
1987 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1988
1989 C<join> will usually do what you need and it is strongly recommended that you
1990 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1991 And we really do mean "cannot", not just tried and failed. Attempting to use
1992 this because you're having problems with C<join> is like trying to use x86
1993 ASM because you've got a syntax error in your C. Trust us on this.
1994
1995 Now, if you're still really, really sure you need to use this (and if you're
1996 not 100% sure, ask the mailing list first), here's an explanation of how this
1997 works.
1998
1999 The syntax is as follows -
2000
2001   [
2002     { <alias1> => <table1> },
2003     [
2004       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2005       [], # nested JOIN (optional)
2006       { <table1.column1> => <table2.column2>, ... (more conditions) },
2007     ],
2008     # More of the above [ ] may follow for additional joins
2009   ]
2010
2011   <table1> <alias1>
2012   JOIN
2013     <table2> <alias2>
2014     [JOIN ...]
2015   ON <table1.column1> = <table2.column2>
2016   <more joins may follow>
2017
2018 An easy way to follow the examples below is to remember the following:
2019
2020     Anything inside "[]" is a JOIN
2021     Anything inside "{}" is a condition for the enclosing JOIN
2022
2023 The following examples utilize a "person" table in a family tree application.
2024 In order to express parent->child relationships, this table is self-joined:
2025
2026     # Person->belongs_to('father' => 'Person');
2027     # Person->belongs_to('mother' => 'Person');
2028
2029 C<from> can be used to nest joins. Here we return all children with a father,
2030 then search against all mothers of those children:
2031
2032   $rs = $schema->resultset('Person')->search(
2033       undef,
2034       {
2035           alias => 'mother', # alias columns in accordance with "from"
2036           from => [
2037               { mother => 'person' },
2038               [
2039                   [
2040                       { child => 'person' },
2041                       [
2042                           { father => 'person' },
2043                           { 'father.person_id' => 'child.father_id' }
2044                       ]
2045                   ],
2046                   { 'mother.person_id' => 'child.mother_id' }
2047               ],
2048           ]
2049       },
2050   );
2051
2052   # Equivalent SQL:
2053   # SELECT mother.* FROM person mother
2054   # JOIN (
2055   #   person child
2056   #   JOIN person father
2057   #   ON ( father.person_id = child.father_id )
2058   # )
2059   # ON ( mother.person_id = child.mother_id )
2060
2061 The type of any join can be controlled manually. To search against only people
2062 with a father in the person table, we could explicitly use C<INNER JOIN>:
2063
2064     $rs = $schema->resultset('Person')->search(
2065         undef,
2066         {
2067             alias => 'child', # alias columns in accordance with "from"
2068             from => [
2069                 { child => 'person' },
2070                 [
2071                     { father => 'person', -join_type => 'inner' },
2072                     { 'father.id' => 'child.father_id' }
2073                 ],
2074             ]
2075         },
2076     );
2077
2078     # Equivalent SQL:
2079     # SELECT child.* FROM person child
2080     # INNER JOIN person father ON child.father_id = father.id
2081
2082 =cut
2083
2084 1;