collapse has to be an attr too, bah
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   $attrs->{order_by} = [ $attrs->{order_by} ]
106     if $attrs->{order_by} && !ref($attrs->{order_by});
107   $attrs->{order_by} ||= [];
108
109   my $collapse = $attrs->{collapse} || {};
110
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY'
114               ? (@{$prefetch}) : ($prefetch)) {
115       if( ref $p eq 'HASH' ) {
116         foreach my $key (keys %$p) {
117           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
118             unless $seen{$key};
119         }
120       }
121       else {
122         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
123             unless $seen{$p};
124       }
125       my @prefetch = $source->resolve_prefetch(
126            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
127       #die Dumper \@cols;
128       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
129       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
130     }
131     push(@{$attrs->{order_by}}, @pre_order);
132   }
133
134   $attrs->{collapse} = $collapse;
135
136   if ($attrs->{page}) {
137     $attrs->{rows} ||= 10;
138     $attrs->{offset} ||= 0;
139     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
140   }
141
142 #if (keys %{$collapse}) {
143 #  use Data::Dumper; warn Dumper($collapse);
144 #}
145
146   my $new = {
147     result_source => $source,
148     result_class => $attrs->{result_class} || $source->result_class,
149     cond => $attrs->{where},
150     from => $attrs->{from},
151     collapse => $collapse,
152     count => undef,
153     page => delete $attrs->{page},
154     pager => undef,
155     attrs => $attrs };
156   bless ($new, $class);
157   return $new;
158 }
159
160 =head2 search
161
162   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
163   my $new_rs = $rs->search({ foo => 3 });
164
165 If you need to pass in additional attributes but no additional condition,
166 call it as C<search({}, \%attrs);>.
167
168   # "SELECT foo, bar FROM $class_table"
169   my @all = $class->search({}, { cols => [qw/foo bar/] });
170
171 =cut
172
173 sub search {
174   my $self = shift;
175
176   my $rs;
177   if( @_ ) {
178     
179     my $attrs = { %{$self->{attrs}} };
180     my $having = delete $attrs->{having};
181     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
182      $attrs = { %$attrs, %{ pop(@_) } };
183     }
184
185     my $where = (@_
186                   ? ((@_ == 1 || ref $_[0] eq "HASH")
187                       ? shift
188                       : ((@_ % 2)
189                           ? $self->throw_exception(
190                               "Odd number of arguments to search")
191                           : {@_}))
192                   : undef());
193     if (defined $where) {
194       $where = (defined $attrs->{where}
195                 ? { '-and' =>
196                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
197                         $where, $attrs->{where} ] }
198                 : $where);
199       $attrs->{where} = $where;
200     }
201
202     if (defined $having) {
203       $having = (defined $attrs->{having}
204                 ? { '-and' =>
205                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
206                         $having, $attrs->{having} ] }
207                 : $having);
208       $attrs->{having} = $having;
209     }
210
211     $rs = (ref $self)->new($self->result_source, $attrs);
212   }
213   else {
214     $rs = $self;
215     $rs->reset();
216   }
217   return (wantarray ? $rs->all : $rs);
218 }
219
220 =head2 search_literal
221
222   my @obj    = $rs->search_literal($literal_where_cond, @bind);
223   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
224
225 Pass a literal chunk of SQL to be added to the conditional part of the
226 resultset.
227
228 =cut
229
230 sub search_literal {
231   my ($self, $cond, @vals) = @_;
232   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
233   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
234   return $self->search(\$cond, $attrs);
235 }
236
237 =head2 find
238
239 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
240
241 Finds a row based on its primary key or unique constraint. For example:
242
243   my $cd = $schema->resultset('CD')->find(5);
244
245 Also takes an optional C<key> attribute, to search by a specific key or unique
246 constraint. For example:
247
248   my $cd = $schema->resultset('CD')->find(
249     {
250       artist => 'Massive Attack',
251       title  => 'Mezzanine',
252     },
253     { key => 'artist_title' }
254   );
255
256 See also L</find_or_create> and L</update_or_create>.
257
258 =cut
259
260 sub find {
261   my ($self, @vals) = @_;
262   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
263
264   my @cols = $self->result_source->primary_columns;
265   if (exists $attrs->{key}) {
266     my %uniq = $self->result_source->unique_constraints;
267     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
268       unless exists $uniq{$attrs->{key}};
269     @cols = @{ $uniq{$attrs->{key}} };
270   }
271   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
272   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
273     unless @cols;
274
275   my $query;
276   if (ref $vals[0] eq 'HASH') {
277     $query = { %{$vals[0]} };
278   } elsif (@cols == @vals) {
279     $query = {};
280     @{$query}{@cols} = @vals;
281   } else {
282     $query = {@vals};
283   }
284   foreach (keys %$query) {
285     next if m/\./;
286     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
287   }
288   #warn Dumper($query);
289   
290   if (keys %$attrs) {
291       my $rs = $self->search($query,$attrs);
292       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
293   } else {
294       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
295   }
296 }
297
298 =head2 search_related
299
300   $rs->search_related('relname', $cond?, $attrs?);
301
302 Search the specified relationship. Optionally specify a condition for matching
303 records.
304
305 =cut
306
307 sub search_related {
308   return shift->related_resultset(shift)->search(@_);
309 }
310
311 =head2 cursor
312
313 Returns a storage-driven cursor to the given resultset.
314
315 =cut
316
317 sub cursor {
318   my ($self) = @_;
319   my ($attrs) = $self->{attrs};
320   $attrs = { %$attrs };
321   return $self->{cursor}
322     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
323           $attrs->{where},$attrs);
324 }
325
326 =head2 single
327
328 Inflates the first result without creating a cursor
329
330 =cut
331
332 sub single {
333   my ($self, $extra) = @_;
334   my ($attrs) = $self->{attrs};
335   $attrs = { %$attrs };
336   if ($extra) {
337     if (defined $attrs->{where}) {
338       $attrs->{where} = {
339         '-and'
340           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
341                delete $attrs->{where}, $extra ]
342       };
343     } else {
344       $attrs->{where} = $extra;
345     }
346   }
347   my @data = $self->result_source->storage->select_single(
348           $self->{from}, $attrs->{select},
349           $attrs->{where},$attrs);
350   return (@data ? $self->_construct_object(@data) : ());
351 }
352
353
354 =head2 search_like
355
356 Perform a search, but use C<LIKE> instead of equality as the condition. Note
357 that this is simply a convenience method; you most likely want to use
358 L</search> with specific operators.
359
360 For more information, see L<DBIx::Class::Manual::Cookbook>.
361
362 =cut
363
364 sub search_like {
365   my $class    = shift;
366   my $attrs = { };
367   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
368     $attrs = pop(@_);
369   }
370   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
371   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
372   return $class->search($query, { %$attrs });
373 }
374
375 =head2 slice
376
377 =head3 Arguments: ($first, $last)
378
379 Returns a subset of elements from the resultset.
380
381 =cut
382
383 sub slice {
384   my ($self, $min, $max) = @_;
385   my $attrs = { %{ $self->{attrs} || {} } };
386   $attrs->{offset} ||= 0;
387   $attrs->{offset} += $min;
388   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
389   my $slice = (ref $self)->new($self->result_source, $attrs);
390   return (wantarray ? $slice->all : $slice);
391 }
392
393 =head2 next
394
395 Returns the next element in the resultset (C<undef> is there is none).
396
397 Can be used to efficiently iterate over records in the resultset:
398
399   my $rs = $schema->resultset('CD')->search({});
400   while (my $cd = $rs->next) {
401     print $cd->title;
402   }
403
404 =cut
405
406 sub next {
407   my ($self) = @_;
408   my $cache;
409   if( @{$cache = $self->{all_cache} || []}) {
410     $self->{all_cache_position} ||= 0;
411     my $obj = $cache->[$self->{all_cache_position}];
412     $self->{all_cache_position}++;
413     return $obj;
414   }
415   if ($self->{attrs}{cache}) {
416     $self->{all_cache_position} = 1;
417     return ($self->all)[0];
418   }
419   my @row = (exists $self->{stashed_row}
420                ? @{delete $self->{stashed_row}}
421                : $self->cursor->next);
422 #  warn Dumper(\@row); use Data::Dumper;
423   return unless (@row);
424   return $self->_construct_object(@row);
425 }
426
427 sub _construct_object {
428   my ($self, @row) = @_;
429   my @as = @{ $self->{attrs}{as} };
430
431   my $info = $self->_collapse_result(\@as, \@row);
432
433   my $new = $self->result_class->inflate_result($self->result_source, @$info);
434
435   $new = $self->{attrs}{record_filter}->($new)
436     if exists $self->{attrs}{record_filter};
437  
438   return $new;
439 }
440
441 sub _collapse_result {
442   my ($self, $as, $row, $prefix) = @_;
443
444   my %const;
445
446   my @copy = @$row;
447   foreach my $this_as (@$as) {
448     my $val = shift @copy;
449     if (defined $prefix) {
450       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
451         my $remain = $1;
452         $remain =~ /^(?:(.*)\.)?([^\.]+)$/;
453         $const{$1||''}{$2} = $val;
454       }
455     } else {
456       $this_as =~ /^(?:(.*)\.)?([^\.]+)$/;
457       $const{$1||''}{$2} = $val;
458     }
459   }
460
461   my $info = [ {}, {} ];
462   foreach my $key (keys %const) {
463     if (length $key) {
464       my $target = $info;
465       my @parts = split(/\./, $key);
466       foreach my $p (@parts) {
467         $target = $target->[1]->{$p} ||= [];
468       }
469       $target->[0] = $const{$key};
470     } else {
471       $info->[0] = $const{$key};
472     }
473   }
474
475   my @collapse = (defined($prefix)
476                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
477                        keys %{$self->{collapse}})
478                    : keys %{$self->{collapse}});
479   if (@collapse) {
480     my ($c) = sort { length $a <=> length $b } @collapse;
481     my $target = $info;
482     foreach my $p (split(/\./, $c)) {
483       $target = $target->[1]->{$p} ||= [];
484     }
485     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
486     my @co_key = @{$self->{collapse}{$c_prefix}};
487     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
488     my $tree = $self->_collapse_result($as, $row, $c_prefix);
489     my (@final, @raw);
490     while ( !(grep {
491                 !defined($tree->[0]->{$_})
492                 || $co_check{$_} ne $tree->[0]->{$_}
493               } @co_key) ) {
494       push(@final, $tree);
495       last unless (@raw = $self->cursor->next);
496       $row = $self->{stashed_row} = \@raw;
497       $tree = $self->_collapse_result($as, $row, $c_prefix);
498       #warn Data::Dumper::Dumper($tree, $row);
499     }
500     @{$target} = @final;
501   }
502
503   return $info;
504 }
505
506 =head2 result_source
507
508 Returns a reference to the result source for this recordset.
509
510 =cut
511
512
513 =head2 count
514
515 Performs an SQL C<COUNT> with the same query as the resultset was built
516 with to find the number of elements. If passed arguments, does a search
517 on the resultset and counts the results of that.
518
519 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
520 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
521 not support C<DISTINCT> with multiple columns. If you are using such a
522 database, you should only use columns from the main table in your C<group_by>
523 clause.
524
525 =cut
526
527 sub count {
528   my $self = shift;
529   return $self->search(@_)->count if @_ && defined $_[0];
530   unless (defined $self->{count}) {
531     return scalar @{ $self->get_cache }
532       if @{ $self->get_cache };
533     my $group_by;
534     my $select = { 'count' => '*' };
535     my $attrs = { %{ $self->{attrs} } };
536     if( $group_by = delete $attrs->{group_by} ) {
537       delete $attrs->{having};
538       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
539       # todo: try CONCAT for multi-column pk
540       my @pk = $self->result_source->primary_columns;
541       if( scalar(@pk) == 1 ) {
542         my $pk = shift(@pk);
543         my $alias = $attrs->{alias};
544         my $re = qr/^($alias\.)?$pk$/;
545         foreach my $column ( @distinct) {
546           if( $column =~ $re ) {
547             @distinct = ( $column );
548             last;
549           }
550         } 
551       }
552
553       $select = { count => { 'distinct' => \@distinct } };
554       #use Data::Dumper; die Dumper $select;
555     }
556
557     $attrs->{select} = $select;
558     $attrs->{as} = [ 'count' ];
559     # offset, order by and page are not needed to count. record_filter is cdbi
560     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
561         
562     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
563   }
564   return 0 unless $self->{count};
565   my $count = $self->{count};
566   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
567   $count = $self->{attrs}{rows} if
568     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
569   return $count;
570 }
571
572 =head2 count_literal
573
574 Calls L</search_literal> with the passed arguments, then L</count>.
575
576 =cut
577
578 sub count_literal { shift->search_literal(@_)->count; }
579
580 =head2 all
581
582 Returns all elements in the resultset. Called implictly if the resultset
583 is returned in list context.
584
585 =cut
586
587 sub all {
588   my ($self) = @_;
589   return @{ $self->get_cache }
590     if @{ $self->get_cache };
591
592   my @obj;
593
594   if (keys %{$self->{collapse}}) {
595       # Using $self->cursor->all is really just an optimisation.
596       # If we're collapsing has_many prefetches it probably makes
597       # very little difference, and this is cleaner than hacking
598       # _construct_object to survive the approach
599     my @row;
600     $self->cursor->reset;
601     while (@row = $self->cursor->next) {
602       push(@obj, $self->_construct_object(@row));
603     }
604   } else {
605     @obj = map { $self->_construct_object(@$_); }
606              $self->cursor->all;
607   }
608
609   if( $self->{attrs}->{cache} ) {
610     $self->set_cache( \@obj );
611   }
612
613   return @obj;
614 }
615
616 =head2 reset
617
618 Resets the resultset's cursor, so you can iterate through the elements again.
619
620 =cut
621
622 sub reset {
623   my ($self) = @_;
624   $self->{all_cache_position} = 0;
625   $self->cursor->reset;
626   return $self;
627 }
628
629 =head2 first
630
631 Resets the resultset and returns the first element.
632
633 =cut
634
635 sub first {
636   return $_[0]->reset->next;
637 }
638
639 =head2 update
640
641 =head3 Arguments: (\%values)
642
643 Sets the specified columns in the resultset to the supplied values.
644
645 =cut
646
647 sub update {
648   my ($self, $values) = @_;
649   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
650   return $self->result_source->storage->update(
651            $self->result_source->from, $values, $self->{cond});
652 }
653
654 =head2 update_all
655
656 =head3 Arguments: (\%values)
657
658 Fetches all objects and updates them one at a time.  Note that C<update_all>
659 will run cascade triggers while L</update> will not.
660
661 =cut
662
663 sub update_all {
664   my ($self, $values) = @_;
665   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
666   foreach my $obj ($self->all) {
667     $obj->set_columns($values)->update;
668   }
669   return 1;
670 }
671
672 =head2 delete
673
674 Deletes the contents of the resultset from its result source.
675
676 =cut
677
678 sub delete {
679   my ($self) = @_;
680   my $del = {};
681   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
682     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
683   if (ref $self->{cond} eq 'ARRAY') {
684     $del = [ map { my %hash;
685       foreach my $key (keys %{$_}) {
686         $key =~ /([^\.]+)$/;
687         $hash{$1} = $_->{$key};
688       }; \%hash; } @{$self->{cond}} ];
689   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
690     $del->{-and} = [ map { my %hash;
691       foreach my $key (keys %{$_}) {
692         $key =~ /([^\.]+)$/;
693         $hash{$1} = $_->{$key};
694       }; \%hash; } @{$self->{cond}{-and}} ];
695   } else {
696     foreach my $key (keys %{$self->{cond}}) {
697       $key =~ /([^\.]+)$/;
698       $del->{$1} = $self->{cond}{$key};
699     }
700   }
701   $self->result_source->storage->delete($self->result_source->from, $del);
702   return 1;
703 }
704
705 =head2 delete_all
706
707 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
708 will run cascade triggers while L</delete> will not.
709
710 =cut
711
712 sub delete_all {
713   my ($self) = @_;
714   $_->delete for $self->all;
715   return 1;
716 }
717
718 =head2 pager
719
720 Returns a L<Data::Page> object for the current resultset. Only makes
721 sense for queries with a C<page> attribute.
722
723 =cut
724
725 sub pager {
726   my ($self) = @_;
727   my $attrs = $self->{attrs};
728   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
729   $attrs->{rows} ||= 10;
730   $self->count;
731   return $self->{pager} ||= Data::Page->new(
732     $self->{count}, $attrs->{rows}, $self->{page});
733 }
734
735 =head2 page
736
737 =head3 Arguments: ($page_num)
738
739 Returns a new resultset for the specified page.
740
741 =cut
742
743 sub page {
744   my ($self, $page) = @_;
745   my $attrs = { %{$self->{attrs}} };
746   $attrs->{page} = $page;
747   return (ref $self)->new($self->result_source, $attrs);
748 }
749
750 =head2 new_result
751
752 =head3 Arguments: (\%vals)
753
754 Creates a result in the resultset's result class.
755
756 =cut
757
758 sub new_result {
759   my ($self, $values) = @_;
760   $self->throw_exception( "new_result needs a hash" )
761     unless (ref $values eq 'HASH');
762   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
763     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
764   my %new = %$values;
765   my $alias = $self->{attrs}{alias};
766   foreach my $key (keys %{$self->{cond}||{}}) {
767     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
768   }
769   my $obj = $self->result_class->new(\%new);
770   $obj->result_source($self->result_source) if $obj->can('result_source');
771   $obj;
772 }
773
774 =head2 create
775
776 =head3 Arguments: (\%vals)
777
778 Inserts a record into the resultset and returns the object.
779
780 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
781
782 =cut
783
784 sub create {
785   my ($self, $attrs) = @_;
786   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
787   return $self->new_result($attrs)->insert;
788 }
789
790 =head2 find_or_create
791
792 =head3 Arguments: (\%vals, \%attrs?)
793
794   $class->find_or_create({ key => $val, ... });
795
796 Searches for a record matching the search condition; if it doesn't find one,
797 creates one and returns that instead.
798
799   my $cd = $schema->resultset('CD')->find_or_create({
800     cdid   => 5,
801     artist => 'Massive Attack',
802     title  => 'Mezzanine',
803     year   => 2005,
804   });
805
806 Also takes an optional C<key> attribute, to search by a specific key or unique
807 constraint. For example:
808
809   my $cd = $schema->resultset('CD')->find_or_create(
810     {
811       artist => 'Massive Attack',
812       title  => 'Mezzanine',
813     },
814     { key => 'artist_title' }
815   );
816
817 See also L</find> and L</update_or_create>.
818
819 =cut
820
821 sub find_or_create {
822   my $self     = shift;
823   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
824   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
825   my $exists   = $self->find($hash, $attrs);
826   return defined($exists) ? $exists : $self->create($hash);
827 }
828
829 =head2 update_or_create
830
831   $class->update_or_create({ key => $val, ... });
832
833 First, search for an existing row matching one of the unique constraints
834 (including the primary key) on the source of this resultset.  If a row is
835 found, update it with the other given column values.  Otherwise, create a new
836 row.
837
838 Takes an optional C<key> attribute to search on a specific unique constraint.
839 For example:
840
841   # In your application
842   my $cd = $schema->resultset('CD')->update_or_create(
843     {
844       artist => 'Massive Attack',
845       title  => 'Mezzanine',
846       year   => 1998,
847     },
848     { key => 'artist_title' }
849   );
850
851 If no C<key> is specified, it searches on all unique constraints defined on the
852 source, including the primary key.
853
854 If the C<key> is specified as C<primary>, search only on the primary key.
855
856 See also L</find> and L</find_or_create>.
857
858 =cut
859
860 sub update_or_create {
861   my $self = shift;
862
863   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
864   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
865
866   my %unique_constraints = $self->result_source->unique_constraints;
867   my @constraint_names   = (exists $attrs->{key}
868                             ? ($attrs->{key})
869                             : keys %unique_constraints);
870
871   my @unique_hashes;
872   foreach my $name (@constraint_names) {
873     my @unique_cols = @{ $unique_constraints{$name} };
874     my %unique_hash =
875       map  { $_ => $hash->{$_} }
876       grep { exists $hash->{$_} }
877       @unique_cols;
878
879     push @unique_hashes, \%unique_hash
880       if (scalar keys %unique_hash == scalar @unique_cols);
881   }
882
883   my $row;
884   if (@unique_hashes) {
885     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
886     if ($row) {
887       $row->set_columns($hash);
888       $row->update;
889     }
890   }
891
892   unless ($row) {
893     $row = $self->create($hash);
894   }
895
896   return $row;
897 }
898
899 =head2 get_cache
900
901 Gets the contents of the cache for the resultset.
902
903 =cut
904
905 sub get_cache {
906   my $self = shift;
907   return $self->{all_cache} || [];
908 }
909
910 =head2 set_cache
911
912 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
913
914 =cut
915
916 sub set_cache {
917   my ( $self, $data ) = @_;
918   $self->throw_exception("set_cache requires an arrayref")
919     if ref $data ne 'ARRAY';
920   my $result_class = $self->result_class;
921   foreach( @$data ) {
922     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
923       if ref $_ ne $result_class;
924   }
925   $self->{all_cache} = $data;
926 }
927
928 =head2 clear_cache
929
930 Clears the cache for the resultset.
931
932 =cut
933
934 sub clear_cache {
935   my $self = shift;
936   $self->set_cache([]);
937 }
938
939 =head2 related_resultset
940
941 Returns a related resultset for the supplied relationship name.
942
943   $rs = $rs->related_resultset('foo');
944
945 =cut
946
947 sub related_resultset {
948   my ( $self, $rel, @rest ) = @_;
949   $self->{related_resultsets} ||= {};
950   my $resultsets = $self->{related_resultsets};
951   if( !exists $resultsets->{$rel} ) {
952     #warn "fetching related resultset for rel '$rel'";
953     my $rel_obj = $self->result_source->relationship_info($rel);
954     $self->throw_exception(
955       "search_related: result source '" . $self->result_source->name .
956       "' has no such relationship ${rel}")
957       unless $rel_obj; #die Dumper $self->{attrs};
958     my $rs = $self->search(undef, { join => $rel });
959     #if( $self->{attrs}->{cache} ) {
960     #  $rs = $self->search(undef);
961     #}
962     #else {
963     #}
964     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
965     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
966     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
967                   && $rs->{attrs}{seen_join}{$rel} > 1
968                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
969                 : $rel);
970     $resultsets->{$rel} =
971       $self->result_source->schema->resultset($rel_obj->{class}
972            )->search( undef,
973              { %{$rs->{attrs}},
974                alias => $alias,
975                select => undef(),
976                as => undef() }
977            )->search(@rest);
978   }
979   return $resultsets->{$rel};
980 }
981
982 =head2 throw_exception
983
984 See Schema's throw_exception
985
986 =cut
987
988 sub throw_exception {
989   my $self=shift;
990   $self->result_source->schema->throw_exception(@_);
991 }
992
993 =head1 ATTRIBUTES
994
995 The resultset takes various attributes that modify its behavior. Here's an
996 overview of them:
997
998 =head2 order_by
999
1000 Which column(s) to order the results by. This is currently passed through
1001 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
1002
1003 =head2 cols
1004
1005 =head3 Arguments: (arrayref)
1006
1007 Shortcut to request a particular set of columns to be retrieved.  Adds
1008 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1009 from that, then auto-populates C<as> from C<select> as normal.
1010
1011 =head2 include_columns
1012
1013 =head3 Arguments: (arrayref)
1014
1015 Shortcut to include additional columns in the returned results - for example
1016
1017   { include_columns => ['foo.name'], join => ['foo'] }
1018
1019 would add a 'name' column to the information passed to object inflation
1020
1021 =head2 select
1022
1023 =head3 Arguments: (arrayref)
1024
1025 Indicates which columns should be selected from the storage. You can use
1026 column names, or in the case of RDBMS back ends, function or stored procedure
1027 names:
1028
1029   $rs = $schema->resultset('Foo')->search(
1030     {},
1031     {
1032       select => [
1033         'column_name',
1034         { count => 'column_to_count' },
1035         { sum => 'column_to_sum' }
1036       ]
1037     }
1038   );
1039
1040 When you use function/stored procedure names and do not supply an C<as>
1041 attribute, the column names returned are storage-dependent. E.g. MySQL would
1042 return a column named C<count(column_to_count)> in the above example.
1043
1044 =head2 as
1045
1046 =head3 Arguments: (arrayref)
1047
1048 Indicates column names for object inflation. This is used in conjunction with
1049 C<select>, usually when C<select> contains one or more function or stored
1050 procedure names:
1051
1052   $rs = $schema->resultset('Foo')->search(
1053     {},
1054     {
1055       select => [
1056         'column1',
1057         { count => 'column2' }
1058       ],
1059       as => [qw/ column1 column2_count /]
1060     }
1061   );
1062
1063   my $foo = $rs->first(); # get the first Foo
1064
1065 If the object against which the search is performed already has an accessor
1066 matching a column name specified in C<as>, the value can be retrieved using
1067 the accessor as normal:
1068
1069   my $column1 = $foo->column1();
1070
1071 If on the other hand an accessor does not exist in the object, you need to
1072 use C<get_column> instead:
1073
1074   my $column2_count = $foo->get_column('column2_count');
1075
1076 You can create your own accessors if required - see
1077 L<DBIx::Class::Manual::Cookbook> for details.
1078
1079 =head2 join
1080
1081 Contains a list of relationships that should be joined for this query.  For
1082 example:
1083
1084   # Get CDs by Nine Inch Nails
1085   my $rs = $schema->resultset('CD')->search(
1086     { 'artist.name' => 'Nine Inch Nails' },
1087     { join => 'artist' }
1088   );
1089
1090 Can also contain a hash reference to refer to the other relation's relations.
1091 For example:
1092
1093   package MyApp::Schema::Track;
1094   use base qw/DBIx::Class/;
1095   __PACKAGE__->table('track');
1096   __PACKAGE__->add_columns(qw/trackid cd position title/);
1097   __PACKAGE__->set_primary_key('trackid');
1098   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1099   1;
1100
1101   # In your application
1102   my $rs = $schema->resultset('Artist')->search(
1103     { 'track.title' => 'Teardrop' },
1104     {
1105       join     => { cd => 'track' },
1106       order_by => 'artist.name',
1107     }
1108   );
1109
1110 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1111 similarly for a third time). For e.g.
1112
1113   my $rs = $schema->resultset('Artist')->search(
1114     { 'cds.title'   => 'Foo',
1115       'cds_2.title' => 'Bar' },
1116     { join => [ qw/cds cds/ ] });
1117
1118 will return a set of all artists that have both a cd with title Foo and a cd
1119 with title Bar.
1120
1121 If you want to fetch related objects from other tables as well, see C<prefetch>
1122 below.
1123
1124 =head2 prefetch
1125
1126 =head3 Arguments: arrayref/hashref
1127
1128 Contains one or more relationships that should be fetched along with the main 
1129 query (when they are accessed afterwards they will have already been
1130 "prefetched").  This is useful for when you know you will need the related
1131 objects, because it saves at least one query:
1132
1133   my $rs = $schema->resultset('Tag')->search(
1134     {},
1135     {
1136       prefetch => {
1137         cd => 'artist'
1138       }
1139     }
1140   );
1141
1142 The initial search results in SQL like the following:
1143
1144   SELECT tag.*, cd.*, artist.* FROM tag
1145   JOIN cd ON tag.cd = cd.cdid
1146   JOIN artist ON cd.artist = artist.artistid
1147
1148 L<DBIx::Class> has no need to go back to the database when we access the
1149 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1150 case.
1151
1152 Simple prefetches will be joined automatically, so there is no need
1153 for a C<join> attribute in the above search. If you're prefetching to
1154 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1155 specify the join as well.
1156
1157 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1158 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1159 with an accessor type of 'single' or 'filter').
1160
1161 =head2 from
1162
1163 =head3 Arguments: (arrayref)
1164
1165 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1166 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1167 clauses.
1168
1169 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1170 C<join> will usually do what you need and it is strongly recommended that you
1171 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1172
1173 In simple terms, C<from> works as follows:
1174
1175     [
1176         { <alias> => <table>, -join-type => 'inner|left|right' }
1177         [] # nested JOIN (optional)
1178         { <table.column> = <foreign_table.foreign_key> }
1179     ]
1180
1181     JOIN
1182         <alias> <table>
1183         [JOIN ...]
1184     ON <table.column> = <foreign_table.foreign_key>
1185
1186 An easy way to follow the examples below is to remember the following:
1187
1188     Anything inside "[]" is a JOIN
1189     Anything inside "{}" is a condition for the enclosing JOIN
1190
1191 The following examples utilize a "person" table in a family tree application.
1192 In order to express parent->child relationships, this table is self-joined:
1193
1194     # Person->belongs_to('father' => 'Person');
1195     # Person->belongs_to('mother' => 'Person');
1196
1197 C<from> can be used to nest joins. Here we return all children with a father,
1198 then search against all mothers of those children:
1199
1200   $rs = $schema->resultset('Person')->search(
1201       {},
1202       {
1203           alias => 'mother', # alias columns in accordance with "from"
1204           from => [
1205               { mother => 'person' },
1206               [
1207                   [
1208                       { child => 'person' },
1209                       [
1210                           { father => 'person' },
1211                           { 'father.person_id' => 'child.father_id' }
1212                       ]
1213                   ],
1214                   { 'mother.person_id' => 'child.mother_id' }
1215               ],
1216           ]
1217       },
1218   );
1219
1220   # Equivalent SQL:
1221   # SELECT mother.* FROM person mother
1222   # JOIN (
1223   #   person child
1224   #   JOIN person father
1225   #   ON ( father.person_id = child.father_id )
1226   # )
1227   # ON ( mother.person_id = child.mother_id )
1228
1229 The type of any join can be controlled manually. To search against only people
1230 with a father in the person table, we could explicitly use C<INNER JOIN>:
1231
1232     $rs = $schema->resultset('Person')->search(
1233         {},
1234         {
1235             alias => 'child', # alias columns in accordance with "from"
1236             from => [
1237                 { child => 'person' },
1238                 [
1239                     { father => 'person', -join-type => 'inner' },
1240                     { 'father.id' => 'child.father_id' }
1241                 ],
1242             ]
1243         },
1244     );
1245
1246     # Equivalent SQL:
1247     # SELECT child.* FROM person child
1248     # INNER JOIN person father ON child.father_id = father.id
1249
1250 =head2 page
1251
1252 For a paged resultset, specifies which page to retrieve.  Leave unset
1253 for an unpaged resultset.
1254
1255 =head2 rows
1256
1257 For a paged resultset, how many rows per page:
1258
1259   rows => 10
1260
1261 Can also be used to simulate an SQL C<LIMIT>.
1262
1263 =head2 group_by
1264
1265 =head3 Arguments: (arrayref)
1266
1267 A arrayref of columns to group by. Can include columns of joined tables.
1268
1269   group_by => [qw/ column1 column2 ... /]
1270
1271 =head2 distinct
1272
1273 Set to 1 to group by all columns.
1274
1275 For more examples of using these attributes, see
1276 L<DBIx::Class::Manual::Cookbook>.
1277
1278 =cut
1279
1280 1;