Merge 'DBIx-Class-current' into 'trunk'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => 'result_source');
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   if (my $prefetch = delete $attrs->{prefetch}) {
106     foreach my $p (ref $prefetch eq 'ARRAY'
107               ? (@{$prefetch}) : ($prefetch)) {
108       if( ref $p eq 'HASH' ) {
109         foreach my $key (keys %$p) {
110           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
111             unless $seen{$key};
112         }
113       }
114       else {
115         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
116             unless $seen{$p};
117       }
118       my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
119       #die Dumper \@cols;
120       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
121       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
122     }
123   }
124
125   if ($attrs->{page}) {
126     $attrs->{rows} ||= 10;
127     $attrs->{offset} ||= 0;
128     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
129   }
130   my $new = {
131     result_source => $source,
132     cond => $attrs->{where},
133     from => $attrs->{from},
134     count => undef,
135     page => delete $attrs->{page},
136     pager => undef,
137     attrs => $attrs };
138   bless ($new, $class);
139   return $new;
140 }
141
142 =head2 search
143
144   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
145   my $new_rs = $rs->search({ foo => 3 });
146
147 If you need to pass in additional attributes but no additional condition,
148 call it as C<search({}, \%attrs);>.
149
150   # "SELECT foo, bar FROM $class_table"
151   my @all = $class->search({}, { cols => [qw/foo bar/] });
152
153 =cut
154
155 sub search {
156   my $self = shift;
157
158   my $rs;
159   if( @_ ) {
160     
161     my $attrs = { %{$self->{attrs}} };
162     my $having = delete $attrs->{having};
163     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
164      $attrs = { %$attrs, %{ pop(@_) } };
165     }
166
167     my $where = (@_
168                   ? ((@_ == 1 || ref $_[0] eq "HASH")
169                       ? shift
170                       : ((@_ % 2)
171                           ? $self->throw_exception(
172                               "Odd number of arguments to search")
173                           : {@_}))
174                   : undef());
175     if (defined $where) {
176       $where = (defined $attrs->{where}
177                 ? { '-and' =>
178                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
179                         $where, $attrs->{where} ] }
180                 : $where);
181       $attrs->{where} = $where;
182     }
183
184     if (defined $having) {
185       $having = (defined $attrs->{having}
186                 ? { '-and' =>
187                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
188                         $having, $attrs->{having} ] }
189                 : $having);
190       $attrs->{having} = $having;
191     }
192
193     $rs = (ref $self)->new($self->result_source, $attrs);
194   }
195   else {
196     $rs = $self;
197     $rs->reset();
198   }
199   return (wantarray ? $rs->all : $rs);
200 }
201
202 =head2 search_literal
203
204   my @obj    = $rs->search_literal($literal_where_cond, @bind);
205   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
206
207 Pass a literal chunk of SQL to be added to the conditional part of the
208 resultset.
209
210 =cut
211
212 sub search_literal {
213   my ($self, $cond, @vals) = @_;
214   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
215   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
216   return $self->search(\$cond, $attrs);
217 }
218
219 =head2 find
220
221 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
222
223 Finds a row based on its primary key or unique constraint. For example:
224
225   my $cd = $schema->resultset('CD')->find(5);
226
227 Also takes an optional C<key> attribute, to search by a specific key or unique
228 constraint. For example:
229
230   my $cd = $schema->resultset('CD')->find(
231     {
232       artist => 'Massive Attack',
233       title  => 'Mezzanine',
234     },
235     { key => 'artist_title' }
236   );
237
238 See also L</find_or_create> and L</update_or_create>.
239
240 =cut
241
242 sub find {
243   my ($self, @vals) = @_;
244   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
245
246   my @cols = $self->result_source->primary_columns;
247   if (exists $attrs->{key}) {
248     my %uniq = $self->result_source->unique_constraints;
249     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
250       unless exists $uniq{$attrs->{key}};
251     @cols = @{ $uniq{$attrs->{key}} };
252   }
253   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
254   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
255     unless @cols;
256
257   my $query;
258   if (ref $vals[0] eq 'HASH') {
259     $query = { %{$vals[0]} };
260   } elsif (@cols == @vals) {
261     $query = {};
262     @{$query}{@cols} = @vals;
263   } else {
264     $query = {@vals};
265   }
266   foreach (keys %$query) {
267     next if m/\./;
268     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
269   }
270   #warn Dumper($query);
271   return (keys %$attrs
272            ? $self->search($query,$attrs)->single
273            : $self->single($query));
274 }
275
276 =head2 search_related
277
278   $rs->search_related('relname', $cond?, $attrs?);
279
280 Search the specified relationship. Optionally specify a condition for matching
281 records.
282
283 =cut
284
285 sub search_related {
286   return shift->related_resultset(shift)->search(@_);
287 }
288
289 =head2 cursor
290
291 Returns a storage-driven cursor to the given resultset.
292
293 =cut
294
295 sub cursor {
296   my ($self) = @_;
297   my ($attrs) = $self->{attrs};
298   $attrs = { %$attrs };
299   return $self->{cursor}
300     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
301           $attrs->{where},$attrs);
302 }
303
304 =head2 single
305
306 Inflates the first result without creating a cursor
307
308 =cut
309
310 sub single {
311   my ($self, $extra) = @_;
312   my ($attrs) = $self->{attrs};
313   $attrs = { %$attrs };
314   if ($extra) {
315     if (defined $attrs->{where}) {
316       $attrs->{where} = {
317         '-and'
318           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
319                delete $attrs->{where}, $extra ]
320       };
321     } else {
322       $attrs->{where} = $extra;
323     }
324   }
325   my @data = $self->result_source->storage->select_single(
326           $self->{from}, $attrs->{select},
327           $attrs->{where},$attrs);
328   return (@data ? $self->_construct_object(@data) : ());
329 }
330
331
332 =head2 search_like
333
334 Perform a search, but use C<LIKE> instead of equality as the condition. Note
335 that this is simply a convenience method; you most likely want to use
336 L</search> with specific operators.
337
338 For more information, see L<DBIx::Class::Manual::Cookbook>.
339
340 =cut
341
342 sub search_like {
343   my $class    = shift;
344   my $attrs = { };
345   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
346     $attrs = pop(@_);
347   }
348   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
349   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
350   return $class->search($query, { %$attrs });
351 }
352
353 =head2 slice
354
355 =head3 Arguments: ($first, $last)
356
357 Returns a subset of elements from the resultset.
358
359 =cut
360
361 sub slice {
362   my ($self, $min, $max) = @_;
363   my $attrs = { %{ $self->{attrs} || {} } };
364   $attrs->{offset} ||= 0;
365   $attrs->{offset} += $min;
366   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
367   my $slice = (ref $self)->new($self->result_source, $attrs);
368   return (wantarray ? $slice->all : $slice);
369 }
370
371 =head2 next
372
373 Returns the next element in the resultset (C<undef> is there is none).
374
375 Can be used to efficiently iterate over records in the resultset:
376
377   my $rs = $schema->resultset('CD')->search({});
378   while (my $cd = $rs->next) {
379     print $cd->title;
380   }
381
382 =cut
383
384 sub next {
385   my ($self) = @_;
386   my $cache;
387   if( @{$cache = $self->{all_cache} || []}) {
388     $self->{all_cache_position} ||= 0;
389     my $obj = $cache->[$self->{all_cache_position}];
390     $self->{all_cache_position}++;
391     return $obj;
392   }
393   if ($self->{attrs}{cache}) {
394     $self->{all_cache_position} = 0;
395     return ($self->all)[0];
396   }
397   my @row = $self->cursor->next;
398 #  warn Dumper(\@row); use Data::Dumper;
399   return unless (@row);
400   return $self->_construct_object(@row);
401 }
402
403 sub _construct_object {
404   my ($self, @row) = @_;
405   my @row_orig = @row; # copy @row for key comparison later, because @row will change
406   my @as = @{ $self->{attrs}{as} };
407 #use Data::Dumper; warn Dumper \@as;
408   #warn "@cols -> @row";
409   my $info = [ {}, {} ];
410   foreach my $as (@as) {
411     my $rs = $self;
412     my $target = $info;
413     my @parts = split(/\./, $as);
414     my $col = pop(@parts);
415     foreach my $p (@parts) {
416       $target = $target->[1]->{$p} ||= [];
417       
418       $rs = $rs->related_resultset($p) if $rs->{attrs}->{cache};
419     }
420     
421     $target->[0]->{$col} = shift @row
422       if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
423   }
424   #use Data::Dumper; warn Dumper(\@as, $info);
425   my $new = $self->result_source->result_class->inflate_result(
426               $self->result_source, @$info);
427   $new = $self->{attrs}{record_filter}->($new)
428     if exists $self->{attrs}{record_filter};
429  
430   if( $self->{attrs}->{cache} ) {
431     while( my( $rel, $rs ) = each( %{$self->{related_resultsets}} ) ) {
432       $rs->all;
433       #warn "$rel:", @{$rs->get_cache};
434     }
435     $self->build_rr( $self, $new );
436   }
437  
438   return $new;
439 }
440   
441 sub build_rr {
442   # build related resultsets for supplied object
443   my ( $self, $context, $obj ) = @_;
444   
445   my $re = qr/^\w+\./;
446   while( my ($rel, $rs) = each( %{$context->{related_resultsets}} ) ) {  
447     #warn "context:", $context->result_source->name, ", rel:$rel, rs:", $rs->result_source->name;
448     my @objs = ();
449     my $map = {};
450     my $cond = $context->result_source->relationship_info($rel)->{cond};
451     keys %$cond;
452     while( my( $rel_key, $pk ) = each(%$cond) ) {
453       $rel_key =~ s/$re//;
454       $pk =~ s/$re//;
455       $map->{$rel_key} = $pk;
456     }
457     
458     $rs->reset();
459     while( my $rel_obj = $rs->next ) {
460       while( my( $rel_key, $pk ) = each(%$map) ) {
461         if( $rel_obj->get_column($rel_key) eq $obj->get_column($pk) ) {
462           push @objs, $rel_obj;
463         }
464       }
465     }
466
467     my $rel_rs = $obj->related_resultset($rel);
468     $rel_rs->{attrs}->{cache} = 1;
469     $rel_rs->set_cache( \@objs );
470     
471     while( my $rel_obj = $rel_rs->next ) {
472       $self->build_rr( $rs, $rel_obj );
473     }
474     
475   }
476   
477 }
478
479 =head2 result_source
480
481 Returns a reference to the result source for this recordset.
482
483 =cut
484
485
486 =head2 count
487
488 Performs an SQL C<COUNT> with the same query as the resultset was built
489 with to find the number of elements. If passed arguments, does a search
490 on the resultset and counts the results of that.
491
492 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
493 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
494 not support C<DISTINCT> with multiple columns. If you are using such a
495 database, you should only use columns from the main table in your C<group_by>
496 clause.
497
498 =cut
499
500 sub count {
501   my $self = shift;
502   return $self->search(@_)->count if @_ && defined $_[0];
503   unless (defined $self->{count}) {
504     return scalar @{ $self->get_cache }
505       if @{ $self->get_cache };
506     my $group_by;
507     my $select = { 'count' => '*' };
508     my $attrs = { %{ $self->{attrs} } };
509     if( $group_by = delete $attrs->{group_by} ) {
510       delete $attrs->{having};
511       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
512       # todo: try CONCAT for multi-column pk
513       my @pk = $self->result_source->primary_columns;
514       if( scalar(@pk) == 1 ) {
515         my $pk = shift(@pk);
516         my $alias = $attrs->{alias};
517         my $re = qr/^($alias\.)?$pk$/;
518         foreach my $column ( @distinct) {
519           if( $column =~ $re ) {
520             @distinct = ( $column );
521             last;
522           }
523         } 
524       }
525
526       $select = { count => { 'distinct' => \@distinct } };
527       #use Data::Dumper; die Dumper $select;
528     }
529
530     $attrs->{select} = $select;
531     $attrs->{as} = [ 'count' ];
532     # offset, order by and page are not needed to count. record_filter is cdbi
533     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
534         
535     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
536   }
537   return 0 unless $self->{count};
538   my $count = $self->{count};
539   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
540   $count = $self->{attrs}{rows} if
541     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
542   return $count;
543 }
544
545 =head2 count_literal
546
547 Calls L</search_literal> with the passed arguments, then L</count>.
548
549 =cut
550
551 sub count_literal { shift->search_literal(@_)->count; }
552
553 =head2 all
554
555 Returns all elements in the resultset. Called implictly if the resultset
556 is returned in list context.
557
558 =cut
559
560 sub all {
561   my ($self) = @_;
562   return @{ $self->get_cache }
563     if @{ $self->get_cache };
564   if( $self->{attrs}->{cache} ) {
565     my @obj = map { $self->_construct_object(@$_); }
566             $self->cursor->all;
567     $self->set_cache( \@obj );
568     return @obj;
569   }
570   return map { $self->_construct_object(@$_); }
571            $self->cursor->all;
572 }
573
574 =head2 reset
575
576 Resets the resultset's cursor, so you can iterate through the elements again.
577
578 =cut
579
580 sub reset {
581   my ($self) = @_;
582   $self->{all_cache_position} = 0;
583   $self->cursor->reset;
584   return $self;
585 }
586
587 =head2 first
588
589 Resets the resultset and returns the first element.
590
591 =cut
592
593 sub first {
594   return $_[0]->reset->next;
595 }
596
597 =head2 update
598
599 =head3 Arguments: (\%values)
600
601 Sets the specified columns in the resultset to the supplied values.
602
603 =cut
604
605 sub update {
606   my ($self, $values) = @_;
607   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
608   return $self->result_source->storage->update(
609            $self->result_source->from, $values, $self->{cond});
610 }
611
612 =head2 update_all
613
614 =head3 Arguments: (\%values)
615
616 Fetches all objects and updates them one at a time.  Note that C<update_all>
617 will run cascade triggers while L</update> will not.
618
619 =cut
620
621 sub update_all {
622   my ($self, $values) = @_;
623   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
624   foreach my $obj ($self->all) {
625     $obj->set_columns($values)->update;
626   }
627   return 1;
628 }
629
630 =head2 delete
631
632 Deletes the contents of the resultset from its result source.
633
634 =cut
635
636 sub delete {
637   my ($self) = @_;
638   my $del = {};
639   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
640     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
641   if (ref $self->{cond} eq 'ARRAY') {
642     $del = [ map { my %hash;
643       foreach my $key (keys %{$_}) {
644         $key =~ /([^\.]+)$/;
645         $hash{$1} = $_->{$key};
646       }; \%hash; } @{$self->{cond}} ];
647   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
648     $del->{-and} = [ map { my %hash;
649       foreach my $key (keys %{$_}) {
650         $key =~ /([^\.]+)$/;
651         $hash{$1} = $_->{$key};
652       }; \%hash; } @{$self->{cond}{-and}} ];
653   } else {
654     foreach my $key (keys %{$self->{cond}}) {
655       $key =~ /([^\.]+)$/;
656       $del->{$1} = $self->{cond}{$key};
657     }
658   }
659   $self->result_source->storage->delete($self->result_source->from, $del);
660   return 1;
661 }
662
663 =head2 delete_all
664
665 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
666 will run cascade triggers while L</delete> will not.
667
668 =cut
669
670 sub delete_all {
671   my ($self) = @_;
672   $_->delete for $self->all;
673   return 1;
674 }
675
676 =head2 pager
677
678 Returns a L<Data::Page> object for the current resultset. Only makes
679 sense for queries with a C<page> attribute.
680
681 =cut
682
683 sub pager {
684   my ($self) = @_;
685   my $attrs = $self->{attrs};
686   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
687   $attrs->{rows} ||= 10;
688   $self->count;
689   return $self->{pager} ||= Data::Page->new(
690     $self->{count}, $attrs->{rows}, $self->{page});
691 }
692
693 =head2 page
694
695 =head3 Arguments: ($page_num)
696
697 Returns a new resultset for the specified page.
698
699 =cut
700
701 sub page {
702   my ($self, $page) = @_;
703   my $attrs = { %{$self->{attrs}} };
704   $attrs->{page} = $page;
705   return (ref $self)->new($self->result_source, $attrs);
706 }
707
708 =head2 new_result
709
710 =head3 Arguments: (\%vals)
711
712 Creates a result in the resultset's result class.
713
714 =cut
715
716 sub new_result {
717   my ($self, $values) = @_;
718   $self->throw_exception( "new_result needs a hash" )
719     unless (ref $values eq 'HASH');
720   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
721     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
722   my %new = %$values;
723   my $alias = $self->{attrs}{alias};
724   foreach my $key (keys %{$self->{cond}||{}}) {
725     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
726   }
727   my $obj = $self->result_source->result_class->new(\%new);
728   $obj->result_source($self->result_source) if $obj->can('result_source');
729   $obj;
730 }
731
732 =head2 create
733
734 =head3 Arguments: (\%vals)
735
736 Inserts a record into the resultset and returns the object.
737
738 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
739
740 =cut
741
742 sub create {
743   my ($self, $attrs) = @_;
744   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
745   return $self->new_result($attrs)->insert;
746 }
747
748 =head2 find_or_create
749
750 =head3 Arguments: (\%vals, \%attrs?)
751
752   $class->find_or_create({ key => $val, ... });
753
754 Searches for a record matching the search condition; if it doesn't find one,
755 creates one and returns that instead.
756
757   my $cd = $schema->resultset('CD')->find_or_create({
758     cdid   => 5,
759     artist => 'Massive Attack',
760     title  => 'Mezzanine',
761     year   => 2005,
762   });
763
764 Also takes an optional C<key> attribute, to search by a specific key or unique
765 constraint. For example:
766
767   my $cd = $schema->resultset('CD')->find_or_create(
768     {
769       artist => 'Massive Attack',
770       title  => 'Mezzanine',
771     },
772     { key => 'artist_title' }
773   );
774
775 See also L</find> and L</update_or_create>.
776
777 =cut
778
779 sub find_or_create {
780   my $self     = shift;
781   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
782   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
783   my $exists   = $self->find($hash, $attrs);
784   return defined($exists) ? $exists : $self->create($hash);
785 }
786
787 =head2 update_or_create
788
789   $class->update_or_create({ key => $val, ... });
790
791 First, search for an existing row matching one of the unique constraints
792 (including the primary key) on the source of this resultset.  If a row is
793 found, update it with the other given column values.  Otherwise, create a new
794 row.
795
796 Takes an optional C<key> attribute to search on a specific unique constraint.
797 For example:
798
799   # In your application
800   my $cd = $schema->resultset('CD')->update_or_create(
801     {
802       artist => 'Massive Attack',
803       title  => 'Mezzanine',
804       year   => 1998,
805     },
806     { key => 'artist_title' }
807   );
808
809 If no C<key> is specified, it searches on all unique constraints defined on the
810 source, including the primary key.
811
812 If the C<key> is specified as C<primary>, search only on the primary key.
813
814 See also L</find> and L</find_or_create>.
815
816 =cut
817
818 sub update_or_create {
819   my $self = shift;
820
821   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
822   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
823
824   my %unique_constraints = $self->result_source->unique_constraints;
825   my @constraint_names   = (exists $attrs->{key}
826                             ? ($attrs->{key})
827                             : keys %unique_constraints);
828
829   my @unique_hashes;
830   foreach my $name (@constraint_names) {
831     my @unique_cols = @{ $unique_constraints{$name} };
832     my %unique_hash =
833       map  { $_ => $hash->{$_} }
834       grep { exists $hash->{$_} }
835       @unique_cols;
836
837     push @unique_hashes, \%unique_hash
838       if (scalar keys %unique_hash == scalar @unique_cols);
839   }
840
841   my $row;
842   if (@unique_hashes) {
843     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
844     if ($row) {
845       $row->set_columns($hash);
846       $row->update;
847     }
848   }
849
850   unless ($row) {
851     $row = $self->create($hash);
852   }
853
854   return $row;
855 }
856
857 =head2 get_cache
858
859 Gets the contents of the cache for the resultset.
860
861 =cut
862
863 sub get_cache {
864   my $self = shift;
865   return $self->{all_cache} || [];
866 }
867
868 =head2 set_cache
869
870 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
871
872 =cut
873
874 sub set_cache {
875   my ( $self, $data ) = @_;
876   $self->throw_exception("set_cache requires an arrayref")
877     if ref $data ne 'ARRAY';
878   my $result_class = $self->result_source->result_class;
879   foreach( @$data ) {
880     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
881       if ref $_ ne $result_class;
882   }
883   $self->{all_cache} = $data;
884 }
885
886 =head2 clear_cache
887
888 Clears the cache for the resultset.
889
890 =cut
891
892 sub clear_cache {
893   my $self = shift;
894   $self->set_cache([]);
895 }
896
897 =head2 related_resultset
898
899 Returns a related resultset for the supplied relationship name.
900
901   $rs = $rs->related_resultset('foo');
902
903 =cut
904
905 sub related_resultset {
906   my ( $self, $rel, @rest ) = @_;
907   $self->{related_resultsets} ||= {};
908   my $resultsets = $self->{related_resultsets};
909   if( !exists $resultsets->{$rel} ) {
910     #warn "fetching related resultset for rel '$rel'";
911     my $rel_obj = $self->result_source->relationship_info($rel);
912     $self->throw_exception(
913       "search_related: result source '" . $self->result_source->name .
914       "' has no such relationship ${rel}")
915       unless $rel_obj; #die Dumper $self->{attrs};
916     my $rs;
917     if( $self->{attrs}->{cache} ) {
918       $rs = $self->search(undef);
919     }
920     else {
921       $rs = $self->search(undef, { join => $rel });
922     }
923     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
924     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
925     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
926                   && $rs->{attrs}{seen_join}{$rel} > 1
927                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
928                 : $rel);
929     $resultsets->{$rel} =
930       $self->result_source->schema->resultset($rel_obj->{class}
931            )->search( undef,
932              { %{$rs->{attrs}},
933                alias => $alias,
934                select => undef(),
935                as => undef() }
936            )->search(@rest);
937   }
938   return $resultsets->{$rel};
939 }
940
941 =head2 throw_exception
942
943 See Schema's throw_exception
944
945 =cut
946
947 sub throw_exception {
948   my $self=shift;
949   $self->result_source->schema->throw_exception(@_);
950 }
951
952 =head1 ATTRIBUTES
953
954 The resultset takes various attributes that modify its behavior. Here's an
955 overview of them:
956
957 =head2 order_by
958
959 Which column(s) to order the results by. This is currently passed through
960 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
961
962 =head2 cols
963
964 =head3 Arguments: (arrayref)
965
966 Shortcut to request a particular set of columns to be retrieved.  Adds
967 C<me.> onto the start of any column without a C<.> in it and sets C<select>
968 from that, then auto-populates C<as> from C<select> as normal.
969
970 =head2 include_columns
971
972 =head3 Arguments: (arrayref)
973
974 Shortcut to include additional columns in the returned results - for example
975
976   { include_columns => ['foo.name'], join => ['foo'] }
977
978 would add a 'name' column to the information passed to object inflation
979
980 =head2 select
981
982 =head3 Arguments: (arrayref)
983
984 Indicates which columns should be selected from the storage. You can use
985 column names, or in the case of RDBMS back ends, function or stored procedure
986 names:
987
988   $rs = $schema->resultset('Foo')->search(
989     {},
990     {
991       select => [
992         'column_name',
993         { count => 'column_to_count' },
994         { sum => 'column_to_sum' }
995       ]
996     }
997   );
998
999 When you use function/stored procedure names and do not supply an C<as>
1000 attribute, the column names returned are storage-dependent. E.g. MySQL would
1001 return a column named C<count(column_to_count)> in the above example.
1002
1003 =head2 as
1004
1005 =head3 Arguments: (arrayref)
1006
1007 Indicates column names for object inflation. This is used in conjunction with
1008 C<select>, usually when C<select> contains one or more function or stored
1009 procedure names:
1010
1011   $rs = $schema->resultset('Foo')->search(
1012     {},
1013     {
1014       select => [
1015         'column1',
1016         { count => 'column2' }
1017       ],
1018       as => [qw/ column1 column2_count /]
1019     }
1020   );
1021
1022   my $foo = $rs->first(); # get the first Foo
1023
1024 If the object against which the search is performed already has an accessor
1025 matching a column name specified in C<as>, the value can be retrieved using
1026 the accessor as normal:
1027
1028   my $column1 = $foo->column1();
1029
1030 If on the other hand an accessor does not exist in the object, you need to
1031 use C<get_column> instead:
1032
1033   my $column2_count = $foo->get_column('column2_count');
1034
1035 You can create your own accessors if required - see
1036 L<DBIx::Class::Manual::Cookbook> for details.
1037
1038 =head2 join
1039
1040 Contains a list of relationships that should be joined for this query.  For
1041 example:
1042
1043   # Get CDs by Nine Inch Nails
1044   my $rs = $schema->resultset('CD')->search(
1045     { 'artist.name' => 'Nine Inch Nails' },
1046     { join => 'artist' }
1047   );
1048
1049 Can also contain a hash reference to refer to the other relation's relations.
1050 For example:
1051
1052   package MyApp::Schema::Track;
1053   use base qw/DBIx::Class/;
1054   __PACKAGE__->table('track');
1055   __PACKAGE__->add_columns(qw/trackid cd position title/);
1056   __PACKAGE__->set_primary_key('trackid');
1057   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1058   1;
1059
1060   # In your application
1061   my $rs = $schema->resultset('Artist')->search(
1062     { 'track.title' => 'Teardrop' },
1063     {
1064       join     => { cd => 'track' },
1065       order_by => 'artist.name',
1066     }
1067   );
1068
1069 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1070 similarly for a third time). For e.g.
1071
1072   my $rs = $schema->resultset('Artist')->search(
1073     { 'cds.title'   => 'Foo',
1074       'cds_2.title' => 'Bar' },
1075     { join => [ qw/cds cds/ ] });
1076
1077 will return a set of all artists that have both a cd with title Foo and a cd
1078 with title Bar.
1079
1080 If you want to fetch related objects from other tables as well, see C<prefetch>
1081 below.
1082
1083 =head2 prefetch
1084
1085 =head3 Arguments: arrayref/hashref
1086
1087 Contains one or more relationships that should be fetched along with the main 
1088 query (when they are accessed afterwards they will have already been
1089 "prefetched").  This is useful for when you know you will need the related
1090 objects, because it saves at least one query:
1091
1092   my $rs = $schema->resultset('Tag')->search(
1093     {},
1094     {
1095       prefetch => {
1096         cd => 'artist'
1097       }
1098     }
1099   );
1100
1101 The initial search results in SQL like the following:
1102
1103   SELECT tag.*, cd.*, artist.* FROM tag
1104   JOIN cd ON tag.cd = cd.cdid
1105   JOIN artist ON cd.artist = artist.artistid
1106
1107 L<DBIx::Class> has no need to go back to the database when we access the
1108 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1109 case.
1110
1111 Simple prefetches will be joined automatically, so there is no need
1112 for a C<join> attribute in the above search. If you're prefetching to
1113 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1114 specify the join as well.
1115
1116 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1117 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1118 with an accessor type of 'single' or 'filter').
1119
1120 =head2 from
1121
1122 =head3 Arguments: (arrayref)
1123
1124 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1125 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1126 clauses.
1127
1128 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1129 C<join> will usually do what you need and it is strongly recommended that you
1130 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1131
1132 In simple terms, C<from> works as follows:
1133
1134     [
1135         { <alias> => <table>, -join-type => 'inner|left|right' }
1136         [] # nested JOIN (optional)
1137         { <table.column> = <foreign_table.foreign_key> }
1138     ]
1139
1140     JOIN
1141         <alias> <table>
1142         [JOIN ...]
1143     ON <table.column> = <foreign_table.foreign_key>
1144
1145 An easy way to follow the examples below is to remember the following:
1146
1147     Anything inside "[]" is a JOIN
1148     Anything inside "{}" is a condition for the enclosing JOIN
1149
1150 The following examples utilize a "person" table in a family tree application.
1151 In order to express parent->child relationships, this table is self-joined:
1152
1153     # Person->belongs_to('father' => 'Person');
1154     # Person->belongs_to('mother' => 'Person');
1155
1156 C<from> can be used to nest joins. Here we return all children with a father,
1157 then search against all mothers of those children:
1158
1159   $rs = $schema->resultset('Person')->search(
1160       {},
1161       {
1162           alias => 'mother', # alias columns in accordance with "from"
1163           from => [
1164               { mother => 'person' },
1165               [
1166                   [
1167                       { child => 'person' },
1168                       [
1169                           { father => 'person' },
1170                           { 'father.person_id' => 'child.father_id' }
1171                       ]
1172                   ],
1173                   { 'mother.person_id' => 'child.mother_id' }
1174               ],
1175           ]
1176       },
1177   );
1178
1179   # Equivalent SQL:
1180   # SELECT mother.* FROM person mother
1181   # JOIN (
1182   #   person child
1183   #   JOIN person father
1184   #   ON ( father.person_id = child.father_id )
1185   # )
1186   # ON ( mother.person_id = child.mother_id )
1187
1188 The type of any join can be controlled manually. To search against only people
1189 with a father in the person table, we could explicitly use C<INNER JOIN>:
1190
1191     $rs = $schema->resultset('Person')->search(
1192         {},
1193         {
1194             alias => 'child', # alias columns in accordance with "from"
1195             from => [
1196                 { child => 'person' },
1197                 [
1198                     { father => 'person', -join-type => 'inner' },
1199                     { 'father.id' => 'child.father_id' }
1200                 ],
1201             ]
1202         },
1203     );
1204
1205     # Equivalent SQL:
1206     # SELECT child.* FROM person child
1207     # INNER JOIN person father ON child.father_id = father.id
1208
1209 =head2 page
1210
1211 For a paged resultset, specifies which page to retrieve.  Leave unset
1212 for an unpaged resultset.
1213
1214 =head2 rows
1215
1216 For a paged resultset, how many rows per page:
1217
1218   rows => 10
1219
1220 Can also be used to simulate an SQL C<LIMIT>.
1221
1222 =head2 group_by
1223
1224 =head3 Arguments: (arrayref)
1225
1226 A arrayref of columns to group by. Can include columns of joined tables.
1227
1228   group_by => [qw/ column1 column2 ... /]
1229
1230 =head2 distinct
1231
1232 Set to 1 to group by all columns.
1233
1234 For more examples of using these attributes, see
1235 L<DBIx::Class::Manual::Cookbook>.
1236
1237 =cut
1238
1239 1;