_is_unique_query: Don't warn on multiple occurrences of the same value for a constrai...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85
86   my ($source, $attrs) = @_;
87   #weaken $source;
88
89   if ($attrs->{page}) {
90     $attrs->{rows} ||= 10;
91     $attrs->{offset} ||= 0;
92     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
93   }
94
95   $attrs->{alias} ||= 'me';
96   $attrs->{_orig_alias} ||= $attrs->{alias};
97
98   bless {
99     result_source => $source,
100     result_class => $attrs->{result_class} || $source->result_class,
101     cond => $attrs->{where},
102 #    from => $attrs->{from},
103 #    collapse => $collapse,
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   }, $class;
108 }
109
110 =head2 search
111
112 =over 4
113
114 =item Arguments: $cond, \%attrs?
115
116 =item Return Value: $resultset (scalar context), @row_objs (list context)
117
118 =back
119
120   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
121   my $new_rs = $cd_rs->search({ year => 2005 });
122
123   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
124                  # year = 2005 OR year = 2004
125
126 If you need to pass in additional attributes but no additional condition,
127 call it as C<search(undef, \%attrs)>.
128
129   # "SELECT name, artistid FROM $artist_table"
130   my @all_artists = $schema->resultset('Artist')->search(undef, {
131     columns => [qw/name artistid/],
132   });
133
134 =cut
135
136 sub search {
137   my $self = shift;
138   my $rs = $self->search_rs( @_ );
139   return (wantarray ? $rs->all : $rs);
140 }
141
142 =head2 search_rs
143
144 =over 4
145
146 =item Arguments: $cond, \%attrs?
147
148 =item Return Value: $resultset
149
150 =back
151
152 This method does the same exact thing as search() except it will
153 always return a resultset, even in list context.
154
155 =cut
156
157 sub search_rs {
158   my $self = shift;
159
160   my $attrs = {};
161   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
162   my $our_attrs = exists $attrs->{_parent_attrs}
163     ? { %{delete $attrs->{_parent_attrs}} }
164     : { %{$self->{attrs}} };
165   my $having = delete $our_attrs->{having};
166
167   # XXX should only maintain _live_join_stack and generate _live_join_h from that
168   if ($attrs->{_live_join_stack}) {
169     foreach my $join (reverse @{$attrs->{_live_join_stack}}) {
170       $attrs->{_live_join_h} = defined $attrs->{_live_join_h}
171         ? { $join => $attrs->{_live_join_h} }
172         : $join;
173     }
174   }
175
176   # merge new attrs into inherited
177   foreach my $key (qw/join prefetch/) {
178     next unless exists $attrs->{$key};
179     if (my $live_join = $attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
180       foreach my $join (reverse @{$live_join}) {
181         $attrs->{$key} = { $join => $attrs->{$key} };
182       }
183     }
184
185     $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, delete $attrs->{$key});
186   }
187
188   $our_attrs->{join} = $self->_merge_attr(
189     $our_attrs->{join}, $attrs->{_live_join_h}
190   ) if ($attrs->{_live_join_h});
191
192   if (defined $our_attrs->{prefetch}) {
193     $our_attrs->{join} = $self->_merge_attr(
194       $our_attrs->{join}, $our_attrs->{prefetch}
195     );
196   }
197
198   my $new_attrs = { %{$our_attrs}, %{$attrs} };
199   my $where = (@_
200     ? (
201         (@_ == 1 || ref $_[0] eq "HASH")
202           ? shift
203           : (
204               (@_ % 2)
205                 ? $self->throw_exception("Odd number of arguments to search")
206                 : {@_}
207              )
208       )
209     : undef
210   );
211
212   if (defined $where) {
213     $new_attrs->{where} = (
214       defined $new_attrs->{where}
215         ? { '-and' => [
216               map {
217                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
218               } $where, $new_attrs->{where}
219             ]
220           }
221         : $where);
222   }
223
224   if (defined $having) {
225     $new_attrs->{having} = (
226       defined $new_attrs->{having}
227         ? { '-and' => [
228               map {
229                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
230               } $having, $new_attrs->{having}
231             ]
232           }
233         : $having);
234   }
235
236   my $rs = (ref $self)->new($self->result_source, $new_attrs);
237   $rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
238
239   unless (@_) {                 # no search, effectively just a clone
240     my $rows = $self->get_cache;
241     if ($rows) {
242       $rs->set_cache($rows);
243     }
244   }
245   return $rs;
246 }
247
248 =head2 search_literal
249
250 =over 4
251
252 =item Arguments: $sql_fragment, @bind_values
253
254 =item Return Value: $resultset (scalar context), @row_objs (list context)
255
256 =back
257
258   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
259   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
260
261 Pass a literal chunk of SQL to be added to the conditional part of the
262 resultset query.
263
264 =cut
265
266 sub search_literal {
267   my ($self, $cond, @vals) = @_;
268   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
269   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
270   return $self->search(\$cond, $attrs);
271 }
272
273 =head2 find
274
275 =over 4
276
277 =item Arguments: @values | \%cols, \%attrs?
278
279 =item Return Value: $row_object
280
281 =back
282
283 Finds a row based on its primary key or unique constraint. For example, to find
284 a row by its primary key:
285
286   my $cd = $schema->resultset('CD')->find(5);
287
288 You can also find a row by a specific unique constraint using the C<key>
289 attribute. For example:
290
291   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
292     key => 'cd_artist_title'
293   });
294
295 Additionally, you can specify the columns explicitly by name:
296
297   my $cd = $schema->resultset('CD')->find(
298     {
299       artist => 'Massive Attack',
300       title  => 'Mezzanine',
301     },
302     { key => 'cd_artist_title' }
303   );
304
305 If the C<key> is specified as C<primary>, it searches only on the primary key.
306
307 If no C<key> is specified, it searches on all unique constraints defined on the
308 source, including the primary key.
309
310 See also L</find_or_create> and L</update_or_create>. For information on how to
311 declare unique constraints, see
312 L<DBIx::Class::ResultSource/add_unique_constraint>.
313
314 =cut
315
316 sub find {
317   my $self = shift;
318   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
319
320   # Default to the primary key, but allow a specific key
321   my @cols = exists $attrs->{key}
322     ? $self->result_source->unique_constraint_columns($attrs->{key})
323     : $self->result_source->primary_columns;
324   $self->throw_exception(
325     "Can't find unless a primary key or unique constraint is defined"
326   ) unless @cols;
327
328   # Parse out a hashref from input
329   my $input_query;
330   if (ref $_[0] eq 'HASH') {
331     $input_query = { %{$_[0]} };
332   }
333   elsif (@_ == @cols) {
334     $input_query = {};
335     @{$input_query}{@cols} = @_;
336   }
337   else {
338     # Compatibility: Allow e.g. find(id => $value)
339     carp "Find by key => value deprecated; please use a hashref instead";
340     $input_query = {@_};
341   }
342
343   my @unique_queries = $self->_unique_queries($input_query, $attrs);
344
345   # Handle cases where the ResultSet defines the query, or where the user is
346   # abusing find
347   my $query = @unique_queries ? \@unique_queries : $input_query;
348
349   # Run the query
350   if (keys %$attrs) {
351     my $rs = $self->search($query, $attrs);
352     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
353   }
354   else {
355     return keys %{$self->_resolved_attrs->{collapse}}
356       ? $self->search($query)->next
357       : $self->single($query);
358   }
359 }
360
361 # _unique_queries
362 #
363 # Build a list of queries which satisfy unique constraints.
364
365 sub _unique_queries {
366   my ($self, $query, $attrs) = @_;
367
368   my @constraint_names = exists $attrs->{key}
369     ? ($attrs->{key})
370     : $self->result_source->unique_constraint_names;
371
372   my @unique_queries;
373   foreach my $name (@constraint_names) {
374     my @unique_cols = $self->result_source->unique_constraint_columns($name);
375     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
376
377     next unless scalar keys %$unique_query;
378
379     # Add the ResultSet's alias
380     my $alias = $self->{attrs}{alias};
381     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
382       $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
383     }
384
385     push @unique_queries, $unique_query;
386   }
387
388   return @unique_queries;
389 }
390
391 # _build_unique_query
392 #
393 # Constrain the specified query hash based on the specified column names.
394
395 sub _build_unique_query {
396   my ($self, $query, $unique_cols) = @_;
397
398   return {
399     map  { $_ => $query->{$_} }
400     grep { exists $query->{$_} }
401       @$unique_cols
402   };
403 }
404
405 =head2 search_related
406
407 =over 4
408
409 =item Arguments: $rel, $cond, \%attrs?
410
411 =item Return Value: $new_resultset
412
413 =back
414
415   $new_rs = $cd_rs->search_related('artist', {
416     name => 'Emo-R-Us',
417   });
418
419 Searches the specified relationship, optionally specifying a condition and
420 attributes for matching records. See L</ATTRIBUTES> for more information.
421
422 =cut
423
424 sub search_related {
425   return shift->related_resultset(shift)->search(@_);
426 }
427
428 =head2 cursor
429
430 =over 4
431
432 =item Arguments: none
433
434 =item Return Value: $cursor
435
436 =back
437
438 Returns a storage-driven cursor to the given resultset. See
439 L<DBIx::Class::Cursor> for more information.
440
441 =cut
442
443 sub cursor {
444   my ($self) = @_;
445
446   my $attrs = { %{$self->_resolved_attrs} };
447   return $self->{cursor}
448     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
449           $attrs->{where},$attrs);
450 }
451
452 =head2 single
453
454 =over 4
455
456 =item Arguments: $cond?
457
458 =item Return Value: $row_object?
459
460 =back
461
462   my $cd = $schema->resultset('CD')->single({ year => 2001 });
463
464 Inflates the first result without creating a cursor if the resultset has
465 any records in it; if not returns nothing. Used by L</find> as an optimisation.
466
467 Can optionally take an additional condition *only* - this is a fast-code-path
468 method; if you need to add extra joins or similar call ->search and then
469 ->single without a condition on the $rs returned from that.
470
471 =cut
472
473 sub single {
474   my ($self, $where) = @_;
475   my $attrs = { %{$self->_resolved_attrs} };
476   if ($where) {
477     if (defined $attrs->{where}) {
478       $attrs->{where} = {
479         '-and' =>
480             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
481                $where, delete $attrs->{where} ]
482       };
483     } else {
484       $attrs->{where} = $where;
485     }
486   }
487
488   unless ($self->_is_unique_query($attrs->{where})) {
489     carp "Query not guaranteed to return a single row"
490       . "; please declare your unique constraints or use search instead";
491   }
492
493   my @data = $self->result_source->storage->select_single(
494     $attrs->{from}, $attrs->{select},
495     $attrs->{where}, $attrs
496   );
497
498   return (@data ? $self->_construct_object(@data) : ());
499 }
500
501 # _is_unique_query
502 #
503 # Try to determine if the specified query is guaranteed to be unique, based on
504 # the declared unique constraints.
505
506 sub _is_unique_query {
507   my ($self, $query) = @_;
508
509   my $collapsed = $self->_collapse_query($query);
510   my $alias = $self->{attrs}{alias};
511
512   foreach my $name ($self->result_source->unique_constraint_names) {
513     my @unique_cols = map {
514       "$alias.$_"
515     } $self->result_source->unique_constraint_columns($name);
516
517     # Count the values for each unique column
518     my %seen = map { $_ => 0 } @unique_cols;
519
520     foreach my $key (keys %$collapsed) {
521       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
522       next unless exists $seen{$aliased};  # Additional constraints are okay
523       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
524     }
525
526     # If we get 0 or more than 1 value for a column, it's not necessarily unique
527     return 1 unless grep { $_ != 1 } values %seen;
528   }
529
530   return 0;
531 }
532
533 # _collapse_query
534 #
535 # Recursively collapse the query, accumulating values for each column.
536
537 sub _collapse_query {
538   my ($self, $query, $collapsed) = @_;
539
540   $collapsed ||= {};
541
542   if (ref $query eq 'ARRAY') {
543     foreach my $subquery (@$query) {
544       next unless ref $subquery;  # -or
545 #      warn "ARRAY: " . Dumper $subquery;
546       $collapsed = $self->_collapse_query($subquery, $collapsed);
547     }
548   }
549   elsif (ref $query eq 'HASH') {
550     if (keys %$query and (keys %$query)[0] eq '-and') {
551       foreach my $subquery (@{$query->{-and}}) {
552 #        warn "HASH: " . Dumper $subquery;
553         $collapsed = $self->_collapse_query($subquery, $collapsed);
554       }
555     }
556     else {
557 #      warn "LEAF: " . Dumper $query;
558       foreach my $key (keys %$query) {
559         my $value = $query->{$key};
560         $collapsed->{$key}{$value}++;
561       }
562     }
563   }
564
565   return $collapsed;
566 }
567
568 =head2 get_column
569
570 =over 4
571
572 =item Arguments: $cond?
573
574 =item Return Value: $resultsetcolumn
575
576 =back
577
578   my $max_length = $rs->get_column('length')->max;
579
580 Returns a ResultSetColumn instance for $column based on $self
581
582 =cut
583
584 sub get_column {
585   my ($self, $column) = @_;
586   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
587   return $new;
588 }
589
590 =head2 search_like
591
592 =over 4
593
594 =item Arguments: $cond, \%attrs?
595
596 =item Return Value: $resultset (scalar context), @row_objs (list context)
597
598 =back
599
600   # WHERE title LIKE '%blue%'
601   $cd_rs = $rs->search_like({ title => '%blue%'});
602
603 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
604 that this is simply a convenience method. You most likely want to use
605 L</search> with specific operators.
606
607 For more information, see L<DBIx::Class::Manual::Cookbook>.
608
609 =cut
610
611 sub search_like {
612   my $class = shift;
613   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
614   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
615   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
616   return $class->search($query, { %$attrs });
617 }
618
619 =head2 slice
620
621 =over 4
622
623 =item Arguments: $first, $last
624
625 =item Return Value: $resultset (scalar context), @row_objs (list context)
626
627 =back
628
629 Returns a resultset or object list representing a subset of elements from the
630 resultset slice is called on. Indexes are from 0, i.e., to get the first
631 three records, call:
632
633   my ($one, $two, $three) = $rs->slice(0, 2);
634
635 =cut
636
637 sub slice {
638   my ($self, $min, $max) = @_;
639   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
640   $attrs->{offset} = $self->{attrs}{offset} || 0;
641   $attrs->{offset} += $min;
642   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
643   return $self->search(undef(), $attrs);
644   #my $slice = (ref $self)->new($self->result_source, $attrs);
645   #return (wantarray ? $slice->all : $slice);
646 }
647
648 =head2 next
649
650 =over 4
651
652 =item Arguments: none
653
654 =item Return Value: $result?
655
656 =back
657
658 Returns the next element in the resultset (C<undef> is there is none).
659
660 Can be used to efficiently iterate over records in the resultset:
661
662   my $rs = $schema->resultset('CD')->search;
663   while (my $cd = $rs->next) {
664     print $cd->title;
665   }
666
667 Note that you need to store the resultset object, and call C<next> on it.
668 Calling C<< resultset('Table')->next >> repeatedly will always return the
669 first record from the resultset.
670
671 =cut
672
673 sub next {
674   my ($self) = @_;
675   if (my $cache = $self->get_cache) {
676     $self->{all_cache_position} ||= 0;
677     return $cache->[$self->{all_cache_position}++];
678   }
679   if ($self->{attrs}{cache}) {
680     $self->{all_cache_position} = 1;
681     return ($self->all)[0];
682   }
683   my @row = (
684     exists $self->{stashed_row}
685       ? @{delete $self->{stashed_row}}
686       : $self->cursor->next
687   );
688   return unless (@row);
689   return $self->_construct_object(@row);
690 }
691
692 sub _resolved_attrs {
693   my $self = shift;
694   return $self->{_attrs} if $self->{_attrs};
695
696   my $attrs = $self->{attrs};
697   my $source = $self->{_parent_source} || $self->{result_source};
698   my $alias = $attrs->{_orig_alias};
699
700   # XXX - lose storable dclone
701   my $record_filter = delete $attrs->{record_filter};
702   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
703   $attrs->{record_filter} = $record_filter if $record_filter;
704
705   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
706   if ($attrs->{columns}) {
707     delete $attrs->{as};
708   } elsif (!$attrs->{select}) {
709     $attrs->{columns} = [ $self->{result_source}->columns ];
710   }
711   
712   my $select_alias = $self->{attrs}{alias};
713   $attrs->{select} ||= [
714     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
715   ];
716   $attrs->{as} ||= [
717     map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
718   ];
719   
720   my $adds;
721   if ($adds = delete $attrs->{include_columns}) {
722     $adds = [$adds] unless ref $adds eq 'ARRAY';
723     push(@{$attrs->{select}}, @$adds);
724     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
725   }
726   if ($adds = delete $attrs->{'+select'}) {
727     $adds = [$adds] unless ref $adds eq 'ARRAY';
728     push(@{$attrs->{select}}, map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
729   }
730   if (my $adds = delete $attrs->{'+as'}) {
731     $adds = [$adds] unless ref $adds eq 'ARRAY';
732     push(@{$attrs->{as}}, @$adds);
733   }
734
735   $attrs->{from} ||= [ { $alias => $source->from } ];
736   $attrs->{seen_join} ||= {};
737   my %seen;
738   if (my $join = delete $attrs->{join}) {
739     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
740       if (ref $j eq 'HASH') {
741         $seen{$_} = 1 foreach keys %$j;
742       } else {
743         $seen{$j} = 1;
744       }
745     }
746     push(@{$attrs->{from}},
747       $source->resolve_join($join, $alias, $attrs->{seen_join})
748     );
749   }
750
751   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
752   if ($attrs->{order_by}) {
753     $attrs->{order_by} = [ $attrs->{order_by} ] unless ref $attrs->{order_by};    
754   } else {
755     $attrs->{order_by} ||= [];    
756   }
757
758   my $collapse = $attrs->{collapse} || {};
759   if (my $prefetch = delete $attrs->{prefetch}) {
760     my @pre_order;
761     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
762       if ( ref $p eq 'HASH' ) {
763         foreach my $key (keys %$p) {
764           push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
765             unless $seen{$key};
766         }
767       } else {
768         push(@{$attrs->{from}}, $source->resolve_join($p, $alias))
769           unless $seen{$p};
770       }
771       # bring joins back to level of current class
772       $p = $self->_reduce_joins($p, $attrs) if $attrs->{_live_join_stack};
773       if ($p) {
774         my @prefetch = $self->result_source->resolve_prefetch(
775           $p, $alias, {}, \@pre_order, $collapse
776         );
777         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
778         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
779       }
780     }
781     push(@{$attrs->{order_by}}, @pre_order);
782   }
783   $attrs->{collapse} = $collapse;
784
785   return $self->{_attrs} = $attrs;
786 }
787
788 sub _merge_attr {
789   my ($self, $a, $b) = @_;
790   return $b unless $a;
791   
792   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
793     foreach my $key (keys %{$b}) {
794       if (exists $a->{$key}) {
795         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
796       } else {
797         $a->{$key} = $b->{$key};
798       }
799     }
800     return $a;
801   } else {
802     $a = [$a] unless ref $a eq 'ARRAY';
803     $b = [$b] unless ref $b eq 'ARRAY';
804
805     my $hash = {};
806     my @array;
807     foreach my $x ($a, $b) {
808       foreach my $element (@{$x}) {
809         if (ref $element eq 'HASH') {
810           $hash = $self->_merge_attr($hash, $element);
811         } elsif (ref $element eq 'ARRAY') {
812           push(@array, @{$element});
813         } else {
814           push(@array, $element) unless $b == $x
815             && grep { $_ eq $element } @array;
816         }
817       }
818     }
819     
820     @array = grep { !exists $hash->{$_} } @array;
821
822     return keys %{$hash}
823       ? ( scalar(@array)
824             ? [$hash, @array]
825             : $hash
826         )
827       : \@array;
828   }
829 }
830
831 # bring the joins (which are from the original class) to the level
832 # of the current class so that we can resolve them properly
833 sub _reduce_joins {
834   my ($self, $p, $attrs) = @_;
835
836   STACK:
837   foreach my $join (@{$attrs->{_live_join_stack}}) {
838     if (ref $p eq 'HASH') {
839       return undef unless exists $p->{$join};
840       $p = $p->{$join};
841     } elsif (ref $p eq 'ARRAY') {
842       foreach my $pe (@{$p}) {
843         return undef if $pe eq $join;
844         if (ref $pe eq 'HASH' && exists $pe->{$join}) {
845           $p = $pe->{$join};
846           next STACK;
847         }
848       }
849       return undef;
850     } else {
851       return undef;
852     }
853   }
854   return $p;
855 }
856
857 sub _construct_object {
858   my ($self, @row) = @_;
859   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
860   my $new = $self->result_class->inflate_result($self->result_source, @$info);
861   $new = $self->{_attrs}{record_filter}->($new)
862     if exists $self->{_attrs}{record_filter};
863   return $new;
864 }
865
866 sub _collapse_result {
867   my ($self, $as, $row, $prefix) = @_;
868
869   my %const;
870   my @copy = @$row;
871   
872   foreach my $this_as (@$as) {
873     my $val = shift @copy;
874     if (defined $prefix) {
875       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
876         my $remain = $1;
877         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
878         $const{$1||''}{$2} = $val;
879       }
880     } else {
881       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
882       $const{$1||''}{$2} = $val;
883     }
884   }
885
886   my $alias = $self->{attrs}{alias};
887   my $info = [ {}, {} ];
888   foreach my $key (keys %const) {
889     if (length $key && $key ne $alias) {
890       my $target = $info;
891       my @parts = split(/\./, $key);
892       foreach my $p (@parts) {
893         $target = $target->[1]->{$p} ||= [];
894       }
895       $target->[0] = $const{$key};
896     } else {
897       $info->[0] = $const{$key};
898     }
899   }
900   
901   my @collapse;
902   if (defined $prefix) {
903     @collapse = map {
904         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
905     } keys %{$self->{_attrs}{collapse}}
906   } else {
907     @collapse = keys %{$self->{_attrs}{collapse}};
908   };
909
910   if (@collapse) {
911     my ($c) = sort { length $a <=> length $b } @collapse;
912     my $target = $info;
913     foreach my $p (split(/\./, $c)) {
914       $target = $target->[1]->{$p} ||= [];
915     }
916     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
917     my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
918     my $tree = $self->_collapse_result($as, $row, $c_prefix);
919     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
920     my (@final, @raw);
921
922     while (
923       !(
924         grep {
925           !defined($tree->[0]->{$_}) || $co_check{$_} ne $tree->[0]->{$_}
926         } @co_key
927         )
928     ) {
929       push(@final, $tree);
930       last unless (@raw = $self->cursor->next);
931       $row = $self->{stashed_row} = \@raw;
932       $tree = $self->_collapse_result($as, $row, $c_prefix);
933     }
934     @$target = (@final ? @final : [ {}, {} ]);
935       # single empty result to indicate an empty prefetched has_many
936   }
937
938   #print "final info: " . Dumper($info);
939   return $info;
940 }
941
942 =head2 result_source
943
944 =over 4
945
946 =item Arguments: $result_source?
947
948 =item Return Value: $result_source
949
950 =back
951
952 An accessor for the primary ResultSource object from which this ResultSet
953 is derived.
954
955 =cut
956
957
958 =head2 count
959
960 =over 4
961
962 =item Arguments: $cond, \%attrs??
963
964 =item Return Value: $count
965
966 =back
967
968 Performs an SQL C<COUNT> with the same query as the resultset was built
969 with to find the number of elements. If passed arguments, does a search
970 on the resultset and counts the results of that.
971
972 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
973 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
974 not support C<DISTINCT> with multiple columns. If you are using such a
975 database, you should only use columns from the main table in your C<group_by>
976 clause.
977
978 =cut
979
980 sub count {
981   my $self = shift;
982   return $self->search(@_)->count if @_ and defined $_[0];
983   return scalar @{ $self->get_cache } if $self->get_cache;
984   my $count = $self->_count;
985   return 0 unless $count;
986
987   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
988   $count = $self->{attrs}{rows} if
989     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
990   return $count;
991 }
992
993 sub _count { # Separated out so pager can get the full count
994   my $self = shift;
995   my $select = { count => '*' };
996
997   my $attrs = { %{$self->_resolved_attrs} };
998   if (my $group_by = delete $attrs->{group_by}) {
999     delete $attrs->{having};
1000     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1001     # todo: try CONCAT for multi-column pk
1002     my @pk = $self->result_source->primary_columns;
1003     if (@pk == 1) {
1004       my $alias = $attrs->{_orig_alias};
1005       foreach my $column (@distinct) {
1006         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1007           @distinct = ($column);
1008           last;
1009         }
1010       }
1011     }
1012
1013     $select = { count => { distinct => \@distinct } };
1014   }
1015
1016   $attrs->{select} = $select;
1017   $attrs->{as} = [qw/count/];
1018
1019   # offset, order by and page are not needed to count. record_filter is cdbi
1020   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1021   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1022   $tmp_rs->{_parent_source} = $self->{_parent_source} if $self->{_parent_source};
1023        #XXX - hack to pass through parent of related resultsets
1024
1025   my ($count) = $tmp_rs->cursor->next;
1026   return $count;
1027 }
1028
1029 =head2 count_literal
1030
1031 =over 4
1032
1033 =item Arguments: $sql_fragment, @bind_values
1034
1035 =item Return Value: $count
1036
1037 =back
1038
1039 Counts the results in a literal query. Equivalent to calling L</search_literal>
1040 with the passed arguments, then L</count>.
1041
1042 =cut
1043
1044 sub count_literal { shift->search_literal(@_)->count; }
1045
1046 =head2 all
1047
1048 =over 4
1049
1050 =item Arguments: none
1051
1052 =item Return Value: @objects
1053
1054 =back
1055
1056 Returns all elements in the resultset. Called implicitly if the resultset
1057 is returned in list context.
1058
1059 =cut
1060
1061 sub all {
1062   my ($self) = @_;
1063   return @{ $self->get_cache } if $self->get_cache;
1064
1065   my @obj;
1066
1067   # TODO: don't call resolve here
1068   if (keys %{$self->_resolved_attrs->{collapse}}) {
1069 #  if ($self->{attrs}{prefetch}) {
1070       # Using $self->cursor->all is really just an optimisation.
1071       # If we're collapsing has_many prefetches it probably makes
1072       # very little difference, and this is cleaner than hacking
1073       # _construct_object to survive the approach
1074     my @row = $self->cursor->next;
1075     while (@row) {
1076       push(@obj, $self->_construct_object(@row));
1077       @row = (exists $self->{stashed_row}
1078                ? @{delete $self->{stashed_row}}
1079                : $self->cursor->next);
1080     }
1081   } else {
1082     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1083   }
1084
1085   $self->set_cache(\@obj) if $self->{attrs}{cache};
1086   return @obj;
1087 }
1088
1089 =head2 reset
1090
1091 =over 4
1092
1093 =item Arguments: none
1094
1095 =item Return Value: $self
1096
1097 =back
1098
1099 Resets the resultset's cursor, so you can iterate through the elements again.
1100
1101 =cut
1102
1103 sub reset {
1104   my ($self) = @_;
1105   delete $self->{_attrs} if exists $self->{_attrs};
1106   $self->{all_cache_position} = 0;
1107   $self->cursor->reset;
1108   return $self;
1109 }
1110
1111 =head2 first
1112
1113 =over 4
1114
1115 =item Arguments: none
1116
1117 =item Return Value: $object?
1118
1119 =back
1120
1121 Resets the resultset and returns an object for the first result (if the
1122 resultset returns anything).
1123
1124 =cut
1125
1126 sub first {
1127   return $_[0]->reset->next;
1128 }
1129
1130 # _cond_for_update_delete
1131 #
1132 # update/delete require the condition to be modified to handle
1133 # the differing SQL syntax available.  This transforms the $self->{cond}
1134 # appropriately, returning the new condition.
1135
1136 sub _cond_for_update_delete {
1137   my ($self) = @_;
1138   my $cond = {};
1139
1140   # No-op. No condition, we're updating/deleting everything
1141   return $cond unless ref $self->{cond};
1142
1143   if (ref $self->{cond} eq 'ARRAY') {
1144     $cond = [
1145       map {
1146         my %hash;
1147         foreach my $key (keys %{$_}) {
1148           $key =~ /([^.]+)$/;
1149           $hash{$1} = $_->{$key};
1150         }
1151         \%hash;
1152       } @{$self->{cond}}
1153     ];
1154   }
1155   elsif (ref $self->{cond} eq 'HASH') {
1156     if ((keys %{$self->{cond}})[0] eq '-and') {
1157       $cond->{-and} = [];
1158
1159       my @cond = @{$self->{cond}{-and}};
1160       for (my $i = 0; $i < @cond; $i++) {
1161         my $entry = $cond[$i];
1162
1163         my %hash;
1164         if (ref $entry eq 'HASH') {
1165           foreach my $key (keys %{$entry}) {
1166             $key =~ /([^.]+)$/;
1167             $hash{$1} = $entry->{$key};
1168           }
1169         }
1170         else {
1171           $entry =~ /([^.]+)$/;
1172           $hash{$1} = $cond[++$i];
1173         }
1174
1175         push @{$cond->{-and}}, \%hash;
1176       }
1177     }
1178     else {
1179       foreach my $key (keys %{$self->{cond}}) {
1180         $key =~ /([^.]+)$/;
1181         $cond->{$1} = $self->{cond}{$key};
1182       }
1183     }
1184   }
1185   else {
1186     $self->throw_exception(
1187       "Can't update/delete on resultset with condition unless hash or array"
1188     );
1189   }
1190
1191   return $cond;
1192 }
1193
1194
1195 =head2 update
1196
1197 =over 4
1198
1199 =item Arguments: \%values
1200
1201 =item Return Value: $storage_rv
1202
1203 =back
1204
1205 Sets the specified columns in the resultset to the supplied values in a
1206 single query. Return value will be true if the update succeeded or false
1207 if no records were updated; exact type of success value is storage-dependent.
1208
1209 =cut
1210
1211 sub update {
1212   my ($self, $values) = @_;
1213   $self->throw_exception("Values for update must be a hash")
1214     unless ref $values eq 'HASH';
1215
1216   my $cond = $self->_cond_for_update_delete;
1217
1218   return $self->result_source->storage->update(
1219     $self->result_source->from, $values, $cond
1220   );
1221 }
1222
1223 =head2 update_all
1224
1225 =over 4
1226
1227 =item Arguments: \%values
1228
1229 =item Return Value: 1
1230
1231 =back
1232
1233 Fetches all objects and updates them one at a time. Note that C<update_all>
1234 will run DBIC cascade triggers, while L</update> will not.
1235
1236 =cut
1237
1238 sub update_all {
1239   my ($self, $values) = @_;
1240   $self->throw_exception("Values for update must be a hash")
1241     unless ref $values eq 'HASH';
1242   foreach my $obj ($self->all) {
1243     $obj->set_columns($values)->update;
1244   }
1245   return 1;
1246 }
1247
1248 =head2 delete
1249
1250 =over 4
1251
1252 =item Arguments: none
1253
1254 =item Return Value: 1
1255
1256 =back
1257
1258 Deletes the contents of the resultset from its result source. Note that this
1259 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1260 to run.
1261
1262 =cut
1263
1264 sub delete {
1265   my ($self) = @_;
1266
1267   my $cond = $self->_cond_for_update_delete;
1268
1269   $self->result_source->storage->delete($self->result_source->from, $cond);
1270   return 1;
1271 }
1272
1273 =head2 delete_all
1274
1275 =over 4
1276
1277 =item Arguments: none
1278
1279 =item Return Value: 1
1280
1281 =back
1282
1283 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1284 will run DBIC cascade triggers, while L</delete> will not.
1285
1286 =cut
1287
1288 sub delete_all {
1289   my ($self) = @_;
1290   $_->delete for $self->all;
1291   return 1;
1292 }
1293
1294 =head2 pager
1295
1296 =over 4
1297
1298 =item Arguments: none
1299
1300 =item Return Value: $pager
1301
1302 =back
1303
1304 Return Value a L<Data::Page> object for the current resultset. Only makes
1305 sense for queries with a C<page> attribute.
1306
1307 =cut
1308
1309 sub pager {
1310   my ($self) = @_;
1311   my $attrs = $self->{attrs};
1312   $self->throw_exception("Can't create pager for non-paged rs")
1313     unless $self->{attrs}{page};
1314   $attrs->{rows} ||= 10;
1315   return $self->{pager} ||= Data::Page->new(
1316     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1317 }
1318
1319 =head2 page
1320
1321 =over 4
1322
1323 =item Arguments: $page_number
1324
1325 =item Return Value: $rs
1326
1327 =back
1328
1329 Returns a resultset for the $page_number page of the resultset on which page
1330 is called, where each page contains a number of rows equal to the 'rows'
1331 attribute set on the resultset (10 by default).
1332
1333 =cut
1334
1335 sub page {
1336   my ($self, $page) = @_;
1337   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1338 }
1339
1340 =head2 new_result
1341
1342 =over 4
1343
1344 =item Arguments: \%vals
1345
1346 =item Return Value: $object
1347
1348 =back
1349
1350 Creates an object in the resultset's result class and returns it.
1351
1352 =cut
1353
1354 sub new_result {
1355   my ($self, $values) = @_;
1356   $self->throw_exception( "new_result needs a hash" )
1357     unless (ref $values eq 'HASH');
1358   $self->throw_exception(
1359     "Can't abstract implicit construct, condition not a hash"
1360   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1361   my %new = %$values;
1362   my $alias = $self->{attrs}{_orig_alias};
1363   foreach my $key (keys %{$self->{cond}||{}}) {
1364     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1365   }
1366   my $obj = $self->result_class->new(\%new);
1367   $obj->result_source($self->result_source) if $obj->can('result_source');
1368   return $obj;
1369 }
1370
1371 =head2 find_or_new
1372
1373 =over 4
1374
1375 =item Arguments: \%vals, \%attrs?
1376
1377 =item Return Value: $object
1378
1379 =back
1380
1381 Find an existing record from this resultset. If none exists, instantiate a new
1382 result object and return it. The object will not be saved into your storage
1383 until you call L<DBIx::Class::Row/insert> on it.
1384
1385 If you want objects to be saved immediately, use L</find_or_create> instead.
1386
1387 =cut
1388
1389 sub find_or_new {
1390   my $self     = shift;
1391   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1392   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1393   my $exists   = $self->find($hash, $attrs);
1394   return defined $exists ? $exists : $self->new_result($hash);
1395 }
1396
1397 =head2 create
1398
1399 =over 4
1400
1401 =item Arguments: \%vals
1402
1403 =item Return Value: $object
1404
1405 =back
1406
1407 Inserts a record into the resultset and returns the object representing it.
1408
1409 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1410
1411 =cut
1412
1413 sub create {
1414   my ($self, $attrs) = @_;
1415   $self->throw_exception( "create needs a hashref" )
1416     unless ref $attrs eq 'HASH';
1417   return $self->new_result($attrs)->insert;
1418 }
1419
1420 =head2 find_or_create
1421
1422 =over 4
1423
1424 =item Arguments: \%vals, \%attrs?
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430   $class->find_or_create({ key => $val, ... });
1431
1432 Tries to find a record based on its primary key or unique constraint; if none
1433 is found, creates one and returns that instead.
1434
1435   my $cd = $schema->resultset('CD')->find_or_create({
1436     cdid   => 5,
1437     artist => 'Massive Attack',
1438     title  => 'Mezzanine',
1439     year   => 2005,
1440   });
1441
1442 Also takes an optional C<key> attribute, to search by a specific key or unique
1443 constraint. For example:
1444
1445   my $cd = $schema->resultset('CD')->find_or_create(
1446     {
1447       artist => 'Massive Attack',
1448       title  => 'Mezzanine',
1449     },
1450     { key => 'cd_artist_title' }
1451   );
1452
1453 See also L</find> and L</update_or_create>. For information on how to declare
1454 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1455
1456 =cut
1457
1458 sub find_or_create {
1459   my $self     = shift;
1460   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1461   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1462   my $exists   = $self->find($hash, $attrs);
1463   return defined $exists ? $exists : $self->create($hash);
1464 }
1465
1466 =head2 update_or_create
1467
1468 =over 4
1469
1470 =item Arguments: \%col_values, { key => $unique_constraint }?
1471
1472 =item Return Value: $object
1473
1474 =back
1475
1476   $class->update_or_create({ col => $val, ... });
1477
1478 First, searches for an existing row matching one of the unique constraints
1479 (including the primary key) on the source of this resultset. If a row is
1480 found, updates it with the other given column values. Otherwise, creates a new
1481 row.
1482
1483 Takes an optional C<key> attribute to search on a specific unique constraint.
1484 For example:
1485
1486   # In your application
1487   my $cd = $schema->resultset('CD')->update_or_create(
1488     {
1489       artist => 'Massive Attack',
1490       title  => 'Mezzanine',
1491       year   => 1998,
1492     },
1493     { key => 'cd_artist_title' }
1494   );
1495
1496 If no C<key> is specified, it searches on all unique constraints defined on the
1497 source, including the primary key.
1498
1499 If the C<key> is specified as C<primary>, it searches only on the primary key.
1500
1501 See also L</find> and L</find_or_create>. For information on how to declare
1502 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1503
1504 =cut
1505
1506 sub update_or_create {
1507   my $self = shift;
1508   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1509   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1510
1511   my $row = $self->find($cond);
1512   if (defined $row) {
1513     $row->update($cond);
1514     return $row;
1515   }
1516
1517   return $self->create($cond);
1518 }
1519
1520 =head2 get_cache
1521
1522 =over 4
1523
1524 =item Arguments: none
1525
1526 =item Return Value: \@cache_objects?
1527
1528 =back
1529
1530 Gets the contents of the cache for the resultset, if the cache is set.
1531
1532 =cut
1533
1534 sub get_cache {
1535   shift->{all_cache};
1536 }
1537
1538 =head2 set_cache
1539
1540 =over 4
1541
1542 =item Arguments: \@cache_objects
1543
1544 =item Return Value: \@cache_objects
1545
1546 =back
1547
1548 Sets the contents of the cache for the resultset. Expects an arrayref
1549 of objects of the same class as those produced by the resultset. Note that
1550 if the cache is set the resultset will return the cached objects rather
1551 than re-querying the database even if the cache attr is not set.
1552
1553 =cut
1554
1555 sub set_cache {
1556   my ( $self, $data ) = @_;
1557   $self->throw_exception("set_cache requires an arrayref")
1558       if defined($data) && (ref $data ne 'ARRAY');
1559   $self->{all_cache} = $data;
1560 }
1561
1562 =head2 clear_cache
1563
1564 =over 4
1565
1566 =item Arguments: none
1567
1568 =item Return Value: []
1569
1570 =back
1571
1572 Clears the cache for the resultset.
1573
1574 =cut
1575
1576 sub clear_cache {
1577   shift->set_cache(undef);
1578 }
1579
1580 =head2 related_resultset
1581
1582 =over 4
1583
1584 =item Arguments: $relationship_name
1585
1586 =item Return Value: $resultset
1587
1588 =back
1589
1590 Returns a related resultset for the supplied relationship name.
1591
1592   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1593
1594 =cut
1595
1596 sub related_resultset {
1597   my ($self, $rel) = @_;
1598
1599   $self->{related_resultsets} ||= {};
1600   return $self->{related_resultsets}{$rel} ||= do {
1601     my $rel_obj = $self->result_source->relationship_info($rel);
1602
1603     $self->throw_exception(
1604       "search_related: result source '" . $self->result_source->name .
1605         "' has no such relationship $rel")
1606       unless $rel_obj;
1607
1608     my @live_join_stack = @{$self->{attrs}{_live_join_stack}||[]};
1609
1610     # XXX mst: I'm sure this is wrong, somehow
1611     #          something with complex joins early on could die on search_rel
1612     #          followed by a prefetch. I think. need a test case.
1613
1614     my $join_count = scalar(grep { $_ eq $rel } @live_join_stack);
1615     my $alias = $join_count ? join('_', $rel, $join_count+1) : $rel;
1616
1617     push(@live_join_stack, $rel);
1618
1619     my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
1620       undef, {
1621         select => undef,
1622         as => undef,
1623         alias => $alias,
1624         _live_join_stack => \@live_join_stack,
1625         _parent_attrs => $self->{attrs}}
1626     );
1627
1628     # keep reference of the original resultset
1629     $rs->{_parent_source} = $self->{_parent_source} || $self->result_source;
1630
1631     return $rs;
1632   };
1633 }
1634
1635 =head2 throw_exception
1636
1637 See L<DBIx::Class::Schema/throw_exception> for details.
1638
1639 =cut
1640
1641 sub throw_exception {
1642   my $self=shift;
1643   $self->result_source->schema->throw_exception(@_);
1644 }
1645
1646 # XXX: FIXME: Attributes docs need clearing up
1647
1648 =head1 ATTRIBUTES
1649
1650 The resultset takes various attributes that modify its behavior. Here's an
1651 overview of them:
1652
1653 =head2 order_by
1654
1655 =over 4
1656
1657 =item Value: ($order_by | \@order_by)
1658
1659 =back
1660
1661 Which column(s) to order the results by. This is currently passed
1662 through directly to SQL, so you can give e.g. C<year DESC> for a
1663 descending order on the column `year'.
1664
1665 Please note that if you have quoting enabled (see
1666 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1667 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1668 so you will need to manually quote things as appropriate.)
1669
1670 =head2 columns
1671
1672 =over 4
1673
1674 =item Value: \@columns
1675
1676 =back
1677
1678 Shortcut to request a particular set of columns to be retrieved.  Adds
1679 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1680 from that, then auto-populates C<as> from C<select> as normal. (You may also
1681 use the C<cols> attribute, as in earlier versions of DBIC.)
1682
1683 =head2 include_columns
1684
1685 =over 4
1686
1687 =item Value: \@columns
1688
1689 =back
1690
1691 Shortcut to include additional columns in the returned results - for example
1692
1693   $schema->resultset('CD')->search(undef, {
1694     include_columns => ['artist.name'],
1695     join => ['artist']
1696   });
1697
1698 would return all CDs and include a 'name' column to the information
1699 passed to object inflation
1700
1701 =head2 select
1702
1703 =over 4
1704
1705 =item Value: \@select_columns
1706
1707 =back
1708
1709 Indicates which columns should be selected from the storage. You can use
1710 column names, or in the case of RDBMS back ends, function or stored procedure
1711 names:
1712
1713   $rs = $schema->resultset('Employee')->search(undef, {
1714     select => [
1715       'name',
1716       { count => 'employeeid' },
1717       { sum => 'salary' }
1718     ]
1719   });
1720
1721 When you use function/stored procedure names and do not supply an C<as>
1722 attribute, the column names returned are storage-dependent. E.g. MySQL would
1723 return a column named C<count(employeeid)> in the above example.
1724
1725 =head2 +select
1726
1727 =over 4
1728
1729 Indicates additional columns to be selected from storage.  Works the same as
1730 L<select> but adds columns to the selection.
1731
1732 =back
1733
1734 =head2 +as
1735
1736 =over 4
1737
1738 Indicates additional column names for those added via L<+select>.
1739
1740 =back
1741
1742 =head2 as
1743
1744 =over 4
1745
1746 =item Value: \@inflation_names
1747
1748 =back
1749
1750 Indicates column names for object inflation. This is used in conjunction with
1751 C<select>, usually when C<select> contains one or more function or stored
1752 procedure names:
1753
1754   $rs = $schema->resultset('Employee')->search(undef, {
1755     select => [
1756       'name',
1757       { count => 'employeeid' }
1758     ],
1759     as => ['name', 'employee_count'],
1760   });
1761
1762   my $employee = $rs->first(); # get the first Employee
1763
1764 If the object against which the search is performed already has an accessor
1765 matching a column name specified in C<as>, the value can be retrieved using
1766 the accessor as normal:
1767
1768   my $name = $employee->name();
1769
1770 If on the other hand an accessor does not exist in the object, you need to
1771 use C<get_column> instead:
1772
1773   my $employee_count = $employee->get_column('employee_count');
1774
1775 You can create your own accessors if required - see
1776 L<DBIx::Class::Manual::Cookbook> for details.
1777
1778 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1779 produced, it is used for internal access only. Thus attempting to use the accessor
1780 in an C<order_by> clause or similar will fail misrably.
1781
1782 =head2 join
1783
1784 =over 4
1785
1786 =item Value: ($rel_name | \@rel_names | \%rel_names)
1787
1788 =back
1789
1790 Contains a list of relationships that should be joined for this query.  For
1791 example:
1792
1793   # Get CDs by Nine Inch Nails
1794   my $rs = $schema->resultset('CD')->search(
1795     { 'artist.name' => 'Nine Inch Nails' },
1796     { join => 'artist' }
1797   );
1798
1799 Can also contain a hash reference to refer to the other relation's relations.
1800 For example:
1801
1802   package MyApp::Schema::Track;
1803   use base qw/DBIx::Class/;
1804   __PACKAGE__->table('track');
1805   __PACKAGE__->add_columns(qw/trackid cd position title/);
1806   __PACKAGE__->set_primary_key('trackid');
1807   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1808   1;
1809
1810   # In your application
1811   my $rs = $schema->resultset('Artist')->search(
1812     { 'track.title' => 'Teardrop' },
1813     {
1814       join     => { cd => 'track' },
1815       order_by => 'artist.name',
1816     }
1817   );
1818
1819 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1820 similarly for a third time). For e.g.
1821
1822   my $rs = $schema->resultset('Artist')->search({
1823     'cds.title'   => 'Down to Earth',
1824     'cds_2.title' => 'Popular',
1825   }, {
1826     join => [ qw/cds cds/ ],
1827   });
1828
1829 will return a set of all artists that have both a cd with title 'Down
1830 to Earth' and a cd with title 'Popular'.
1831
1832 If you want to fetch related objects from other tables as well, see C<prefetch>
1833 below.
1834
1835 =head2 prefetch
1836
1837 =over 4
1838
1839 =item Value: ($rel_name | \@rel_names | \%rel_names)
1840
1841 =back
1842
1843 Contains one or more relationships that should be fetched along with the main
1844 query (when they are accessed afterwards they will have already been
1845 "prefetched").  This is useful for when you know you will need the related
1846 objects, because it saves at least one query:
1847
1848   my $rs = $schema->resultset('Tag')->search(
1849     undef,
1850     {
1851       prefetch => {
1852         cd => 'artist'
1853       }
1854     }
1855   );
1856
1857 The initial search results in SQL like the following:
1858
1859   SELECT tag.*, cd.*, artist.* FROM tag
1860   JOIN cd ON tag.cd = cd.cdid
1861   JOIN artist ON cd.artist = artist.artistid
1862
1863 L<DBIx::Class> has no need to go back to the database when we access the
1864 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1865 case.
1866
1867 Simple prefetches will be joined automatically, so there is no need
1868 for a C<join> attribute in the above search. If you're prefetching to
1869 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1870 specify the join as well.
1871
1872 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1873 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1874 with an accessor type of 'single' or 'filter').
1875
1876 =head2 page
1877
1878 =over 4
1879
1880 =item Value: $page
1881
1882 =back
1883
1884 Makes the resultset paged and specifies the page to retrieve. Effectively
1885 identical to creating a non-pages resultset and then calling ->page($page)
1886 on it.
1887
1888 If L<rows> attribute is not specified it defualts to 10 rows per page.
1889
1890 =head2 rows
1891
1892 =over 4
1893
1894 =item Value: $rows
1895
1896 =back
1897
1898 Specifes the maximum number of rows for direct retrieval or the number of
1899 rows per page if the page attribute or method is used.
1900
1901 =head2 offset
1902
1903 =over 4
1904
1905 =item Value: $offset
1906
1907 =back
1908
1909 Specifies the (zero-based) row number for the  first row to be returned, or the
1910 of the first row of the first page if paging is used.
1911
1912 =head2 group_by
1913
1914 =over 4
1915
1916 =item Value: \@columns
1917
1918 =back
1919
1920 A arrayref of columns to group by. Can include columns of joined tables.
1921
1922   group_by => [qw/ column1 column2 ... /]
1923
1924 =head2 having
1925
1926 =over 4
1927
1928 =item Value: $condition
1929
1930 =back
1931
1932 HAVING is a select statement attribute that is applied between GROUP BY and
1933 ORDER BY. It is applied to the after the grouping calculations have been
1934 done.
1935
1936   having => { 'count(employee)' => { '>=', 100 } }
1937
1938 =head2 distinct
1939
1940 =over 4
1941
1942 =item Value: (0 | 1)
1943
1944 =back
1945
1946 Set to 1 to group by all columns.
1947
1948 =head2 where
1949
1950 =over 4
1951
1952 Adds to the WHERE clause.
1953
1954   # only return rows WHERE deleted IS NULL for all searches
1955   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
1956
1957 Can be overridden by passing C<{ where => undef }> as an attribute
1958 to a resulset.
1959
1960 =back
1961
1962 =head2 cache
1963
1964 Set to 1 to cache search results. This prevents extra SQL queries if you
1965 revisit rows in your ResultSet:
1966
1967   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1968
1969   while( my $artist = $resultset->next ) {
1970     ... do stuff ...
1971   }
1972
1973   $rs->first; # without cache, this would issue a query
1974
1975 By default, searches are not cached.
1976
1977 For more examples of using these attributes, see
1978 L<DBIx::Class::Manual::Cookbook>.
1979
1980 =head2 from
1981
1982 =over 4
1983
1984 =item Value: \@from_clause
1985
1986 =back
1987
1988 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1989 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1990 clauses.
1991
1992 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1993
1994 C<join> will usually do what you need and it is strongly recommended that you
1995 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1996 And we really do mean "cannot", not just tried and failed. Attempting to use
1997 this because you're having problems with C<join> is like trying to use x86
1998 ASM because you've got a syntax error in your C. Trust us on this.
1999
2000 Now, if you're still really, really sure you need to use this (and if you're
2001 not 100% sure, ask the mailing list first), here's an explanation of how this
2002 works.
2003
2004 The syntax is as follows -
2005
2006   [
2007     { <alias1> => <table1> },
2008     [
2009       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2010       [], # nested JOIN (optional)
2011       { <table1.column1> => <table2.column2>, ... (more conditions) },
2012     ],
2013     # More of the above [ ] may follow for additional joins
2014   ]
2015
2016   <table1> <alias1>
2017   JOIN
2018     <table2> <alias2>
2019     [JOIN ...]
2020   ON <table1.column1> = <table2.column2>
2021   <more joins may follow>
2022
2023 An easy way to follow the examples below is to remember the following:
2024
2025     Anything inside "[]" is a JOIN
2026     Anything inside "{}" is a condition for the enclosing JOIN
2027
2028 The following examples utilize a "person" table in a family tree application.
2029 In order to express parent->child relationships, this table is self-joined:
2030
2031     # Person->belongs_to('father' => 'Person');
2032     # Person->belongs_to('mother' => 'Person');
2033
2034 C<from> can be used to nest joins. Here we return all children with a father,
2035 then search against all mothers of those children:
2036
2037   $rs = $schema->resultset('Person')->search(
2038       undef,
2039       {
2040           alias => 'mother', # alias columns in accordance with "from"
2041           from => [
2042               { mother => 'person' },
2043               [
2044                   [
2045                       { child => 'person' },
2046                       [
2047                           { father => 'person' },
2048                           { 'father.person_id' => 'child.father_id' }
2049                       ]
2050                   ],
2051                   { 'mother.person_id' => 'child.mother_id' }
2052               ],
2053           ]
2054       },
2055   );
2056
2057   # Equivalent SQL:
2058   # SELECT mother.* FROM person mother
2059   # JOIN (
2060   #   person child
2061   #   JOIN person father
2062   #   ON ( father.person_id = child.father_id )
2063   # )
2064   # ON ( mother.person_id = child.mother_id )
2065
2066 The type of any join can be controlled manually. To search against only people
2067 with a father in the person table, we could explicitly use C<INNER JOIN>:
2068
2069     $rs = $schema->resultset('Person')->search(
2070         undef,
2071         {
2072             alias => 'child', # alias columns in accordance with "from"
2073             from => [
2074                 { child => 'person' },
2075                 [
2076                     { father => 'person', -join_type => 'inner' },
2077                     { 'father.id' => 'child.father_id' }
2078                 ],
2079             ]
2080         },
2081     );
2082
2083     # Equivalent SQL:
2084     # SELECT child.* FROM person child
2085     # INNER JOIN person father ON child.father_id = father.id
2086
2087 =cut
2088
2089 1;