093a45657328c8ded6d68a830934660dcd013f12
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: ($source, \%$attrs)
59
60 =back
61
62 The resultset constructor. Takes a source object (usually a
63 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
64 L</ATTRIBUTES> below).  Does not perform any queries -- these are
65 executed as needed by the other methods.
66
67 Generally you won't need to construct a resultset manually.  You'll
68 automatically get one from e.g. a L</search> called in scalar context:
69
70   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
71
72 =cut
73
74 sub new {
75   my $class = shift;
76   return $class->new_result(@_) if ref $class;
77   
78   my ($source, $attrs) = @_;
79   weaken $source;
80   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
81   #use Data::Dumper; warn Dumper($attrs);
82   my $alias = ($attrs->{alias} ||= 'me');
83   
84   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
85   delete $attrs->{as} if $attrs->{columns};
86   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
87   $attrs->{select} = [
88     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
89   ] if $attrs->{columns};
90   $attrs->{as} ||= [
91     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
92   ];
93   if (my $include = delete $attrs->{include_columns}) {
94     push(@{$attrs->{select}}, @$include);
95     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
96   }
97   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
98
99   $attrs->{from} ||= [ { $alias => $source->from } ];
100   $attrs->{seen_join} ||= {};
101   my %seen;
102   if (my $join = delete $attrs->{join}) {
103     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
104       if (ref $j eq 'HASH') {
105         $seen{$_} = 1 foreach keys %$j;
106       } else {
107         $seen{$j} = 1;
108       }
109     }
110     push(@{$attrs->{from}}, $source->resolve_join(
111       $join, $attrs->{alias}, $attrs->{seen_join})
112     );
113   }
114   
115   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
116   $attrs->{order_by} = [ $attrs->{order_by} ] if
117     $attrs->{order_by} and !ref($attrs->{order_by});
118   $attrs->{order_by} ||= [];
119
120   my $collapse = $attrs->{collapse} || {};
121   if (my $prefetch = delete $attrs->{prefetch}) {
122     my @pre_order;
123     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
124       if ( ref $p eq 'HASH' ) {
125         foreach my $key (keys %$p) {
126           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
127             unless $seen{$key};
128         }
129       } else {
130         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
131             unless $seen{$p};
132       }
133       my @prefetch = $source->resolve_prefetch(
134            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
135       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
136       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
137     }
138     push(@{$attrs->{order_by}}, @pre_order);
139   }
140   $attrs->{collapse} = $collapse;
141 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
142
143   if ($attrs->{page}) {
144     $attrs->{rows} ||= 10;
145     $attrs->{offset} ||= 0;
146     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
147   }
148
149   bless {
150     result_source => $source,
151     result_class => $attrs->{result_class} || $source->result_class,
152     cond => $attrs->{where},
153     from => $attrs->{from},
154     collapse => $collapse,
155     count => undef,
156     page => delete $attrs->{page},
157     pager => undef,
158     attrs => $attrs
159   }, $class;
160 }
161
162 =head2 search
163
164   my @cds    = $rs->search({ year => 2001 }); # "... WHERE year = 2001"
165   my $new_rs = $rs->search({ year => 2005 });
166
167 If you need to pass in additional attributes but no additional condition,
168 call it as C<search(undef, \%attrs);>.
169
170   # "SELECT name, artistid FROM $artist_table"
171   my @all_artists = $schema->resultset('Artist')->search(undef, {
172     columns => [qw/name artistid/],
173   });
174
175 =cut
176
177 sub search {
178   my $self = shift;
179
180   my $rs;
181   if( @_ ) {
182     
183     my $attrs = { %{$self->{attrs}} };
184     my $having = delete $attrs->{having};
185     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
186
187     my $where = (@_
188                   ? ((@_ == 1 || ref $_[0] eq "HASH")
189                       ? shift
190                       : ((@_ % 2)
191                           ? $self->throw_exception(
192                               "Odd number of arguments to search")
193                           : {@_}))
194                   : undef());
195     if (defined $where) {
196       $attrs->{where} = (defined $attrs->{where}
197                 ? { '-and' =>
198                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
199                         $where, $attrs->{where} ] }
200                 : $where);
201     }
202
203     if (defined $having) {
204       $attrs->{having} = (defined $attrs->{having}
205                 ? { '-and' =>
206                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
207                         $having, $attrs->{having} ] }
208                 : $having);
209     }
210
211     $rs = (ref $self)->new($self->result_source, $attrs);
212   }
213   else {
214     $rs = $self;
215     $rs->reset;
216   }
217   return (wantarray ? $rs->all : $rs);
218 }
219
220 =head2 search_literal
221
222   my @obj    = $rs->search_literal($literal_where_cond, @bind);
223   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
224
225 Pass a literal chunk of SQL to be added to the conditional part of the
226 resultset.
227
228 =cut
229
230 sub search_literal {
231   my ($self, $cond, @vals) = @_;
232   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
233   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
234   return $self->search(\$cond, $attrs);
235 }
236
237 =head2 find
238
239 =over 4
240
241 =item Arguments: (@colvalues) | (\%cols, \%attrs?)
242
243 =back
244
245 Finds a row based on its primary key or unique constraint. For example:
246
247   my $cd = $schema->resultset('CD')->find(5);
248
249 Also takes an optional C<key> attribute, to search by a specific key or unique
250 constraint. For example:
251
252   my $cd = $schema->resultset('CD')->find(
253     {
254       artist => 'Massive Attack',
255       title  => 'Mezzanine',
256     },
257     { key => 'artist_title' }
258   );
259
260 See also L</find_or_create> and L</update_or_create>.
261
262 =cut
263
264 sub find {
265   my ($self, @vals) = @_;
266   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
267
268   my @cols = $self->result_source->primary_columns;
269   if (exists $attrs->{key}) {
270     my %uniq = $self->result_source->unique_constraints;
271     $self->throw_exception(
272       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
273     ) unless exists $uniq{$attrs->{key}};
274     @cols = @{ $uniq{$attrs->{key}} };
275   }
276   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
277   $self->throw_exception(
278     "Can't find unless a primary key or unique constraint is defined"
279   ) unless @cols;
280
281   my $query;
282   if (ref $vals[0] eq 'HASH') {
283     $query = { %{$vals[0]} };
284   } elsif (@cols == @vals) {
285     $query = {};
286     @{$query}{@cols} = @vals;
287   } else {
288     $query = {@vals};
289   }
290   foreach my $key (grep { ! m/\./ } keys %$query) {
291     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
292   }
293   #warn Dumper($query);
294   
295   if (keys %$attrs) {
296       my $rs = $self->search($query,$attrs);
297       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
298   } else {
299       return keys %{$self->{collapse}} ?
300         $self->search($query)->next :
301         $self->single($query);
302   }
303 }
304
305 =head2 search_related
306
307   $rs->search_related('relname', $cond?, $attrs?);
308
309 Search the specified relationship. Optionally specify a condition for matching
310 records.
311
312 =cut
313
314 sub search_related {
315   return shift->related_resultset(shift)->search(@_);
316 }
317
318 =head2 cursor
319
320 Returns a storage-driven cursor to the given resultset.
321
322 =cut
323
324 sub cursor {
325   my ($self) = @_;
326   my $attrs = { %{$self->{attrs}} };
327   return $self->{cursor}
328     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
329           $attrs->{where},$attrs);
330 }
331
332 =head2 single
333
334 Inflates the first result without creating a cursor
335
336 =cut
337
338 sub single {
339   my ($self, $where) = @_;
340   my $attrs = { %{$self->{attrs}} };
341   if ($where) {
342     if (defined $attrs->{where}) {
343       $attrs->{where} = {
344         '-and' => 
345             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
346                $where, delete $attrs->{where} ]
347       };
348     } else {
349       $attrs->{where} = $where;
350     }
351   }
352   my @data = $self->result_source->storage->select_single(
353           $self->{from}, $attrs->{select},
354           $attrs->{where},$attrs);
355   return (@data ? $self->_construct_object(@data) : ());
356 }
357
358
359 =head2 search_like
360
361 Perform a search, but use C<LIKE> instead of equality as the condition. Note
362 that this is simply a convenience method; you most likely want to use
363 L</search> with specific operators.
364
365 For more information, see L<DBIx::Class::Manual::Cookbook>.
366
367 =cut
368
369 sub search_like {
370   my $class = shift;
371   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
372   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
373   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
374   return $class->search($query, { %$attrs });
375 }
376
377 =head2 slice
378
379 =over 4
380
381 =item Arguments: ($first, $last)
382
383 =back
384
385 Returns a subset of elements from the resultset.
386
387 =cut
388
389 sub slice {
390   my ($self, $min, $max) = @_;
391   my $attrs = { %{ $self->{attrs} || {} } };
392   $attrs->{offset} ||= 0;
393   $attrs->{offset} += $min;
394   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
395   my $slice = (ref $self)->new($self->result_source, $attrs);
396   return (wantarray ? $slice->all : $slice);
397 }
398
399 =head2 next
400
401 Returns the next element in the resultset (C<undef> is there is none).
402
403 Can be used to efficiently iterate over records in the resultset:
404
405   my $rs = $schema->resultset('CD')->search;
406   while (my $cd = $rs->next) {
407     print $cd->title;
408   }
409
410 =cut
411
412 sub next {
413   my ($self) = @_;
414   if (@{$self->{all_cache} || []}) {
415     $self->{all_cache_position} ||= 0;
416     return $self->{all_cache}->[$self->{all_cache_position}++];
417   }
418   if ($self->{attrs}{cache}) {
419     $self->{all_cache_position} = 1;
420     return ($self->all)[0];
421   }
422   my @row = (exists $self->{stashed_row} ?
423                @{delete $self->{stashed_row}} :
424                $self->cursor->next
425   );
426 #  warn Dumper(\@row); use Data::Dumper;
427   return unless (@row);
428   return $self->_construct_object(@row);
429 }
430
431 sub _construct_object {
432   my ($self, @row) = @_;
433   my @as = @{ $self->{attrs}{as} };
434   
435   my $info = $self->_collapse_result(\@as, \@row);
436   
437   my $new = $self->result_class->inflate_result($self->result_source, @$info);
438   
439   $new = $self->{attrs}{record_filter}->($new)
440     if exists $self->{attrs}{record_filter};
441   return $new;
442 }
443
444 sub _collapse_result {
445   my ($self, $as, $row, $prefix) = @_;
446
447   my %const;
448
449   my @copy = @$row;
450   foreach my $this_as (@$as) {
451     my $val = shift @copy;
452     if (defined $prefix) {
453       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
454         my $remain = $1;
455         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
456         $const{$1||''}{$2} = $val;
457       }
458     } else {
459       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
460       $const{$1||''}{$2} = $val;
461     }
462   }
463
464   my $info = [ {}, {} ];
465   foreach my $key (keys %const) {
466     if (length $key) {
467       my $target = $info;
468       my @parts = split(/\./, $key);
469       foreach my $p (@parts) {
470         $target = $target->[1]->{$p} ||= [];
471       }
472       $target->[0] = $const{$key};
473     } else {
474       $info->[0] = $const{$key};
475     }
476   }
477
478   my @collapse;
479   if (defined $prefix) {
480     @collapse = map {
481         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
482     } keys %{$self->{collapse}}
483   } else {
484     @collapse = keys %{$self->{collapse}};
485   };
486
487   if (@collapse) {
488     my ($c) = sort { length $a <=> length $b } @collapse;
489     my $target = $info;
490     foreach my $p (split(/\./, $c)) {
491       $target = $target->[1]->{$p} ||= [];
492     }
493     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
494     my @co_key = @{$self->{collapse}{$c_prefix}};
495     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
496     my $tree = $self->_collapse_result($as, $row, $c_prefix);
497     my (@final, @raw);
498     while ( !(grep {
499                 !defined($tree->[0]->{$_}) ||
500                 $co_check{$_} ne $tree->[0]->{$_}
501               } @co_key) ) {
502       push(@final, $tree);
503       last unless (@raw = $self->cursor->next);
504       $row = $self->{stashed_row} = \@raw;
505       $tree = $self->_collapse_result($as, $row, $c_prefix);
506       #warn Data::Dumper::Dumper($tree, $row);
507     }
508     @$target = @final;
509   }
510
511   return $info;
512 }
513
514 =head2 result_source
515
516 Returns a reference to the result source for this recordset.
517
518 =cut
519
520
521 =head2 count
522
523 Performs an SQL C<COUNT> with the same query as the resultset was built
524 with to find the number of elements. If passed arguments, does a search
525 on the resultset and counts the results of that.
526
527 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
528 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
529 not support C<DISTINCT> with multiple columns. If you are using such a
530 database, you should only use columns from the main table in your C<group_by>
531 clause.
532
533 =cut
534
535 sub count {
536   my $self = shift;
537   return $self->search(@_)->count if @_ and defined $_[0];
538   return scalar @{ $self->get_cache } if @{ $self->get_cache };
539
540   my $count = $self->_count;
541   return 0 unless $count;
542
543   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
544   $count = $self->{attrs}{rows} if
545     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
546   return $count;
547 }
548
549 sub _count { # Separated out so pager can get the full count
550   my $self = shift;
551   my $select = { count => '*' };
552   my $attrs = { %{ $self->{attrs} } };
553   if (my $group_by = delete $attrs->{group_by}) {
554     delete $attrs->{having};
555     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
556     # todo: try CONCAT for multi-column pk
557     my @pk = $self->result_source->primary_columns;
558     if (@pk == 1) {
559       foreach my $column (@distinct) {
560         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
561           @distinct = ($column);
562           last;
563         }
564       } 
565     }
566
567     $select = { count => { distinct => \@distinct } };
568     #use Data::Dumper; die Dumper $select;
569   }
570
571   $attrs->{select} = $select;
572   $attrs->{as} = [qw/count/];
573
574   # offset, order by and page are not needed to count. record_filter is cdbi
575   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
576         
577   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
578   return $count;
579 }
580
581 =head2 count_literal
582
583 Calls L</search_literal> with the passed arguments, then L</count>.
584
585 =cut
586
587 sub count_literal { shift->search_literal(@_)->count; }
588
589 =head2 all
590
591 Returns all elements in the resultset. Called implictly if the resultset
592 is returned in list context.
593
594 =cut
595
596 sub all {
597   my ($self) = @_;
598   return @{ $self->get_cache } if @{ $self->get_cache };
599
600   my @obj;
601
602   if (keys %{$self->{collapse}}) {
603       # Using $self->cursor->all is really just an optimisation.
604       # If we're collapsing has_many prefetches it probably makes
605       # very little difference, and this is cleaner than hacking
606       # _construct_object to survive the approach
607     $self->cursor->reset;
608     my @row = $self->cursor->next;
609     while (@row) {
610       push(@obj, $self->_construct_object(@row));
611       @row = (exists $self->{stashed_row}
612                ? @{delete $self->{stashed_row}}
613                : $self->cursor->next);
614     }
615   } else {
616     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
617   }
618
619   $self->set_cache(\@obj) if $self->{attrs}{cache};
620   return @obj;
621 }
622
623 =head2 reset
624
625 Resets the resultset's cursor, so you can iterate through the elements again.
626
627 =cut
628
629 sub reset {
630   my ($self) = @_;
631   $self->{all_cache_position} = 0;
632   $self->cursor->reset;
633   return $self;
634 }
635
636 =head2 first
637
638 Resets the resultset and returns the first element.
639
640 =cut
641
642 sub first {
643   return $_[0]->reset->next;
644 }
645
646 =head2 update
647
648 =over 4
649
650 =item Arguments: (\%values)
651
652 =back
653
654 Sets the specified columns in the resultset to the supplied values.
655
656 =cut
657
658 sub update {
659   my ($self, $values) = @_;
660   $self->throw_exception("Values for update must be a hash")
661     unless ref $values eq 'HASH';
662   return $self->result_source->storage->update(
663     $self->result_source->from, $values, $self->{cond}
664   );
665 }
666
667 =head2 update_all
668
669 =over 4
670
671 =item Arguments: (\%values)
672
673 =back
674
675 Fetches all objects and updates them one at a time.  Note that C<update_all>
676 will run cascade triggers while L</update> will not.
677
678 =cut
679
680 sub update_all {
681   my ($self, $values) = @_;
682   $self->throw_exception("Values for update must be a hash")
683     unless ref $values eq 'HASH';
684   foreach my $obj ($self->all) {
685     $obj->set_columns($values)->update;
686   }
687   return 1;
688 }
689
690 =head2 delete
691
692 Deletes the contents of the resultset from its result source.
693
694 =cut
695
696 sub delete {
697   my ($self) = @_;
698   my $del = {};
699
700   if (!ref($self->{cond})) {
701
702     # No-op. No condition, we're deleting everything
703
704   } elsif (ref $self->{cond} eq 'ARRAY') {
705
706     $del = [ map { my %hash;
707       foreach my $key (keys %{$_}) {
708         $key =~ /([^.]+)$/;
709         $hash{$1} = $_->{$key};
710       }; \%hash; } @{$self->{cond}} ];
711
712   } elsif (ref $self->{cond} eq 'HASH') {
713
714     if ((keys %{$self->{cond}})[0] eq '-and') {
715
716       $del->{-and} = [ map { my %hash;
717         foreach my $key (keys %{$_}) {
718           $key =~ /([^.]+)$/;
719           $hash{$1} = $_->{$key};
720         }; \%hash; } @{$self->{cond}{-and}} ];
721
722     } else {
723
724       foreach my $key (keys %{$self->{cond}}) {
725         $key =~ /([^.]+)$/;
726         $del->{$1} = $self->{cond}{$key};
727       }
728     }
729
730   } else {
731     $self->throw_exception(
732       "Can't delete on resultset with condition unless hash or array"
733     );
734   }
735
736   $self->result_source->storage->delete($self->result_source->from, $del);
737   return 1;
738 }
739
740 =head2 delete_all
741
742 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
743 will run cascade triggers while L</delete> will not.
744
745 =cut
746
747 sub delete_all {
748   my ($self) = @_;
749   $_->delete for $self->all;
750   return 1;
751 }
752
753 =head2 pager
754
755 Returns a L<Data::Page> object for the current resultset. Only makes
756 sense for queries with a C<page> attribute.
757
758 =cut
759
760 sub pager {
761   my ($self) = @_;
762   my $attrs = $self->{attrs};
763   $self->throw_exception("Can't create pager for non-paged rs")
764     unless $self->{page};
765   $attrs->{rows} ||= 10;
766   return $self->{pager} ||= Data::Page->new(
767     $self->_count, $attrs->{rows}, $self->{page});
768 }
769
770 =head2 page
771
772 =over 4
773
774 =item Arguments: ($page_num)
775
776 =back
777
778 Returns a new resultset for the specified page.
779
780 =cut
781
782 sub page {
783   my ($self, $page) = @_;
784   my $attrs = { %{$self->{attrs}} };
785   $attrs->{page} = $page;
786   return (ref $self)->new($self->result_source, $attrs);
787 }
788
789 =head2 new_result
790
791 =over 4
792
793 =item Arguments: (\%vals)
794
795 =back
796
797 Creates a result in the resultset's result class.
798
799 =cut
800
801 sub new_result {
802   my ($self, $values) = @_;
803   $self->throw_exception( "new_result needs a hash" )
804     unless (ref $values eq 'HASH');
805   $self->throw_exception(
806     "Can't abstract implicit construct, condition not a hash"
807   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
808   my %new = %$values;
809   my $alias = $self->{attrs}{alias};
810   foreach my $key (keys %{$self->{cond}||{}}) {
811     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
812   }
813   my $obj = $self->result_class->new(\%new);
814   $obj->result_source($self->result_source) if $obj->can('result_source');
815   return $obj;
816 }
817
818 =head2 create
819
820 =over 4
821
822 =item Arguments: (\%vals)
823
824 =back
825
826 Inserts a record into the resultset and returns the object.
827
828 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
829
830 =cut
831
832 sub create {
833   my ($self, $attrs) = @_;
834   $self->throw_exception( "create needs a hashref" )
835     unless ref $attrs eq 'HASH';
836   return $self->new_result($attrs)->insert;
837 }
838
839 =head2 find_or_create
840
841 =over 4
842
843 =item Arguments: (\%vals, \%attrs?)
844
845 =back
846
847   $class->find_or_create({ key => $val, ... });
848
849 Searches for a record matching the search condition; if it doesn't find one,
850 creates one and returns that instead.
851
852   my $cd = $schema->resultset('CD')->find_or_create({
853     cdid   => 5,
854     artist => 'Massive Attack',
855     title  => 'Mezzanine',
856     year   => 2005,
857   });
858
859 Also takes an optional C<key> attribute, to search by a specific key or unique
860 constraint. For example:
861
862   my $cd = $schema->resultset('CD')->find_or_create(
863     {
864       artist => 'Massive Attack',
865       title  => 'Mezzanine',
866     },
867     { key => 'artist_title' }
868   );
869
870 See also L</find> and L</update_or_create>.
871
872 =cut
873
874 sub find_or_create {
875   my $self     = shift;
876   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
877   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
878   my $exists   = $self->find($hash, $attrs);
879   return defined $exists ? $exists : $self->create($hash);
880 }
881
882 =head2 update_or_create
883
884   $class->update_or_create({ key => $val, ... });
885
886 First, search for an existing row matching one of the unique constraints
887 (including the primary key) on the source of this resultset.  If a row is
888 found, update it with the other given column values.  Otherwise, create a new
889 row.
890
891 Takes an optional C<key> attribute to search on a specific unique constraint.
892 For example:
893
894   # In your application
895   my $cd = $schema->resultset('CD')->update_or_create(
896     {
897       artist => 'Massive Attack',
898       title  => 'Mezzanine',
899       year   => 1998,
900     },
901     { key => 'artist_title' }
902   );
903
904 If no C<key> is specified, it searches on all unique constraints defined on the
905 source, including the primary key.
906
907 If the C<key> is specified as C<primary>, search only on the primary key.
908
909 See also L</find> and L</find_or_create>.
910
911 =cut
912
913 sub update_or_create {
914   my $self = shift;
915   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
916   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
917
918   my %unique_constraints = $self->result_source->unique_constraints;
919   my @constraint_names   = (exists $attrs->{key}
920                             ? ($attrs->{key})
921                             : keys %unique_constraints);
922
923   my @unique_hashes;
924   foreach my $name (@constraint_names) {
925     my @unique_cols = @{ $unique_constraints{$name} };
926     my %unique_hash =
927       map  { $_ => $hash->{$_} }
928       grep { exists $hash->{$_} }
929       @unique_cols;
930
931     push @unique_hashes, \%unique_hash
932       if (scalar keys %unique_hash == scalar @unique_cols);
933   }
934
935   if (@unique_hashes) {
936     my $row = $self->single(\@unique_hashes);
937     if (defined $row) {
938       $row->set_columns($hash);
939       $row->update;
940       return $row;
941     }
942   }
943
944   return $self->create($hash);
945 }
946
947 =head2 get_cache
948
949 Gets the contents of the cache for the resultset.
950
951 =cut
952
953 sub get_cache {
954   shift->{all_cache} || [];
955 }
956
957 =head2 set_cache
958
959 Sets the contents of the cache for the resultset. Expects an arrayref
960 of objects of the same class as those produced by the resultset.
961
962 =cut
963
964 sub set_cache {
965   my ( $self, $data ) = @_;
966   $self->throw_exception("set_cache requires an arrayref")
967     if ref $data ne 'ARRAY';
968   my $result_class = $self->result_class;
969   foreach( @$data ) {
970     $self->throw_exception(
971       "cannot cache object of type '$_', expected '$result_class'"
972     ) if ref $_ ne $result_class;
973   }
974   $self->{all_cache} = $data;
975 }
976
977 =head2 clear_cache
978
979 Clears the cache for the resultset.
980
981 =cut
982
983 sub clear_cache {
984   shift->set_cache([]);
985 }
986
987 =head2 related_resultset
988
989 Returns a related resultset for the supplied relationship name.
990
991   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
992
993 =cut
994
995 sub related_resultset {
996   my ( $self, $rel, @rest ) = @_;
997   $self->{related_resultsets} ||= {};
998   return $self->{related_resultsets}{$rel} ||= do {
999       #warn "fetching related resultset for rel '$rel'";
1000       my $rel_obj = $self->result_source->relationship_info($rel);
1001       $self->throw_exception(
1002         "search_related: result source '" . $self->result_source->name .
1003         "' has no such relationship ${rel}")
1004         unless $rel_obj; #die Dumper $self->{attrs};
1005
1006       my $rs = $self->search(undef, { join => $rel });
1007       my $alias = defined $rs->{attrs}{seen_join}{$rel}
1008                     && $rs->{attrs}{seen_join}{$rel} > 1
1009                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
1010                   : $rel;
1011
1012       $self->result_source->schema->resultset($rel_obj->{class}
1013            )->search( undef,
1014              { %{$rs->{attrs}},
1015                alias => $alias,
1016                select => undef,
1017                as => undef }
1018            )->search(@rest);      
1019   };
1020 }
1021
1022 =head2 throw_exception
1023
1024 See Schema's throw_exception
1025
1026 =cut
1027
1028 sub throw_exception {
1029   my $self=shift;
1030   $self->result_source->schema->throw_exception(@_);
1031 }
1032
1033 =head1 ATTRIBUTES
1034
1035 XXX: FIXME: Attributes docs need clearing up
1036
1037 The resultset takes various attributes that modify its behavior. Here's an
1038 overview of them:
1039
1040 =head2 order_by
1041
1042 Which column(s) to order the results by. This is currently passed
1043 through directly to SQL, so you can give e.g. C<year DESC> for a
1044 descending order on the column `year'.
1045
1046 =head2 columns
1047
1048 =over 4
1049
1050 =item Arguments: (\@columns)
1051
1052 =back
1053
1054 Shortcut to request a particular set of columns to be retrieved.  Adds
1055 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1056 from that, then auto-populates C<as> from C<select> as normal. (You may also
1057 use the C<cols> attribute, as in earlier versions of DBIC.)
1058
1059 =head2 include_columns
1060
1061 =over 4
1062
1063 =item Arguments: (\@columns)
1064
1065 =back
1066
1067 Shortcut to include additional columns in the returned results - for example
1068
1069   $schema->resultset('CD')->search(undef, {
1070     include_columns => ['artist.name'],
1071     join => ['artist']
1072   });
1073
1074 would return all CDs and include a 'name' column to the information
1075 passed to object inflation
1076
1077 =head2 select
1078
1079 =over 4
1080
1081 =item Arguments: (\@columns)
1082
1083 =back
1084
1085 Indicates which columns should be selected from the storage. You can use
1086 column names, or in the case of RDBMS back ends, function or stored procedure
1087 names:
1088
1089   $rs = $schema->resultset('Employee')->search(undef, {
1090     select => [
1091       'name',
1092       { count => 'employeeid' },
1093       { sum => 'salary' }
1094     ]
1095   });
1096
1097 When you use function/stored procedure names and do not supply an C<as>
1098 attribute, the column names returned are storage-dependent. E.g. MySQL would
1099 return a column named C<count(employeeid)> in the above example.
1100
1101 =head2 as
1102
1103 =over 4
1104
1105 =item Arguments: (\@names)
1106
1107 =back
1108
1109 Indicates column names for object inflation. This is used in conjunction with
1110 C<select>, usually when C<select> contains one or more function or stored
1111 procedure names:
1112
1113   $rs = $schema->resultset('Employee')->search(undef, {
1114     select => [
1115       'name',
1116       { count => 'employeeid' }
1117     ],
1118     as => ['name', 'employee_count'],
1119   });
1120
1121   my $employee = $rs->first(); # get the first Employee
1122
1123 If the object against which the search is performed already has an accessor
1124 matching a column name specified in C<as>, the value can be retrieved using
1125 the accessor as normal:
1126
1127   my $name = $employee->name();
1128
1129 If on the other hand an accessor does not exist in the object, you need to
1130 use C<get_column> instead:
1131
1132   my $employee_count = $employee->get_column('employee_count');
1133
1134 You can create your own accessors if required - see
1135 L<DBIx::Class::Manual::Cookbook> for details.
1136
1137 =head2 join
1138
1139 Contains a list of relationships that should be joined for this query.  For
1140 example:
1141
1142   # Get CDs by Nine Inch Nails
1143   my $rs = $schema->resultset('CD')->search(
1144     { 'artist.name' => 'Nine Inch Nails' },
1145     { join => 'artist' }
1146   );
1147
1148 Can also contain a hash reference to refer to the other relation's relations.
1149 For example:
1150
1151   package MyApp::Schema::Track;
1152   use base qw/DBIx::Class/;
1153   __PACKAGE__->table('track');
1154   __PACKAGE__->add_columns(qw/trackid cd position title/);
1155   __PACKAGE__->set_primary_key('trackid');
1156   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1157   1;
1158
1159   # In your application
1160   my $rs = $schema->resultset('Artist')->search(
1161     { 'track.title' => 'Teardrop' },
1162     {
1163       join     => { cd => 'track' },
1164       order_by => 'artist.name',
1165     }
1166   );
1167
1168 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1169 similarly for a third time). For e.g.
1170
1171   my $rs = $schema->resultset('Artist')->search({
1172     'cds.title'   => 'Down to Earth',
1173     'cds_2.title' => 'Popular',
1174   }, {
1175     join => [ qw/cds cds/ ],
1176   });
1177
1178 will return a set of all artists that have both a cd with title 'Down
1179 to Earth' and a cd with title 'Popular'.
1180
1181 If you want to fetch related objects from other tables as well, see C<prefetch>
1182 below.
1183
1184 =head2 prefetch
1185
1186 =over 4
1187
1188 =item Arguments: (\@relationships)
1189
1190 =back
1191
1192 Contains one or more relationships that should be fetched along with the main 
1193 query (when they are accessed afterwards they will have already been
1194 "prefetched").  This is useful for when you know you will need the related
1195 objects, because it saves at least one query:
1196
1197   my $rs = $schema->resultset('Tag')->search(
1198     undef,
1199     {
1200       prefetch => {
1201         cd => 'artist'
1202       }
1203     }
1204   );
1205
1206 The initial search results in SQL like the following:
1207
1208   SELECT tag.*, cd.*, artist.* FROM tag
1209   JOIN cd ON tag.cd = cd.cdid
1210   JOIN artist ON cd.artist = artist.artistid
1211
1212 L<DBIx::Class> has no need to go back to the database when we access the
1213 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1214 case.
1215
1216 Simple prefetches will be joined automatically, so there is no need
1217 for a C<join> attribute in the above search. If you're prefetching to
1218 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1219 specify the join as well.
1220
1221 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1222 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1223 with an accessor type of 'single' or 'filter').
1224
1225 =head2 from
1226
1227 =over 4
1228
1229 =item Arguments: (\@array)
1230
1231 =back
1232
1233 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1234 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1235 clauses.
1236
1237 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1238 C<join> will usually do what you need and it is strongly recommended that you
1239 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1240
1241 In simple terms, C<from> works as follows:
1242
1243     [
1244         { <alias> => <table>, -join-type => 'inner|left|right' }
1245         [] # nested JOIN (optional)
1246         { <table.column> = <foreign_table.foreign_key> }
1247     ]
1248
1249     JOIN
1250         <alias> <table>
1251         [JOIN ...]
1252     ON <table.column> = <foreign_table.foreign_key>
1253
1254 An easy way to follow the examples below is to remember the following:
1255
1256     Anything inside "[]" is a JOIN
1257     Anything inside "{}" is a condition for the enclosing JOIN
1258
1259 The following examples utilize a "person" table in a family tree application.
1260 In order to express parent->child relationships, this table is self-joined:
1261
1262     # Person->belongs_to('father' => 'Person');
1263     # Person->belongs_to('mother' => 'Person');
1264
1265 C<from> can be used to nest joins. Here we return all children with a father,
1266 then search against all mothers of those children:
1267
1268   $rs = $schema->resultset('Person')->search(
1269       undef,
1270       {
1271           alias => 'mother', # alias columns in accordance with "from"
1272           from => [
1273               { mother => 'person' },
1274               [
1275                   [
1276                       { child => 'person' },
1277                       [
1278                           { father => 'person' },
1279                           { 'father.person_id' => 'child.father_id' }
1280                       ]
1281                   ],
1282                   { 'mother.person_id' => 'child.mother_id' }
1283               ],
1284           ]
1285       },
1286   );
1287
1288   # Equivalent SQL:
1289   # SELECT mother.* FROM person mother
1290   # JOIN (
1291   #   person child
1292   #   JOIN person father
1293   #   ON ( father.person_id = child.father_id )
1294   # )
1295   # ON ( mother.person_id = child.mother_id )
1296
1297 The type of any join can be controlled manually. To search against only people
1298 with a father in the person table, we could explicitly use C<INNER JOIN>:
1299
1300     $rs = $schema->resultset('Person')->search(
1301         undef,
1302         {
1303             alias => 'child', # alias columns in accordance with "from"
1304             from => [
1305                 { child => 'person' },
1306                 [
1307                     { father => 'person', -join-type => 'inner' },
1308                     { 'father.id' => 'child.father_id' }
1309                 ],
1310             ]
1311         },
1312     );
1313
1314     # Equivalent SQL:
1315     # SELECT child.* FROM person child
1316     # INNER JOIN person father ON child.father_id = father.id
1317
1318 =head2 page
1319
1320 =over 4
1321
1322 =item Arguments: ($page)
1323
1324 =back
1325
1326 For a paged resultset, specifies which page to retrieve.  Leave unset
1327 for an unpaged resultset.
1328
1329 =head2 rows
1330
1331 =over 4
1332
1333 =item Arguments: ($rows)
1334
1335 =back
1336
1337 For a paged resultset, specifies how many rows are in each page:
1338
1339   rows => 10
1340
1341 Can also be used to simulate an SQL C<LIMIT>.
1342
1343 =head2 group_by
1344
1345 =over 4
1346
1347 =item Arguments: (\@columns)
1348
1349 =back
1350
1351 A arrayref of columns to group by. Can include columns of joined tables.
1352
1353   group_by => [qw/ column1 column2 ... /]
1354
1355 =head2 distinct
1356
1357 Set to 1 to group by all columns.
1358
1359 =head2 cache
1360
1361 Set to 1 to cache search results. This prevents extra SQL queries if you
1362 revisit rows in your ResultSet:
1363
1364   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1365   
1366   while( my $artist = $resultset->next ) {
1367     ... do stuff ...
1368   }
1369
1370   $rs->first; # without cache, this would issue a query 
1371
1372 By default, searches are not cached.
1373
1374 For more examples of using these attributes, see
1375 L<DBIx::Class::Manual::Cookbook>.
1376
1377 =cut
1378
1379 1;