code reformatting for readibility
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =head3 Arguments: ($source, \%$attrs)
57
58 The resultset constructor. Takes a source object (usually a
59 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
60 L</ATTRIBUTES> below).  Does not perform any queries -- these are
61 executed as needed by the other methods.
62
63 Generally you won't need to construct a resultset manually.  You'll
64 automatically get one from e.g. a L</search> called in scalar context:
65
66   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
67
68 =cut
69
70 sub new {
71   my $class = shift;
72   return $class->new_result(@_) if ref $class;
73   
74   my ($source, $attrs) = @_;
75   weaken $source;
76   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
77   #use Data::Dumper; warn Dumper($attrs);
78   my $alias = ($attrs->{alias} ||= 'me');
79   
80   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
81   delete $attrs->{as} if $attrs->{columns};
82   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
83   $attrs->{select} = [
84     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
85   ] if $attrs->{columns};
86   $attrs->{as} ||= [
87     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
88   ];
89   if (my $include = delete $attrs->{include_columns}) {
90     push(@{$attrs->{select}}, @$include);
91     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
92   }
93   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
94
95   $attrs->{from} ||= [ { $alias => $source->from } ];
96   $attrs->{seen_join} ||= {};
97   my %seen;
98   if (my $join = delete $attrs->{join}) {
99     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
100       if (ref $j eq 'HASH') {
101         $seen{$_} = 1 foreach keys %$j;
102       } else {
103         $seen{$j} = 1;
104       }
105     }
106     push(@{$attrs->{from}}, $source->resolve_join(
107       $join, $attrs->{alias}, $attrs->{seen_join})
108     );
109   }
110   
111   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
112   $attrs->{order_by} = [ $attrs->{order_by} ] if
113     $attrs->{order_by} and !ref($attrs->{order_by});
114   $attrs->{order_by} ||= [];
115
116   my $collapse = $attrs->{collapse} || {};
117   if (my $prefetch = delete $attrs->{prefetch}) {
118     my @pre_order;
119     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
120       if ( ref $p eq 'HASH' ) {
121         foreach my $key (keys %$p) {
122           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
123             unless $seen{$key};
124         }
125       } else {
126         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
127             unless $seen{$p};
128       }
129       my @prefetch = $source->resolve_prefetch(
130            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
131       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
132       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
133     }
134     push(@{$attrs->{order_by}}, @pre_order);
135   }
136   $attrs->{collapse} = $collapse;
137 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
138
139   if ($attrs->{page}) {
140     $attrs->{rows} ||= 10;
141     $attrs->{offset} ||= 0;
142     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
143   }
144
145   bless {
146     result_source => $source,
147     result_class => $attrs->{result_class} || $source->result_class,
148     cond => $attrs->{where},
149     from => $attrs->{from},
150     collapse => $collapse,
151     count => undef,
152     page => delete $attrs->{page},
153     pager => undef,
154     attrs => $attrs
155   }, $class;
156 }
157
158 =head2 search
159
160   my @cds    = $rs->search({ year => 2001 }); # "... WHERE year = 2001"
161   my $new_rs = $rs->search({ year => 2005 });
162
163 If you need to pass in additional attributes but no additional condition,
164 call it as C<search(undef, \%attrs);>.
165
166   # "SELECT name, artistid FROM $artist_table"
167   my @all_artists = $schema->resultset('Artist')->search(undef, {
168     columns => [qw/name artistid/],
169   });
170
171 =cut
172
173 sub search {
174   my $self = shift;
175
176   my $rs;
177   if( @_ ) {
178     
179     my $attrs = { %{$self->{attrs}} };
180     my $having = delete $attrs->{having};
181     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
182
183     my $where = (@_
184                   ? ((@_ == 1 || ref $_[0] eq "HASH")
185                       ? shift
186                       : ((@_ % 2)
187                           ? $self->throw_exception(
188                               "Odd number of arguments to search")
189                           : {@_}))
190                   : undef());
191     if (defined $where) {
192       $attrs->{where} = (defined $attrs->{where}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $where, $attrs->{where} ] }
196                 : $where);
197     }
198
199     if (defined $having) {
200       $attrs->{having} = (defined $attrs->{having}
201                 ? { '-and' =>
202                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
203                         $having, $attrs->{having} ] }
204                 : $having);
205     }
206
207     $rs = (ref $self)->new($self->result_source, $attrs);
208   }
209   else {
210     $rs = $self;
211     $rs->reset;
212   }
213   return (wantarray ? $rs->all : $rs);
214 }
215
216 =head2 search_literal
217
218   my @obj    = $rs->search_literal($literal_where_cond, @bind);
219   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
220
221 Pass a literal chunk of SQL to be added to the conditional part of the
222 resultset.
223
224 =cut
225
226 sub search_literal {
227   my ($self, $cond, @vals) = @_;
228   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
229   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
230   return $self->search(\$cond, $attrs);
231 }
232
233 =head2 find
234
235 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
236
237 Finds a row based on its primary key or unique constraint. For example:
238
239   my $cd = $schema->resultset('CD')->find(5);
240
241 Also takes an optional C<key> attribute, to search by a specific key or unique
242 constraint. For example:
243
244   my $cd = $schema->resultset('CD')->find(
245     {
246       artist => 'Massive Attack',
247       title  => 'Mezzanine',
248     },
249     { key => 'artist_title' }
250   );
251
252 See also L</find_or_create> and L</update_or_create>.
253
254 =cut
255
256 sub find {
257   my ($self, @vals) = @_;
258   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
259
260   my @cols = $self->result_source->primary_columns;
261   if (exists $attrs->{key}) {
262     my %uniq = $self->result_source->unique_constraints;
263     $self->throw_exception(
264       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
265     ) unless exists $uniq{$attrs->{key}};
266     @cols = @{ $uniq{$attrs->{key}} };
267   }
268   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
269   $self->throw_exception(
270     "Can't find unless a primary key or unique constraint is defined"
271   ) unless @cols;
272
273   my $query;
274   if (ref $vals[0] eq 'HASH') {
275     $query = { %{$vals[0]} };
276   } elsif (@cols == @vals) {
277     $query = {};
278     @{$query}{@cols} = @vals;
279   } else {
280     $query = {@vals};
281   }
282   foreach my $key (grep { ! m/\./ } keys %$query) {
283     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
284   }
285   #warn Dumper($query);
286   
287   if (keys %$attrs) {
288       my $rs = $self->search($query,$attrs);
289       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
290   } else {
291       return keys %{$self->{collapse}} ?
292         $self->search($query)->next :
293         $self->single($query);
294   }
295 }
296
297 =head2 search_related
298
299   $rs->search_related('relname', $cond?, $attrs?);
300
301 Search the specified relationship. Optionally specify a condition for matching
302 records.
303
304 =cut
305
306 sub search_related {
307   return shift->related_resultset(shift)->search(@_);
308 }
309
310 =head2 cursor
311
312 Returns a storage-driven cursor to the given resultset.
313
314 =cut
315
316 sub cursor {
317   my ($self) = @_;
318   my $attrs = { %{$self->{attrs}} };
319   return $self->{cursor}
320     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
321           $attrs->{where},$attrs);
322 }
323
324 =head2 single
325
326 Inflates the first result without creating a cursor
327
328 =cut
329
330 sub single {
331   my ($self, $where) = @_;
332   my $attrs = { %{$self->{attrs}} };
333   if ($where) {
334     if (defined $attrs->{where}) {
335       $attrs->{where} = {
336         '-and' => 
337             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
338                $where, delete $attrs->{where} ]
339       };
340     } else {
341       $attrs->{where} = $where;
342     }
343   }
344   my @data = $self->result_source->storage->select_single(
345           $self->{from}, $attrs->{select},
346           $attrs->{where},$attrs);
347   return (@data ? $self->_construct_object(@data) : ());
348 }
349
350
351 =head2 search_like
352
353 Perform a search, but use C<LIKE> instead of equality as the condition. Note
354 that this is simply a convenience method; you most likely want to use
355 L</search> with specific operators.
356
357 For more information, see L<DBIx::Class::Manual::Cookbook>.
358
359 =cut
360
361 sub search_like {
362   my $class = shift;
363   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
364   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
365   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
366   return $class->search($query, { %$attrs });
367 }
368
369 =head2 slice
370
371 =head3 Arguments: ($first, $last)
372
373 Returns a subset of elements from the resultset.
374
375 =cut
376
377 sub slice {
378   my ($self, $min, $max) = @_;
379   my $attrs = { %{ $self->{attrs} || {} } };
380   $attrs->{offset} ||= 0;
381   $attrs->{offset} += $min;
382   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
383   my $slice = (ref $self)->new($self->result_source, $attrs);
384   return (wantarray ? $slice->all : $slice);
385 }
386
387 =head2 next
388
389 Returns the next element in the resultset (C<undef> is there is none).
390
391 Can be used to efficiently iterate over records in the resultset:
392
393   my $rs = $schema->resultset('CD')->search;
394   while (my $cd = $rs->next) {
395     print $cd->title;
396   }
397
398 =cut
399
400 sub next {
401   my ($self) = @_;
402   if (@{$self->{all_cache} || []}) {
403     $self->{all_cache_position} ||= 0;
404     return $self->{all_cache}->[$self->{all_cache_position}++];
405   }
406   if ($self->{attrs}{cache}) {
407     $self->{all_cache_position} = 1;
408     return ($self->all)[0];
409   }
410   my @row = (exists $self->{stashed_row} ?
411                @{delete $self->{stashed_row}} :
412                $self->cursor->next
413   );
414 #  warn Dumper(\@row); use Data::Dumper;
415   return unless (@row);
416   return $self->_construct_object(@row);
417 }
418
419 sub _construct_object {
420   my ($self, @row) = @_;
421   my @as = @{ $self->{attrs}{as} };
422   
423   my $info = $self->_collapse_result(\@as, \@row);
424   
425   my $new = $self->result_class->inflate_result($self->result_source, @$info);
426   
427   $new = $self->{attrs}{record_filter}->($new)
428     if exists $self->{attrs}{record_filter};
429   return $new;
430 }
431
432 sub _collapse_result {
433   my ($self, $as, $row, $prefix) = @_;
434
435   my %const;
436
437   my @copy = @$row;
438   foreach my $this_as (@$as) {
439     my $val = shift @copy;
440     if (defined $prefix) {
441       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
442         my $remain = $1;
443         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
444         $const{$1||''}{$2} = $val;
445       }
446     } else {
447       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
448       $const{$1||''}{$2} = $val;
449     }
450   }
451
452   my $info = [ {}, {} ];
453   foreach my $key (keys %const) {
454     if (length $key) {
455       my $target = $info;
456       my @parts = split(/\./, $key);
457       foreach my $p (@parts) {
458         $target = $target->[1]->{$p} ||= [];
459       }
460       $target->[0] = $const{$key};
461     } else {
462       $info->[0] = $const{$key};
463     }
464   }
465
466   my @collapse;
467   if (defined $prefix) {
468     @collapse = map {
469         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
470     } keys %{$self->{collapse}})
471   } else {
472     @collapse = keys %{$self->{collapse}};
473   );
474
475   if (@collapse) {
476     my ($c) = sort { length $a <=> length $b } @collapse;
477     my $target = $info;
478     foreach my $p (split(/\./, $c)) {
479       $target = $target->[1]->{$p} ||= [];
480     }
481     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
482     my @co_key = @{$self->{collapse}{$c_prefix}};
483     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
484     my $tree = $self->_collapse_result($as, $row, $c_prefix);
485     my (@final, @raw);
486     while ( !(grep {
487                 !defined($tree->[0]->{$_}) ||
488                 $co_check{$_} ne $tree->[0]->{$_}
489               } @co_key) ) {
490       push(@final, $tree);
491       last unless (@raw = $self->cursor->next);
492       $row = $self->{stashed_row} = \@raw;
493       $tree = $self->_collapse_result($as, $row, $c_prefix);
494       #warn Data::Dumper::Dumper($tree, $row);
495     }
496     @$target = @final;
497   }
498
499   return $info;
500 }
501
502 =head2 result_source
503
504 Returns a reference to the result source for this recordset.
505
506 =cut
507
508
509 =head2 count
510
511 Performs an SQL C<COUNT> with the same query as the resultset was built
512 with to find the number of elements. If passed arguments, does a search
513 on the resultset and counts the results of that.
514
515 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
516 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
517 not support C<DISTINCT> with multiple columns. If you are using such a
518 database, you should only use columns from the main table in your C<group_by>
519 clause.
520
521 =cut
522
523 sub count {
524   my $self = shift;
525   return $self->search(@_)->count if @_ and defined $_[0];
526   return scalar @{ $self->get_cache } if @{ $self->get_cache };
527
528   my $count = $self->_count;
529   return 0 unless $count;
530
531   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
532   $count = $self->{attrs}{rows} if
533     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
534   return $count;
535 }
536
537 sub _count { # Separated out so pager can get the full count
538   my $self = shift;
539   my $select = { count => '*' };
540   my $attrs = { %{ $self->{attrs} } };
541   if (my $group_by = delete $attrs->{group_by}) {
542     delete $attrs->{having};
543     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
544     # todo: try CONCAT for multi-column pk
545     my @pk = $self->result_source->primary_columns;
546     if (@pk == 1) {
547       foreach my $column (@distinct) {
548         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
549           @distinct = ($column);
550           last;
551         }
552       } 
553     }
554
555     $select = { count => { distinct => \@distinct } };
556     #use Data::Dumper; die Dumper $select;
557   }
558
559   $attrs->{select} = $select;
560   $attrs->{as} = [qw/count/];
561
562   # offset, order by and page are not needed to count. record_filter is cdbi
563   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
564         
565   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
566   return $count;
567 }
568
569 =head2 count_literal
570
571 Calls L</search_literal> with the passed arguments, then L</count>.
572
573 =cut
574
575 sub count_literal { shift->search_literal(@_)->count; }
576
577 =head2 all
578
579 Returns all elements in the resultset. Called implictly if the resultset
580 is returned in list context.
581
582 =cut
583
584 sub all {
585   my ($self) = @_;
586   return @{ $self->get_cache } if @{ $self->get_cache };
587
588   my @obj;
589
590   if (keys %{$self->{collapse}}) {
591       # Using $self->cursor->all is really just an optimisation.
592       # If we're collapsing has_many prefetches it probably makes
593       # very little difference, and this is cleaner than hacking
594       # _construct_object to survive the approach
595     $self->cursor->reset;
596     my @row = $self->cursor->next;
597     while (@row) {
598       push(@obj, $self->_construct_object(@row));
599       @row = (exists $self->{stashed_row}
600                ? @{delete $self->{stashed_row}}
601                : $self->cursor->next);
602     }
603   } else {
604     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
605   }
606
607   $self->set_cache(\@obj) if $self->{attrs}{cache};
608   return @obj;
609 }
610
611 =head2 reset
612
613 Resets the resultset's cursor, so you can iterate through the elements again.
614
615 =cut
616
617 sub reset {
618   my ($self) = @_;
619   $self->{all_cache_position} = 0;
620   $self->cursor->reset;
621   return $self;
622 }
623
624 =head2 first
625
626 Resets the resultset and returns the first element.
627
628 =cut
629
630 sub first {
631   return $_[0]->reset->next;
632 }
633
634 =head2 update
635
636 =head3 Arguments: (\%values)
637
638 Sets the specified columns in the resultset to the supplied values.
639
640 =cut
641
642 sub update {
643   my ($self, $values) = @_;
644   $self->throw_exception("Values for update must be a hash")
645     unless ref $values eq 'HASH';
646   return $self->result_source->storage->update(
647            $self->result_source->from, $values, $self->{cond});
648 }
649
650 =head2 update_all
651
652 =head3 Arguments: (\%values)
653
654 Fetches all objects and updates them one at a time.  Note that C<update_all>
655 will run cascade triggers while L</update> will not.
656
657 =cut
658
659 sub update_all {
660   my ($self, $values) = @_;
661   $self->throw_exception("Values for update must be a hash")
662     unless ref $values eq 'HASH';
663   foreach my $obj ($self->all) {
664     $obj->set_columns($values)->update;
665   }
666   return 1;
667 }
668
669 =head2 delete
670
671 Deletes the contents of the resultset from its result source.
672
673 =cut
674
675 sub delete {
676   my ($self) = @_;
677   my $del = {};
678
679   if (!ref($self->{cond})) {
680
681     # No-op. No condition, we're deleting everything
682
683   } elsif (ref $self->{cond} eq 'ARRAY') {
684
685     $del = [ map { my %hash;
686       foreach my $key (keys %{$_}) {
687         $key =~ /([^.]+)$/;
688         $hash{$1} = $_->{$key};
689       }; \%hash; } @{$self->{cond}} ];
690
691   } elsif (ref $self->{cond} eq 'HASH') {
692
693     if ((keys %{$self->{cond}})[0] eq '-and') {
694
695       $del->{-and} = [ map { my %hash;
696         foreach my $key (keys %{$_}) {
697           $key =~ /([^.]+)$/;
698           $hash{$1} = $_->{$key};
699         }; \%hash; } @{$self->{cond}{-and}} ];
700
701     } else {
702
703       foreach my $key (keys %{$self->{cond}}) {
704         $key =~ /([^.]+)$/;
705         $del->{$1} = $self->{cond}{$key};
706       }
707     }
708   } else {
709     $self->throw_exception(
710       "Can't delete on resultset with condition unless hash or array");
711   }
712
713   $self->result_source->storage->delete($self->result_source->from, $del);
714   return 1;
715 }
716
717 =head2 delete_all
718
719 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
720 will run cascade triggers while L</delete> will not.
721
722 =cut
723
724 sub delete_all {
725   my ($self) = @_;
726   $_->delete for $self->all;
727   return 1;
728 }
729
730 =head2 pager
731
732 Returns a L<Data::Page> object for the current resultset. Only makes
733 sense for queries with a C<page> attribute.
734
735 =cut
736
737 sub pager {
738   my ($self) = @_;
739   my $attrs = $self->{attrs};
740   $self->throw_exception("Can't create pager for non-paged rs")
741     unless $self->{page};
742   $attrs->{rows} ||= 10;
743   return $self->{pager} ||= Data::Page->new(
744     $self->_count, $attrs->{rows}, $self->{page});
745 }
746
747 =head2 page
748
749 =head3 Arguments: ($page_num)
750
751 Returns a new resultset for the specified page.
752
753 =cut
754
755 sub page {
756   my ($self, $page) = @_;
757   my $attrs = { %{$self->{attrs}} };
758   $attrs->{page} = $page;
759   return (ref $self)->new($self->result_source, $attrs);
760 }
761
762 =head2 new_result
763
764 =head3 Arguments: (\%vals)
765
766 Creates a result in the resultset's result class.
767
768 =cut
769
770 sub new_result {
771   my ($self, $values) = @_;
772   $self->throw_exception( "new_result needs a hash" )
773     unless (ref $values eq 'HASH');
774   $self->throw_exception(
775     "Can't abstract implicit construct, condition not a hash"
776   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
777   my %new = %$values;
778   my $alias = $self->{attrs}{alias};
779   foreach my $key (keys %{$self->{cond}||{}}) {
780     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
781   }
782   my $obj = $self->result_class->new(\%new);
783   $obj->result_source($self->result_source) if $obj->can('result_source');
784   return $obj;
785 }
786
787 =head2 create
788
789 =head3 Arguments: (\%vals)
790
791 Inserts a record into the resultset and returns the object.
792
793 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
794
795 =cut
796
797 sub create {
798   my ($self, $attrs) = @_;
799   $self->throw_exception( "create needs a hashref" )
800     unless ref $attrs eq 'HASH';
801   return $self->new_result($attrs)->insert;
802 }
803
804 =head2 find_or_create
805
806 =head3 Arguments: (\%vals, \%attrs?)
807
808   $class->find_or_create({ key => $val, ... });
809
810 Searches for a record matching the search condition; if it doesn't find one,
811 creates one and returns that instead.
812
813   my $cd = $schema->resultset('CD')->find_or_create({
814     cdid   => 5,
815     artist => 'Massive Attack',
816     title  => 'Mezzanine',
817     year   => 2005,
818   });
819
820 Also takes an optional C<key> attribute, to search by a specific key or unique
821 constraint. For example:
822
823   my $cd = $schema->resultset('CD')->find_or_create(
824     {
825       artist => 'Massive Attack',
826       title  => 'Mezzanine',
827     },
828     { key => 'artist_title' }
829   );
830
831 See also L</find> and L</update_or_create>.
832
833 =cut
834
835 sub find_or_create {
836   my $self     = shift;
837   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
839   my $exists   = $self->find($hash, $attrs);
840   return defined $exists ? $exists : $self->create($hash);
841 }
842
843 =head2 update_or_create
844
845   $class->update_or_create({ key => $val, ... });
846
847 First, search for an existing row matching one of the unique constraints
848 (including the primary key) on the source of this resultset.  If a row is
849 found, update it with the other given column values.  Otherwise, create a new
850 row.
851
852 Takes an optional C<key> attribute to search on a specific unique constraint.
853 For example:
854
855   # In your application
856   my $cd = $schema->resultset('CD')->update_or_create(
857     {
858       artist => 'Massive Attack',
859       title  => 'Mezzanine',
860       year   => 1998,
861     },
862     { key => 'artist_title' }
863   );
864
865 If no C<key> is specified, it searches on all unique constraints defined on the
866 source, including the primary key.
867
868 If the C<key> is specified as C<primary>, search only on the primary key.
869
870 See also L</find> and L</find_or_create>.
871
872 =cut
873
874 sub update_or_create {
875   my $self = shift;
876   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
877   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
878
879   my %unique_constraints = $self->result_source->unique_constraints;
880   my @constraint_names   = (exists $attrs->{key}
881                             ? ($attrs->{key})
882                             : keys %unique_constraints);
883
884   my @unique_hashes;
885   foreach my $name (@constraint_names) {
886     my @unique_cols = @{ $unique_constraints{$name} };
887     my %unique_hash =
888       map  { $_ => $hash->{$_} }
889       grep { exists $hash->{$_} }
890       @unique_cols;
891
892     push @unique_hashes, \%unique_hash
893       if (scalar keys %unique_hash == scalar @unique_cols);
894   }
895
896   if (@unique_hashes) {
897     my $row = $self->single(\@unique_hashes);
898     if (defined $row) {
899       $row->set_columns($hash);
900       $row->update;
901       return $row;
902     }
903   }
904
905   return $self->create($hash);
906 }
907
908 =head2 get_cache
909
910 Gets the contents of the cache for the resultset.
911
912 =cut
913
914 sub get_cache {
915   shift->{all_cache} || [];
916 }
917
918 =head2 set_cache
919
920 Sets the contents of the cache for the resultset. Expects an arrayref
921 of objects of the same class as those produced by the resultset.
922
923 =cut
924
925 sub set_cache {
926   my ( $self, $data ) = @_;
927   $self->throw_exception("set_cache requires an arrayref")
928     if ref $data ne 'ARRAY';
929   my $result_class = $self->result_class;
930   foreach( @$data ) {
931     $self->throw_exception(
932       "cannot cache object of type '$_', expected '$result_class'"
933     ) if ref $_ ne $result_class;
934   }
935   $self->{all_cache} = $data;
936 }
937
938 =head2 clear_cache
939
940 Clears the cache for the resultset.
941
942 =cut
943
944 sub clear_cache {
945   shift->set_cache([]);
946 }
947
948 =head2 related_resultset
949
950 Returns a related resultset for the supplied relationship name.
951
952   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
953
954 =cut
955
956 sub related_resultset {
957   my ( $self, $rel, @rest ) = @_;
958   $self->{related_resultsets} ||= {};
959   return $self->{related_resultsets}{$rel} ||= do {
960       #warn "fetching related resultset for rel '$rel'";
961       my $rel_obj = $self->result_source->relationship_info($rel);
962       $self->throw_exception(
963         "search_related: result source '" . $self->result_source->name .
964         "' has no such relationship ${rel}")
965         unless $rel_obj; #die Dumper $self->{attrs};
966
967       my $rs = $self->search(undef, { join => $rel });
968       my $alias = defined $rs->{attrs}{seen_join}{$rel}
969                     && $rs->{attrs}{seen_join}{$rel} > 1
970                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
971                   : $rel;
972
973       $self->result_source->schema->resultset($rel_obj->{class}
974            )->search( undef,
975              { %{$rs->{attrs}},
976                alias => $alias,
977                select => undef,
978                as => undef }
979            )->search(@rest);      
980   };
981 }
982
983 =head2 throw_exception
984
985 See Schema's throw_exception
986
987 =cut
988
989 sub throw_exception {
990   my $self=shift;
991   $self->result_source->schema->throw_exception(@_);
992 }
993
994 =head1 ATTRIBUTES
995
996 The resultset takes various attributes that modify its behavior. Here's an
997 overview of them:
998
999 =head2 order_by
1000
1001 Which column(s) to order the results by. This is currently passed
1002 through directly to SQL, so you can give e.g. C<year DESC> for a
1003 descending order on the column `year'.
1004
1005 =head2 columns
1006
1007 =head3 Arguments: (arrayref)
1008
1009 Shortcut to request a particular set of columns to be retrieved.  Adds
1010 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1011 from that, then auto-populates C<as> from C<select> as normal. (You may also
1012 use the C<cols> attribute, as in earlier versions of DBIC.)
1013
1014 =head2 include_columns
1015
1016 =head3 Arguments: (arrayref)
1017
1018 Shortcut to include additional columns in the returned results - for example
1019
1020   $schema->resultset('CD')->search(undef, {
1021     include_columns => ['artist.name'],
1022     join => ['artist']
1023   });
1024
1025 would return all CDs and include a 'name' column to the information
1026 passed to object inflation
1027
1028 =head2 select
1029
1030 =head3 Arguments: (arrayref)
1031
1032 Indicates which columns should be selected from the storage. You can use
1033 column names, or in the case of RDBMS back ends, function or stored procedure
1034 names:
1035
1036   $rs = $schema->resultset('Employee')->search(undef, {
1037     select => [
1038       'name',
1039       { count => 'employeeid' },
1040       { sum => 'salary' }
1041     ]
1042   });
1043
1044 When you use function/stored procedure names and do not supply an C<as>
1045 attribute, the column names returned are storage-dependent. E.g. MySQL would
1046 return a column named C<count(employeeid)> in the above example.
1047
1048 =head2 as
1049
1050 =head3 Arguments: (arrayref)
1051
1052 Indicates column names for object inflation. This is used in conjunction with
1053 C<select>, usually when C<select> contains one or more function or stored
1054 procedure names:
1055
1056   $rs = $schema->resultset('Employee')->search(undef, {
1057     select => [
1058       'name',
1059       { count => 'employeeid' }
1060     ],
1061     as => ['name', 'employee_count'],
1062   });
1063
1064   my $employee = $rs->first(); # get the first Employee
1065
1066 If the object against which the search is performed already has an accessor
1067 matching a column name specified in C<as>, the value can be retrieved using
1068 the accessor as normal:
1069
1070   my $name = $employee->name();
1071
1072 If on the other hand an accessor does not exist in the object, you need to
1073 use C<get_column> instead:
1074
1075   my $employee_count = $employee->get_column('employee_count');
1076
1077 You can create your own accessors if required - see
1078 L<DBIx::Class::Manual::Cookbook> for details.
1079
1080 =head2 join
1081
1082 Contains a list of relationships that should be joined for this query.  For
1083 example:
1084
1085   # Get CDs by Nine Inch Nails
1086   my $rs = $schema->resultset('CD')->search(
1087     { 'artist.name' => 'Nine Inch Nails' },
1088     { join => 'artist' }
1089   );
1090
1091 Can also contain a hash reference to refer to the other relation's relations.
1092 For example:
1093
1094   package MyApp::Schema::Track;
1095   use base qw/DBIx::Class/;
1096   __PACKAGE__->table('track');
1097   __PACKAGE__->add_columns(qw/trackid cd position title/);
1098   __PACKAGE__->set_primary_key('trackid');
1099   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1100   1;
1101
1102   # In your application
1103   my $rs = $schema->resultset('Artist')->search(
1104     { 'track.title' => 'Teardrop' },
1105     {
1106       join     => { cd => 'track' },
1107       order_by => 'artist.name',
1108     }
1109   );
1110
1111 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1112 similarly for a third time). For e.g.
1113
1114   my $rs = $schema->resultset('Artist')->search({
1115     'cds.title'   => 'Down to Earth',
1116     'cds_2.title' => 'Popular',
1117   }, {
1118     join => [ qw/cds cds/ ],
1119   });
1120
1121 will return a set of all artists that have both a cd with title 'Down
1122 to Earth' and a cd with title 'Popular'.
1123
1124 If you want to fetch related objects from other tables as well, see C<prefetch>
1125 below.
1126
1127 =head2 prefetch
1128
1129 =head3 Arguments: arrayref/hashref
1130
1131 Contains one or more relationships that should be fetched along with the main 
1132 query (when they are accessed afterwards they will have already been
1133 "prefetched").  This is useful for when you know you will need the related
1134 objects, because it saves at least one query:
1135
1136   my $rs = $schema->resultset('Tag')->search(
1137     undef,
1138     {
1139       prefetch => {
1140         cd => 'artist'
1141       }
1142     }
1143   );
1144
1145 The initial search results in SQL like the following:
1146
1147   SELECT tag.*, cd.*, artist.* FROM tag
1148   JOIN cd ON tag.cd = cd.cdid
1149   JOIN artist ON cd.artist = artist.artistid
1150
1151 L<DBIx::Class> has no need to go back to the database when we access the
1152 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1153 case.
1154
1155 Simple prefetches will be joined automatically, so there is no need
1156 for a C<join> attribute in the above search. If you're prefetching to
1157 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1158 specify the join as well.
1159
1160 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1161 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1162 with an accessor type of 'single' or 'filter').
1163
1164 =head2 from
1165
1166 =head3 Arguments: (arrayref)
1167
1168 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1169 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1170 clauses.
1171
1172 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1173 C<join> will usually do what you need and it is strongly recommended that you
1174 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1175
1176 In simple terms, C<from> works as follows:
1177
1178     [
1179         { <alias> => <table>, -join-type => 'inner|left|right' }
1180         [] # nested JOIN (optional)
1181         { <table.column> = <foreign_table.foreign_key> }
1182     ]
1183
1184     JOIN
1185         <alias> <table>
1186         [JOIN ...]
1187     ON <table.column> = <foreign_table.foreign_key>
1188
1189 An easy way to follow the examples below is to remember the following:
1190
1191     Anything inside "[]" is a JOIN
1192     Anything inside "{}" is a condition for the enclosing JOIN
1193
1194 The following examples utilize a "person" table in a family tree application.
1195 In order to express parent->child relationships, this table is self-joined:
1196
1197     # Person->belongs_to('father' => 'Person');
1198     # Person->belongs_to('mother' => 'Person');
1199
1200 C<from> can be used to nest joins. Here we return all children with a father,
1201 then search against all mothers of those children:
1202
1203   $rs = $schema->resultset('Person')->search(
1204       undef,
1205       {
1206           alias => 'mother', # alias columns in accordance with "from"
1207           from => [
1208               { mother => 'person' },
1209               [
1210                   [
1211                       { child => 'person' },
1212                       [
1213                           { father => 'person' },
1214                           { 'father.person_id' => 'child.father_id' }
1215                       ]
1216                   ],
1217                   { 'mother.person_id' => 'child.mother_id' }
1218               ],
1219           ]
1220       },
1221   );
1222
1223   # Equivalent SQL:
1224   # SELECT mother.* FROM person mother
1225   # JOIN (
1226   #   person child
1227   #   JOIN person father
1228   #   ON ( father.person_id = child.father_id )
1229   # )
1230   # ON ( mother.person_id = child.mother_id )
1231
1232 The type of any join can be controlled manually. To search against only people
1233 with a father in the person table, we could explicitly use C<INNER JOIN>:
1234
1235     $rs = $schema->resultset('Person')->search(
1236         undef,
1237         {
1238             alias => 'child', # alias columns in accordance with "from"
1239             from => [
1240                 { child => 'person' },
1241                 [
1242                     { father => 'person', -join-type => 'inner' },
1243                     { 'father.id' => 'child.father_id' }
1244                 ],
1245             ]
1246         },
1247     );
1248
1249     # Equivalent SQL:
1250     # SELECT child.* FROM person child
1251     # INNER JOIN person father ON child.father_id = father.id
1252
1253 =head2 page
1254
1255 For a paged resultset, specifies which page to retrieve.  Leave unset
1256 for an unpaged resultset.
1257
1258 =head2 rows
1259
1260 For a paged resultset, how many rows per page:
1261
1262   rows => 10
1263
1264 Can also be used to simulate an SQL C<LIMIT>.
1265
1266 =head2 group_by
1267
1268 =head3 Arguments: (arrayref)
1269
1270 A arrayref of columns to group by. Can include columns of joined tables.
1271
1272   group_by => [qw/ column1 column2 ... /]
1273
1274 =head2 distinct
1275
1276 Set to 1 to group by all columns.
1277
1278 =head2 cache
1279
1280 Set to 1 to cache search results. This prevents extra SQL queries if you
1281 revisit rows in your ResultSet:
1282
1283   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1284   
1285   while( my $artist = $resultset->next ) {
1286     ... do stuff ...
1287   }
1288
1289   $rs->first; # without cache, this would issue a query 
1290
1291 By default, searches are not cached.
1292
1293 For more examples of using these attributes, see
1294 L<DBIx::Class::Manual::Cookbook>.
1295
1296 =cut
1297
1298 1;