Working HAVING, DBD::SQLite returns duff data so tests don't pass yet
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => 'count',
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => 'result_source');
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATRRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   if (my $prefetch = delete $attrs->{prefetch}) {
106     foreach my $p (ref $prefetch eq 'ARRAY'
107               ? (@{$prefetch}) : ($prefetch)) {
108       if( ref $p eq 'HASH' ) {
109         foreach my $key (keys %$p) {
110           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
111             unless $seen{$key};
112         }
113       }
114       else {
115         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
116             unless $seen{$p};
117       }
118       my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
119       #die Dumper \@cols;
120       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
121       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
122     }
123   }
124
125   if ($attrs->{page}) {
126     $attrs->{rows} ||= 10;
127     $attrs->{offset} ||= 0;
128     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
129   }
130   my $new = {
131     result_source => $source,
132     cond => $attrs->{where},
133     from => $attrs->{from},
134     count => undef,
135     page => delete $attrs->{page},
136     pager => undef,
137     attrs => $attrs };
138   bless ($new, $class);
139   return $new;
140 }
141
142 =head2 search
143
144   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
145   my $new_rs = $rs->search({ foo => 3 });
146
147 If you need to pass in additional attributes but no additional condition,
148 call it as C<search({}, \%attrs);>.
149
150   # "SELECT foo, bar FROM $class_table"
151   my @all = $class->search({}, { cols => [qw/foo bar/] });
152
153 =cut
154
155 sub search {
156   my $self = shift;
157
158   #use Data::Dumper;warn Dumper(@_);
159   my $rs;
160   if( @_ ) {
161     
162     my $attrs = { %{$self->{attrs}} };
163     my $having = delete $attrs->{having};
164     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
165      $attrs = { %$attrs, %{ pop(@_) } };
166     }
167
168     my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
169     if (defined $where) {
170       $where = (defined $attrs->{where}
171                 ? { '-and' =>
172                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
173                         $where, $attrs->{where} ] }
174                 : $where);
175       $attrs->{where} = $where;
176     }
177
178     if (defined $having) {
179       $having = (defined $attrs->{having}
180                 ? { '-and' =>
181                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
182                         $having, $attrs->{having} ] }
183                 : $having);
184       $attrs->{having} = $having;
185     }
186
187     $rs = (ref $self)->new($self->result_source, $attrs);
188   }
189   else {
190     $rs = $self;
191     $rs->reset();
192   }
193   return (wantarray ? $rs->all : $rs);
194 }
195
196 =head2 search_literal
197
198   my @obj    = $rs->search_literal($literal_where_cond, @bind);
199   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
200
201 Pass a literal chunk of SQL to be added to the conditional part of the
202 resultset.
203
204 =cut
205                                                          
206 sub search_literal {
207   my ($self, $cond, @vals) = @_;
208   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
209   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
210   return $self->search(\$cond, $attrs);
211 }
212
213 =head2 find
214
215 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
216
217 Finds a row based on its primary key or unique constraint. For example:
218
219   my $cd = $schema->resultset('CD')->find(5);
220
221 Also takes an optional C<key> attribute, to search by a specific key or unique
222 constraint. For example:
223
224   my $cd = $schema->resultset('CD')->find_or_create(
225     {
226       artist => 'Massive Attack',
227       title  => 'Mezzanine',
228     },
229     { key => 'artist_title' }
230   );
231
232 See also L</find_or_create> and L</update_or_create>.
233
234 =cut
235
236 sub find {
237   my ($self, @vals) = @_;
238   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
239
240   my @cols = $self->result_source->primary_columns;
241   if (exists $attrs->{key}) {
242     my %uniq = $self->result_source->unique_constraints;
243     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
244       unless exists $uniq{$attrs->{key}};
245     @cols = @{ $uniq{$attrs->{key}} };
246   }
247   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
248   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
249     unless @cols;
250
251   my $query;
252   if (ref $vals[0] eq 'HASH') {
253     $query = { %{$vals[0]} };
254   } elsif (@cols == @vals) {
255     $query = {};
256     @{$query}{@cols} = @vals;
257   } else {
258     $query = {@vals};
259   }
260   foreach (keys %$query) {
261     next if m/\./;
262     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
263   }
264   #warn Dumper($query);
265   return (keys %$attrs
266            ? $self->search($query,$attrs)->single
267            : $self->single($query));
268 }
269
270 =head2 search_related
271
272   $rs->search_related('relname', $cond?, $attrs?);
273
274 Search the specified relationship. Optionally specify a condition for matching
275 records.
276
277 =cut
278
279 sub search_related {
280   return shift->related_resultset(shift)->search(@_);
281 }
282
283 =head2 cursor
284
285 Returns a storage-driven cursor to the given resultset.
286
287 =cut
288
289 sub cursor {
290   my ($self) = @_;
291   my ($attrs) = $self->{attrs};
292   $attrs = { %$attrs };
293   return $self->{cursor}
294     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
295           $attrs->{where},$attrs);
296 }
297
298 =head2 single
299
300 Inflates the first result without creating a cursor
301
302 =cut
303
304 sub single {
305   my ($self, $extra) = @_;
306   my ($attrs) = $self->{attrs};
307   $attrs = { %$attrs };
308   if ($extra) {
309     if (defined $attrs->{where}) {
310       $attrs->{where} = {
311         '-and'
312           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
313                delete $attrs->{where}, $extra ]
314       };
315     } else {
316       $attrs->{where} = $extra;
317     }
318   }
319   my @data = $self->result_source->storage->select_single(
320           $self->{from}, $attrs->{select},
321           $attrs->{where},$attrs);
322   return (@data ? $self->_construct_object(@data) : ());
323 }
324
325
326 =head2 search_like
327
328 Perform a search, but use C<LIKE> instead of equality as the condition. Note
329 that this is simply a convenience method; you most likely want to use
330 L</search> with specific operators.
331
332 For more information, see L<DBIx::Class::Manual::Cookbook>.
333
334 =cut
335
336 sub search_like {
337   my $class    = shift;
338   my $attrs = { };
339   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
340     $attrs = pop(@_);
341   }
342   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
343   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
344   return $class->search($query, { %$attrs });
345 }
346
347 =head2 slice
348
349 =head3 Arguments: ($first, $last)
350
351 Returns a subset of elements from the resultset.
352
353 =cut
354
355 sub slice {
356   my ($self, $min, $max) = @_;
357   my $attrs = { %{ $self->{attrs} || {} } };
358   $attrs->{offset} ||= 0;
359   $attrs->{offset} += $min;
360   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
361   my $slice = (ref $self)->new($self->result_source, $attrs);
362   return (wantarray ? $slice->all : $slice);
363 }
364
365 =head2 next
366
367 Returns the next element in the resultset (C<undef> is there is none).
368
369 Can be used to efficiently iterate over records in the resultset:
370
371   my $rs = $schema->resultset('CD')->search({});
372   while (my $cd = $rs->next) {
373     print $cd->title;
374   }
375
376 =cut
377
378 sub next {
379   my ($self) = @_;
380   my $cache = $self->get_cache;
381   if( @$cache ) {
382     $self->{all_cache_position} ||= 0;
383     my $obj = $cache->[$self->{all_cache_position}];
384     $self->{all_cache_position}++;
385     return $obj;
386   }
387   my @row = $self->cursor->next;
388 #  warn Dumper(\@row); use Data::Dumper;
389   return unless (@row);
390   return $self->_construct_object(@row);
391 }
392
393 sub _construct_object {
394   my ($self, @row) = @_;
395   my @row_orig = @row; # copy @row for key comparison later, because @row will change
396   my @as = @{ $self->{attrs}{as} };
397 #use Data::Dumper; warn Dumper \@as;
398   #warn "@cols -> @row";
399   my $info = [ {}, {} ];
400   foreach my $as (@as) {
401     my $rs = $self;
402     my $target = $info;
403     my @parts = split(/\./, $as);
404     my $col = pop(@parts);
405     foreach my $p (@parts) {
406       $target = $target->[1]->{$p} ||= [];
407       
408       $rs = $rs->related_resultset($p) if $rs->{attrs}->{cache};
409     }
410     
411     $target->[0]->{$col} = shift @row
412       if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
413   }
414   #use Data::Dumper; warn Dumper(\@as, $info);
415   my $new = $self->result_source->result_class->inflate_result(
416               $self->result_source, @$info);
417   $new = $self->{attrs}{record_filter}->($new)
418     if exists $self->{attrs}{record_filter};
419  
420   if( $self->{attrs}->{cache} ) {
421     while( my( $rel, $rs ) = each( %{$self->{related_resultsets}} ) ) {
422       $rs->all;
423       #warn "$rel:", @{$rs->get_cache};
424     }
425     $self->build_rr( $self, $new );
426   }
427  
428   return $new;
429 }
430   
431 sub build_rr {
432   # build related resultsets for supplied object
433   my ( $self, $context, $obj ) = @_;
434   
435   my $re = qr/^\w+\./;
436   while( my ($rel, $rs) = each( %{$context->{related_resultsets}} ) ) {  
437     #warn "context:", $context->result_source->name, ", rel:$rel, rs:", $rs->result_source->name;
438     my @objs = ();
439     my $map = {};
440     my $cond = $context->result_source->relationship_info($rel)->{cond};
441     keys %$cond;
442     while( my( $rel_key, $pk ) = each(%$cond) ) {
443       $rel_key =~ s/$re//;
444       $pk =~ s/$re//;
445       $map->{$rel_key} = $pk;
446     }
447     
448     $rs->reset();
449     while( my $rel_obj = $rs->next ) {
450       while( my( $rel_key, $pk ) = each(%$map) ) {
451         if( $rel_obj->get_column($rel_key) eq $obj->get_column($pk) ) {
452           push @objs, $rel_obj;
453         }
454       }
455     }
456
457     my $rel_rs = $obj->related_resultset($rel);
458     $rel_rs->{attrs}->{cache} = 1;
459     $rel_rs->set_cache( \@objs );
460     
461     while( my $rel_obj = $rel_rs->next ) {
462       $self->build_rr( $rs, $rel_obj );
463     }
464     
465   }
466   
467 }
468
469 =head2 result_source
470
471 Returns a reference to the result source for this recordset.
472
473 =cut
474
475
476 =head2 count
477
478 Performs an SQL C<COUNT> with the same query as the resultset was built
479 with to find the number of elements. If passed arguments, does a search
480 on the resultset and counts the results of that.
481
482 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
483 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
484 not support C<DISTINCT> with multiple columns. If you are using such a
485 database, you should only use columns from the main table in your C<group_by>
486 clause.
487
488 =cut
489
490 sub count {
491   my $self = shift;
492   return $self->search(@_)->count if @_ && defined $_[0];
493   unless (defined $self->{count}) {
494     return scalar @{ $self->get_cache }
495       if @{ $self->get_cache };
496     my $group_by;
497     my $select = { 'count' => '*' };
498     my $attrs = { %{ $self->{attrs} } };
499     if( $group_by = delete $attrs->{group_by} ) {
500       delete $attrs->{having};
501       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
502       # todo: try CONCAT for multi-column pk
503       my @pk = $self->result_source->primary_columns;
504       if( scalar(@pk) == 1 ) {
505         my $pk = shift(@pk);
506         my $alias = $attrs->{alias};
507         my $re = qr/^($alias\.)?$pk$/;
508         foreach my $column ( @distinct) {
509           if( $column =~ $re ) {
510             @distinct = ( $column );
511             last;
512           }
513         } 
514       }
515
516       $select = { count => { 'distinct' => \@distinct } };
517       #use Data::Dumper; die Dumper $select;
518     }
519
520     $attrs->{select} = $select;
521     $attrs->{as} = [ 'count' ];
522     # offset, order by and page are not needed to count. record_filter is cdbi
523     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
524         
525     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
526   }
527   return 0 unless $self->{count};
528   my $count = $self->{count};
529   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
530   $count = $self->{attrs}{rows} if
531     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
532   return $count;
533 }
534
535 =head2 count_literal
536
537 Calls L</search_literal> with the passed arguments, then L</count>.
538
539 =cut
540
541 sub count_literal { shift->search_literal(@_)->count; }
542
543 =head2 all
544
545 Returns all elements in the resultset. Called implictly if the resultset
546 is returned in list context.
547
548 =cut
549
550 sub all {
551   my ($self) = @_;
552   return @{ $self->get_cache }
553     if @{ $self->get_cache };
554   if( $self->{attrs}->{cache} ) {
555     my @obj = map { $self->_construct_object(@$_); }
556             $self->cursor->all;
557     $self->set_cache( \@obj );
558     return @{ $self->get_cache };
559   }
560   return map { $self->_construct_object(@$_); }
561            $self->cursor->all;
562 }
563
564 =head2 reset
565
566 Resets the resultset's cursor, so you can iterate through the elements again.
567
568 =cut
569
570 sub reset {
571   my ($self) = @_;
572   $self->{all_cache_position} = 0;
573   $self->cursor->reset;
574   return $self;
575 }
576
577 =head2 first
578
579 Resets the resultset and returns the first element.
580
581 =cut
582
583 sub first {
584   return $_[0]->reset->next;
585 }
586
587 =head2 update
588
589 =head3 Arguments: (\%values)
590
591 Sets the specified columns in the resultset to the supplied values.
592
593 =cut
594
595 sub update {
596   my ($self, $values) = @_;
597   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
598   return $self->result_source->storage->update(
599            $self->result_source->from, $values, $self->{cond});
600 }
601
602 =head2 update_all
603
604 =head3 Arguments: (\%values)
605
606 Fetches all objects and updates them one at a time.  Note that C<update_all>
607 will run cascade triggers while L</update> will not.
608
609 =cut
610
611 sub update_all {
612   my ($self, $values) = @_;
613   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
614   foreach my $obj ($self->all) {
615     $obj->set_columns($values)->update;
616   }
617   return 1;
618 }
619
620 =head2 delete
621
622 Deletes the contents of the resultset from its result source.
623
624 =cut
625
626 sub delete {
627   my ($self) = @_;
628   my $del = {};
629   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
630     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
631   if (ref $self->{cond} eq 'ARRAY') {
632     $del = [ map { my %hash;
633       foreach my $key (keys %{$_}) {
634         $key =~ /([^\.]+)$/;
635         $hash{$1} = $_->{$key};
636       }; \%hash; } @{$self->{cond}} ];
637   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
638     $del->{-and} = [ map { my %hash;
639       foreach my $key (keys %{$_}) {
640         $key =~ /([^\.]+)$/;
641         $hash{$1} = $_->{$key};
642       }; \%hash; } @{$self->{cond}{-and}} ];
643   } else {
644     foreach my $key (keys %{$self->{cond}}) {
645       $key =~ /([^\.]+)$/;
646       $del->{$1} = $self->{cond}{$key};
647     }
648   }
649   $self->result_source->storage->delete($self->result_source->from, $del);
650   return 1;
651 }
652
653 =head2 delete_all
654
655 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
656 will run cascade triggers while L</delete> will not.
657
658 =cut
659
660 sub delete_all {
661   my ($self) = @_;
662   $_->delete for $self->all;
663   return 1;
664 }
665
666 =head2 pager
667
668 Returns a L<Data::Page> object for the current resultset. Only makes
669 sense for queries with a C<page> attribute.
670
671 =cut
672
673 sub pager {
674   my ($self) = @_;
675   my $attrs = $self->{attrs};
676   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
677   $attrs->{rows} ||= 10;
678   $self->count;
679   return $self->{pager} ||= Data::Page->new(
680     $self->{count}, $attrs->{rows}, $self->{page});
681 }
682
683 =head2 page
684
685 =head3 Arguments: ($page_num)
686
687 Returns a new resultset for the specified page.
688
689 =cut
690
691 sub page {
692   my ($self, $page) = @_;
693   my $attrs = { %{$self->{attrs}} };
694   $attrs->{page} = $page;
695   return (ref $self)->new($self->result_source, $attrs);
696 }
697
698 =head2 new_result
699
700 =head3 Arguments: (\%vals)
701
702 Creates a result in the resultset's result class.
703
704 =cut
705
706 sub new_result {
707   my ($self, $values) = @_;
708   $self->throw_exception( "new_result needs a hash" )
709     unless (ref $values eq 'HASH');
710   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
711     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
712   my %new = %$values;
713   my $alias = $self->{attrs}{alias};
714   foreach my $key (keys %{$self->{cond}||{}}) {
715     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
716   }
717   my $obj = $self->result_source->result_class->new(\%new);
718   $obj->result_source($self->result_source) if $obj->can('result_source');
719   $obj;
720 }
721
722 =head2 create
723
724 =head3 Arguments: (\%vals)
725
726 Inserts a record into the resultset and returns the object.
727
728 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
729
730 =cut
731
732 sub create {
733   my ($self, $attrs) = @_;
734   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
735   return $self->new_result($attrs)->insert;
736 }
737
738 =head2 find_or_create
739
740 =head3 Arguments: (\%vals, \%attrs?)
741
742   $class->find_or_create({ key => $val, ... });
743
744 Searches for a record matching the search condition; if it doesn't find one,    
745 creates one and returns that instead.                                       
746
747   my $cd = $schema->resultset('CD')->find_or_create({
748     cdid   => 5,
749     artist => 'Massive Attack',
750     title  => 'Mezzanine',
751     year   => 2005,
752   });
753
754 Also takes an optional C<key> attribute, to search by a specific key or unique
755 constraint. For example:
756
757   my $cd = $schema->resultset('CD')->find_or_create(
758     {
759       artist => 'Massive Attack',
760       title  => 'Mezzanine',
761     },
762     { key => 'artist_title' }
763   );
764
765 See also L</find> and L</update_or_create>.
766
767 =cut
768
769 sub find_or_create {
770   my $self     = shift;
771   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
772   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
773   my $exists   = $self->find($hash, $attrs);
774   return defined($exists) ? $exists : $self->create($hash);
775 }
776
777 =head2 update_or_create
778
779   $class->update_or_create({ key => $val, ... });
780
781 First, search for an existing row matching one of the unique constraints
782 (including the primary key) on the source of this resultset.  If a row is
783 found, update it with the other given column values.  Otherwise, create a new
784 row.
785
786 Takes an optional C<key> attribute to search on a specific unique constraint.
787 For example:
788
789   # In your application
790   my $cd = $schema->resultset('CD')->update_or_create(
791     {
792       artist => 'Massive Attack',
793       title  => 'Mezzanine',
794       year   => 1998,
795     },
796     { key => 'artist_title' }
797   );
798
799 If no C<key> is specified, it searches on all unique constraints defined on the
800 source, including the primary key.
801
802 If the C<key> is specified as C<primary>, search only on the primary key.
803
804 See also L</find> and L</find_or_create>.
805
806 =cut
807
808 sub update_or_create {
809   my $self = shift;
810
811   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
812   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
813
814   my %unique_constraints = $self->result_source->unique_constraints;
815   my @constraint_names   = (exists $attrs->{key}
816                             ? ($attrs->{key})
817                             : keys %unique_constraints);
818
819   my @unique_hashes;
820   foreach my $name (@constraint_names) {
821     my @unique_cols = @{ $unique_constraints{$name} };
822     my %unique_hash =
823       map  { $_ => $hash->{$_} }
824       grep { exists $hash->{$_} }
825       @unique_cols;
826
827     push @unique_hashes, \%unique_hash
828       if (scalar keys %unique_hash == scalar @unique_cols);
829   }
830
831   my $row;
832   if (@unique_hashes) {
833     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
834     if ($row) {
835       $row->set_columns($hash);
836       $row->update;
837     }
838   }
839
840   unless ($row) {
841     $row = $self->create($hash);
842   }
843
844   return $row;
845 }
846
847 =head2 get_cache
848
849 Gets the contents of the cache for the resultset.
850
851 =cut
852
853 sub get_cache {
854   my $self = shift;
855   return $self->{all_cache} || [];
856 }
857
858 =head2 set_cache
859
860 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
861
862 =cut
863
864 sub set_cache {
865   my ( $self, $data ) = @_;
866   $self->throw_exception("set_cache requires an arrayref")
867     if ref $data ne 'ARRAY';
868   my $result_class = $self->result_source->result_class;
869   foreach( @$data ) {
870     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
871       if ref $_ ne $result_class;
872   }
873   $self->{all_cache} = $data;
874 }
875
876 =head2 clear_cache
877
878 Clears the cache for the resultset.
879
880 =cut
881
882 sub clear_cache {
883   my $self = shift;
884   $self->set_cache([]);
885 }
886
887 =head2 related_resultset
888
889 Returns a related resultset for the supplied relationship name.
890
891   $rs = $rs->related_resultset('foo');
892
893 =cut
894
895 sub related_resultset {
896   my ( $self, $rel, @rest ) = @_;
897   $self->{related_resultsets} ||= {};
898   my $resultsets = $self->{related_resultsets};
899   if( !exists $resultsets->{$rel} ) {
900     #warn "fetching related resultset for rel '$rel'";
901     my $rel_obj = $self->result_source->relationship_info($rel);
902     $self->throw_exception(
903       "search_related: result source '" . $self->result_source->name .
904       "' has no such relationship ${rel}")
905       unless $rel_obj; #die Dumper $self->{attrs};
906     my $rs;
907     if( $self->{attrs}->{cache} ) {
908       $rs = $self->search(undef);
909     }
910     else {
911       $rs = $self->search(undef, { join => $rel });
912     }
913     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
914     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
915     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
916                   && $rs->{attrs}{seen_join}{$rel} > 1
917                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
918                 : $rel);
919     $resultsets->{$rel} =
920       $self->result_source->schema->resultset($rel_obj->{class}
921            )->search( undef,
922              { %{$rs->{attrs}},
923                alias => $alias,
924                select => undef(),
925                as => undef() }
926            )->search(@rest);
927   }
928   return $resultsets->{$rel};
929 }
930
931 =head2 throw_exception
932
933 See Schema's throw_exception
934
935 =cut
936
937 sub throw_exception {
938   my $self=shift;
939   $self->result_source->schema->throw_exception(@_);
940 }
941
942 =head1 ATTRIBUTES
943
944 The resultset takes various attributes that modify its behavior. Here's an
945 overview of them:
946
947 =head2 order_by
948
949 Which column(s) to order the results by. This is currently passed through
950 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
951
952 =head2 cols
953
954 =head3 Arguments: (arrayref)
955
956 Shortcut to request a particular set of columns to be retrieved.  Adds
957 C<me.> onto the start of any column without a C<.> in it and sets C<select>
958 from that, then auto-populates C<as> from C<select> as normal.
959
960 =head2 include_columns
961
962 =head3 Arguments: (arrayref)
963
964 Shortcut to include additional columns in the returned results - for example
965
966   { include_columns => ['foo.name'], join => ['foo'] }
967
968 would add a 'name' column to the information passed to object inflation
969
970 =head2 select
971
972 =head3 Arguments: (arrayref)
973
974 Indicates which columns should be selected from the storage. You can use
975 column names, or in the case of RDBMS back ends, function or stored procedure
976 names:
977
978   $rs = $schema->resultset('Foo')->search(
979     {},
980     {
981       select => [
982         'column_name',
983         { count => 'column_to_count' },
984         { sum => 'column_to_sum' }
985       ]
986     }
987   );
988
989 When you use function/stored procedure names and do not supply an C<as>
990 attribute, the column names returned are storage-dependent. E.g. MySQL would
991 return a column named C<count(column_to_count)> in the above example.
992
993 =head2 as
994
995 =head3 Arguments: (arrayref)
996
997 Indicates column names for object inflation. This is used in conjunction with
998 C<select>, usually when C<select> contains one or more function or stored
999 procedure names:
1000
1001   $rs = $schema->resultset('Foo')->search(
1002     {},
1003     {
1004       select => [
1005         'column1',
1006         { count => 'column2' }
1007       ],
1008       as => [qw/ column1 column2_count /]
1009     }
1010   );
1011
1012   my $foo = $rs->first(); # get the first Foo
1013
1014 If the object against which the search is performed already has an accessor
1015 matching a column name specified in C<as>, the value can be retrieved using
1016 the accessor as normal:
1017
1018   my $column1 = $foo->column1();
1019
1020 If on the other hand an accessor does not exist in the object, you need to
1021 use C<get_column> instead:
1022
1023   my $column2_count = $foo->get_column('column2_count');
1024
1025 You can create your own accessors if required - see
1026 L<DBIx::Class::Manual::Cookbook> for details.
1027
1028 =head2 join
1029
1030 Contains a list of relationships that should be joined for this query.  For
1031 example:
1032
1033   # Get CDs by Nine Inch Nails
1034   my $rs = $schema->resultset('CD')->search(
1035     { 'artist.name' => 'Nine Inch Nails' },
1036     { join => 'artist' }
1037   );
1038
1039 Can also contain a hash reference to refer to the other relation's relations.
1040 For example:
1041
1042   package MyApp::Schema::Track;
1043   use base qw/DBIx::Class/;
1044   __PACKAGE__->table('track');
1045   __PACKAGE__->add_columns(qw/trackid cd position title/);
1046   __PACKAGE__->set_primary_key('trackid');
1047   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1048   1;
1049
1050   # In your application
1051   my $rs = $schema->resultset('Artist')->search(
1052     { 'track.title' => 'Teardrop' },
1053     {
1054       join     => { cd => 'track' },
1055       order_by => 'artist.name',
1056     }
1057   );
1058
1059 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1060 similarly for a third time). For e.g.
1061
1062   my $rs = $schema->resultset('Artist')->search(
1063     { 'cds.title'   => 'Foo',
1064       'cds_2.title' => 'Bar' },
1065     { join => [ qw/cds cds/ ] });
1066
1067 will return a set of all artists that have both a cd with title Foo and a cd
1068 with title Bar.
1069
1070 If you want to fetch related objects from other tables as well, see C<prefetch>
1071 below.
1072
1073 =head2 prefetch
1074
1075 =head3 Arguments: arrayref/hashref
1076
1077 Contains one or more relationships that should be fetched along with the main 
1078 query (when they are accessed afterwards they will have already been
1079 "prefetched").  This is useful for when you know you will need the related
1080 objects, because it saves at least one query:
1081
1082   my $rs = $schema->resultset('Tag')->search(
1083     {},
1084     {
1085       prefetch => {
1086         cd => 'artist'
1087       }
1088     }
1089   );
1090
1091 The initial search results in SQL like the following:
1092
1093   SELECT tag.*, cd.*, artist.* FROM tag
1094   JOIN cd ON tag.cd = cd.cdid
1095   JOIN artist ON cd.artist = artist.artistid
1096
1097 L<DBIx::Class> has no need to go back to the database when we access the
1098 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1099 case.
1100
1101 Simple prefetches will be joined automatically, so there is no need
1102 for a C<join> attribute in the above search. If you're prefetching to
1103 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1104 specify the join as well.
1105
1106 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1107 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1108 with an accessor type of 'single' or 'filter').
1109
1110 =head2 from
1111
1112 =head3 Arguments: (arrayref)
1113
1114 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1115 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1116 clauses.
1117
1118 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1119 C<join> will usually do what you need and it is strongly recommended that you
1120 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1121
1122 In simple terms, C<from> works as follows:
1123
1124     [
1125         { <alias> => <table>, -join-type => 'inner|left|right' }
1126         [] # nested JOIN (optional)
1127         { <table.column> = <foreign_table.foreign_key> }
1128     ]
1129
1130     JOIN
1131         <alias> <table>
1132         [JOIN ...]
1133     ON <table.column> = <foreign_table.foreign_key>
1134
1135 An easy way to follow the examples below is to remember the following:
1136
1137     Anything inside "[]" is a JOIN
1138     Anything inside "{}" is a condition for the enclosing JOIN
1139
1140 The following examples utilize a "person" table in a family tree application.
1141 In order to express parent->child relationships, this table is self-joined:
1142
1143     # Person->belongs_to('father' => 'Person');
1144     # Person->belongs_to('mother' => 'Person');
1145
1146 C<from> can be used to nest joins. Here we return all children with a father,
1147 then search against all mothers of those children:
1148
1149   $rs = $schema->resultset('Person')->search(
1150       {},
1151       {
1152           alias => 'mother', # alias columns in accordance with "from"
1153           from => [
1154               { mother => 'person' },
1155               [
1156                   [
1157                       { child => 'person' },
1158                       [
1159                           { father => 'person' },
1160                           { 'father.person_id' => 'child.father_id' }
1161                       ]
1162                   ],
1163                   { 'mother.person_id' => 'child.mother_id' }
1164               ],                
1165           ]
1166       },
1167   );
1168
1169   # Equivalent SQL:
1170   # SELECT mother.* FROM person mother
1171   # JOIN (
1172   #   person child
1173   #   JOIN person father
1174   #   ON ( father.person_id = child.father_id )
1175   # )
1176   # ON ( mother.person_id = child.mother_id )
1177
1178 The type of any join can be controlled manually. To search against only people
1179 with a father in the person table, we could explicitly use C<INNER JOIN>:
1180
1181     $rs = $schema->resultset('Person')->search(
1182         {},
1183         {
1184             alias => 'child', # alias columns in accordance with "from"
1185             from => [
1186                 { child => 'person' },
1187                 [
1188                     { father => 'person', -join-type => 'inner' },
1189                     { 'father.id' => 'child.father_id' }
1190                 ],
1191             ]
1192         },
1193     );
1194
1195     # Equivalent SQL:
1196     # SELECT child.* FROM person child
1197     # INNER JOIN person father ON child.father_id = father.id
1198
1199 =head2 page
1200
1201 For a paged resultset, specifies which page to retrieve.  Leave unset
1202 for an unpaged resultset.
1203
1204 =head2 rows
1205
1206 For a paged resultset, how many rows per page:
1207
1208   rows => 10
1209
1210 Can also be used to simulate an SQL C<LIMIT>.
1211
1212 =head2 group_by
1213
1214 =head3 Arguments: (arrayref)
1215
1216 A arrayref of columns to group by. Can include columns of joined tables.
1217
1218   group_by => [qw/ column1 column2 ... /]
1219
1220 =head2 distinct
1221
1222 Set to 1 to group by all columns.
1223
1224 For more examples of using these attributes, see
1225 L<DBIx::Class::Manual::Cookbook>.
1226
1227 =cut
1228
1229 1;