maintain cache + reset cursor for search() without arguments
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => 'count',
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => 'result_source');
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->table('artist');
36   __PACKAGE__->add_columns(qw/artistid name/);
37   __PACKAGE__->set_primary_key('artistid');
38   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
39   1;
40
41   package MyApp::Schema::CD;
42   use base qw/DBIx::Class/;
43   __PACKAGE__->table('artist');
44   __PACKAGE__->add_columns(qw/cdid artist title year/);
45   __PACKAGE__->set_primary_key('cdid');
46   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
47   1;
48
49 =head1 METHODS
50
51 =head2 new($source, \%$attrs)
52
53 The resultset constructor. Takes a source object (usually a
54 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATRRIBUTES>
55 below).  Does not perform any queries -- these are executed as needed by the
56 other methods.
57
58 Generally you won't need to construct a resultset manually.  You'll
59 automatically get one from e.g. a L</search> called in scalar context:
60
61   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
62
63 =cut
64
65 sub new {
66   my $class = shift;
67   return $class->new_result(@_) if ref $class;
68   my ($source, $attrs) = @_;
69   #use Data::Dumper; warn Dumper($attrs);
70   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
71   my %seen;
72   my $alias = ($attrs->{alias} ||= 'me');
73   if ($attrs->{cols} || !$attrs->{select}) {
74     delete $attrs->{as} if $attrs->{cols};
75     my @cols = ($attrs->{cols}
76                  ? @{delete $attrs->{cols}}
77                  : $source->columns);
78     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
79   }
80   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
81   if (my $include = delete $attrs->{include_columns}) {
82     push(@{$attrs->{select}}, @$include);
83     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
84   }
85   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
86   $attrs->{from} ||= [ { $alias => $source->from } ];
87   $attrs->{seen_join} ||= {};
88   if (my $join = delete $attrs->{join}) {
89     foreach my $j (ref $join eq 'ARRAY'
90               ? (@{$join}) : ($join)) {
91       if (ref $j eq 'HASH') {
92         $seen{$_} = 1 foreach keys %$j;
93       } else {
94         $seen{$j} = 1;
95       }
96     }
97     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
98   }
99   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
100
101   if (my $prefetch = delete $attrs->{prefetch}) {
102     foreach my $p (ref $prefetch eq 'ARRAY'
103               ? (@{$prefetch}) : ($prefetch)) {
104       if( ref $p eq 'HASH' ) {
105         foreach my $key (keys %$p) {
106           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
107             unless $seen{$key};
108         }
109       }
110       else {
111         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
112             unless $seen{$p};
113       }
114       my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
115       #die Dumper \@cols;
116       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
117       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
118     }
119   }
120
121   if ($attrs->{page}) {
122     $attrs->{rows} ||= 10;
123     $attrs->{offset} ||= 0;
124     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
125   }
126   my $new = {
127     result_source => $source,
128     cond => $attrs->{where},
129     from => $attrs->{from},
130     count => undef,
131     page => delete $attrs->{page},
132     pager => undef,
133     attrs => $attrs };
134   bless ($new, $class);
135   return $new;
136 }
137
138 =head2 search
139
140   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
141   my $new_rs = $rs->search({ foo => 3 });
142
143 If you need to pass in additional attributes but no additional condition,
144 call it as C<search({}, \%attrs);>.
145
146   # "SELECT foo, bar FROM $class_table"
147   my @all = $class->search({}, { cols => [qw/foo bar/] });
148
149 =cut
150
151 sub search {
152   my $self = shift;
153
154   #use Data::Dumper;warn Dumper(@_);
155   my $rs;
156   if( @_ ) {
157     
158     my $attrs = { %{$self->{attrs}} };
159     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
160      $attrs = { %$attrs, %{ pop(@_) } };
161     }
162
163     my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
164     if (defined $where) {
165       $where = (defined $attrs->{where}
166                 ? { '-and' =>
167                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
168                         $where, $attrs->{where} ] }
169                 : $where);
170       $attrs->{where} = $where;
171     }
172
173     $rs = (ref $self)->new($self->result_source, $attrs);
174   }
175   else {
176     $rs = $self;
177     $rs->reset();
178   }
179   return (wantarray ? $rs->all : $rs);
180 }
181
182 =head2 search_literal
183
184   my @obj    = $rs->search_literal($literal_where_cond, @bind);
185   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
186
187 Pass a literal chunk of SQL to be added to the conditional part of the
188 resultset.
189
190 =cut
191                                                          
192 sub search_literal {
193   my ($self, $cond, @vals) = @_;
194   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
195   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
196   return $self->search(\$cond, $attrs);
197 }
198
199 =head2 find(@colvalues), find(\%cols, \%attrs?)
200
201 Finds a row based on its primary key or unique constraint. For example:
202
203   my $cd = $schema->resultset('CD')->find(5);
204
205 Also takes an optional C<key> attribute, to search by a specific key or unique
206 constraint. For example:
207
208   my $cd = $schema->resultset('CD')->find_or_create(
209     {
210       artist => 'Massive Attack',
211       title  => 'Mezzanine',
212     },
213     { key => 'artist_title' }
214   );
215
216 See also L</find_or_create> and L</update_or_create>.
217
218 =cut
219
220 sub find {
221   my ($self, @vals) = @_;
222   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
223
224   my @cols = $self->result_source->primary_columns;
225   if (exists $attrs->{key}) {
226     my %uniq = $self->result_source->unique_constraints;
227     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
228       unless exists $uniq{$attrs->{key}};
229     @cols = @{ $uniq{$attrs->{key}} };
230   }
231   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
232   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
233     unless @cols;
234
235   my $query;
236   if (ref $vals[0] eq 'HASH') {
237     $query = { %{$vals[0]} };
238   } elsif (@cols == @vals) {
239     $query = {};
240     @{$query}{@cols} = @vals;
241   } else {
242     $query = {@vals};
243   }
244   foreach (keys %$query) {
245     next if m/\./;
246     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
247   }
248   #warn Dumper($query);
249   return (keys %$attrs
250            ? $self->search($query,$attrs)->single
251            : $self->single($query));
252 }
253
254 =head2 search_related
255
256   $rs->search_related('relname', $cond?, $attrs?);
257
258 Search the specified relationship. Optionally specify a condition for matching
259 records.
260
261 =cut
262
263 sub search_related {
264   return shift->related_resultset(shift)->search(@_);
265 }
266
267 =head2 cursor
268
269 Returns a storage-driven cursor to the given resultset.
270
271 =cut
272
273 sub cursor {
274   my ($self) = @_;
275   my ($attrs) = $self->{attrs};
276   $attrs = { %$attrs };
277   return $self->{cursor}
278     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
279           $attrs->{where},$attrs);
280 }
281
282 =head2 single
283
284 Inflates the first result without creating a cursor
285
286 =cut
287
288 sub single {
289   my ($self, $extra) = @_;
290   my ($attrs) = $self->{attrs};
291   $attrs = { %$attrs };
292   if ($extra) {
293     if (defined $attrs->{where}) {
294       $attrs->{where} = {
295         '-and'
296           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
297                delete $attrs->{where}, $extra ]
298       };
299     } else {
300       $attrs->{where} = $extra;
301     }
302   }
303   my @data = $self->result_source->storage->select_single(
304           $self->{from}, $attrs->{select},
305           $attrs->{where},$attrs);
306   return (@data ? $self->_construct_object(@data) : ());
307 }
308
309
310 =head2 search_like
311
312 Perform a search, but use C<LIKE> instead of equality as the condition. Note
313 that this is simply a convenience method; you most likely want to use
314 L</search> with specific operators.
315
316 For more information, see L<DBIx::Class::Manual::Cookbook>.
317
318 =cut
319
320 sub search_like {
321   my $class    = shift;
322   my $attrs = { };
323   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
324     $attrs = pop(@_);
325   }
326   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
327   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
328   return $class->search($query, { %$attrs });
329 }
330
331 =head2 slice($first, $last)
332
333 Returns a subset of elements from the resultset.
334
335 =cut
336
337 sub slice {
338   my ($self, $min, $max) = @_;
339   my $attrs = { %{ $self->{attrs} || {} } };
340   $attrs->{offset} ||= 0;
341   $attrs->{offset} += $min;
342   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
343   my $slice = (ref $self)->new($self->result_source, $attrs);
344   return (wantarray ? $slice->all : $slice);
345 }
346
347 =head2 next
348
349 Returns the next element in the resultset (C<undef> is there is none).
350
351 Can be used to efficiently iterate over records in the resultset:
352
353   my $rs = $schema->resultset('CD')->search({});
354   while (my $cd = $rs->next) {
355     print $cd->title;
356   }
357
358 =cut
359
360 sub next {
361   my ($self) = @_;
362   my $cache = $self->get_cache;
363   if( @$cache ) {
364     $self->{all_cache_position} ||= 0;
365     my $obj = $cache->[$self->{all_cache_position}];
366     $self->{all_cache_position}++;
367     return $obj;
368   }
369   my @row = $self->cursor->next;
370 #  warn Dumper(\@row); use Data::Dumper;
371   return unless (@row);
372   return $self->_construct_object(@row);
373 }
374
375 sub _construct_object {
376   my ($self, @row) = @_;
377   my @row_orig = @row; # copy @row for key comparison later, because @row will change
378   my @as = @{ $self->{attrs}{as} };
379   #warn "@cols -> @row";
380   my $info = [ {}, {} ];
381   foreach my $as (@as) {
382     my $rs = $self;
383     my $target = $info;
384     my @parts = split(/\./, $as);
385     my $col = pop(@parts);
386     foreach my $p (@parts) {
387       $target = $target->[1]->{$p} ||= [];
388       
389       # if cache is enabled, fetch inflated objs for prefetch
390       if( $rs->{attrs}->{cache} ) {
391         my $rel_info = $rs->result_source->relationship_info($p);
392         my $cond = $rel_info->{cond};
393         my $parent_rs = $rs;
394         $rs = $rs->related_resultset($p);
395         $rs->{attrs}->{cache} = 1;
396         my @objs = ();
397           
398         # populate related resultset's cache if empty
399         if( !@{ $rs->get_cache } ) {
400           $rs->all;
401         }
402
403         # get ordinals for pk columns in $row, so values can be compared
404         my $map = {};
405         keys %$cond;
406         my $re = qr/^\w+\./;
407         while( my( $rel_key, $pk ) = ( each %$cond ) ) {
408           $rel_key =~ s/$re//;
409           $pk =~ s/$re//;
410           $map->{$rel_key} = $pk;
411         } #die Dumper $map;
412           
413         keys %$map;
414         while( my( $rel_key, $pk ) = each( %$map ) ) {
415           my $i = 0;
416           foreach my $col ( $parent_rs->result_source->columns ) {
417             if( $col eq $pk ) {
418               $map->{$rel_key} = $i;
419             }
420             $i++;
421           }
422         } #die Dumper $map;
423
424         $rs->reset(); # reset cursor/cache position 
425           
426         # get matching objects for inflation
427         OBJ: while( my $rel_obj = $rs->next ) {
428           keys %$map;
429           KEYS: while( my( $rel_key, $ordinal ) = each %$map ) {
430             # use get_column to avoid auto inflation (want scalar value)
431             if( $rel_obj->get_column($rel_key) ne $row_orig[$ordinal] ) {
432               next OBJ;
433             }
434             push @objs, $rel_obj;
435           }
436         }
437         $target->[0] = \@objs;
438       }
439     }
440     $target->[0]->{$col} = shift @row
441       if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
442   }
443   #use Data::Dumper; warn Dumper(\@as, $info);
444   my $new = $self->result_source->result_class->inflate_result(
445               $self->result_source, @$info);
446   $new = $self->{attrs}{record_filter}->($new)
447     if exists $self->{attrs}{record_filter};
448   return $new;
449 }
450
451 =head2 result_source 
452
453 Returns a reference to the result source for this recordset.
454
455 =cut
456
457
458 =head2 count
459
460 Performs an SQL C<COUNT> with the same query as the resultset was built
461 with to find the number of elements. If passed arguments, does a search
462 on the resultset and counts the results of that.
463
464 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
465 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
466 not support C<DISTINCT> with multiple columns. If you are using such a
467 database, you should only use columns from the main table in your C<group_by>
468 clause.
469
470 =cut
471
472 sub count {
473   my $self = shift;
474   return $self->search(@_)->count if @_ && defined $_[0];
475   unless (defined $self->{count}) {
476     return scalar @{ $self->get_cache }
477       if @{ $self->get_cache };
478     my $group_by;
479     my $select = { 'count' => '*' };
480     if( $group_by = delete $self->{attrs}{group_by} ) {
481       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
482       # todo: try CONCAT for multi-column pk
483       my @pk = $self->result_source->primary_columns;
484       if( scalar(@pk) == 1 ) {
485         my $pk = shift(@pk);
486         my $alias = $self->{attrs}{alias};
487         my $re = qr/^($alias\.)?$pk$/;
488         foreach my $column ( @distinct) {
489           if( $column =~ $re ) {
490             @distinct = ( $column );
491             last;
492           }
493         } 
494       }
495
496       $select = { count => { 'distinct' => \@distinct } };
497       #use Data::Dumper; die Dumper $select;
498     }
499
500     my $attrs = { %{ $self->{attrs} },
501                   select => $select,
502                   as => [ 'count' ] };
503     # offset, order by and page are not needed to count. record_filter is cdbi
504     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
505         
506     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
507     $self->{attrs}{group_by} = $group_by;
508   }
509   return 0 unless $self->{count};
510   my $count = $self->{count};
511   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
512   $count = $self->{attrs}{rows} if
513     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
514   return $count;
515 }
516
517 =head2 count_literal
518
519 Calls L</search_literal> with the passed arguments, then L</count>.
520
521 =cut
522
523 sub count_literal { shift->search_literal(@_)->count; }
524
525 =head2 all
526
527 Returns all elements in the resultset. Called implictly if the resultset
528 is returned in list context.
529
530 =cut
531
532 sub all {
533   my ($self) = @_;
534   return @{ $self->get_cache }
535     if @{ $self->get_cache };
536   if( $self->{attrs}->{cache} ) {
537     my @obj = map { $self->_construct_object(@$_); }
538             $self->cursor->all;
539     $self->set_cache( \@obj );
540     return @{ $self->get_cache };
541   }
542   return map { $self->_construct_object(@$_); }
543            $self->cursor->all;
544 }
545
546 =head2 reset
547
548 Resets the resultset's cursor, so you can iterate through the elements again.
549
550 =cut
551
552 sub reset {
553   my ($self) = @_;
554   $self->{all_cache_position} = 0;
555   $self->cursor->reset;
556   return $self;
557 }
558
559 =head2 first
560
561 Resets the resultset and returns the first element.
562
563 =cut
564
565 sub first {
566   return $_[0]->reset->next;
567 }
568
569 =head2 update(\%values)
570
571 Sets the specified columns in the resultset to the supplied values.
572
573 =cut
574
575 sub update {
576   my ($self, $values) = @_;
577   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
578   return $self->result_source->storage->update(
579            $self->result_source->from, $values, $self->{cond});
580 }
581
582 =head2 update_all(\%values)
583
584 Fetches all objects and updates them one at a time.  Note that C<update_all>
585 will run cascade triggers while L</update> will not.
586
587 =cut
588
589 sub update_all {
590   my ($self, $values) = @_;
591   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
592   foreach my $obj ($self->all) {
593     $obj->set_columns($values)->update;
594   }
595   return 1;
596 }
597
598 =head2 delete
599
600 Deletes the contents of the resultset from its result source.
601
602 =cut
603
604 sub delete {
605   my ($self) = @_;
606   my $del = {};
607   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
608     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
609   if (ref $self->{cond} eq 'ARRAY') {
610     $del = [ map { my %hash;
611       foreach my $key (keys %{$_}) {
612         $key =~ /([^\.]+)$/;
613         $hash{$1} = $_->{$key};
614       }; \%hash; } @{$self->{cond}} ];
615   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
616     $del->{-and} = [ map { my %hash;
617       foreach my $key (keys %{$_}) {
618         $key =~ /([^\.]+)$/;
619         $hash{$1} = $_->{$key};
620       }; \%hash; } @{$self->{cond}{-and}} ];
621   } else {
622     foreach my $key (keys %{$self->{cond}}) {
623       $key =~ /([^\.]+)$/;
624       $del->{$1} = $self->{cond}{$key};
625     }
626   }
627   $self->result_source->storage->delete($self->result_source->from, $del);
628   return 1;
629 }
630
631 =head2 delete_all
632
633 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
634 will run cascade triggers while L</delete> will not.
635
636 =cut
637
638 sub delete_all {
639   my ($self) = @_;
640   $_->delete for $self->all;
641   return 1;
642 }
643
644 =head2 pager
645
646 Returns a L<Data::Page> object for the current resultset. Only makes
647 sense for queries with a C<page> attribute.
648
649 =cut
650
651 sub pager {
652   my ($self) = @_;
653   my $attrs = $self->{attrs};
654   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
655   $attrs->{rows} ||= 10;
656   $self->count;
657   return $self->{pager} ||= Data::Page->new(
658     $self->{count}, $attrs->{rows}, $self->{page});
659 }
660
661 =head2 page($page_num)
662
663 Returns a new resultset for the specified page.
664
665 =cut
666
667 sub page {
668   my ($self, $page) = @_;
669   my $attrs = { %{$self->{attrs}} };
670   $attrs->{page} = $page;
671   return (ref $self)->new($self->result_source, $attrs);
672 }
673
674 =head2 new_result(\%vals)
675
676 Creates a result in the resultset's result class.
677
678 =cut
679
680 sub new_result {
681   my ($self, $values) = @_;
682   $self->throw_exception( "new_result needs a hash" )
683     unless (ref $values eq 'HASH');
684   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
685     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
686   my %new = %$values;
687   my $alias = $self->{attrs}{alias};
688   foreach my $key (keys %{$self->{cond}||{}}) {
689     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
690   }
691   my $obj = $self->result_source->result_class->new(\%new);
692   $obj->result_source($self->result_source) if $obj->can('result_source');
693   $obj;
694 }
695
696 =head2 create(\%vals)
697
698 Inserts a record into the resultset and returns the object.
699
700 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
701
702 =cut
703
704 sub create {
705   my ($self, $attrs) = @_;
706   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
707   return $self->new_result($attrs)->insert;
708 }
709
710 =head2 find_or_create(\%vals, \%attrs?)
711
712   $class->find_or_create({ key => $val, ... });
713
714 Searches for a record matching the search condition; if it doesn't find one,    
715 creates one and returns that instead.                                       
716
717   my $cd = $schema->resultset('CD')->find_or_create({
718     cdid   => 5,
719     artist => 'Massive Attack',
720     title  => 'Mezzanine',
721     year   => 2005,
722   });
723
724 Also takes an optional C<key> attribute, to search by a specific key or unique
725 constraint. For example:
726
727   my $cd = $schema->resultset('CD')->find_or_create(
728     {
729       artist => 'Massive Attack',
730       title  => 'Mezzanine',
731     },
732     { key => 'artist_title' }
733   );
734
735 See also L</find> and L</update_or_create>.
736
737 =cut
738
739 sub find_or_create {
740   my $self     = shift;
741   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
742   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
743   my $exists   = $self->find($hash, $attrs);
744   return defined($exists) ? $exists : $self->create($hash);
745 }
746
747 =head2 update_or_create
748
749   $class->update_or_create({ key => $val, ... });
750
751 First, search for an existing row matching one of the unique constraints
752 (including the primary key) on the source of this resultset.  If a row is
753 found, update it with the other given column values.  Otherwise, create a new
754 row.
755
756 Takes an optional C<key> attribute to search on a specific unique constraint.
757 For example:
758
759   # In your application
760   my $cd = $schema->resultset('CD')->update_or_create(
761     {
762       artist => 'Massive Attack',
763       title  => 'Mezzanine',
764       year   => 1998,
765     },
766     { key => 'artist_title' }
767   );
768
769 If no C<key> is specified, it searches on all unique constraints defined on the
770 source, including the primary key.
771
772 If the C<key> is specified as C<primary>, search only on the primary key.
773
774 See also L</find> and L</find_or_create>.
775
776 =cut
777
778 sub update_or_create {
779   my $self = shift;
780
781   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
782   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
783
784   my %unique_constraints = $self->result_source->unique_constraints;
785   my @constraint_names   = (exists $attrs->{key}
786                             ? ($attrs->{key})
787                             : keys %unique_constraints);
788
789   my @unique_hashes;
790   foreach my $name (@constraint_names) {
791     my @unique_cols = @{ $unique_constraints{$name} };
792     my %unique_hash =
793       map  { $_ => $hash->{$_} }
794       grep { exists $hash->{$_} }
795       @unique_cols;
796
797     push @unique_hashes, \%unique_hash
798       if (scalar keys %unique_hash == scalar @unique_cols);
799   }
800
801   my $row;
802   if (@unique_hashes) {
803     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
804     if ($row) {
805       $row->set_columns($hash);
806       $row->update;
807     }
808   }
809
810   unless ($row) {
811     $row = $self->create($hash);
812   }
813
814   return $row;
815 }
816
817 =head2 get_cache
818
819 Gets the contents of the cache for the resultset.
820
821 =cut
822
823 sub get_cache {
824   my $self = shift;
825   return $self->{all_cache} || [];
826 }
827
828 =head2 set_cache
829
830 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
831
832 =cut
833
834 sub set_cache {
835   my ( $self, $data ) = @_;
836   $self->throw_exception("set_cache requires an arrayref")
837     if ref $data ne 'ARRAY';
838   my $result_class = $self->result_source->result_class;
839   foreach( @$data ) {
840     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
841       if ref $_ ne $result_class;
842   }
843   $self->{all_cache} = $data;
844 }
845
846 =head2 clear_cache
847
848 Clears the cache for the resultset.
849
850 =cut
851
852 sub clear_cache {
853   my $self = shift;
854   $self->set_cache([]);
855 }
856
857 =head2 related_resultset
858
859 Returns a related resultset for the supplied relationship name.
860
861   $rs = $rs->related_resultset('foo');
862
863 =cut
864
865 sub related_resultset {
866   my ( $self, $rel, @rest ) = @_;
867   $self->{related_resultsets} ||= {};
868   my $resultsets = $self->{related_resultsets};
869   if( !exists $resultsets->{$rel} ) {
870     #warn "fetching related resultset for rel '$rel'";
871     my $rel_obj = $self->result_source->relationship_info($rel);
872     $self->throw_exception(
873       "search_related: result source '" . $self->result_source->name .
874       "' has no such relationship ${rel}")
875       unless $rel_obj; #die Dumper $self->{attrs};
876     my $rs;
877     if( $self->{attrs}->{cache} ) {
878       $rs = $self->search(undef);
879     }
880     else {
881       $rs = $self->search(undef, { join => $rel });
882     }
883     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
884     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
885     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
886                   && $rs->{attrs}{seen_join}{$rel} > 1
887                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
888                 : $rel);
889     $resultsets->{$rel} =
890       $self->result_source->schema->resultset($rel_obj->{class}
891            )->search( undef,
892              { %{$rs->{attrs}},
893                alias => $alias,
894                select => undef(),
895                as => undef() }
896            )->search(@rest);
897   }
898   return $resultsets->{$rel};
899 }
900
901 =head2 throw_exception
902
903 See Schema's throw_exception
904
905 =cut
906
907 sub throw_exception {
908   my $self=shift;
909   $self->result_source->schema->throw_exception(@_);
910 }
911
912 =head1 ATTRIBUTES
913
914 The resultset takes various attributes that modify its behavior. Here's an
915 overview of them:
916
917 =head2 order_by
918
919 Which column(s) to order the results by. This is currently passed through
920 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
921
922 =head2 cols (arrayref)
923
924 Shortcut to request a particular set of columns to be retrieved.  Adds
925 C<me.> onto the start of any column without a C<.> in it and sets C<select>
926 from that, then auto-populates C<as> from C<select> as normal.
927
928 =head2 include_columns (arrayref)
929
930 Shortcut to include additional columns in the returned results - for example
931
932   { include_columns => ['foo.name'], join => ['foo'] }
933
934 would add a 'name' column to the information passed to object inflation
935
936 =head2 select (arrayref)
937
938 Indicates which columns should be selected from the storage. You can use
939 column names, or in the case of RDBMS back ends, function or stored procedure
940 names:
941
942   $rs = $schema->resultset('Foo')->search(
943     {},
944     {
945       select => [
946         'column_name',
947         { count => 'column_to_count' },
948         { sum => 'column_to_sum' }
949       ]
950     }
951   );
952
953 When you use function/stored procedure names and do not supply an C<as>
954 attribute, the column names returned are storage-dependent. E.g. MySQL would
955 return a column named C<count(column_to_count)> in the above example.
956
957 =head2 as (arrayref)
958
959 Indicates column names for object inflation. This is used in conjunction with
960 C<select>, usually when C<select> contains one or more function or stored
961 procedure names:
962
963   $rs = $schema->resultset('Foo')->search(
964     {},
965     {
966       select => [
967         'column1',
968         { count => 'column2' }
969       ],
970       as => [qw/ column1 column2_count /]
971     }
972   );
973
974   my $foo = $rs->first(); # get the first Foo
975
976 If the object against which the search is performed already has an accessor
977 matching a column name specified in C<as>, the value can be retrieved using
978 the accessor as normal:
979
980   my $column1 = $foo->column1();
981
982 If on the other hand an accessor does not exist in the object, you need to
983 use C<get_column> instead:
984
985   my $column2_count = $foo->get_column('column2_count');
986
987 You can create your own accessors if required - see
988 L<DBIx::Class::Manual::Cookbook> for details.
989
990 =head2 join
991
992 Contains a list of relationships that should be joined for this query.  For
993 example:
994
995   # Get CDs by Nine Inch Nails
996   my $rs = $schema->resultset('CD')->search(
997     { 'artist.name' => 'Nine Inch Nails' },
998     { join => 'artist' }
999   );
1000
1001 Can also contain a hash reference to refer to the other relation's relations.
1002 For example:
1003
1004   package MyApp::Schema::Track;
1005   use base qw/DBIx::Class/;
1006   __PACKAGE__->table('track');
1007   __PACKAGE__->add_columns(qw/trackid cd position title/);
1008   __PACKAGE__->set_primary_key('trackid');
1009   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1010   1;
1011
1012   # In your application
1013   my $rs = $schema->resultset('Artist')->search(
1014     { 'track.title' => 'Teardrop' },
1015     {
1016       join     => { cd => 'track' },
1017       order_by => 'artist.name',
1018     }
1019   );
1020
1021 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1022 similarly for a third time). For e.g.
1023
1024   my $rs = $schema->resultset('Artist')->search(
1025     { 'cds.title'   => 'Foo',
1026       'cds_2.title' => 'Bar' },
1027     { join => [ qw/cds cds/ ] });
1028
1029 will return a set of all artists that have both a cd with title Foo and a cd
1030 with title Bar.
1031
1032 If you want to fetch related objects from other tables as well, see C<prefetch>
1033 below.
1034
1035 =head2 prefetch arrayref/hashref
1036
1037 Contains one or more relationships that should be fetched along with the main 
1038 query (when they are accessed afterwards they will have already been
1039 "prefetched").  This is useful for when you know you will need the related
1040 objects, because it saves at least one query:
1041
1042   my $rs = $schema->resultset('Tag')->search(
1043     {},
1044     {
1045       prefetch => {
1046         cd => 'artist'
1047       }
1048     }
1049   );
1050
1051 The initial search results in SQL like the following:
1052
1053   SELECT tag.*, cd.*, artist.* FROM tag
1054   JOIN cd ON tag.cd = cd.cdid
1055   JOIN artist ON cd.artist = artist.artistid
1056
1057 L<DBIx::Class> has no need to go back to the database when we access the
1058 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1059 case.
1060
1061 Simple prefetches will be joined automatically, so there is no need
1062 for a C<join> attribute in the above search. If you're prefetching to
1063 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1064 specify the join as well.
1065
1066 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1067 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1068 with an accessor type of 'single' or 'filter').
1069
1070 =head2 from (arrayref)
1071
1072 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1073 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1074 clauses.
1075
1076 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1077 C<join> will usually do what you need and it is strongly recommended that you
1078 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1079
1080 In simple terms, C<from> works as follows:
1081
1082     [
1083         { <alias> => <table>, -join-type => 'inner|left|right' }
1084         [] # nested JOIN (optional)
1085         { <table.column> = <foreign_table.foreign_key> }
1086     ]
1087
1088     JOIN
1089         <alias> <table>
1090         [JOIN ...]
1091     ON <table.column> = <foreign_table.foreign_key>
1092
1093 An easy way to follow the examples below is to remember the following:
1094
1095     Anything inside "[]" is a JOIN
1096     Anything inside "{}" is a condition for the enclosing JOIN
1097
1098 The following examples utilize a "person" table in a family tree application.
1099 In order to express parent->child relationships, this table is self-joined:
1100
1101     # Person->belongs_to('father' => 'Person');
1102     # Person->belongs_to('mother' => 'Person');
1103
1104 C<from> can be used to nest joins. Here we return all children with a father,
1105 then search against all mothers of those children:
1106
1107   $rs = $schema->resultset('Person')->search(
1108       {},
1109       {
1110           alias => 'mother', # alias columns in accordance with "from"
1111           from => [
1112               { mother => 'person' },
1113               [
1114                   [
1115                       { child => 'person' },
1116                       [
1117                           { father => 'person' },
1118                           { 'father.person_id' => 'child.father_id' }
1119                       ]
1120                   ],
1121                   { 'mother.person_id' => 'child.mother_id' }
1122               ],                
1123           ]
1124       },
1125   );
1126
1127   # Equivalent SQL:
1128   # SELECT mother.* FROM person mother
1129   # JOIN (
1130   #   person child
1131   #   JOIN person father
1132   #   ON ( father.person_id = child.father_id )
1133   # )
1134   # ON ( mother.person_id = child.mother_id )
1135
1136 The type of any join can be controlled manually. To search against only people
1137 with a father in the person table, we could explicitly use C<INNER JOIN>:
1138
1139     $rs = $schema->resultset('Person')->search(
1140         {},
1141         {
1142             alias => 'child', # alias columns in accordance with "from"
1143             from => [
1144                 { child => 'person' },
1145                 [
1146                     { father => 'person', -join-type => 'inner' },
1147                     { 'father.id' => 'child.father_id' }
1148                 ],
1149             ]
1150         },
1151     );
1152
1153     # Equivalent SQL:
1154     # SELECT child.* FROM person child
1155     # INNER JOIN person father ON child.father_id = father.id
1156
1157 =head2 page
1158
1159 For a paged resultset, specifies which page to retrieve.  Leave unset
1160 for an unpaged resultset.
1161
1162 =head2 rows
1163
1164 For a paged resultset, how many rows per page:
1165
1166   rows => 10
1167
1168 Can also be used to simulate an SQL C<LIMIT>.
1169
1170 =head2 group_by (arrayref)
1171
1172 A arrayref of columns to group by. Can include columns of joined tables.
1173
1174   group_by => [qw/ column1 column2 ... /]
1175
1176 =head2 distinct
1177
1178 Set to 1 to group by all columns.
1179
1180 For more examples of using these attributes, see
1181 L<DBIx::Class::Manual::Cookbook>.
1182
1183 =cut
1184
1185 1;