2e9fb33b7cdc572ec55a1412d3c12c7364eef280
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: ($source, \%$attrs)
59
60 =back
61
62 The resultset constructor. Takes a source object (usually a
63 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
64 L</ATTRIBUTES> below).  Does not perform any queries -- these are
65 executed as needed by the other methods.
66
67 Generally you won't need to construct a resultset manually.  You'll
68 automatically get one from e.g. a L</search> called in scalar context:
69
70   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
71
72 =cut
73
74 sub new {
75   my $class = shift;
76   return $class->new_result(@_) if ref $class;
77   
78   my ($source, $attrs) = @_;
79   weaken $source;
80   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
81   #use Data::Dumper; warn Dumper($attrs);
82   my $alias = ($attrs->{alias} ||= 'me');
83   
84   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
85   delete $attrs->{as} if $attrs->{columns};
86   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
87   $attrs->{select} = [
88     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
89   ] if $attrs->{columns};
90   $attrs->{as} ||= [
91     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
92   ];
93   if (my $include = delete $attrs->{include_columns}) {
94     push(@{$attrs->{select}}, @$include);
95     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
96   }
97   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
98
99   $attrs->{from} ||= [ { $alias => $source->from } ];
100   $attrs->{seen_join} ||= {};
101   my %seen;
102   if (my $join = delete $attrs->{join}) {
103     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
104       if (ref $j eq 'HASH') {
105         $seen{$_} = 1 foreach keys %$j;
106       } else {
107         $seen{$j} = 1;
108       }
109     }
110     push(@{$attrs->{from}}, $source->resolve_join(
111       $join, $attrs->{alias}, $attrs->{seen_join})
112     );
113   }
114   
115   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
116   $attrs->{order_by} = [ $attrs->{order_by} ] if
117     $attrs->{order_by} and !ref($attrs->{order_by});
118   $attrs->{order_by} ||= [];
119
120   my $collapse = $attrs->{collapse} || {};
121   if (my $prefetch = delete $attrs->{prefetch}) {
122     my @pre_order;
123     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
124       if ( ref $p eq 'HASH' ) {
125         foreach my $key (keys %$p) {
126           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
127             unless $seen{$key};
128         }
129       } else {
130         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
131             unless $seen{$p};
132       }
133       my @prefetch = $source->resolve_prefetch(
134            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
135       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
136       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
137     }
138     push(@{$attrs->{order_by}}, @pre_order);
139   }
140   $attrs->{collapse} = $collapse;
141 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
142
143   if ($attrs->{page}) {
144     $attrs->{rows} ||= 10;
145     $attrs->{offset} ||= 0;
146     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
147   }
148
149   bless {
150     result_source => $source,
151     result_class => $attrs->{result_class} || $source->result_class,
152     cond => $attrs->{where},
153     from => $attrs->{from},
154     collapse => $collapse,
155     count => undef,
156     page => delete $attrs->{page},
157     pager => undef,
158     attrs => $attrs
159   }, $class;
160 }
161
162 =head2 search
163
164 =over 4
165
166 =item Arguments: (\%cond?, \%attrs?)
167
168 =item Returns: $resultset (scalar context), @row_objs (list context)
169
170 =back
171
172   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
173   my $new_rs = $cd_rs->search({ year => 2005 });
174
175 If you need to pass in additional attributes but no additional condition,
176 call it as C<search(undef, \%attrs);>.
177
178   # "SELECT name, artistid FROM $artist_table"
179   my @all_artists = $schema->resultset('Artist')->search(undef, {
180     columns => [qw/name artistid/],
181   });
182
183 =cut
184
185 sub search {
186   my $self = shift;
187
188   my $rs;
189   if( @_ ) {
190     
191     my $attrs = { %{$self->{attrs}} };
192     my $having = delete $attrs->{having};
193     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
194
195     my $where = (@_
196                   ? ((@_ == 1 || ref $_[0] eq "HASH")
197                       ? shift
198                       : ((@_ % 2)
199                           ? $self->throw_exception(
200                               "Odd number of arguments to search")
201                           : {@_}))
202                   : undef());
203     if (defined $where) {
204       $attrs->{where} = (defined $attrs->{where}
205                 ? { '-and' =>
206                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
207                         $where, $attrs->{where} ] }
208                 : $where);
209     }
210
211     if (defined $having) {
212       $attrs->{having} = (defined $attrs->{having}
213                 ? { '-and' =>
214                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
215                         $having, $attrs->{having} ] }
216                 : $having);
217     }
218
219     $rs = (ref $self)->new($self->result_source, $attrs);
220   }
221   else {
222     $rs = $self;
223     $rs->reset;
224   }
225   return (wantarray ? $rs->all : $rs);
226 }
227
228 =head2 search_literal
229
230 =over 4
231
232 =item Arguments: ($literal_cond, @bind?)
233
234 =item Returns: $resultset (scalar context), @row_objs (list context)
235
236 =back
237
238   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
239   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
240
241 Pass a literal chunk of SQL to be added to the conditional part of the
242 resultset query.
243
244 =cut
245
246 sub search_literal {
247   my ($self, $cond, @vals) = @_;
248   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
249   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
250   return $self->search(\$cond, $attrs);
251 }
252
253 =head2 find
254
255 =over 4
256
257 =item Arguments: (@colvalues) | (\%cols, \%attrs?)
258
259 =item Returns: $row_object
260
261 =back
262
263 Finds a row based on its primary key or unique constraint. For example:
264
265   my $cd = $schema->resultset('CD')->find(5);
266
267 Also takes an optional C<key> attribute, to search by a specific key or unique
268 constraint. For example:
269
270   my $cd = $schema->resultset('CD')->find(
271     {
272       artist => 'Massive Attack',
273       title  => 'Mezzanine',
274     },
275     { key => 'artist_title' }
276   );
277
278 See also L</find_or_create> and L</update_or_create>.
279
280 =cut
281
282 sub find {
283   my ($self, @vals) = @_;
284   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
285
286   my @cols = $self->result_source->primary_columns;
287   if (exists $attrs->{key}) {
288     my %uniq = $self->result_source->unique_constraints;
289     $self->throw_exception(
290       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
291     ) unless exists $uniq{$attrs->{key}};
292     @cols = @{ $uniq{$attrs->{key}} };
293   }
294   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
295   $self->throw_exception(
296     "Can't find unless a primary key or unique constraint is defined"
297   ) unless @cols;
298
299   my $query;
300   if (ref $vals[0] eq 'HASH') {
301     $query = { %{$vals[0]} };
302   } elsif (@cols == @vals) {
303     $query = {};
304     @{$query}{@cols} = @vals;
305   } else {
306     $query = {@vals};
307   }
308   foreach my $key (grep { ! m/\./ } keys %$query) {
309     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
310   }
311   #warn Dumper($query);
312   
313   if (keys %$attrs) {
314       my $rs = $self->search($query,$attrs);
315       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
316   } else {
317       return keys %{$self->{collapse}} ?
318         $self->search($query)->next :
319         $self->single($query);
320   }
321 }
322
323 =head2 search_related
324
325 =over 4
326
327 =item Arguments: (\%cond?, \%attrs?)
328
329 =item Returns: $new_resultset
330
331 =back
332
333   $new_rs = $cd_rs->search_related('artist', {
334     name => 'Emo-R-Us',
335   });
336
337 Search the specified relationship, optionally specify a condition and
338 attributes for matching records. See L</ATTRIBUTES> for more information.
339
340 =cut
341
342 sub search_related {
343   return shift->related_resultset(shift)->search(@_);
344 }
345
346 =head2 cursor
347
348 =over 4
349
350 =item Arguments: (none)
351
352 =item Returns: $cursor
353
354 =back
355
356 Returns a storage-driven cursor to the given resultset. See
357 L<DBIx::Class::Cursor> for more information.
358
359 =cut
360
361 sub cursor {
362   my ($self) = @_;
363   my $attrs = { %{$self->{attrs}} };
364   return $self->{cursor}
365     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
366           $attrs->{where},$attrs);
367 }
368
369 =head2 single
370
371 =over 4
372
373 =item Arguments: (\%cond)
374
375 =item Returns: $row_object
376
377 =back
378
379   my $cd = $schema->resultset('CD')->single({ year => 2001 });
380
381 Inflates the first result without creating a cursor.
382
383 =cut
384
385 sub single {
386   my ($self, $where) = @_;
387   my $attrs = { %{$self->{attrs}} };
388   if ($where) {
389     if (defined $attrs->{where}) {
390       $attrs->{where} = {
391         '-and' => 
392             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
393                $where, delete $attrs->{where} ]
394       };
395     } else {
396       $attrs->{where} = $where;
397     }
398   }
399   my @data = $self->result_source->storage->select_single(
400           $self->{from}, $attrs->{select},
401           $attrs->{where},$attrs);
402   return (@data ? $self->_construct_object(@data) : ());
403 }
404
405
406 =head2 search_like
407
408 =over 4
409
410 =item Arguments: (\%cond?, \%attrs?)
411
412 =item Returns: $resultset (scalar context), @row_objs (list context)
413
414 =back
415
416   # WHERE title LIKE '%blue%'
417   $cd_rs = $rs->search_like({ title => '%blue%'});
418
419 Perform a search, but use C<LIKE> instead of C<=> as the condition. Note
420 that this is simply a convenience method. You most likely want to use
421 L</search> with specific operators.
422
423 For more information, see L<DBIx::Class::Manual::Cookbook>.
424
425 =cut
426
427 sub search_like {
428   my $class = shift;
429   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
430   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
431   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
432   return $class->search($query, { %$attrs });
433 }
434
435 =head2 slice
436
437 =over 4
438
439 =item Arguments: ($first, $last)
440
441 =item Returns: $resultset (scalar context), @row_objs (list context)
442
443 =back
444
445 Returns a subset of elements from the resultset.
446
447 =cut
448
449 sub slice {
450   my ($self, $min, $max) = @_;
451   my $attrs = { %{ $self->{attrs} || {} } };
452   $attrs->{offset} ||= 0;
453   $attrs->{offset} += $min;
454   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
455   my $slice = (ref $self)->new($self->result_source, $attrs);
456   return (wantarray ? $slice->all : $slice);
457 }
458
459 =head2 next
460
461 Returns the next element in the resultset (C<undef> is there is none).
462
463 Can be used to efficiently iterate over records in the resultset:
464
465   my $rs = $schema->resultset('CD')->search;
466   while (my $cd = $rs->next) {
467     print $cd->title;
468   }
469
470 =cut
471
472 sub next {
473   my ($self) = @_;
474   if (@{$self->{all_cache} || []}) {
475     $self->{all_cache_position} ||= 0;
476     return $self->{all_cache}->[$self->{all_cache_position}++];
477   }
478   if ($self->{attrs}{cache}) {
479     $self->{all_cache_position} = 1;
480     return ($self->all)[0];
481   }
482   my @row = (exists $self->{stashed_row} ?
483                @{delete $self->{stashed_row}} :
484                $self->cursor->next
485   );
486 #  warn Dumper(\@row); use Data::Dumper;
487   return unless (@row);
488   return $self->_construct_object(@row);
489 }
490
491 sub _construct_object {
492   my ($self, @row) = @_;
493   my @as = @{ $self->{attrs}{as} };
494   
495   my $info = $self->_collapse_result(\@as, \@row);
496   
497   my $new = $self->result_class->inflate_result($self->result_source, @$info);
498   
499   $new = $self->{attrs}{record_filter}->($new)
500     if exists $self->{attrs}{record_filter};
501   return $new;
502 }
503
504 sub _collapse_result {
505   my ($self, $as, $row, $prefix) = @_;
506
507   my %const;
508
509   my @copy = @$row;
510   foreach my $this_as (@$as) {
511     my $val = shift @copy;
512     if (defined $prefix) {
513       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
514         my $remain = $1;
515         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
516         $const{$1||''}{$2} = $val;
517       }
518     } else {
519       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
520       $const{$1||''}{$2} = $val;
521     }
522   }
523
524   my $info = [ {}, {} ];
525   foreach my $key (keys %const) {
526     if (length $key) {
527       my $target = $info;
528       my @parts = split(/\./, $key);
529       foreach my $p (@parts) {
530         $target = $target->[1]->{$p} ||= [];
531       }
532       $target->[0] = $const{$key};
533     } else {
534       $info->[0] = $const{$key};
535     }
536   }
537
538   my @collapse;
539   if (defined $prefix) {
540     @collapse = map {
541         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
542     } keys %{$self->{collapse}}
543   } else {
544     @collapse = keys %{$self->{collapse}};
545   };
546
547   if (@collapse) {
548     my ($c) = sort { length $a <=> length $b } @collapse;
549     my $target = $info;
550     foreach my $p (split(/\./, $c)) {
551       $target = $target->[1]->{$p} ||= [];
552     }
553     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
554     my @co_key = @{$self->{collapse}{$c_prefix}};
555     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
556     my $tree = $self->_collapse_result($as, $row, $c_prefix);
557     my (@final, @raw);
558     while ( !(grep {
559                 !defined($tree->[0]->{$_}) ||
560                 $co_check{$_} ne $tree->[0]->{$_}
561               } @co_key) ) {
562       push(@final, $tree);
563       last unless (@raw = $self->cursor->next);
564       $row = $self->{stashed_row} = \@raw;
565       $tree = $self->_collapse_result($as, $row, $c_prefix);
566       #warn Data::Dumper::Dumper($tree, $row);
567     }
568     @$target = @final;
569   }
570
571   return $info;
572 }
573
574 =head2 result_source
575
576 Returns a reference to the result source for this recordset.
577
578 =cut
579
580
581 =head2 count
582
583 Performs an SQL C<COUNT> with the same query as the resultset was built
584 with to find the number of elements. If passed arguments, does a search
585 on the resultset and counts the results of that.
586
587 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
588 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
589 not support C<DISTINCT> with multiple columns. If you are using such a
590 database, you should only use columns from the main table in your C<group_by>
591 clause.
592
593 =cut
594
595 sub count {
596   my $self = shift;
597   return $self->search(@_)->count if @_ and defined $_[0];
598   return scalar @{ $self->get_cache } if @{ $self->get_cache };
599
600   my $count = $self->_count;
601   return 0 unless $count;
602
603   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
604   $count = $self->{attrs}{rows} if
605     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
606   return $count;
607 }
608
609 sub _count { # Separated out so pager can get the full count
610   my $self = shift;
611   my $select = { count => '*' };
612   my $attrs = { %{ $self->{attrs} } };
613   if (my $group_by = delete $attrs->{group_by}) {
614     delete $attrs->{having};
615     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
616     # todo: try CONCAT for multi-column pk
617     my @pk = $self->result_source->primary_columns;
618     if (@pk == 1) {
619       foreach my $column (@distinct) {
620         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
621           @distinct = ($column);
622           last;
623         }
624       } 
625     }
626
627     $select = { count => { distinct => \@distinct } };
628     #use Data::Dumper; die Dumper $select;
629   }
630
631   $attrs->{select} = $select;
632   $attrs->{as} = [qw/count/];
633
634   # offset, order by and page are not needed to count. record_filter is cdbi
635   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
636         
637   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
638   return $count;
639 }
640
641 =head2 count_literal
642
643 Counts the results in a literal query. Equivalent to calling L</search_literal>
644 with the passed arguments, then L</count>.
645
646 =cut
647
648 sub count_literal { shift->search_literal(@_)->count; }
649
650 =head2 all
651
652 Returns all elements in the resultset. Called implicitly if the resultset
653 is returned in list context.
654
655 =cut
656
657 sub all {
658   my ($self) = @_;
659   return @{ $self->get_cache } if @{ $self->get_cache };
660
661   my @obj;
662
663   if (keys %{$self->{collapse}}) {
664       # Using $self->cursor->all is really just an optimisation.
665       # If we're collapsing has_many prefetches it probably makes
666       # very little difference, and this is cleaner than hacking
667       # _construct_object to survive the approach
668     $self->cursor->reset;
669     my @row = $self->cursor->next;
670     while (@row) {
671       push(@obj, $self->_construct_object(@row));
672       @row = (exists $self->{stashed_row}
673                ? @{delete $self->{stashed_row}}
674                : $self->cursor->next);
675     }
676   } else {
677     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
678   }
679
680   $self->set_cache(\@obj) if $self->{attrs}{cache};
681   return @obj;
682 }
683
684 =head2 reset
685
686 Resets the resultset's cursor, so you can iterate through the elements again.
687
688 =cut
689
690 sub reset {
691   my ($self) = @_;
692   $self->{all_cache_position} = 0;
693   $self->cursor->reset;
694   return $self;
695 }
696
697 =head2 first
698
699 Resets the resultset and returns the first element.
700
701 =cut
702
703 sub first {
704   return $_[0]->reset->next;
705 }
706
707 =head2 update
708
709 =over 4
710
711 =item Arguments: (\%values)
712
713 =back
714
715 Sets the specified columns in the resultset to the supplied values.
716
717 =cut
718
719 sub update {
720   my ($self, $values) = @_;
721   $self->throw_exception("Values for update must be a hash")
722     unless ref $values eq 'HASH';
723   return $self->result_source->storage->update(
724     $self->result_source->from, $values, $self->{cond}
725   );
726 }
727
728 =head2 update_all
729
730 =over 4
731
732 =item Arguments: (\%values)
733
734 =back
735
736 Fetches all objects and updates them one at a time.  Note that C<update_all>
737 will run cascade triggers while L</update> will not.
738
739 =cut
740
741 sub update_all {
742   my ($self, $values) = @_;
743   $self->throw_exception("Values for update must be a hash")
744     unless ref $values eq 'HASH';
745   foreach my $obj ($self->all) {
746     $obj->set_columns($values)->update;
747   }
748   return 1;
749 }
750
751 =head2 delete
752
753 Deletes the contents of the resultset from its result source. Note that this
754 will not run cascade triggers. See L</delete_all> if you need triggers to run.
755
756 =cut
757
758 sub delete {
759   my ($self) = @_;
760   my $del = {};
761
762   if (!ref($self->{cond})) {
763
764     # No-op. No condition, we're deleting everything
765
766   } elsif (ref $self->{cond} eq 'ARRAY') {
767
768     $del = [ map { my %hash;
769       foreach my $key (keys %{$_}) {
770         $key =~ /([^.]+)$/;
771         $hash{$1} = $_->{$key};
772       }; \%hash; } @{$self->{cond}} ];
773
774   } elsif (ref $self->{cond} eq 'HASH') {
775
776     if ((keys %{$self->{cond}})[0] eq '-and') {
777
778       $del->{-and} = [ map { my %hash;
779         foreach my $key (keys %{$_}) {
780           $key =~ /([^.]+)$/;
781           $hash{$1} = $_->{$key};
782         }; \%hash; } @{$self->{cond}{-and}} ];
783
784     } else {
785
786       foreach my $key (keys %{$self->{cond}}) {
787         $key =~ /([^.]+)$/;
788         $del->{$1} = $self->{cond}{$key};
789       }
790     }
791
792   } else {
793     $self->throw_exception(
794       "Can't delete on resultset with condition unless hash or array"
795     );
796   }
797
798   $self->result_source->storage->delete($self->result_source->from, $del);
799   return 1;
800 }
801
802 =head2 delete_all
803
804 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
805 will run cascade triggers while L</delete> will not.
806
807 =cut
808
809 sub delete_all {
810   my ($self) = @_;
811   $_->delete for $self->all;
812   return 1;
813 }
814
815 =head2 pager
816
817 Returns a L<Data::Page> object for the current resultset. Only makes
818 sense for queries with a C<page> attribute.
819
820 =cut
821
822 sub pager {
823   my ($self) = @_;
824   my $attrs = $self->{attrs};
825   $self->throw_exception("Can't create pager for non-paged rs")
826     unless $self->{page};
827   $attrs->{rows} ||= 10;
828   return $self->{pager} ||= Data::Page->new(
829     $self->_count, $attrs->{rows}, $self->{page});
830 }
831
832 =head2 page
833
834 =over 4
835
836 =item Arguments: ($page_num)
837
838 =back
839
840 Returns a new resultset for the specified page.
841
842 =cut
843
844 sub page {
845   my ($self, $page) = @_;
846   my $attrs = { %{$self->{attrs}} };
847   $attrs->{page} = $page;
848   return (ref $self)->new($self->result_source, $attrs);
849 }
850
851 =head2 new_result
852
853 =over 4
854
855 =item Arguments: (\%vals)
856
857 =back
858
859 Creates a result in the resultset's result class.
860
861 =cut
862
863 sub new_result {
864   my ($self, $values) = @_;
865   $self->throw_exception( "new_result needs a hash" )
866     unless (ref $values eq 'HASH');
867   $self->throw_exception(
868     "Can't abstract implicit construct, condition not a hash"
869   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
870   my %new = %$values;
871   my $alias = $self->{attrs}{alias};
872   foreach my $key (keys %{$self->{cond}||{}}) {
873     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
874   }
875   my $obj = $self->result_class->new(\%new);
876   $obj->result_source($self->result_source) if $obj->can('result_source');
877   return $obj;
878 }
879
880 =head2 create
881
882 =over 4
883
884 =item Arguments: (\%vals)
885
886 =back
887
888 Inserts a record into the resultset and returns the object.
889
890 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
891
892 =cut
893
894 sub create {
895   my ($self, $attrs) = @_;
896   $self->throw_exception( "create needs a hashref" )
897     unless ref $attrs eq 'HASH';
898   return $self->new_result($attrs)->insert;
899 }
900
901 =head2 find_or_create
902
903 =over 4
904
905 =item Arguments: (\%vals, \%attrs?)
906
907 =back
908
909   $class->find_or_create({ key => $val, ... });
910
911 Searches for a record matching the search condition; if it doesn't find one,
912 creates one and returns that instead.
913
914   my $cd = $schema->resultset('CD')->find_or_create({
915     cdid   => 5,
916     artist => 'Massive Attack',
917     title  => 'Mezzanine',
918     year   => 2005,
919   });
920
921 Also takes an optional C<key> attribute, to search by a specific key or unique
922 constraint. For example:
923
924   my $cd = $schema->resultset('CD')->find_or_create(
925     {
926       artist => 'Massive Attack',
927       title  => 'Mezzanine',
928     },
929     { key => 'artist_title' }
930   );
931
932 See also L</find> and L</update_or_create>.
933
934 =cut
935
936 sub find_or_create {
937   my $self     = shift;
938   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
939   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
940   my $exists   = $self->find($hash, $attrs);
941   return defined $exists ? $exists : $self->create($hash);
942 }
943
944 =head2 update_or_create
945
946   $class->update_or_create({ key => $val, ... });
947
948 First, search for an existing row matching one of the unique constraints
949 (including the primary key) on the source of this resultset.  If a row is
950 found, update it with the other given column values.  Otherwise, create a new
951 row.
952
953 Takes an optional C<key> attribute to search on a specific unique constraint.
954 For example:
955
956   # In your application
957   my $cd = $schema->resultset('CD')->update_or_create(
958     {
959       artist => 'Massive Attack',
960       title  => 'Mezzanine',
961       year   => 1998,
962     },
963     { key => 'artist_title' }
964   );
965
966 If no C<key> is specified, it searches on all unique constraints defined on the
967 source, including the primary key.
968
969 If the C<key> is specified as C<primary>, search only on the primary key.
970
971 See also L</find> and L</find_or_create>.
972
973 =cut
974
975 sub update_or_create {
976   my $self = shift;
977   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
978   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
979
980   my %unique_constraints = $self->result_source->unique_constraints;
981   my @constraint_names   = (exists $attrs->{key}
982                             ? ($attrs->{key})
983                             : keys %unique_constraints);
984
985   my @unique_hashes;
986   foreach my $name (@constraint_names) {
987     my @unique_cols = @{ $unique_constraints{$name} };
988     my %unique_hash =
989       map  { $_ => $hash->{$_} }
990       grep { exists $hash->{$_} }
991       @unique_cols;
992
993     push @unique_hashes, \%unique_hash
994       if (scalar keys %unique_hash == scalar @unique_cols);
995   }
996
997   if (@unique_hashes) {
998     my $row = $self->single(\@unique_hashes);
999     if (defined $row) {
1000       $row->set_columns($hash);
1001       $row->update;
1002       return $row;
1003     }
1004   }
1005
1006   return $self->create($hash);
1007 }
1008
1009 =head2 get_cache
1010
1011 Gets the contents of the cache for the resultset.
1012
1013 =cut
1014
1015 sub get_cache {
1016   shift->{all_cache} || [];
1017 }
1018
1019 =head2 set_cache
1020
1021 Sets the contents of the cache for the resultset. Expects an arrayref
1022 of objects of the same class as those produced by the resultset.
1023
1024 =cut
1025
1026 sub set_cache {
1027   my ( $self, $data ) = @_;
1028   $self->throw_exception("set_cache requires an arrayref")
1029     if ref $data ne 'ARRAY';
1030   my $result_class = $self->result_class;
1031   foreach( @$data ) {
1032     $self->throw_exception(
1033       "cannot cache object of type '$_', expected '$result_class'"
1034     ) if ref $_ ne $result_class;
1035   }
1036   $self->{all_cache} = $data;
1037 }
1038
1039 =head2 clear_cache
1040
1041 Clears the cache for the resultset.
1042
1043 =cut
1044
1045 sub clear_cache {
1046   shift->set_cache([]);
1047 }
1048
1049 =head2 related_resultset
1050
1051 Returns a related resultset for the supplied relationship name.
1052
1053   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1054
1055 =cut
1056
1057 sub related_resultset {
1058   my ( $self, $rel, @rest ) = @_;
1059   $self->{related_resultsets} ||= {};
1060   return $self->{related_resultsets}{$rel} ||= do {
1061       #warn "fetching related resultset for rel '$rel'";
1062       my $rel_obj = $self->result_source->relationship_info($rel);
1063       $self->throw_exception(
1064         "search_related: result source '" . $self->result_source->name .
1065         "' has no such relationship ${rel}")
1066         unless $rel_obj; #die Dumper $self->{attrs};
1067
1068       my $rs = $self->search(undef, { join => $rel });
1069       my $alias = defined $rs->{attrs}{seen_join}{$rel}
1070                     && $rs->{attrs}{seen_join}{$rel} > 1
1071                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
1072                   : $rel;
1073
1074       $self->result_source->schema->resultset($rel_obj->{class}
1075            )->search( undef,
1076              { %{$rs->{attrs}},
1077                alias => $alias,
1078                select => undef,
1079                as => undef }
1080            )->search(@rest);      
1081   };
1082 }
1083
1084 =head2 throw_exception
1085
1086 See Schema's throw_exception
1087
1088 =cut
1089
1090 sub throw_exception {
1091   my $self=shift;
1092   $self->result_source->schema->throw_exception(@_);
1093 }
1094
1095 =head1 ATTRIBUTES
1096
1097 XXX: FIXME: Attributes docs need clearing up
1098
1099 The resultset takes various attributes that modify its behavior. Here's an
1100 overview of them:
1101
1102 =head2 order_by
1103
1104 Which column(s) to order the results by. This is currently passed
1105 through directly to SQL, so you can give e.g. C<year DESC> for a
1106 descending order on the column `year'.
1107
1108 =head2 columns
1109
1110 =over 4
1111
1112 =item Arguments: (\@columns)
1113
1114 =back
1115
1116 Shortcut to request a particular set of columns to be retrieved.  Adds
1117 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1118 from that, then auto-populates C<as> from C<select> as normal. (You may also
1119 use the C<cols> attribute, as in earlier versions of DBIC.)
1120
1121 =head2 include_columns
1122
1123 =over 4
1124
1125 =item Arguments: (\@columns)
1126
1127 =back
1128
1129 Shortcut to include additional columns in the returned results - for example
1130
1131   $schema->resultset('CD')->search(undef, {
1132     include_columns => ['artist.name'],
1133     join => ['artist']
1134   });
1135
1136 would return all CDs and include a 'name' column to the information
1137 passed to object inflation
1138
1139 =head2 select
1140
1141 =over 4
1142
1143 =item Arguments: (\@columns)
1144
1145 =back
1146
1147 Indicates which columns should be selected from the storage. You can use
1148 column names, or in the case of RDBMS back ends, function or stored procedure
1149 names:
1150
1151   $rs = $schema->resultset('Employee')->search(undef, {
1152     select => [
1153       'name',
1154       { count => 'employeeid' },
1155       { sum => 'salary' }
1156     ]
1157   });
1158
1159 When you use function/stored procedure names and do not supply an C<as>
1160 attribute, the column names returned are storage-dependent. E.g. MySQL would
1161 return a column named C<count(employeeid)> in the above example.
1162
1163 =head2 as
1164
1165 =over 4
1166
1167 =item Arguments: (\@names)
1168
1169 =back
1170
1171 Indicates column names for object inflation. This is used in conjunction with
1172 C<select>, usually when C<select> contains one or more function or stored
1173 procedure names:
1174
1175   $rs = $schema->resultset('Employee')->search(undef, {
1176     select => [
1177       'name',
1178       { count => 'employeeid' }
1179     ],
1180     as => ['name', 'employee_count'],
1181   });
1182
1183   my $employee = $rs->first(); # get the first Employee
1184
1185 If the object against which the search is performed already has an accessor
1186 matching a column name specified in C<as>, the value can be retrieved using
1187 the accessor as normal:
1188
1189   my $name = $employee->name();
1190
1191 If on the other hand an accessor does not exist in the object, you need to
1192 use C<get_column> instead:
1193
1194   my $employee_count = $employee->get_column('employee_count');
1195
1196 You can create your own accessors if required - see
1197 L<DBIx::Class::Manual::Cookbook> for details.
1198
1199 =head2 join
1200
1201 Contains a list of relationships that should be joined for this query.  For
1202 example:
1203
1204   # Get CDs by Nine Inch Nails
1205   my $rs = $schema->resultset('CD')->search(
1206     { 'artist.name' => 'Nine Inch Nails' },
1207     { join => 'artist' }
1208   );
1209
1210 Can also contain a hash reference to refer to the other relation's relations.
1211 For example:
1212
1213   package MyApp::Schema::Track;
1214   use base qw/DBIx::Class/;
1215   __PACKAGE__->table('track');
1216   __PACKAGE__->add_columns(qw/trackid cd position title/);
1217   __PACKAGE__->set_primary_key('trackid');
1218   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1219   1;
1220
1221   # In your application
1222   my $rs = $schema->resultset('Artist')->search(
1223     { 'track.title' => 'Teardrop' },
1224     {
1225       join     => { cd => 'track' },
1226       order_by => 'artist.name',
1227     }
1228   );
1229
1230 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1231 similarly for a third time). For e.g.
1232
1233   my $rs = $schema->resultset('Artist')->search({
1234     'cds.title'   => 'Down to Earth',
1235     'cds_2.title' => 'Popular',
1236   }, {
1237     join => [ qw/cds cds/ ],
1238   });
1239
1240 will return a set of all artists that have both a cd with title 'Down
1241 to Earth' and a cd with title 'Popular'.
1242
1243 If you want to fetch related objects from other tables as well, see C<prefetch>
1244 below.
1245
1246 =head2 prefetch
1247
1248 =over 4
1249
1250 =item Arguments: (\@relationships)
1251
1252 =back
1253
1254 Contains one or more relationships that should be fetched along with the main 
1255 query (when they are accessed afterwards they will have already been
1256 "prefetched").  This is useful for when you know you will need the related
1257 objects, because it saves at least one query:
1258
1259   my $rs = $schema->resultset('Tag')->search(
1260     undef,
1261     {
1262       prefetch => {
1263         cd => 'artist'
1264       }
1265     }
1266   );
1267
1268 The initial search results in SQL like the following:
1269
1270   SELECT tag.*, cd.*, artist.* FROM tag
1271   JOIN cd ON tag.cd = cd.cdid
1272   JOIN artist ON cd.artist = artist.artistid
1273
1274 L<DBIx::Class> has no need to go back to the database when we access the
1275 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1276 case.
1277
1278 Simple prefetches will be joined automatically, so there is no need
1279 for a C<join> attribute in the above search. If you're prefetching to
1280 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1281 specify the join as well.
1282
1283 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1284 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1285 with an accessor type of 'single' or 'filter').
1286
1287 =head2 from
1288
1289 =over 4
1290
1291 =item Arguments: (\@array)
1292
1293 =back
1294
1295 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1296 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1297 clauses.
1298
1299 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1300 C<join> will usually do what you need and it is strongly recommended that you
1301 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1302
1303 In simple terms, C<from> works as follows:
1304
1305     [
1306         { <alias> => <table>, -join-type => 'inner|left|right' }
1307         [] # nested JOIN (optional)
1308         { <table.column> => <foreign_table.foreign_key> }
1309     ]
1310
1311     JOIN
1312         <alias> <table>
1313         [JOIN ...]
1314     ON <table.column> = <foreign_table.foreign_key>
1315
1316 An easy way to follow the examples below is to remember the following:
1317
1318     Anything inside "[]" is a JOIN
1319     Anything inside "{}" is a condition for the enclosing JOIN
1320
1321 The following examples utilize a "person" table in a family tree application.
1322 In order to express parent->child relationships, this table is self-joined:
1323
1324     # Person->belongs_to('father' => 'Person');
1325     # Person->belongs_to('mother' => 'Person');
1326
1327 C<from> can be used to nest joins. Here we return all children with a father,
1328 then search against all mothers of those children:
1329
1330   $rs = $schema->resultset('Person')->search(
1331       undef,
1332       {
1333           alias => 'mother', # alias columns in accordance with "from"
1334           from => [
1335               { mother => 'person' },
1336               [
1337                   [
1338                       { child => 'person' },
1339                       [
1340                           { father => 'person' },
1341                           { 'father.person_id' => 'child.father_id' }
1342                       ]
1343                   ],
1344                   { 'mother.person_id' => 'child.mother_id' }
1345               ],
1346           ]
1347       },
1348   );
1349
1350   # Equivalent SQL:
1351   # SELECT mother.* FROM person mother
1352   # JOIN (
1353   #   person child
1354   #   JOIN person father
1355   #   ON ( father.person_id = child.father_id )
1356   # )
1357   # ON ( mother.person_id = child.mother_id )
1358
1359 The type of any join can be controlled manually. To search against only people
1360 with a father in the person table, we could explicitly use C<INNER JOIN>:
1361
1362     $rs = $schema->resultset('Person')->search(
1363         undef,
1364         {
1365             alias => 'child', # alias columns in accordance with "from"
1366             from => [
1367                 { child => 'person' },
1368                 [
1369                     { father => 'person', -join-type => 'inner' },
1370                     { 'father.id' => 'child.father_id' }
1371                 ],
1372             ]
1373         },
1374     );
1375
1376     # Equivalent SQL:
1377     # SELECT child.* FROM person child
1378     # INNER JOIN person father ON child.father_id = father.id
1379
1380 =head2 page
1381
1382 =over 4
1383
1384 =item Arguments: ($page)
1385
1386 =back
1387
1388 For a paged resultset, specifies which page to retrieve.  Leave unset
1389 for an unpaged resultset.
1390
1391 =head2 rows
1392
1393 =over 4
1394
1395 =item Arguments: ($rows)
1396
1397 =back
1398
1399 For a paged resultset, specifies how many rows are in each page:
1400
1401   rows => 10
1402
1403 Can also be used to simulate an SQL C<LIMIT>.
1404
1405 =head2 group_by
1406
1407 =over 4
1408
1409 =item Arguments: (\@columns)
1410
1411 =back
1412
1413 A arrayref of columns to group by. Can include columns of joined tables.
1414
1415   group_by => [qw/ column1 column2 ... /]
1416
1417 =head2 distinct
1418
1419 Set to 1 to group by all columns.
1420
1421 =head2 cache
1422
1423 Set to 1 to cache search results. This prevents extra SQL queries if you
1424 revisit rows in your ResultSet:
1425
1426   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1427   
1428   while( my $artist = $resultset->next ) {
1429     ... do stuff ...
1430   }
1431
1432   $rs->first; # without cache, this would issue a query 
1433
1434 By default, searches are not cached.
1435
1436 For more examples of using these attributes, see
1437 L<DBIx::Class::Manual::Cookbook>.
1438
1439 =cut
1440
1441 1;