0056abd26a4803fd88449a9785f8ca44644eac9e
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => 'count',
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => 'result_source');
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATRRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   if (my $prefetch = delete $attrs->{prefetch}) {
106     foreach my $p (ref $prefetch eq 'ARRAY'
107               ? (@{$prefetch}) : ($prefetch)) {
108       if( ref $p eq 'HASH' ) {
109         foreach my $key (keys %$p) {
110           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
111             unless $seen{$key};
112         }
113       }
114       else {
115         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
116             unless $seen{$p};
117       }
118       my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
119       #die Dumper \@cols;
120       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
121       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
122     }
123   }
124
125   if ($attrs->{page}) {
126     $attrs->{rows} ||= 10;
127     $attrs->{offset} ||= 0;
128     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
129   }
130   my $new = {
131     result_source => $source,
132     cond => $attrs->{where},
133     from => $attrs->{from},
134     count => undef,
135     page => delete $attrs->{page},
136     pager => undef,
137     attrs => $attrs };
138   bless ($new, $class);
139   return $new;
140 }
141
142 =head2 search
143
144   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
145   my $new_rs = $rs->search({ foo => 3 });
146
147 If you need to pass in additional attributes but no additional condition,
148 call it as C<search({}, \%attrs);>.
149
150   # "SELECT foo, bar FROM $class_table"
151   my @all = $class->search({}, { cols => [qw/foo bar/] });
152
153 =cut
154
155 sub search {
156   my $self = shift;
157
158   #use Data::Dumper;warn Dumper(@_);
159   my $rs;
160   if( @_ ) {
161     
162     my $attrs = { %{$self->{attrs}} };
163     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
164      $attrs = { %$attrs, %{ pop(@_) } };
165     }
166
167     my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
168     if (defined $where) {
169       $where = (defined $attrs->{where}
170                 ? { '-and' =>
171                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
172                         $where, $attrs->{where} ] }
173                 : $where);
174       $attrs->{where} = $where;
175     }
176
177     $rs = (ref $self)->new($self->result_source, $attrs);
178   }
179   else {
180     $rs = $self;
181     $rs->reset();
182   }
183   return (wantarray ? $rs->all : $rs);
184 }
185
186 =head2 search_literal
187
188   my @obj    = $rs->search_literal($literal_where_cond, @bind);
189   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
190
191 Pass a literal chunk of SQL to be added to the conditional part of the
192 resultset.
193
194 =cut
195                                                          
196 sub search_literal {
197   my ($self, $cond, @vals) = @_;
198   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
199   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
200   return $self->search(\$cond, $attrs);
201 }
202
203 =head2 find
204
205 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
206
207 Finds a row based on its primary key or unique constraint. For example:
208
209   my $cd = $schema->resultset('CD')->find(5);
210
211 Also takes an optional C<key> attribute, to search by a specific key or unique
212 constraint. For example:
213
214   my $cd = $schema->resultset('CD')->find_or_create(
215     {
216       artist => 'Massive Attack',
217       title  => 'Mezzanine',
218     },
219     { key => 'artist_title' }
220   );
221
222 See also L</find_or_create> and L</update_or_create>.
223
224 =cut
225
226 sub find {
227   my ($self, @vals) = @_;
228   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
229
230   my @cols = $self->result_source->primary_columns;
231   if (exists $attrs->{key}) {
232     my %uniq = $self->result_source->unique_constraints;
233     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
234       unless exists $uniq{$attrs->{key}};
235     @cols = @{ $uniq{$attrs->{key}} };
236   }
237   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
238   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
239     unless @cols;
240
241   my $query;
242   if (ref $vals[0] eq 'HASH') {
243     $query = { %{$vals[0]} };
244   } elsif (@cols == @vals) {
245     $query = {};
246     @{$query}{@cols} = @vals;
247   } else {
248     $query = {@vals};
249   }
250   foreach (keys %$query) {
251     next if m/\./;
252     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
253   }
254   #warn Dumper($query);
255   return (keys %$attrs
256            ? $self->search($query,$attrs)->single
257            : $self->single($query));
258 }
259
260 =head2 search_related
261
262   $rs->search_related('relname', $cond?, $attrs?);
263
264 Search the specified relationship. Optionally specify a condition for matching
265 records.
266
267 =cut
268
269 sub search_related {
270   return shift->related_resultset(shift)->search(@_);
271 }
272
273 =head2 cursor
274
275 Returns a storage-driven cursor to the given resultset.
276
277 =cut
278
279 sub cursor {
280   my ($self) = @_;
281   my ($attrs) = $self->{attrs};
282   $attrs = { %$attrs };
283   return $self->{cursor}
284     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
285           $attrs->{where},$attrs);
286 }
287
288 =head2 single
289
290 Inflates the first result without creating a cursor
291
292 =cut
293
294 sub single {
295   my ($self, $extra) = @_;
296   my ($attrs) = $self->{attrs};
297   $attrs = { %$attrs };
298   if ($extra) {
299     if (defined $attrs->{where}) {
300       $attrs->{where} = {
301         '-and'
302           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
303                delete $attrs->{where}, $extra ]
304       };
305     } else {
306       $attrs->{where} = $extra;
307     }
308   }
309   my @data = $self->result_source->storage->select_single(
310           $self->{from}, $attrs->{select},
311           $attrs->{where},$attrs);
312   return (@data ? $self->_construct_object(@data) : ());
313 }
314
315
316 =head2 search_like
317
318 Perform a search, but use C<LIKE> instead of equality as the condition. Note
319 that this is simply a convenience method; you most likely want to use
320 L</search> with specific operators.
321
322 For more information, see L<DBIx::Class::Manual::Cookbook>.
323
324 =cut
325
326 sub search_like {
327   my $class    = shift;
328   my $attrs = { };
329   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
330     $attrs = pop(@_);
331   }
332   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
333   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
334   return $class->search($query, { %$attrs });
335 }
336
337 =head2 slice
338
339 =head3 Arguments: ($first, $last)
340
341 Returns a subset of elements from the resultset.
342
343 =cut
344
345 sub slice {
346   my ($self, $min, $max) = @_;
347   my $attrs = { %{ $self->{attrs} || {} } };
348   $attrs->{offset} ||= 0;
349   $attrs->{offset} += $min;
350   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
351   my $slice = (ref $self)->new($self->result_source, $attrs);
352   return (wantarray ? $slice->all : $slice);
353 }
354
355 =head2 next
356
357 Returns the next element in the resultset (C<undef> is there is none).
358
359 Can be used to efficiently iterate over records in the resultset:
360
361   my $rs = $schema->resultset('CD')->search({});
362   while (my $cd = $rs->next) {
363     print $cd->title;
364   }
365
366 =cut
367
368 sub next {
369   my ($self) = @_;
370   my $cache = $self->get_cache;
371   if( @$cache ) {
372     $self->{all_cache_position} ||= 0;
373     my $obj = $cache->[$self->{all_cache_position}];
374     $self->{all_cache_position}++;
375     return $obj;
376   }
377   my @row = $self->cursor->next;
378 #  warn Dumper(\@row); use Data::Dumper;
379   return unless (@row);
380   return $self->_construct_object(@row);
381 }
382
383 sub _construct_object {
384   my ($self, @row) = @_;
385   my @row_orig = @row; # copy @row for key comparison later, because @row will change
386   my @as = @{ $self->{attrs}{as} };
387 #use Data::Dumper; warn Dumper \@as;
388   #warn "@cols -> @row";
389   my $info = [ {}, {} ];
390   foreach my $as (@as) {
391     my $rs = $self;
392     my $target = $info;
393     my @parts = split(/\./, $as);
394     my $col = pop(@parts);
395     foreach my $p (@parts) {
396       $target = $target->[1]->{$p} ||= [];
397       
398       $rs = $rs->related_resultset($p) if $rs->{attrs}->{cache};
399     }
400     
401     $target->[0]->{$col} = shift @row
402       if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
403   }
404   #use Data::Dumper; warn Dumper(\@as, $info);
405   my $new = $self->result_source->result_class->inflate_result(
406               $self->result_source, @$info);
407   $new = $self->{attrs}{record_filter}->($new)
408     if exists $self->{attrs}{record_filter};
409  
410   if( $self->{attrs}->{cache} ) {
411     while( my( $rel, $rs ) = each( %{$self->{related_resultsets}} ) ) {
412       $rs->all;
413       #warn "$rel:", @{$rs->get_cache};
414     }
415     $self->build_rr( $self, $new );
416   }
417  
418   return $new;
419 }
420   
421 sub build_rr {
422   # build related resultsets for supplied object
423   my ( $self, $context, $obj ) = @_;
424   
425   my $re = qr/^\w+\./;
426   while( my ($rel, $rs) = each( %{$context->{related_resultsets}} ) ) {  
427     #warn "context:", $context->result_source->name, ", rel:$rel, rs:", $rs->result_source->name;
428     my @objs = ();
429     my $map = {};
430     my $cond = $context->result_source->relationship_info($rel)->{cond};
431     keys %$cond;
432     while( my( $rel_key, $pk ) = each(%$cond) ) {
433       $rel_key =~ s/$re//;
434       $pk =~ s/$re//;
435       $map->{$rel_key} = $pk;
436     }
437     
438     $rs->reset();
439     while( my $rel_obj = $rs->next ) {
440       while( my( $rel_key, $pk ) = each(%$map) ) {
441         if( $rel_obj->get_column($rel_key) eq $obj->get_column($pk) ) {
442           push @objs, $rel_obj;
443         }
444       }
445     }
446
447     my $rel_rs = $obj->related_resultset($rel);
448     $rel_rs->{attrs}->{cache} = 1;
449     $rel_rs->set_cache( \@objs );
450     
451     while( my $rel_obj = $rel_rs->next ) {
452       $self->build_rr( $rs, $rel_obj );
453     }
454     
455   }
456   
457 }
458
459 =head2 result_source
460
461 Returns a reference to the result source for this recordset.
462
463 =cut
464
465
466 =head2 count
467
468 Performs an SQL C<COUNT> with the same query as the resultset was built
469 with to find the number of elements. If passed arguments, does a search
470 on the resultset and counts the results of that.
471
472 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
473 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
474 not support C<DISTINCT> with multiple columns. If you are using such a
475 database, you should only use columns from the main table in your C<group_by>
476 clause.
477
478 =cut
479
480 sub count {
481   my $self = shift;
482   return $self->search(@_)->count if @_ && defined $_[0];
483   unless (defined $self->{count}) {
484     return scalar @{ $self->get_cache }
485       if @{ $self->get_cache };
486     my $group_by;
487     my $select = { 'count' => '*' };
488     if( $group_by = delete $self->{attrs}{group_by} ) {
489       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
490       # todo: try CONCAT for multi-column pk
491       my @pk = $self->result_source->primary_columns;
492       if( scalar(@pk) == 1 ) {
493         my $pk = shift(@pk);
494         my $alias = $self->{attrs}{alias};
495         my $re = qr/^($alias\.)?$pk$/;
496         foreach my $column ( @distinct) {
497           if( $column =~ $re ) {
498             @distinct = ( $column );
499             last;
500           }
501         } 
502       }
503
504       $select = { count => { 'distinct' => \@distinct } };
505       #use Data::Dumper; die Dumper $select;
506     }
507
508     my $attrs = { %{ $self->{attrs} },
509                   select => $select,
510                   as => [ 'count' ] };
511     # offset, order by and page are not needed to count. record_filter is cdbi
512     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
513         
514     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
515     $self->{attrs}{group_by} = $group_by;
516   }
517   return 0 unless $self->{count};
518   my $count = $self->{count};
519   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
520   $count = $self->{attrs}{rows} if
521     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
522   return $count;
523 }
524
525 =head2 count_literal
526
527 Calls L</search_literal> with the passed arguments, then L</count>.
528
529 =cut
530
531 sub count_literal { shift->search_literal(@_)->count; }
532
533 =head2 all
534
535 Returns all elements in the resultset. Called implictly if the resultset
536 is returned in list context.
537
538 =cut
539
540 sub all {
541   my ($self) = @_;
542   return @{ $self->get_cache }
543     if @{ $self->get_cache };
544   if( $self->{attrs}->{cache} ) {
545     my @obj = map { $self->_construct_object(@$_); }
546             $self->cursor->all;
547     $self->set_cache( \@obj );
548     return @{ $self->get_cache };
549   }
550   return map { $self->_construct_object(@$_); }
551            $self->cursor->all;
552 }
553
554 =head2 reset
555
556 Resets the resultset's cursor, so you can iterate through the elements again.
557
558 =cut
559
560 sub reset {
561   my ($self) = @_;
562   $self->{all_cache_position} = 0;
563   $self->cursor->reset;
564   return $self;
565 }
566
567 =head2 first
568
569 Resets the resultset and returns the first element.
570
571 =cut
572
573 sub first {
574   return $_[0]->reset->next;
575 }
576
577 =head2 update
578
579 =head3 Arguments: (\%values)
580
581 Sets the specified columns in the resultset to the supplied values.
582
583 =cut
584
585 sub update {
586   my ($self, $values) = @_;
587   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
588   return $self->result_source->storage->update(
589            $self->result_source->from, $values, $self->{cond});
590 }
591
592 =head2 update_all
593
594 =head3 Arguments: (\%values)
595
596 Fetches all objects and updates them one at a time.  Note that C<update_all>
597 will run cascade triggers while L</update> will not.
598
599 =cut
600
601 sub update_all {
602   my ($self, $values) = @_;
603   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
604   foreach my $obj ($self->all) {
605     $obj->set_columns($values)->update;
606   }
607   return 1;
608 }
609
610 =head2 delete
611
612 Deletes the contents of the resultset from its result source.
613
614 =cut
615
616 sub delete {
617   my ($self) = @_;
618   my $del = {};
619   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
620     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
621   if (ref $self->{cond} eq 'ARRAY') {
622     $del = [ map { my %hash;
623       foreach my $key (keys %{$_}) {
624         $key =~ /([^\.]+)$/;
625         $hash{$1} = $_->{$key};
626       }; \%hash; } @{$self->{cond}} ];
627   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
628     $del->{-and} = [ map { my %hash;
629       foreach my $key (keys %{$_}) {
630         $key =~ /([^\.]+)$/;
631         $hash{$1} = $_->{$key};
632       }; \%hash; } @{$self->{cond}{-and}} ];
633   } else {
634     foreach my $key (keys %{$self->{cond}}) {
635       $key =~ /([^\.]+)$/;
636       $del->{$1} = $self->{cond}{$key};
637     }
638   }
639   $self->result_source->storage->delete($self->result_source->from, $del);
640   return 1;
641 }
642
643 =head2 delete_all
644
645 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
646 will run cascade triggers while L</delete> will not.
647
648 =cut
649
650 sub delete_all {
651   my ($self) = @_;
652   $_->delete for $self->all;
653   return 1;
654 }
655
656 =head2 pager
657
658 Returns a L<Data::Page> object for the current resultset. Only makes
659 sense for queries with a C<page> attribute.
660
661 =cut
662
663 sub pager {
664   my ($self) = @_;
665   my $attrs = $self->{attrs};
666   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
667   $attrs->{rows} ||= 10;
668   $self->count;
669   return $self->{pager} ||= Data::Page->new(
670     $self->{count}, $attrs->{rows}, $self->{page});
671 }
672
673 =head2 page
674
675 =head3 Arguments: ($page_num)
676
677 Returns a new resultset for the specified page.
678
679 =cut
680
681 sub page {
682   my ($self, $page) = @_;
683   my $attrs = { %{$self->{attrs}} };
684   $attrs->{page} = $page;
685   return (ref $self)->new($self->result_source, $attrs);
686 }
687
688 =head2 new_result
689
690 =head3 Arguments: (\%vals)
691
692 Creates a result in the resultset's result class.
693
694 =cut
695
696 sub new_result {
697   my ($self, $values) = @_;
698   $self->throw_exception( "new_result needs a hash" )
699     unless (ref $values eq 'HASH');
700   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
701     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
702   my %new = %$values;
703   my $alias = $self->{attrs}{alias};
704   foreach my $key (keys %{$self->{cond}||{}}) {
705     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
706   }
707   my $obj = $self->result_source->result_class->new(\%new);
708   $obj->result_source($self->result_source) if $obj->can('result_source');
709   $obj;
710 }
711
712 =head2 create
713
714 =head3 Arguments: (\%vals)
715
716 Inserts a record into the resultset and returns the object.
717
718 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
719
720 =cut
721
722 sub create {
723   my ($self, $attrs) = @_;
724   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
725   return $self->new_result($attrs)->insert;
726 }
727
728 =head2 find_or_create
729
730 =head3 Arguments: (\%vals, \%attrs?)
731
732   $class->find_or_create({ key => $val, ... });
733
734 Searches for a record matching the search condition; if it doesn't find one,    
735 creates one and returns that instead.                                       
736
737   my $cd = $schema->resultset('CD')->find_or_create({
738     cdid   => 5,
739     artist => 'Massive Attack',
740     title  => 'Mezzanine',
741     year   => 2005,
742   });
743
744 Also takes an optional C<key> attribute, to search by a specific key or unique
745 constraint. For example:
746
747   my $cd = $schema->resultset('CD')->find_or_create(
748     {
749       artist => 'Massive Attack',
750       title  => 'Mezzanine',
751     },
752     { key => 'artist_title' }
753   );
754
755 See also L</find> and L</update_or_create>.
756
757 =cut
758
759 sub find_or_create {
760   my $self     = shift;
761   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
762   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
763   my $exists   = $self->find($hash, $attrs);
764   return defined($exists) ? $exists : $self->create($hash);
765 }
766
767 =head2 update_or_create
768
769   $class->update_or_create({ key => $val, ... });
770
771 First, search for an existing row matching one of the unique constraints
772 (including the primary key) on the source of this resultset.  If a row is
773 found, update it with the other given column values.  Otherwise, create a new
774 row.
775
776 Takes an optional C<key> attribute to search on a specific unique constraint.
777 For example:
778
779   # In your application
780   my $cd = $schema->resultset('CD')->update_or_create(
781     {
782       artist => 'Massive Attack',
783       title  => 'Mezzanine',
784       year   => 1998,
785     },
786     { key => 'artist_title' }
787   );
788
789 If no C<key> is specified, it searches on all unique constraints defined on the
790 source, including the primary key.
791
792 If the C<key> is specified as C<primary>, search only on the primary key.
793
794 See also L</find> and L</find_or_create>.
795
796 =cut
797
798 sub update_or_create {
799   my $self = shift;
800
801   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
802   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
803
804   my %unique_constraints = $self->result_source->unique_constraints;
805   my @constraint_names   = (exists $attrs->{key}
806                             ? ($attrs->{key})
807                             : keys %unique_constraints);
808
809   my @unique_hashes;
810   foreach my $name (@constraint_names) {
811     my @unique_cols = @{ $unique_constraints{$name} };
812     my %unique_hash =
813       map  { $_ => $hash->{$_} }
814       grep { exists $hash->{$_} }
815       @unique_cols;
816
817     push @unique_hashes, \%unique_hash
818       if (scalar keys %unique_hash == scalar @unique_cols);
819   }
820
821   my $row;
822   if (@unique_hashes) {
823     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
824     if ($row) {
825       $row->set_columns($hash);
826       $row->update;
827     }
828   }
829
830   unless ($row) {
831     $row = $self->create($hash);
832   }
833
834   return $row;
835 }
836
837 =head2 get_cache
838
839 Gets the contents of the cache for the resultset.
840
841 =cut
842
843 sub get_cache {
844   my $self = shift;
845   return $self->{all_cache} || [];
846 }
847
848 =head2 set_cache
849
850 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
851
852 =cut
853
854 sub set_cache {
855   my ( $self, $data ) = @_;
856   $self->throw_exception("set_cache requires an arrayref")
857     if ref $data ne 'ARRAY';
858   my $result_class = $self->result_source->result_class;
859   foreach( @$data ) {
860     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
861       if ref $_ ne $result_class;
862   }
863   $self->{all_cache} = $data;
864 }
865
866 =head2 clear_cache
867
868 Clears the cache for the resultset.
869
870 =cut
871
872 sub clear_cache {
873   my $self = shift;
874   $self->set_cache([]);
875 }
876
877 =head2 related_resultset
878
879 Returns a related resultset for the supplied relationship name.
880
881   $rs = $rs->related_resultset('foo');
882
883 =cut
884
885 sub related_resultset {
886   my ( $self, $rel, @rest ) = @_;
887   $self->{related_resultsets} ||= {};
888   my $resultsets = $self->{related_resultsets};
889   if( !exists $resultsets->{$rel} ) {
890     #warn "fetching related resultset for rel '$rel'";
891     my $rel_obj = $self->result_source->relationship_info($rel);
892     $self->throw_exception(
893       "search_related: result source '" . $self->result_source->name .
894       "' has no such relationship ${rel}")
895       unless $rel_obj; #die Dumper $self->{attrs};
896     my $rs;
897     if( $self->{attrs}->{cache} ) {
898       $rs = $self->search(undef);
899     }
900     else {
901       $rs = $self->search(undef, { join => $rel });
902     }
903     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
904     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
905     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
906                   && $rs->{attrs}{seen_join}{$rel} > 1
907                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
908                 : $rel);
909     $resultsets->{$rel} =
910       $self->result_source->schema->resultset($rel_obj->{class}
911            )->search( undef,
912              { %{$rs->{attrs}},
913                alias => $alias,
914                select => undef(),
915                as => undef() }
916            )->search(@rest);
917   }
918   return $resultsets->{$rel};
919 }
920
921 =head2 throw_exception
922
923 See Schema's throw_exception
924
925 =cut
926
927 sub throw_exception {
928   my $self=shift;
929   $self->result_source->schema->throw_exception(@_);
930 }
931
932 =head1 ATTRIBUTES
933
934 The resultset takes various attributes that modify its behavior. Here's an
935 overview of them:
936
937 =head2 order_by
938
939 Which column(s) to order the results by. This is currently passed through
940 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
941
942 =head2 cols
943
944 =head3 Arguments: (arrayref)
945
946 Shortcut to request a particular set of columns to be retrieved.  Adds
947 C<me.> onto the start of any column without a C<.> in it and sets C<select>
948 from that, then auto-populates C<as> from C<select> as normal.
949
950 =head2 include_columns
951
952 =head3 Arguments: (arrayref)
953
954 Shortcut to include additional columns in the returned results - for example
955
956   { include_columns => ['foo.name'], join => ['foo'] }
957
958 would add a 'name' column to the information passed to object inflation
959
960 =head2 select
961
962 =head3 Arguments: (arrayref)
963
964 Indicates which columns should be selected from the storage. You can use
965 column names, or in the case of RDBMS back ends, function or stored procedure
966 names:
967
968   $rs = $schema->resultset('Foo')->search(
969     {},
970     {
971       select => [
972         'column_name',
973         { count => 'column_to_count' },
974         { sum => 'column_to_sum' }
975       ]
976     }
977   );
978
979 When you use function/stored procedure names and do not supply an C<as>
980 attribute, the column names returned are storage-dependent. E.g. MySQL would
981 return a column named C<count(column_to_count)> in the above example.
982
983 =head2 as
984
985 =head3 Arguments: (arrayref)
986
987 Indicates column names for object inflation. This is used in conjunction with
988 C<select>, usually when C<select> contains one or more function or stored
989 procedure names:
990
991   $rs = $schema->resultset('Foo')->search(
992     {},
993     {
994       select => [
995         'column1',
996         { count => 'column2' }
997       ],
998       as => [qw/ column1 column2_count /]
999     }
1000   );
1001
1002   my $foo = $rs->first(); # get the first Foo
1003
1004 If the object against which the search is performed already has an accessor
1005 matching a column name specified in C<as>, the value can be retrieved using
1006 the accessor as normal:
1007
1008   my $column1 = $foo->column1();
1009
1010 If on the other hand an accessor does not exist in the object, you need to
1011 use C<get_column> instead:
1012
1013   my $column2_count = $foo->get_column('column2_count');
1014
1015 You can create your own accessors if required - see
1016 L<DBIx::Class::Manual::Cookbook> for details.
1017
1018 =head2 join
1019
1020 Contains a list of relationships that should be joined for this query.  For
1021 example:
1022
1023   # Get CDs by Nine Inch Nails
1024   my $rs = $schema->resultset('CD')->search(
1025     { 'artist.name' => 'Nine Inch Nails' },
1026     { join => 'artist' }
1027   );
1028
1029 Can also contain a hash reference to refer to the other relation's relations.
1030 For example:
1031
1032   package MyApp::Schema::Track;
1033   use base qw/DBIx::Class/;
1034   __PACKAGE__->table('track');
1035   __PACKAGE__->add_columns(qw/trackid cd position title/);
1036   __PACKAGE__->set_primary_key('trackid');
1037   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1038   1;
1039
1040   # In your application
1041   my $rs = $schema->resultset('Artist')->search(
1042     { 'track.title' => 'Teardrop' },
1043     {
1044       join     => { cd => 'track' },
1045       order_by => 'artist.name',
1046     }
1047   );
1048
1049 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1050 similarly for a third time). For e.g.
1051
1052   my $rs = $schema->resultset('Artist')->search(
1053     { 'cds.title'   => 'Foo',
1054       'cds_2.title' => 'Bar' },
1055     { join => [ qw/cds cds/ ] });
1056
1057 will return a set of all artists that have both a cd with title Foo and a cd
1058 with title Bar.
1059
1060 If you want to fetch related objects from other tables as well, see C<prefetch>
1061 below.
1062
1063 =head2 prefetch
1064
1065 =head3 Arguments: arrayref/hashref
1066
1067 Contains one or more relationships that should be fetched along with the main 
1068 query (when they are accessed afterwards they will have already been
1069 "prefetched").  This is useful for when you know you will need the related
1070 objects, because it saves at least one query:
1071
1072   my $rs = $schema->resultset('Tag')->search(
1073     {},
1074     {
1075       prefetch => {
1076         cd => 'artist'
1077       }
1078     }
1079   );
1080
1081 The initial search results in SQL like the following:
1082
1083   SELECT tag.*, cd.*, artist.* FROM tag
1084   JOIN cd ON tag.cd = cd.cdid
1085   JOIN artist ON cd.artist = artist.artistid
1086
1087 L<DBIx::Class> has no need to go back to the database when we access the
1088 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1089 case.
1090
1091 Simple prefetches will be joined automatically, so there is no need
1092 for a C<join> attribute in the above search. If you're prefetching to
1093 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1094 specify the join as well.
1095
1096 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1097 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1098 with an accessor type of 'single' or 'filter').
1099
1100 =head2 from
1101
1102 =head3 Arguments: (arrayref)
1103
1104 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1105 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1106 clauses.
1107
1108 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1109 C<join> will usually do what you need and it is strongly recommended that you
1110 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1111
1112 In simple terms, C<from> works as follows:
1113
1114     [
1115         { <alias> => <table>, -join-type => 'inner|left|right' }
1116         [] # nested JOIN (optional)
1117         { <table.column> = <foreign_table.foreign_key> }
1118     ]
1119
1120     JOIN
1121         <alias> <table>
1122         [JOIN ...]
1123     ON <table.column> = <foreign_table.foreign_key>
1124
1125 An easy way to follow the examples below is to remember the following:
1126
1127     Anything inside "[]" is a JOIN
1128     Anything inside "{}" is a condition for the enclosing JOIN
1129
1130 The following examples utilize a "person" table in a family tree application.
1131 In order to express parent->child relationships, this table is self-joined:
1132
1133     # Person->belongs_to('father' => 'Person');
1134     # Person->belongs_to('mother' => 'Person');
1135
1136 C<from> can be used to nest joins. Here we return all children with a father,
1137 then search against all mothers of those children:
1138
1139   $rs = $schema->resultset('Person')->search(
1140       {},
1141       {
1142           alias => 'mother', # alias columns in accordance with "from"
1143           from => [
1144               { mother => 'person' },
1145               [
1146                   [
1147                       { child => 'person' },
1148                       [
1149                           { father => 'person' },
1150                           { 'father.person_id' => 'child.father_id' }
1151                       ]
1152                   ],
1153                   { 'mother.person_id' => 'child.mother_id' }
1154               ],                
1155           ]
1156       },
1157   );
1158
1159   # Equivalent SQL:
1160   # SELECT mother.* FROM person mother
1161   # JOIN (
1162   #   person child
1163   #   JOIN person father
1164   #   ON ( father.person_id = child.father_id )
1165   # )
1166   # ON ( mother.person_id = child.mother_id )
1167
1168 The type of any join can be controlled manually. To search against only people
1169 with a father in the person table, we could explicitly use C<INNER JOIN>:
1170
1171     $rs = $schema->resultset('Person')->search(
1172         {},
1173         {
1174             alias => 'child', # alias columns in accordance with "from"
1175             from => [
1176                 { child => 'person' },
1177                 [
1178                     { father => 'person', -join-type => 'inner' },
1179                     { 'father.id' => 'child.father_id' }
1180                 ],
1181             ]
1182         },
1183     );
1184
1185     # Equivalent SQL:
1186     # SELECT child.* FROM person child
1187     # INNER JOIN person father ON child.father_id = father.id
1188
1189 =head2 page
1190
1191 For a paged resultset, specifies which page to retrieve.  Leave unset
1192 for an unpaged resultset.
1193
1194 =head2 rows
1195
1196 For a paged resultset, how many rows per page:
1197
1198   rows => 10
1199
1200 Can also be used to simulate an SQL C<LIMIT>.
1201
1202 =head2 group_by
1203
1204 =head3 Arguments: (arrayref)
1205
1206 A arrayref of columns to group by. Can include columns of joined tables.
1207
1208   group_by => [qw/ column1 column2 ... /]
1209
1210 =head2 distinct
1211
1212 Set to 1 to group by all columns.
1213
1214 For more examples of using these attributes, see
1215 L<DBIx::Class::Manual::Cookbook>.
1216
1217 =cut
1218
1219 1;