partially working has_many prefetch
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   $attrs->{order_by} = [ $attrs->{order_by} ]
106     if $attrs->{order_by} && !ref($attrs->{order_by});
107   $attrs->{order_by} ||= [];
108
109   my $collapse = {};
110
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY'
114               ? (@{$prefetch}) : ($prefetch)) {
115       if( ref $p eq 'HASH' ) {
116         foreach my $key (keys %$p) {
117           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
118             unless $seen{$key};
119         }
120       }
121       else {
122         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
123             unless $seen{$p};
124       }
125       my @prefetch = $source->resolve_prefetch(
126            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
127       #die Dumper \@cols;
128       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
129       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
130     }
131     push(@{$attrs->{order_by}}, @pre_order);
132   }
133
134   if ($attrs->{page}) {
135     $attrs->{rows} ||= 10;
136     $attrs->{offset} ||= 0;
137     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
138   }
139
140 #if (keys %{$collapse}) {
141 #  use Data::Dumper; warn Dumper($collapse);
142 #}
143
144   my $new = {
145     result_source => $source,
146     result_class => $attrs->{result_class} || $source->result_class,
147     cond => $attrs->{where},
148     from => $attrs->{from},
149     collapse => $collapse,
150     count => undef,
151     page => delete $attrs->{page},
152     pager => undef,
153     attrs => $attrs };
154   bless ($new, $class);
155   return $new;
156 }
157
158 =head2 search
159
160   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
161   my $new_rs = $rs->search({ foo => 3 });
162
163 If you need to pass in additional attributes but no additional condition,
164 call it as C<search({}, \%attrs);>.
165
166   # "SELECT foo, bar FROM $class_table"
167   my @all = $class->search({}, { cols => [qw/foo bar/] });
168
169 =cut
170
171 sub search {
172   my $self = shift;
173
174   my $rs;
175   if( @_ ) {
176     
177     my $attrs = { %{$self->{attrs}} };
178     my $having = delete $attrs->{having};
179     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
180      $attrs = { %$attrs, %{ pop(@_) } };
181     }
182
183     my $where = (@_
184                   ? ((@_ == 1 || ref $_[0] eq "HASH")
185                       ? shift
186                       : ((@_ % 2)
187                           ? $self->throw_exception(
188                               "Odd number of arguments to search")
189                           : {@_}))
190                   : undef());
191     if (defined $where) {
192       $where = (defined $attrs->{where}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $where, $attrs->{where} ] }
196                 : $where);
197       $attrs->{where} = $where;
198     }
199
200     if (defined $having) {
201       $having = (defined $attrs->{having}
202                 ? { '-and' =>
203                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
204                         $having, $attrs->{having} ] }
205                 : $having);
206       $attrs->{having} = $having;
207     }
208
209     $rs = (ref $self)->new($self->result_source, $attrs);
210   }
211   else {
212     $rs = $self;
213     $rs->reset();
214   }
215   return (wantarray ? $rs->all : $rs);
216 }
217
218 =head2 search_literal
219
220   my @obj    = $rs->search_literal($literal_where_cond, @bind);
221   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
222
223 Pass a literal chunk of SQL to be added to the conditional part of the
224 resultset.
225
226 =cut
227
228 sub search_literal {
229   my ($self, $cond, @vals) = @_;
230   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
231   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
232   return $self->search(\$cond, $attrs);
233 }
234
235 =head2 find
236
237 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
238
239 Finds a row based on its primary key or unique constraint. For example:
240
241   my $cd = $schema->resultset('CD')->find(5);
242
243 Also takes an optional C<key> attribute, to search by a specific key or unique
244 constraint. For example:
245
246   my $cd = $schema->resultset('CD')->find(
247     {
248       artist => 'Massive Attack',
249       title  => 'Mezzanine',
250     },
251     { key => 'artist_title' }
252   );
253
254 See also L</find_or_create> and L</update_or_create>.
255
256 =cut
257
258 sub find {
259   my ($self, @vals) = @_;
260   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
261
262   my @cols = $self->result_source->primary_columns;
263   if (exists $attrs->{key}) {
264     my %uniq = $self->result_source->unique_constraints;
265     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
266       unless exists $uniq{$attrs->{key}};
267     @cols = @{ $uniq{$attrs->{key}} };
268   }
269   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
270   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
271     unless @cols;
272
273   my $query;
274   if (ref $vals[0] eq 'HASH') {
275     $query = { %{$vals[0]} };
276   } elsif (@cols == @vals) {
277     $query = {};
278     @{$query}{@cols} = @vals;
279   } else {
280     $query = {@vals};
281   }
282   foreach (keys %$query) {
283     next if m/\./;
284     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
285   }
286   #warn Dumper($query);
287   return (keys %$attrs
288            ? $self->search($query,$attrs)->single
289            : $self->single($query));
290 }
291
292 =head2 search_related
293
294   $rs->search_related('relname', $cond?, $attrs?);
295
296 Search the specified relationship. Optionally specify a condition for matching
297 records.
298
299 =cut
300
301 sub search_related {
302   return shift->related_resultset(shift)->search(@_);
303 }
304
305 =head2 cursor
306
307 Returns a storage-driven cursor to the given resultset.
308
309 =cut
310
311 sub cursor {
312   my ($self) = @_;
313   my ($attrs) = $self->{attrs};
314   $attrs = { %$attrs };
315   return $self->{cursor}
316     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
317           $attrs->{where},$attrs);
318 }
319
320 =head2 single
321
322 Inflates the first result without creating a cursor
323
324 =cut
325
326 sub single {
327   my ($self, $extra) = @_;
328   my ($attrs) = $self->{attrs};
329   $attrs = { %$attrs };
330   if ($extra) {
331     if (defined $attrs->{where}) {
332       $attrs->{where} = {
333         '-and'
334           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
335                delete $attrs->{where}, $extra ]
336       };
337     } else {
338       $attrs->{where} = $extra;
339     }
340   }
341   my @data = $self->result_source->storage->select_single(
342           $self->{from}, $attrs->{select},
343           $attrs->{where},$attrs);
344   return (@data ? $self->_construct_object(@data) : ());
345 }
346
347
348 =head2 search_like
349
350 Perform a search, but use C<LIKE> instead of equality as the condition. Note
351 that this is simply a convenience method; you most likely want to use
352 L</search> with specific operators.
353
354 For more information, see L<DBIx::Class::Manual::Cookbook>.
355
356 =cut
357
358 sub search_like {
359   my $class    = shift;
360   my $attrs = { };
361   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
362     $attrs = pop(@_);
363   }
364   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
365   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
366   return $class->search($query, { %$attrs });
367 }
368
369 =head2 slice
370
371 =head3 Arguments: ($first, $last)
372
373 Returns a subset of elements from the resultset.
374
375 =cut
376
377 sub slice {
378   my ($self, $min, $max) = @_;
379   my $attrs = { %{ $self->{attrs} || {} } };
380   $attrs->{offset} ||= 0;
381   $attrs->{offset} += $min;
382   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
383   my $slice = (ref $self)->new($self->result_source, $attrs);
384   return (wantarray ? $slice->all : $slice);
385 }
386
387 =head2 next
388
389 Returns the next element in the resultset (C<undef> is there is none).
390
391 Can be used to efficiently iterate over records in the resultset:
392
393   my $rs = $schema->resultset('CD')->search({});
394   while (my $cd = $rs->next) {
395     print $cd->title;
396   }
397
398 =cut
399
400 sub next {
401   my ($self) = @_;
402   my $cache;
403   if( @{$cache = $self->{all_cache} || []}) {
404     $self->{all_cache_position} ||= 0;
405     my $obj = $cache->[$self->{all_cache_position}];
406     $self->{all_cache_position}++;
407     return $obj;
408   }
409   if ($self->{attrs}{cache}) {
410     $self->{all_cache_position} = 1;
411     return ($self->all)[0];
412   }
413   my @row = (exists $self->{stashed_row}
414                ? @{delete $self->{stashed_row}}
415                : $self->cursor->next);
416 #  warn Dumper(\@row); use Data::Dumper;
417   return unless (@row);
418   return $self->_construct_object(@row);
419 }
420
421 sub _construct_object {
422   my ($self, @row) = @_;
423   my @as = @{ $self->{attrs}{as} };
424
425   my $info = $self->_collapse_result(\@as, \@row);
426
427   #use Data::Dumper; warn Dumper(\@as, $info);
428   my $new = $self->result_class->inflate_result($self->result_source, @$info);
429
430   $new = $self->{attrs}{record_filter}->($new)
431     if exists $self->{attrs}{record_filter};
432  
433   return $new;
434 }
435
436 sub _collapse_result {
437   my ($self, $as, $row, $prefix) = @_;
438
439   my %const;
440
441   my @copy = @$row;
442   foreach my $as (@$as) {
443     if (defined $prefix && !($as =~ s/\Q${prefix}\E\.//)) {
444       shift @copy;
445       next;
446     }
447     $as =~ /^(?:(.*)\.)?([^\.]+)$/;
448     $const{$1||''}{$2} = shift @copy;
449   }
450
451   #warn "@cols -> @row";
452   my $info = [ {}, {} ];
453   foreach my $key (keys %const) {
454     if (length $key) {
455       my $target = $info;
456       my @parts = split(/\./, $key);
457       foreach my $p (@parts) {
458         $target = $target->[1]->{$p} ||= [];
459       }
460       $target->[0] = $const{$key};
461     } else {
462       $info->[0] = $const{$key};
463     }
464   }
465
466   if (!defined($prefix) && keys %{$self->{collapse}}) {
467     my ($c) = sort { length $a <=> length $b } keys %{$self->{collapse}};
468     #warn "Collapsing ${c}";
469     my $target = $info;
470     #warn Data::Dumper::Dumper($target);
471     foreach my $p (split(/\./, $c)) {
472       $target = $target->[1]->{$p};
473     }
474     my @co_key = @{$self->{collapse}{$c}};
475     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
476     my $tree = $self->_collapse_result($as, $row, $c);
477     #warn Data::Dumper::Dumper($target);
478     my (@final, @raw);
479     while ( !(grep { $co_check{$_} ne $tree->[0]->{$_} } @co_key) ) {
480       push(@final, $tree);
481       last unless (@raw = $self->cursor->next);
482       $row = $self->{stashed_row} = \@raw;
483       $tree = $self->_collapse_result($as, $row, $c);
484     }
485     @{$target} = @final;
486     #warn Data::Dumper::Dumper($target);
487   }
488
489   #warn Dumper($info);
490
491   return $info;
492 }
493
494 =head2 result_source
495
496 Returns a reference to the result source for this recordset.
497
498 =cut
499
500
501 =head2 count
502
503 Performs an SQL C<COUNT> with the same query as the resultset was built
504 with to find the number of elements. If passed arguments, does a search
505 on the resultset and counts the results of that.
506
507 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
508 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
509 not support C<DISTINCT> with multiple columns. If you are using such a
510 database, you should only use columns from the main table in your C<group_by>
511 clause.
512
513 =cut
514
515 sub count {
516   my $self = shift;
517   return $self->search(@_)->count if @_ && defined $_[0];
518   unless (defined $self->{count}) {
519     return scalar @{ $self->get_cache }
520       if @{ $self->get_cache };
521     my $group_by;
522     my $select = { 'count' => '*' };
523     my $attrs = { %{ $self->{attrs} } };
524     if( $group_by = delete $attrs->{group_by} ) {
525       delete $attrs->{having};
526       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
527       # todo: try CONCAT for multi-column pk
528       my @pk = $self->result_source->primary_columns;
529       if( scalar(@pk) == 1 ) {
530         my $pk = shift(@pk);
531         my $alias = $attrs->{alias};
532         my $re = qr/^($alias\.)?$pk$/;
533         foreach my $column ( @distinct) {
534           if( $column =~ $re ) {
535             @distinct = ( $column );
536             last;
537           }
538         } 
539       }
540
541       $select = { count => { 'distinct' => \@distinct } };
542       #use Data::Dumper; die Dumper $select;
543     }
544
545     $attrs->{select} = $select;
546     $attrs->{as} = [ 'count' ];
547     # offset, order by and page are not needed to count. record_filter is cdbi
548     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
549         
550     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
551   }
552   return 0 unless $self->{count};
553   my $count = $self->{count};
554   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
555   $count = $self->{attrs}{rows} if
556     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
557   return $count;
558 }
559
560 =head2 count_literal
561
562 Calls L</search_literal> with the passed arguments, then L</count>.
563
564 =cut
565
566 sub count_literal { shift->search_literal(@_)->count; }
567
568 =head2 all
569
570 Returns all elements in the resultset. Called implictly if the resultset
571 is returned in list context.
572
573 =cut
574
575 sub all {
576   my ($self) = @_;
577   return @{ $self->get_cache }
578     if @{ $self->get_cache };
579   if( $self->{attrs}->{cache} ) {
580     my @obj = map { $self->_construct_object(@$_); }
581             $self->cursor->all;
582     $self->set_cache( \@obj );
583     return @obj;
584   }
585   return map { $self->_construct_object(@$_); }
586            $self->cursor->all;
587 }
588
589 =head2 reset
590
591 Resets the resultset's cursor, so you can iterate through the elements again.
592
593 =cut
594
595 sub reset {
596   my ($self) = @_;
597   $self->{all_cache_position} = 0;
598   $self->cursor->reset;
599   return $self;
600 }
601
602 =head2 first
603
604 Resets the resultset and returns the first element.
605
606 =cut
607
608 sub first {
609   return $_[0]->reset->next;
610 }
611
612 =head2 update
613
614 =head3 Arguments: (\%values)
615
616 Sets the specified columns in the resultset to the supplied values.
617
618 =cut
619
620 sub update {
621   my ($self, $values) = @_;
622   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
623   return $self->result_source->storage->update(
624            $self->result_source->from, $values, $self->{cond});
625 }
626
627 =head2 update_all
628
629 =head3 Arguments: (\%values)
630
631 Fetches all objects and updates them one at a time.  Note that C<update_all>
632 will run cascade triggers while L</update> will not.
633
634 =cut
635
636 sub update_all {
637   my ($self, $values) = @_;
638   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
639   foreach my $obj ($self->all) {
640     $obj->set_columns($values)->update;
641   }
642   return 1;
643 }
644
645 =head2 delete
646
647 Deletes the contents of the resultset from its result source.
648
649 =cut
650
651 sub delete {
652   my ($self) = @_;
653   my $del = {};
654   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
655     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
656   if (ref $self->{cond} eq 'ARRAY') {
657     $del = [ map { my %hash;
658       foreach my $key (keys %{$_}) {
659         $key =~ /([^\.]+)$/;
660         $hash{$1} = $_->{$key};
661       }; \%hash; } @{$self->{cond}} ];
662   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
663     $del->{-and} = [ map { my %hash;
664       foreach my $key (keys %{$_}) {
665         $key =~ /([^\.]+)$/;
666         $hash{$1} = $_->{$key};
667       }; \%hash; } @{$self->{cond}{-and}} ];
668   } else {
669     foreach my $key (keys %{$self->{cond}}) {
670       $key =~ /([^\.]+)$/;
671       $del->{$1} = $self->{cond}{$key};
672     }
673   }
674   $self->result_source->storage->delete($self->result_source->from, $del);
675   return 1;
676 }
677
678 =head2 delete_all
679
680 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
681 will run cascade triggers while L</delete> will not.
682
683 =cut
684
685 sub delete_all {
686   my ($self) = @_;
687   $_->delete for $self->all;
688   return 1;
689 }
690
691 =head2 pager
692
693 Returns a L<Data::Page> object for the current resultset. Only makes
694 sense for queries with a C<page> attribute.
695
696 =cut
697
698 sub pager {
699   my ($self) = @_;
700   my $attrs = $self->{attrs};
701   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
702   $attrs->{rows} ||= 10;
703   $self->count;
704   return $self->{pager} ||= Data::Page->new(
705     $self->{count}, $attrs->{rows}, $self->{page});
706 }
707
708 =head2 page
709
710 =head3 Arguments: ($page_num)
711
712 Returns a new resultset for the specified page.
713
714 =cut
715
716 sub page {
717   my ($self, $page) = @_;
718   my $attrs = { %{$self->{attrs}} };
719   $attrs->{page} = $page;
720   return (ref $self)->new($self->result_source, $attrs);
721 }
722
723 =head2 new_result
724
725 =head3 Arguments: (\%vals)
726
727 Creates a result in the resultset's result class.
728
729 =cut
730
731 sub new_result {
732   my ($self, $values) = @_;
733   $self->throw_exception( "new_result needs a hash" )
734     unless (ref $values eq 'HASH');
735   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
736     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
737   my %new = %$values;
738   my $alias = $self->{attrs}{alias};
739   foreach my $key (keys %{$self->{cond}||{}}) {
740     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
741   }
742   my $obj = $self->result_class->new(\%new);
743   $obj->result_source($self->result_source) if $obj->can('result_source');
744   $obj;
745 }
746
747 =head2 create
748
749 =head3 Arguments: (\%vals)
750
751 Inserts a record into the resultset and returns the object.
752
753 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
754
755 =cut
756
757 sub create {
758   my ($self, $attrs) = @_;
759   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
760   return $self->new_result($attrs)->insert;
761 }
762
763 =head2 find_or_create
764
765 =head3 Arguments: (\%vals, \%attrs?)
766
767   $class->find_or_create({ key => $val, ... });
768
769 Searches for a record matching the search condition; if it doesn't find one,
770 creates one and returns that instead.
771
772   my $cd = $schema->resultset('CD')->find_or_create({
773     cdid   => 5,
774     artist => 'Massive Attack',
775     title  => 'Mezzanine',
776     year   => 2005,
777   });
778
779 Also takes an optional C<key> attribute, to search by a specific key or unique
780 constraint. For example:
781
782   my $cd = $schema->resultset('CD')->find_or_create(
783     {
784       artist => 'Massive Attack',
785       title  => 'Mezzanine',
786     },
787     { key => 'artist_title' }
788   );
789
790 See also L</find> and L</update_or_create>.
791
792 =cut
793
794 sub find_or_create {
795   my $self     = shift;
796   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
797   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
798   my $exists   = $self->find($hash, $attrs);
799   return defined($exists) ? $exists : $self->create($hash);
800 }
801
802 =head2 update_or_create
803
804   $class->update_or_create({ key => $val, ... });
805
806 First, search for an existing row matching one of the unique constraints
807 (including the primary key) on the source of this resultset.  If a row is
808 found, update it with the other given column values.  Otherwise, create a new
809 row.
810
811 Takes an optional C<key> attribute to search on a specific unique constraint.
812 For example:
813
814   # In your application
815   my $cd = $schema->resultset('CD')->update_or_create(
816     {
817       artist => 'Massive Attack',
818       title  => 'Mezzanine',
819       year   => 1998,
820     },
821     { key => 'artist_title' }
822   );
823
824 If no C<key> is specified, it searches on all unique constraints defined on the
825 source, including the primary key.
826
827 If the C<key> is specified as C<primary>, search only on the primary key.
828
829 See also L</find> and L</find_or_create>.
830
831 =cut
832
833 sub update_or_create {
834   my $self = shift;
835
836   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
837   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
838
839   my %unique_constraints = $self->result_source->unique_constraints;
840   my @constraint_names   = (exists $attrs->{key}
841                             ? ($attrs->{key})
842                             : keys %unique_constraints);
843
844   my @unique_hashes;
845   foreach my $name (@constraint_names) {
846     my @unique_cols = @{ $unique_constraints{$name} };
847     my %unique_hash =
848       map  { $_ => $hash->{$_} }
849       grep { exists $hash->{$_} }
850       @unique_cols;
851
852     push @unique_hashes, \%unique_hash
853       if (scalar keys %unique_hash == scalar @unique_cols);
854   }
855
856   my $row;
857   if (@unique_hashes) {
858     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
859     if ($row) {
860       $row->set_columns($hash);
861       $row->update;
862     }
863   }
864
865   unless ($row) {
866     $row = $self->create($hash);
867   }
868
869   return $row;
870 }
871
872 =head2 get_cache
873
874 Gets the contents of the cache for the resultset.
875
876 =cut
877
878 sub get_cache {
879   my $self = shift;
880   return $self->{all_cache} || [];
881 }
882
883 =head2 set_cache
884
885 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
886
887 =cut
888
889 sub set_cache {
890   my ( $self, $data ) = @_;
891   $self->throw_exception("set_cache requires an arrayref")
892     if ref $data ne 'ARRAY';
893   my $result_class = $self->result_class;
894   foreach( @$data ) {
895     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
896       if ref $_ ne $result_class;
897   }
898   $self->{all_cache} = $data;
899 }
900
901 =head2 clear_cache
902
903 Clears the cache for the resultset.
904
905 =cut
906
907 sub clear_cache {
908   my $self = shift;
909   $self->set_cache([]);
910 }
911
912 =head2 related_resultset
913
914 Returns a related resultset for the supplied relationship name.
915
916   $rs = $rs->related_resultset('foo');
917
918 =cut
919
920 sub related_resultset {
921   my ( $self, $rel, @rest ) = @_;
922   $self->{related_resultsets} ||= {};
923   my $resultsets = $self->{related_resultsets};
924   if( !exists $resultsets->{$rel} ) {
925     #warn "fetching related resultset for rel '$rel'";
926     my $rel_obj = $self->result_source->relationship_info($rel);
927     $self->throw_exception(
928       "search_related: result source '" . $self->result_source->name .
929       "' has no such relationship ${rel}")
930       unless $rel_obj; #die Dumper $self->{attrs};
931     my $rs = $self->search(undef, { join => $rel });
932     #if( $self->{attrs}->{cache} ) {
933     #  $rs = $self->search(undef);
934     #}
935     #else {
936     #}
937     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
938     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
939     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
940                   && $rs->{attrs}{seen_join}{$rel} > 1
941                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
942                 : $rel);
943     $resultsets->{$rel} =
944       $self->result_source->schema->resultset($rel_obj->{class}
945            )->search( undef,
946              { %{$rs->{attrs}},
947                alias => $alias,
948                select => undef(),
949                as => undef() }
950            )->search(@rest);
951   }
952   return $resultsets->{$rel};
953 }
954
955 =head2 throw_exception
956
957 See Schema's throw_exception
958
959 =cut
960
961 sub throw_exception {
962   my $self=shift;
963   $self->result_source->schema->throw_exception(@_);
964 }
965
966 =head1 ATTRIBUTES
967
968 The resultset takes various attributes that modify its behavior. Here's an
969 overview of them:
970
971 =head2 order_by
972
973 Which column(s) to order the results by. This is currently passed through
974 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
975
976 =head2 cols
977
978 =head3 Arguments: (arrayref)
979
980 Shortcut to request a particular set of columns to be retrieved.  Adds
981 C<me.> onto the start of any column without a C<.> in it and sets C<select>
982 from that, then auto-populates C<as> from C<select> as normal.
983
984 =head2 include_columns
985
986 =head3 Arguments: (arrayref)
987
988 Shortcut to include additional columns in the returned results - for example
989
990   { include_columns => ['foo.name'], join => ['foo'] }
991
992 would add a 'name' column to the information passed to object inflation
993
994 =head2 select
995
996 =head3 Arguments: (arrayref)
997
998 Indicates which columns should be selected from the storage. You can use
999 column names, or in the case of RDBMS back ends, function or stored procedure
1000 names:
1001
1002   $rs = $schema->resultset('Foo')->search(
1003     {},
1004     {
1005       select => [
1006         'column_name',
1007         { count => 'column_to_count' },
1008         { sum => 'column_to_sum' }
1009       ]
1010     }
1011   );
1012
1013 When you use function/stored procedure names and do not supply an C<as>
1014 attribute, the column names returned are storage-dependent. E.g. MySQL would
1015 return a column named C<count(column_to_count)> in the above example.
1016
1017 =head2 as
1018
1019 =head3 Arguments: (arrayref)
1020
1021 Indicates column names for object inflation. This is used in conjunction with
1022 C<select>, usually when C<select> contains one or more function or stored
1023 procedure names:
1024
1025   $rs = $schema->resultset('Foo')->search(
1026     {},
1027     {
1028       select => [
1029         'column1',
1030         { count => 'column2' }
1031       ],
1032       as => [qw/ column1 column2_count /]
1033     }
1034   );
1035
1036   my $foo = $rs->first(); # get the first Foo
1037
1038 If the object against which the search is performed already has an accessor
1039 matching a column name specified in C<as>, the value can be retrieved using
1040 the accessor as normal:
1041
1042   my $column1 = $foo->column1();
1043
1044 If on the other hand an accessor does not exist in the object, you need to
1045 use C<get_column> instead:
1046
1047   my $column2_count = $foo->get_column('column2_count');
1048
1049 You can create your own accessors if required - see
1050 L<DBIx::Class::Manual::Cookbook> for details.
1051
1052 =head2 join
1053
1054 Contains a list of relationships that should be joined for this query.  For
1055 example:
1056
1057   # Get CDs by Nine Inch Nails
1058   my $rs = $schema->resultset('CD')->search(
1059     { 'artist.name' => 'Nine Inch Nails' },
1060     { join => 'artist' }
1061   );
1062
1063 Can also contain a hash reference to refer to the other relation's relations.
1064 For example:
1065
1066   package MyApp::Schema::Track;
1067   use base qw/DBIx::Class/;
1068   __PACKAGE__->table('track');
1069   __PACKAGE__->add_columns(qw/trackid cd position title/);
1070   __PACKAGE__->set_primary_key('trackid');
1071   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1072   1;
1073
1074   # In your application
1075   my $rs = $schema->resultset('Artist')->search(
1076     { 'track.title' => 'Teardrop' },
1077     {
1078       join     => { cd => 'track' },
1079       order_by => 'artist.name',
1080     }
1081   );
1082
1083 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1084 similarly for a third time). For e.g.
1085
1086   my $rs = $schema->resultset('Artist')->search(
1087     { 'cds.title'   => 'Foo',
1088       'cds_2.title' => 'Bar' },
1089     { join => [ qw/cds cds/ ] });
1090
1091 will return a set of all artists that have both a cd with title Foo and a cd
1092 with title Bar.
1093
1094 If you want to fetch related objects from other tables as well, see C<prefetch>
1095 below.
1096
1097 =head2 prefetch
1098
1099 =head3 Arguments: arrayref/hashref
1100
1101 Contains one or more relationships that should be fetched along with the main 
1102 query (when they are accessed afterwards they will have already been
1103 "prefetched").  This is useful for when you know you will need the related
1104 objects, because it saves at least one query:
1105
1106   my $rs = $schema->resultset('Tag')->search(
1107     {},
1108     {
1109       prefetch => {
1110         cd => 'artist'
1111       }
1112     }
1113   );
1114
1115 The initial search results in SQL like the following:
1116
1117   SELECT tag.*, cd.*, artist.* FROM tag
1118   JOIN cd ON tag.cd = cd.cdid
1119   JOIN artist ON cd.artist = artist.artistid
1120
1121 L<DBIx::Class> has no need to go back to the database when we access the
1122 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1123 case.
1124
1125 Simple prefetches will be joined automatically, so there is no need
1126 for a C<join> attribute in the above search. If you're prefetching to
1127 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1128 specify the join as well.
1129
1130 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1131 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1132 with an accessor type of 'single' or 'filter').
1133
1134 =head2 from
1135
1136 =head3 Arguments: (arrayref)
1137
1138 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1139 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1140 clauses.
1141
1142 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1143 C<join> will usually do what you need and it is strongly recommended that you
1144 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1145
1146 In simple terms, C<from> works as follows:
1147
1148     [
1149         { <alias> => <table>, -join-type => 'inner|left|right' }
1150         [] # nested JOIN (optional)
1151         { <table.column> = <foreign_table.foreign_key> }
1152     ]
1153
1154     JOIN
1155         <alias> <table>
1156         [JOIN ...]
1157     ON <table.column> = <foreign_table.foreign_key>
1158
1159 An easy way to follow the examples below is to remember the following:
1160
1161     Anything inside "[]" is a JOIN
1162     Anything inside "{}" is a condition for the enclosing JOIN
1163
1164 The following examples utilize a "person" table in a family tree application.
1165 In order to express parent->child relationships, this table is self-joined:
1166
1167     # Person->belongs_to('father' => 'Person');
1168     # Person->belongs_to('mother' => 'Person');
1169
1170 C<from> can be used to nest joins. Here we return all children with a father,
1171 then search against all mothers of those children:
1172
1173   $rs = $schema->resultset('Person')->search(
1174       {},
1175       {
1176           alias => 'mother', # alias columns in accordance with "from"
1177           from => [
1178               { mother => 'person' },
1179               [
1180                   [
1181                       { child => 'person' },
1182                       [
1183                           { father => 'person' },
1184                           { 'father.person_id' => 'child.father_id' }
1185                       ]
1186                   ],
1187                   { 'mother.person_id' => 'child.mother_id' }
1188               ],
1189           ]
1190       },
1191   );
1192
1193   # Equivalent SQL:
1194   # SELECT mother.* FROM person mother
1195   # JOIN (
1196   #   person child
1197   #   JOIN person father
1198   #   ON ( father.person_id = child.father_id )
1199   # )
1200   # ON ( mother.person_id = child.mother_id )
1201
1202 The type of any join can be controlled manually. To search against only people
1203 with a father in the person table, we could explicitly use C<INNER JOIN>:
1204
1205     $rs = $schema->resultset('Person')->search(
1206         {},
1207         {
1208             alias => 'child', # alias columns in accordance with "from"
1209             from => [
1210                 { child => 'person' },
1211                 [
1212                     { father => 'person', -join-type => 'inner' },
1213                     { 'father.id' => 'child.father_id' }
1214                 ],
1215             ]
1216         },
1217     );
1218
1219     # Equivalent SQL:
1220     # SELECT child.* FROM person child
1221     # INNER JOIN person father ON child.father_id = father.id
1222
1223 =head2 page
1224
1225 For a paged resultset, specifies which page to retrieve.  Leave unset
1226 for an unpaged resultset.
1227
1228 =head2 rows
1229
1230 For a paged resultset, how many rows per page:
1231
1232   rows => 10
1233
1234 Can also be used to simulate an SQL C<LIMIT>.
1235
1236 =head2 group_by
1237
1238 =head3 Arguments: (arrayref)
1239
1240 A arrayref of columns to group by. Can include columns of joined tables.
1241
1242   group_by => [qw/ column1 column2 ... /]
1243
1244 =head2 distinct
1245
1246 Set to 1 to group by all columns.
1247
1248 For more examples of using these attributes, see
1249 L<DBIx::Class::Manual::Cookbook>.
1250
1251 =cut
1252
1253 1;