920ee00dbb08802a2d202b6cbc6bfee64039e174
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => 'count',
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => 'result_source');
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->table('artist');
36   __PACKAGE__->add_columns(qw/artistid name/);
37   __PACKAGE__->set_primary_key('artistid');
38   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
39   1;
40
41   package MyApp::Schema::CD;
42   use base qw/DBIx::Class/;
43   __PACKAGE__->table('artist');
44   __PACKAGE__->add_columns(qw/cdid artist title year/);
45   __PACKAGE__->set_primary_key('cdid');
46   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
47   1;
48
49 =head1 METHODS
50
51 =head2 new
52
53 =head3 Arguments: ($source, \%$attrs)
54
55 The resultset constructor. Takes a source object (usually a
56 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATRRIBUTES>
57 below).  Does not perform any queries -- these are executed as needed by the
58 other methods.
59
60 Generally you won't need to construct a resultset manually.  You'll
61 automatically get one from e.g. a L</search> called in scalar context:
62
63   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
64
65 =cut
66
67 sub new {
68   my $class = shift;
69   return $class->new_result(@_) if ref $class;
70   my ($source, $attrs) = @_;
71   #use Data::Dumper; warn Dumper($attrs);
72   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
73   my %seen;
74   my $alias = ($attrs->{alias} ||= 'me');
75   if ($attrs->{cols} || !$attrs->{select}) {
76     delete $attrs->{as} if $attrs->{cols};
77     my @cols = ($attrs->{cols}
78                  ? @{delete $attrs->{cols}}
79                  : $source->columns);
80     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
81   }
82   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
83   if (my $include = delete $attrs->{include_columns}) {
84     push(@{$attrs->{select}}, @$include);
85     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
86   }
87   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
88   $attrs->{from} ||= [ { $alias => $source->from } ];
89   $attrs->{seen_join} ||= {};
90   if (my $join = delete $attrs->{join}) {
91     foreach my $j (ref $join eq 'ARRAY'
92               ? (@{$join}) : ($join)) {
93       if (ref $j eq 'HASH') {
94         $seen{$_} = 1 foreach keys %$j;
95       } else {
96         $seen{$j} = 1;
97       }
98     }
99     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
100   }
101   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
102
103   if (my $prefetch = delete $attrs->{prefetch}) {
104     foreach my $p (ref $prefetch eq 'ARRAY'
105               ? (@{$prefetch}) : ($prefetch)) {
106       if( ref $p eq 'HASH' ) {
107         foreach my $key (keys %$p) {
108           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
109             unless $seen{$key};
110         }
111       }
112       else {
113         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
114             unless $seen{$p};
115       }
116       my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
117       #die Dumper \@cols;
118       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
119       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
120     }
121   }
122
123   if ($attrs->{page}) {
124     $attrs->{rows} ||= 10;
125     $attrs->{offset} ||= 0;
126     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
127   }
128   my $new = {
129     result_source => $source,
130     cond => $attrs->{where},
131     from => $attrs->{from},
132     count => undef,
133     page => delete $attrs->{page},
134     pager => undef,
135     attrs => $attrs };
136   bless ($new, $class);
137   return $new;
138 }
139
140 =head2 search
141
142   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
143   my $new_rs = $rs->search({ foo => 3 });
144
145 If you need to pass in additional attributes but no additional condition,
146 call it as C<search({}, \%attrs);>.
147
148   # "SELECT foo, bar FROM $class_table"
149   my @all = $class->search({}, { cols => [qw/foo bar/] });
150
151 =cut
152
153 sub search {
154   my $self = shift;
155
156   #use Data::Dumper;warn Dumper(@_);
157   my $rs;
158   if( @_ ) {
159     
160     my $attrs = { %{$self->{attrs}} };
161     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
162      $attrs = { %$attrs, %{ pop(@_) } };
163     }
164
165     my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
166     if (defined $where) {
167       $where = (defined $attrs->{where}
168                 ? { '-and' =>
169                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
170                         $where, $attrs->{where} ] }
171                 : $where);
172       $attrs->{where} = $where;
173     }
174
175     $rs = (ref $self)->new($self->result_source, $attrs);
176   }
177   else {
178     $rs = $self;
179     $rs->reset();
180   }
181   return (wantarray ? $rs->all : $rs);
182 }
183
184 =head2 search_literal
185
186   my @obj    = $rs->search_literal($literal_where_cond, @bind);
187   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
188
189 Pass a literal chunk of SQL to be added to the conditional part of the
190 resultset.
191
192 =cut
193                                                          
194 sub search_literal {
195   my ($self, $cond, @vals) = @_;
196   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
197   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
198   return $self->search(\$cond, $attrs);
199 }
200
201 =head2 find
202
203 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
204
205 Finds a row based on its primary key or unique constraint. For example:
206
207   my $cd = $schema->resultset('CD')->find(5);
208
209 Also takes an optional C<key> attribute, to search by a specific key or unique
210 constraint. For example:
211
212   my $cd = $schema->resultset('CD')->find_or_create(
213     {
214       artist => 'Massive Attack',
215       title  => 'Mezzanine',
216     },
217     { key => 'artist_title' }
218   );
219
220 See also L</find_or_create> and L</update_or_create>.
221
222 =cut
223
224 sub find {
225   my ($self, @vals) = @_;
226   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
227
228   my @cols = $self->result_source->primary_columns;
229   if (exists $attrs->{key}) {
230     my %uniq = $self->result_source->unique_constraints;
231     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
232       unless exists $uniq{$attrs->{key}};
233     @cols = @{ $uniq{$attrs->{key}} };
234   }
235   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
236   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
237     unless @cols;
238
239   my $query;
240   if (ref $vals[0] eq 'HASH') {
241     $query = { %{$vals[0]} };
242   } elsif (@cols == @vals) {
243     $query = {};
244     @{$query}{@cols} = @vals;
245   } else {
246     $query = {@vals};
247   }
248   foreach (keys %$query) {
249     next if m/\./;
250     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
251   }
252   #warn Dumper($query);
253   return (keys %$attrs
254            ? $self->search($query,$attrs)->single
255            : $self->single($query));
256 }
257
258 =head2 search_related
259
260   $rs->search_related('relname', $cond?, $attrs?);
261
262 Search the specified relationship. Optionally specify a condition for matching
263 records.
264
265 =cut
266
267 sub search_related {
268   return shift->related_resultset(shift)->search(@_);
269 }
270
271 =head2 cursor
272
273 Returns a storage-driven cursor to the given resultset.
274
275 =cut
276
277 sub cursor {
278   my ($self) = @_;
279   my ($attrs) = $self->{attrs};
280   $attrs = { %$attrs };
281   return $self->{cursor}
282     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
283           $attrs->{where},$attrs);
284 }
285
286 =head2 single
287
288 Inflates the first result without creating a cursor
289
290 =cut
291
292 sub single {
293   my ($self, $extra) = @_;
294   my ($attrs) = $self->{attrs};
295   $attrs = { %$attrs };
296   if ($extra) {
297     if (defined $attrs->{where}) {
298       $attrs->{where} = {
299         '-and'
300           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
301                delete $attrs->{where}, $extra ]
302       };
303     } else {
304       $attrs->{where} = $extra;
305     }
306   }
307   my @data = $self->result_source->storage->select_single(
308           $self->{from}, $attrs->{select},
309           $attrs->{where},$attrs);
310   return (@data ? $self->_construct_object(@data) : ());
311 }
312
313
314 =head2 search_like
315
316 Perform a search, but use C<LIKE> instead of equality as the condition. Note
317 that this is simply a convenience method; you most likely want to use
318 L</search> with specific operators.
319
320 For more information, see L<DBIx::Class::Manual::Cookbook>.
321
322 =cut
323
324 sub search_like {
325   my $class    = shift;
326   my $attrs = { };
327   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
328     $attrs = pop(@_);
329   }
330   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
331   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
332   return $class->search($query, { %$attrs });
333 }
334
335 =head2 slice
336
337 =head3 Arguments: ($first, $last)
338
339 Returns a subset of elements from the resultset.
340
341 =cut
342
343 sub slice {
344   my ($self, $min, $max) = @_;
345   my $attrs = { %{ $self->{attrs} || {} } };
346   $attrs->{offset} ||= 0;
347   $attrs->{offset} += $min;
348   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
349   my $slice = (ref $self)->new($self->result_source, $attrs);
350   return (wantarray ? $slice->all : $slice);
351 }
352
353 =head2 next
354
355 Returns the next element in the resultset (C<undef> is there is none).
356
357 Can be used to efficiently iterate over records in the resultset:
358
359   my $rs = $schema->resultset('CD')->search({});
360   while (my $cd = $rs->next) {
361     print $cd->title;
362   }
363
364 =cut
365
366 sub next {
367   my ($self) = @_;
368   my $cache = $self->get_cache;
369   if( @$cache ) {
370     $self->{all_cache_position} ||= 0;
371     my $obj = $cache->[$self->{all_cache_position}];
372     $self->{all_cache_position}++;
373     return $obj;
374   }
375   my @row = $self->cursor->next;
376 #  warn Dumper(\@row); use Data::Dumper;
377   return unless (@row);
378   return $self->_construct_object(@row);
379 }
380
381 sub _construct_object {
382   my ($self, @row) = @_;
383   my @row_orig = @row; # copy @row for key comparison later, because @row will change
384   my @as = @{ $self->{attrs}{as} };
385   #warn "@cols -> @row";
386   my $info = [ {}, {} ];
387   foreach my $as (@as) {
388     my $rs = $self;
389     my $target = $info;
390     my @parts = split(/\./, $as);
391     my $col = pop(@parts);
392     foreach my $p (@parts) {
393       $target = $target->[1]->{$p} ||= [];
394       
395       # if cache is enabled, fetch inflated objs for prefetch
396       if( $rs->{attrs}->{cache} ) {
397         my $rel_info = $rs->result_source->relationship_info($p);
398         my $cond = $rel_info->{cond};
399         my $parent_rs = $rs;
400         $rs = $rs->related_resultset($p);
401         $rs->{attrs}->{cache} = 1;
402         my @objs = ();
403           
404         # populate related resultset's cache if empty
405         if( !@{ $rs->get_cache } ) {
406           $rs->all;
407         }
408
409         # get ordinals for pk columns in $row, so values can be compared
410         my $map = {};
411         keys %$cond;
412         my $re = qr/^\w+\./;
413         while( my( $rel_key, $pk ) = ( each %$cond ) ) {
414           $rel_key =~ s/$re//;
415           $pk =~ s/$re//;
416           $map->{$rel_key} = $pk;
417         } #die Dumper $map;
418           
419         keys %$map;
420         while( my( $rel_key, $pk ) = each( %$map ) ) {
421           my $i = 0;
422           foreach my $col ( $parent_rs->result_source->columns ) {
423             if( $col eq $pk ) {
424               $map->{$rel_key} = $i;
425             }
426             $i++;
427           }
428         } #die Dumper $map;
429
430         $rs->reset(); # reset cursor/cache position 
431           
432         # get matching objects for inflation
433         OBJ: while( my $rel_obj = $rs->next ) {
434           keys %$map;
435           KEYS: while( my( $rel_key, $ordinal ) = each %$map ) {
436             # use get_column to avoid auto inflation (want scalar value)
437             if( $rel_obj->get_column($rel_key) ne $row_orig[$ordinal] ) {
438               next OBJ;
439             }
440             push @objs, $rel_obj;
441           }
442         }
443         $target->[0] = \@objs;
444       }
445     }
446     $target->[0]->{$col} = shift @row
447       if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
448   }
449   #use Data::Dumper; warn Dumper(\@as, $info);
450   my $new = $self->result_source->result_class->inflate_result(
451               $self->result_source, @$info);
452   $new = $self->{attrs}{record_filter}->($new)
453     if exists $self->{attrs}{record_filter};
454   return $new;
455 }
456
457 =head2 result_source
458
459 Returns a reference to the result source for this recordset.
460
461 =cut
462
463
464 =head2 count
465
466 Performs an SQL C<COUNT> with the same query as the resultset was built
467 with to find the number of elements. If passed arguments, does a search
468 on the resultset and counts the results of that.
469
470 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
471 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
472 not support C<DISTINCT> with multiple columns. If you are using such a
473 database, you should only use columns from the main table in your C<group_by>
474 clause.
475
476 =cut
477
478 sub count {
479   my $self = shift;
480   return $self->search(@_)->count if @_ && defined $_[0];
481   unless (defined $self->{count}) {
482     return scalar @{ $self->get_cache }
483       if @{ $self->get_cache };
484     my $group_by;
485     my $select = { 'count' => '*' };
486     if( $group_by = delete $self->{attrs}{group_by} ) {
487       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
488       # todo: try CONCAT for multi-column pk
489       my @pk = $self->result_source->primary_columns;
490       if( scalar(@pk) == 1 ) {
491         my $pk = shift(@pk);
492         my $alias = $self->{attrs}{alias};
493         my $re = qr/^($alias\.)?$pk$/;
494         foreach my $column ( @distinct) {
495           if( $column =~ $re ) {
496             @distinct = ( $column );
497             last;
498           }
499         } 
500       }
501
502       $select = { count => { 'distinct' => \@distinct } };
503       #use Data::Dumper; die Dumper $select;
504     }
505
506     my $attrs = { %{ $self->{attrs} },
507                   select => $select,
508                   as => [ 'count' ] };
509     # offset, order by and page are not needed to count. record_filter is cdbi
510     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
511         
512     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
513     $self->{attrs}{group_by} = $group_by;
514   }
515   return 0 unless $self->{count};
516   my $count = $self->{count};
517   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
518   $count = $self->{attrs}{rows} if
519     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
520   return $count;
521 }
522
523 =head2 count_literal
524
525 Calls L</search_literal> with the passed arguments, then L</count>.
526
527 =cut
528
529 sub count_literal { shift->search_literal(@_)->count; }
530
531 =head2 all
532
533 Returns all elements in the resultset. Called implictly if the resultset
534 is returned in list context.
535
536 =cut
537
538 sub all {
539   my ($self) = @_;
540   return @{ $self->get_cache }
541     if @{ $self->get_cache };
542   if( $self->{attrs}->{cache} ) {
543     my @obj = map { $self->_construct_object(@$_); }
544             $self->cursor->all;
545     $self->set_cache( \@obj );
546     return @{ $self->get_cache };
547   }
548   return map { $self->_construct_object(@$_); }
549            $self->cursor->all;
550 }
551
552 =head2 reset
553
554 Resets the resultset's cursor, so you can iterate through the elements again.
555
556 =cut
557
558 sub reset {
559   my ($self) = @_;
560   $self->{all_cache_position} = 0;
561   $self->cursor->reset;
562   return $self;
563 }
564
565 =head2 first
566
567 Resets the resultset and returns the first element.
568
569 =cut
570
571 sub first {
572   return $_[0]->reset->next;
573 }
574
575 =head2 update
576
577 =head3 Arguments: (\%values)
578
579 Sets the specified columns in the resultset to the supplied values.
580
581 =cut
582
583 sub update {
584   my ($self, $values) = @_;
585   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
586   return $self->result_source->storage->update(
587            $self->result_source->from, $values, $self->{cond});
588 }
589
590 =head2 update_all
591
592 =head3 Arguments: (\%values)
593
594 Fetches all objects and updates them one at a time.  Note that C<update_all>
595 will run cascade triggers while L</update> will not.
596
597 =cut
598
599 sub update_all {
600   my ($self, $values) = @_;
601   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
602   foreach my $obj ($self->all) {
603     $obj->set_columns($values)->update;
604   }
605   return 1;
606 }
607
608 =head2 delete
609
610 Deletes the contents of the resultset from its result source.
611
612 =cut
613
614 sub delete {
615   my ($self) = @_;
616   my $del = {};
617   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
618     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
619   if (ref $self->{cond} eq 'ARRAY') {
620     $del = [ map { my %hash;
621       foreach my $key (keys %{$_}) {
622         $key =~ /([^\.]+)$/;
623         $hash{$1} = $_->{$key};
624       }; \%hash; } @{$self->{cond}} ];
625   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
626     $del->{-and} = [ map { my %hash;
627       foreach my $key (keys %{$_}) {
628         $key =~ /([^\.]+)$/;
629         $hash{$1} = $_->{$key};
630       }; \%hash; } @{$self->{cond}{-and}} ];
631   } else {
632     foreach my $key (keys %{$self->{cond}}) {
633       $key =~ /([^\.]+)$/;
634       $del->{$1} = $self->{cond}{$key};
635     }
636   }
637   $self->result_source->storage->delete($self->result_source->from, $del);
638   return 1;
639 }
640
641 =head2 delete_all
642
643 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
644 will run cascade triggers while L</delete> will not.
645
646 =cut
647
648 sub delete_all {
649   my ($self) = @_;
650   $_->delete for $self->all;
651   return 1;
652 }
653
654 =head2 pager
655
656 Returns a L<Data::Page> object for the current resultset. Only makes
657 sense for queries with a C<page> attribute.
658
659 =cut
660
661 sub pager {
662   my ($self) = @_;
663   my $attrs = $self->{attrs};
664   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
665   $attrs->{rows} ||= 10;
666   $self->count;
667   return $self->{pager} ||= Data::Page->new(
668     $self->{count}, $attrs->{rows}, $self->{page});
669 }
670
671 =head2 page
672
673 =head3 Arguments: ($page_num)
674
675 Returns a new resultset for the specified page.
676
677 =cut
678
679 sub page {
680   my ($self, $page) = @_;
681   my $attrs = { %{$self->{attrs}} };
682   $attrs->{page} = $page;
683   return (ref $self)->new($self->result_source, $attrs);
684 }
685
686 =head2 new_result
687
688 =head3 Arguments: (\%vals)
689
690 Creates a result in the resultset's result class.
691
692 =cut
693
694 sub new_result {
695   my ($self, $values) = @_;
696   $self->throw_exception( "new_result needs a hash" )
697     unless (ref $values eq 'HASH');
698   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
699     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
700   my %new = %$values;
701   my $alias = $self->{attrs}{alias};
702   foreach my $key (keys %{$self->{cond}||{}}) {
703     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
704   }
705   my $obj = $self->result_source->result_class->new(\%new);
706   $obj->result_source($self->result_source) if $obj->can('result_source');
707   $obj;
708 }
709
710 =head2 create
711
712 =head3 Arguments: (\%vals)
713
714 Inserts a record into the resultset and returns the object.
715
716 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
717
718 =cut
719
720 sub create {
721   my ($self, $attrs) = @_;
722   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
723   return $self->new_result($attrs)->insert;
724 }
725
726 =head2 find_or_create
727
728 =head3 Arguments: (\%vals, \%attrs?)
729
730   $class->find_or_create({ key => $val, ... });
731
732 Searches for a record matching the search condition; if it doesn't find one,    
733 creates one and returns that instead.                                       
734
735   my $cd = $schema->resultset('CD')->find_or_create({
736     cdid   => 5,
737     artist => 'Massive Attack',
738     title  => 'Mezzanine',
739     year   => 2005,
740   });
741
742 Also takes an optional C<key> attribute, to search by a specific key or unique
743 constraint. For example:
744
745   my $cd = $schema->resultset('CD')->find_or_create(
746     {
747       artist => 'Massive Attack',
748       title  => 'Mezzanine',
749     },
750     { key => 'artist_title' }
751   );
752
753 See also L</find> and L</update_or_create>.
754
755 =cut
756
757 sub find_or_create {
758   my $self     = shift;
759   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
760   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
761   my $exists   = $self->find($hash, $attrs);
762   return defined($exists) ? $exists : $self->create($hash);
763 }
764
765 =head2 update_or_create
766
767   $class->update_or_create({ key => $val, ... });
768
769 First, search for an existing row matching one of the unique constraints
770 (including the primary key) on the source of this resultset.  If a row is
771 found, update it with the other given column values.  Otherwise, create a new
772 row.
773
774 Takes an optional C<key> attribute to search on a specific unique constraint.
775 For example:
776
777   # In your application
778   my $cd = $schema->resultset('CD')->update_or_create(
779     {
780       artist => 'Massive Attack',
781       title  => 'Mezzanine',
782       year   => 1998,
783     },
784     { key => 'artist_title' }
785   );
786
787 If no C<key> is specified, it searches on all unique constraints defined on the
788 source, including the primary key.
789
790 If the C<key> is specified as C<primary>, search only on the primary key.
791
792 See also L</find> and L</find_or_create>.
793
794 =cut
795
796 sub update_or_create {
797   my $self = shift;
798
799   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
800   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
801
802   my %unique_constraints = $self->result_source->unique_constraints;
803   my @constraint_names   = (exists $attrs->{key}
804                             ? ($attrs->{key})
805                             : keys %unique_constraints);
806
807   my @unique_hashes;
808   foreach my $name (@constraint_names) {
809     my @unique_cols = @{ $unique_constraints{$name} };
810     my %unique_hash =
811       map  { $_ => $hash->{$_} }
812       grep { exists $hash->{$_} }
813       @unique_cols;
814
815     push @unique_hashes, \%unique_hash
816       if (scalar keys %unique_hash == scalar @unique_cols);
817   }
818
819   my $row;
820   if (@unique_hashes) {
821     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
822     if ($row) {
823       $row->set_columns($hash);
824       $row->update;
825     }
826   }
827
828   unless ($row) {
829     $row = $self->create($hash);
830   }
831
832   return $row;
833 }
834
835 =head2 get_cache
836
837 Gets the contents of the cache for the resultset.
838
839 =cut
840
841 sub get_cache {
842   my $self = shift;
843   return $self->{all_cache} || [];
844 }
845
846 =head2 set_cache
847
848 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
849
850 =cut
851
852 sub set_cache {
853   my ( $self, $data ) = @_;
854   $self->throw_exception("set_cache requires an arrayref")
855     if ref $data ne 'ARRAY';
856   my $result_class = $self->result_source->result_class;
857   foreach( @$data ) {
858     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
859       if ref $_ ne $result_class;
860   }
861   $self->{all_cache} = $data;
862 }
863
864 =head2 clear_cache
865
866 Clears the cache for the resultset.
867
868 =cut
869
870 sub clear_cache {
871   my $self = shift;
872   $self->set_cache([]);
873 }
874
875 =head2 related_resultset
876
877 Returns a related resultset for the supplied relationship name.
878
879   $rs = $rs->related_resultset('foo');
880
881 =cut
882
883 sub related_resultset {
884   my ( $self, $rel, @rest ) = @_;
885   $self->{related_resultsets} ||= {};
886   my $resultsets = $self->{related_resultsets};
887   if( !exists $resultsets->{$rel} ) {
888     #warn "fetching related resultset for rel '$rel'";
889     my $rel_obj = $self->result_source->relationship_info($rel);
890     $self->throw_exception(
891       "search_related: result source '" . $self->result_source->name .
892       "' has no such relationship ${rel}")
893       unless $rel_obj; #die Dumper $self->{attrs};
894     my $rs;
895     if( $self->{attrs}->{cache} ) {
896       $rs = $self->search(undef);
897     }
898     else {
899       $rs = $self->search(undef, { join => $rel });
900     }
901     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
902     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
903     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
904                   && $rs->{attrs}{seen_join}{$rel} > 1
905                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
906                 : $rel);
907     $resultsets->{$rel} =
908       $self->result_source->schema->resultset($rel_obj->{class}
909            )->search( undef,
910              { %{$rs->{attrs}},
911                alias => $alias,
912                select => undef(),
913                as => undef() }
914            )->search(@rest);
915   }
916   return $resultsets->{$rel};
917 }
918
919 =head2 throw_exception
920
921 See Schema's throw_exception
922
923 =cut
924
925 sub throw_exception {
926   my $self=shift;
927   $self->result_source->schema->throw_exception(@_);
928 }
929
930 =head1 ATTRIBUTES
931
932 The resultset takes various attributes that modify its behavior. Here's an
933 overview of them:
934
935 =head2 order_by
936
937 Which column(s) to order the results by. This is currently passed through
938 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
939
940 =head2 cols
941
942 =head3 Arguments: (arrayref)
943
944 Shortcut to request a particular set of columns to be retrieved.  Adds
945 C<me.> onto the start of any column without a C<.> in it and sets C<select>
946 from that, then auto-populates C<as> from C<select> as normal.
947
948 =head2 include_columns
949
950 =head3 Arguments: (arrayref)
951
952 Shortcut to include additional columns in the returned results - for example
953
954   { include_columns => ['foo.name'], join => ['foo'] }
955
956 would add a 'name' column to the information passed to object inflation
957
958 =head2 select
959
960 =head3 Arguments: (arrayref)
961
962 Indicates which columns should be selected from the storage. You can use
963 column names, or in the case of RDBMS back ends, function or stored procedure
964 names:
965
966   $rs = $schema->resultset('Foo')->search(
967     {},
968     {
969       select => [
970         'column_name',
971         { count => 'column_to_count' },
972         { sum => 'column_to_sum' }
973       ]
974     }
975   );
976
977 When you use function/stored procedure names and do not supply an C<as>
978 attribute, the column names returned are storage-dependent. E.g. MySQL would
979 return a column named C<count(column_to_count)> in the above example.
980
981 =head2 as
982
983 =head3 Arguments: (arrayref)
984
985 Indicates column names for object inflation. This is used in conjunction with
986 C<select>, usually when C<select> contains one or more function or stored
987 procedure names:
988
989   $rs = $schema->resultset('Foo')->search(
990     {},
991     {
992       select => [
993         'column1',
994         { count => 'column2' }
995       ],
996       as => [qw/ column1 column2_count /]
997     }
998   );
999
1000   my $foo = $rs->first(); # get the first Foo
1001
1002 If the object against which the search is performed already has an accessor
1003 matching a column name specified in C<as>, the value can be retrieved using
1004 the accessor as normal:
1005
1006   my $column1 = $foo->column1();
1007
1008 If on the other hand an accessor does not exist in the object, you need to
1009 use C<get_column> instead:
1010
1011   my $column2_count = $foo->get_column('column2_count');
1012
1013 You can create your own accessors if required - see
1014 L<DBIx::Class::Manual::Cookbook> for details.
1015
1016 =head2 join
1017
1018 Contains a list of relationships that should be joined for this query.  For
1019 example:
1020
1021   # Get CDs by Nine Inch Nails
1022   my $rs = $schema->resultset('CD')->search(
1023     { 'artist.name' => 'Nine Inch Nails' },
1024     { join => 'artist' }
1025   );
1026
1027 Can also contain a hash reference to refer to the other relation's relations.
1028 For example:
1029
1030   package MyApp::Schema::Track;
1031   use base qw/DBIx::Class/;
1032   __PACKAGE__->table('track');
1033   __PACKAGE__->add_columns(qw/trackid cd position title/);
1034   __PACKAGE__->set_primary_key('trackid');
1035   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1036   1;
1037
1038   # In your application
1039   my $rs = $schema->resultset('Artist')->search(
1040     { 'track.title' => 'Teardrop' },
1041     {
1042       join     => { cd => 'track' },
1043       order_by => 'artist.name',
1044     }
1045   );
1046
1047 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1048 similarly for a third time). For e.g.
1049
1050   my $rs = $schema->resultset('Artist')->search(
1051     { 'cds.title'   => 'Foo',
1052       'cds_2.title' => 'Bar' },
1053     { join => [ qw/cds cds/ ] });
1054
1055 will return a set of all artists that have both a cd with title Foo and a cd
1056 with title Bar.
1057
1058 If you want to fetch related objects from other tables as well, see C<prefetch>
1059 below.
1060
1061 =head2 prefetch
1062
1063 =head3 Arguments: arrayref/hashref
1064
1065 Contains one or more relationships that should be fetched along with the main 
1066 query (when they are accessed afterwards they will have already been
1067 "prefetched").  This is useful for when you know you will need the related
1068 objects, because it saves at least one query:
1069
1070   my $rs = $schema->resultset('Tag')->search(
1071     {},
1072     {
1073       prefetch => {
1074         cd => 'artist'
1075       }
1076     }
1077   );
1078
1079 The initial search results in SQL like the following:
1080
1081   SELECT tag.*, cd.*, artist.* FROM tag
1082   JOIN cd ON tag.cd = cd.cdid
1083   JOIN artist ON cd.artist = artist.artistid
1084
1085 L<DBIx::Class> has no need to go back to the database when we access the
1086 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1087 case.
1088
1089 Simple prefetches will be joined automatically, so there is no need
1090 for a C<join> attribute in the above search. If you're prefetching to
1091 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1092 specify the join as well.
1093
1094 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1095 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1096 with an accessor type of 'single' or 'filter').
1097
1098 =head2 from
1099
1100 =head3 Arguments: (arrayref)
1101
1102 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1103 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1104 clauses.
1105
1106 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1107 C<join> will usually do what you need and it is strongly recommended that you
1108 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1109
1110 In simple terms, C<from> works as follows:
1111
1112     [
1113         { <alias> => <table>, -join-type => 'inner|left|right' }
1114         [] # nested JOIN (optional)
1115         { <table.column> = <foreign_table.foreign_key> }
1116     ]
1117
1118     JOIN
1119         <alias> <table>
1120         [JOIN ...]
1121     ON <table.column> = <foreign_table.foreign_key>
1122
1123 An easy way to follow the examples below is to remember the following:
1124
1125     Anything inside "[]" is a JOIN
1126     Anything inside "{}" is a condition for the enclosing JOIN
1127
1128 The following examples utilize a "person" table in a family tree application.
1129 In order to express parent->child relationships, this table is self-joined:
1130
1131     # Person->belongs_to('father' => 'Person');
1132     # Person->belongs_to('mother' => 'Person');
1133
1134 C<from> can be used to nest joins. Here we return all children with a father,
1135 then search against all mothers of those children:
1136
1137   $rs = $schema->resultset('Person')->search(
1138       {},
1139       {
1140           alias => 'mother', # alias columns in accordance with "from"
1141           from => [
1142               { mother => 'person' },
1143               [
1144                   [
1145                       { child => 'person' },
1146                       [
1147                           { father => 'person' },
1148                           { 'father.person_id' => 'child.father_id' }
1149                       ]
1150                   ],
1151                   { 'mother.person_id' => 'child.mother_id' }
1152               ],                
1153           ]
1154       },
1155   );
1156
1157   # Equivalent SQL:
1158   # SELECT mother.* FROM person mother
1159   # JOIN (
1160   #   person child
1161   #   JOIN person father
1162   #   ON ( father.person_id = child.father_id )
1163   # )
1164   # ON ( mother.person_id = child.mother_id )
1165
1166 The type of any join can be controlled manually. To search against only people
1167 with a father in the person table, we could explicitly use C<INNER JOIN>:
1168
1169     $rs = $schema->resultset('Person')->search(
1170         {},
1171         {
1172             alias => 'child', # alias columns in accordance with "from"
1173             from => [
1174                 { child => 'person' },
1175                 [
1176                     { father => 'person', -join-type => 'inner' },
1177                     { 'father.id' => 'child.father_id' }
1178                 ],
1179             ]
1180         },
1181     );
1182
1183     # Equivalent SQL:
1184     # SELECT child.* FROM person child
1185     # INNER JOIN person father ON child.father_id = father.id
1186
1187 =head2 page
1188
1189 For a paged resultset, specifies which page to retrieve.  Leave unset
1190 for an unpaged resultset.
1191
1192 =head2 rows
1193
1194 For a paged resultset, how many rows per page:
1195
1196   rows => 10
1197
1198 Can also be used to simulate an SQL C<LIMIT>.
1199
1200 =head2 group_by
1201
1202 =head3 Arguments: (arrayref)
1203
1204 A arrayref of columns to group by. Can include columns of joined tables.
1205
1206   group_by => [qw/ column1 column2 ... /]
1207
1208 =head2 distinct
1209
1210 Set to 1 to group by all columns.
1211
1212 For more examples of using these attributes, see
1213 L<DBIx::Class::Manual::Cookbook>.
1214
1215 =cut
1216
1217 1;