Applied mst fixes for delete on resultsetin [839] to update. Factored out common...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85   
86   my ($source, $attrs) = @_;
87   weaken $source;
88   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
89   #use Data::Dumper; warn Dumper($attrs);
90   my $alias = ($attrs->{alias} ||= 'me');
91   
92   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
93   delete $attrs->{as} if $attrs->{columns};
94   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
95   $attrs->{select} = [
96     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
97   ] if $attrs->{columns};
98   $attrs->{as} ||= [
99     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
100   ];
101   if (my $include = delete $attrs->{include_columns}) {
102     push(@{$attrs->{select}}, @$include);
103     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
104   }
105   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
106
107   $attrs->{from} ||= [ { $alias => $source->from } ];
108   $attrs->{seen_join} ||= {};
109   my %seen;
110   if (my $join = delete $attrs->{join}) {
111     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
112       if (ref $j eq 'HASH') {
113         $seen{$_} = 1 foreach keys %$j;
114       } else {
115         $seen{$j} = 1;
116       }
117     }
118     push(@{$attrs->{from}}, $source->resolve_join(
119       $join, $attrs->{alias}, $attrs->{seen_join})
120     );
121   }
122   
123   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
124   $attrs->{order_by} = [ $attrs->{order_by} ] if
125     $attrs->{order_by} and !ref($attrs->{order_by});
126   $attrs->{order_by} ||= [];
127
128   my $collapse = $attrs->{collapse} || {};
129   if (my $prefetch = delete $attrs->{prefetch}) {
130     my @pre_order;
131     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
132       if ( ref $p eq 'HASH' ) {
133         foreach my $key (keys %$p) {
134           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
135             unless $seen{$key};
136         }
137       } else {
138         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
139             unless $seen{$p};
140       }
141       my @prefetch = $source->resolve_prefetch(
142            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
143       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
144       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
145     }
146     push(@{$attrs->{order_by}}, @pre_order);
147   }
148   $attrs->{collapse} = $collapse;
149 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
150
151   if ($attrs->{page}) {
152     $attrs->{rows} ||= 10;
153     $attrs->{offset} ||= 0;
154     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
155   }
156
157   bless {
158     result_source => $source,
159     result_class => $attrs->{result_class} || $source->result_class,
160     cond => $attrs->{where},
161     from => $attrs->{from},
162     collapse => $collapse,
163     count => undef,
164     page => delete $attrs->{page},
165     pager => undef,
166     attrs => $attrs
167   }, $class;
168 }
169
170 =head2 search
171
172 =over 4
173
174 =item Arguments: $cond, \%attrs?
175
176 =item Return Value: $resultset (scalar context), @row_objs (list context)
177
178 =back
179
180   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
181   my $new_rs = $cd_rs->search({ year => 2005 });
182
183   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
184                  # year = 2005 OR year = 2004
185
186 If you need to pass in additional attributes but no additional condition,
187 call it as C<search(undef, \%attrs)>.
188
189   # "SELECT name, artistid FROM $artist_table"
190   my @all_artists = $schema->resultset('Artist')->search(undef, {
191     columns => [qw/name artistid/],
192   });
193
194 =cut
195
196 sub search {
197   my $self = shift;
198
199   my $rs;
200   if( @_ ) {
201     
202     my $attrs = { %{$self->{attrs}} };
203     my $having = delete $attrs->{having};
204     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
205
206     my $where = (@_
207                   ? ((@_ == 1 || ref $_[0] eq "HASH")
208                       ? shift
209                       : ((@_ % 2)
210                           ? $self->throw_exception(
211                               "Odd number of arguments to search")
212                           : {@_}))
213                   : undef());
214     if (defined $where) {
215       $attrs->{where} = (defined $attrs->{where}
216                 ? { '-and' =>
217                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
218                         $where, $attrs->{where} ] }
219                 : $where);
220     }
221
222     if (defined $having) {
223       $attrs->{having} = (defined $attrs->{having}
224                 ? { '-and' =>
225                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
226                         $having, $attrs->{having} ] }
227                 : $having);
228     }
229
230     $rs = (ref $self)->new($self->result_source, $attrs);
231   }
232   else {
233     $rs = $self;
234     $rs->reset;
235   }
236   return (wantarray ? $rs->all : $rs);
237 }
238
239 =head2 search_literal
240
241 =over 4
242
243 =item Arguments: $sql_fragment, @bind_values
244
245 =item Return Value: $resultset (scalar context), @row_objs (list context)
246
247 =back
248
249   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
250   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
251
252 Pass a literal chunk of SQL to be added to the conditional part of the
253 resultset query.
254
255 =cut
256
257 sub search_literal {
258   my ($self, $cond, @vals) = @_;
259   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
260   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
261   return $self->search(\$cond, $attrs);
262 }
263
264 =head2 find
265
266 =over 4
267
268 =item Arguments: @values | \%cols, \%attrs?
269
270 =item Return Value: $row_object
271
272 =back
273
274 Finds a row based on its primary key or unique constraint. For example:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 Also takes an optional C<key> attribute, to search by a specific key or unique
279 constraint. For example:
280
281   my $cd = $schema->resultset('CD')->find(
282     {
283       artist => 'Massive Attack',
284       title  => 'Mezzanine',
285     },
286     { key => 'artist_title' }
287   );
288
289 See also L</find_or_create> and L</update_or_create>.
290
291 =cut
292
293 sub find {
294   my ($self, @vals) = @_;
295   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
296
297   my @cols = $self->result_source->primary_columns;
298   if (exists $attrs->{key}) {
299     my %uniq = $self->result_source->unique_constraints;
300     $self->throw_exception(
301       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
302     ) unless exists $uniq{$attrs->{key}};
303     @cols = @{ $uniq{$attrs->{key}} };
304   }
305   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
306   $self->throw_exception(
307     "Can't find unless a primary key or unique constraint is defined"
308   ) unless @cols;
309
310   my $query;
311   if (ref $vals[0] eq 'HASH') {
312     $query = { %{$vals[0]} };
313   } elsif (@cols == @vals) {
314     $query = {};
315     @{$query}{@cols} = @vals;
316   } else {
317     $query = {@vals};
318   }
319   foreach my $key (grep { ! m/\./ } keys %$query) {
320     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
321   }
322   #warn Dumper($query);
323   
324   if (keys %$attrs) {
325       my $rs = $self->search($query,$attrs);
326       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
327   } else {
328       return keys %{$self->{collapse}} ?
329         $self->search($query)->next :
330         $self->single($query);
331   }
332 }
333
334 =head2 search_related
335
336 =over 4
337
338 =item Arguments: $cond, \%attrs?
339
340 =item Return Value: $new_resultset
341
342 =back
343
344   $new_rs = $cd_rs->search_related('artist', {
345     name => 'Emo-R-Us',
346   });
347
348 Searches the specified relationship, optionally specifying a condition and
349 attributes for matching records. See L</ATTRIBUTES> for more information.
350
351 =cut
352
353 sub search_related {
354   return shift->related_resultset(shift)->search(@_);
355 }
356
357 =head2 cursor
358
359 =over 4
360
361 =item Arguments: none
362
363 =item Return Value: $cursor
364
365 =back
366
367 Returns a storage-driven cursor to the given resultset. See
368 L<DBIx::Class::Cursor> for more information.
369
370 =cut
371
372 sub cursor {
373   my ($self) = @_;
374   my $attrs = { %{$self->{attrs}} };
375   return $self->{cursor}
376     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
377           $attrs->{where},$attrs);
378 }
379
380 =head2 single
381
382 =over 4
383
384 =item Arguments: $cond?
385
386 =item Return Value: $row_object?
387
388 =back
389
390   my $cd = $schema->resultset('CD')->single({ year => 2001 });
391
392 Inflates the first result without creating a cursor if the resultset has
393 any records in it; if not returns nothing. Used by find() as an optimisation.
394
395 =cut
396
397 sub single {
398   my ($self, $where) = @_;
399   my $attrs = { %{$self->{attrs}} };
400   if ($where) {
401     if (defined $attrs->{where}) {
402       $attrs->{where} = {
403         '-and' =>
404             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
405                $where, delete $attrs->{where} ]
406       };
407     } else {
408       $attrs->{where} = $where;
409     }
410   }
411   my @data = $self->result_source->storage->select_single(
412           $self->{from}, $attrs->{select},
413           $attrs->{where},$attrs);
414   return (@data ? $self->_construct_object(@data) : ());
415 }
416
417
418 =head2 search_like
419
420 =over 4
421
422 =item Arguments: $cond, \%attrs?
423
424 =item Return Value: $resultset (scalar context), @row_objs (list context)
425
426 =back
427
428   # WHERE title LIKE '%blue%'
429   $cd_rs = $rs->search_like({ title => '%blue%'});
430
431 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
432 that this is simply a convenience method. You most likely want to use
433 L</search> with specific operators.
434
435 For more information, see L<DBIx::Class::Manual::Cookbook>.
436
437 =cut
438
439 sub search_like {
440   my $class = shift;
441   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
442   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
443   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
444   return $class->search($query, { %$attrs });
445 }
446
447 =head2 slice
448
449 =over 4
450
451 =item Arguments: $first, $last
452
453 =item Return Value: $resultset (scalar context), @row_objs (list context)
454
455 =back
456
457 Returns a resultset or object list representing a subset of elements from the
458 resultset slice is called on. Indexes are from 0, i.e., to get the first
459 three records, call:
460
461   my ($one, $two, $three) = $rs->slice(0, 2);
462
463 =cut
464
465 sub slice {
466   my ($self, $min, $max) = @_;
467   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
468   $attrs->{offset} = $self->{attrs}{offset} || 0;
469   $attrs->{offset} += $min;
470   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
471   return $self->search(undef(), $attrs);
472   #my $slice = (ref $self)->new($self->result_source, $attrs);
473   #return (wantarray ? $slice->all : $slice);
474 }
475
476 =head2 next
477
478 =over 4
479
480 =item Arguments: none
481
482 =item Return Value: $result?
483
484 =back
485
486 Returns the next element in the resultset (C<undef> is there is none).
487
488 Can be used to efficiently iterate over records in the resultset:
489
490   my $rs = $schema->resultset('CD')->search;
491   while (my $cd = $rs->next) {
492     print $cd->title;
493   }
494
495 Note that you need to store the resultset object, and call C<next> on it. 
496 Calling C<< resultset('Table')->next >> repeatedly will always return the
497 first record from the resultset.
498
499 =cut
500
501 sub next {
502   my ($self) = @_;
503   if (@{$self->{all_cache} || []}) {
504     $self->{all_cache_position} ||= 0;
505     return $self->{all_cache}->[$self->{all_cache_position}++];
506   }
507   if ($self->{attrs}{cache}) {
508     $self->{all_cache_position} = 1;
509     return ($self->all)[0];
510   }
511   my @row = (exists $self->{stashed_row} ?
512                @{delete $self->{stashed_row}} :
513                $self->cursor->next
514   );
515 #  warn Dumper(\@row); use Data::Dumper;
516   return unless (@row);
517   return $self->_construct_object(@row);
518 }
519
520 sub _construct_object {
521   my ($self, @row) = @_;
522   my @as = @{ $self->{attrs}{as} };
523   
524   my $info = $self->_collapse_result(\@as, \@row);
525   
526   my $new = $self->result_class->inflate_result($self->result_source, @$info);
527   
528   $new = $self->{attrs}{record_filter}->($new)
529     if exists $self->{attrs}{record_filter};
530   return $new;
531 }
532
533 sub _collapse_result {
534   my ($self, $as, $row, $prefix) = @_;
535
536   my %const;
537
538   my @copy = @$row;
539   foreach my $this_as (@$as) {
540     my $val = shift @copy;
541     if (defined $prefix) {
542       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
543         my $remain = $1;
544         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
545         $const{$1||''}{$2} = $val;
546       }
547     } else {
548       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
549       $const{$1||''}{$2} = $val;
550     }
551   }
552
553   my $info = [ {}, {} ];
554   foreach my $key (keys %const) {
555     if (length $key) {
556       my $target = $info;
557       my @parts = split(/\./, $key);
558       foreach my $p (@parts) {
559         $target = $target->[1]->{$p} ||= [];
560       }
561       $target->[0] = $const{$key};
562     } else {
563       $info->[0] = $const{$key};
564     }
565   }
566
567   my @collapse;
568   if (defined $prefix) {
569     @collapse = map {
570         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
571     } keys %{$self->{collapse}}
572   } else {
573     @collapse = keys %{$self->{collapse}};
574   };
575
576   if (@collapse) {
577     my ($c) = sort { length $a <=> length $b } @collapse;
578     my $target = $info;
579     foreach my $p (split(/\./, $c)) {
580       $target = $target->[1]->{$p} ||= [];
581     }
582     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
583     my @co_key = @{$self->{collapse}{$c_prefix}};
584     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
585     my $tree = $self->_collapse_result($as, $row, $c_prefix);
586     my (@final, @raw);
587     while ( !(grep {
588                 !defined($tree->[0]->{$_}) ||
589                 $co_check{$_} ne $tree->[0]->{$_}
590               } @co_key) ) {
591       push(@final, $tree);
592       last unless (@raw = $self->cursor->next);
593       $row = $self->{stashed_row} = \@raw;
594       $tree = $self->_collapse_result($as, $row, $c_prefix);
595       #warn Data::Dumper::Dumper($tree, $row);
596     }
597     @$target = @final;
598   }
599
600   return $info;
601 }
602
603 =head2 result_source
604
605 =over 4
606
607 =item Arguments: $result_source?
608
609 =item Return Value: $result_source
610
611 =back
612
613 An accessor for the primary ResultSource object from which this ResultSet
614 is derived.
615
616 =cut
617
618
619 =head2 count
620
621 =over 4
622
623 =item Arguments: $cond, \%attrs??
624
625 =item Return Value: $count
626
627 =back
628
629 Performs an SQL C<COUNT> with the same query as the resultset was built
630 with to find the number of elements. If passed arguments, does a search
631 on the resultset and counts the results of that.
632
633 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
634 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
635 not support C<DISTINCT> with multiple columns. If you are using such a
636 database, you should only use columns from the main table in your C<group_by>
637 clause.
638
639 =cut
640
641 sub count {
642   my $self = shift;
643   return $self->search(@_)->count if @_ and defined $_[0];
644   return scalar @{ $self->get_cache } if @{ $self->get_cache };
645
646   my $count = $self->_count;
647   return 0 unless $count;
648
649   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
650   $count = $self->{attrs}{rows} if
651     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
652   return $count;
653 }
654
655 sub _count { # Separated out so pager can get the full count
656   my $self = shift;
657   my $select = { count => '*' };
658   my $attrs = { %{ $self->{attrs} } };
659   if (my $group_by = delete $attrs->{group_by}) {
660     delete $attrs->{having};
661     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
662     # todo: try CONCAT for multi-column pk
663     my @pk = $self->result_source->primary_columns;
664     if (@pk == 1) {
665       foreach my $column (@distinct) {
666         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
667           @distinct = ($column);
668           last;
669         }
670       }
671     }
672
673     $select = { count => { distinct => \@distinct } };
674     #use Data::Dumper; die Dumper $select;
675   }
676
677   $attrs->{select} = $select;
678   $attrs->{as} = [qw/count/];
679
680   # offset, order by and page are not needed to count. record_filter is cdbi
681   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
682         
683   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
684   return $count;
685 }
686
687 =head2 count_literal
688
689 =over 4
690
691 =item Arguments: $sql_fragment, @bind_values
692
693 =item Return Value: $count
694
695 =back
696
697 Counts the results in a literal query. Equivalent to calling L</search_literal>
698 with the passed arguments, then L</count>.
699
700 =cut
701
702 sub count_literal { shift->search_literal(@_)->count; }
703
704 =head2 all
705
706 =over 4
707
708 =item Arguments: none
709
710 =item Return Value: @objects
711
712 =back
713
714 Returns all elements in the resultset. Called implicitly if the resultset
715 is returned in list context.
716
717 =cut
718
719 sub all {
720   my ($self) = @_;
721   return @{ $self->get_cache } if @{ $self->get_cache };
722
723   my @obj;
724
725   if (keys %{$self->{collapse}}) {
726       # Using $self->cursor->all is really just an optimisation.
727       # If we're collapsing has_many prefetches it probably makes
728       # very little difference, and this is cleaner than hacking
729       # _construct_object to survive the approach
730     $self->cursor->reset;
731     my @row = $self->cursor->next;
732     while (@row) {
733       push(@obj, $self->_construct_object(@row));
734       @row = (exists $self->{stashed_row}
735                ? @{delete $self->{stashed_row}}
736                : $self->cursor->next);
737     }
738   } else {
739     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
740   }
741
742   $self->set_cache(\@obj) if $self->{attrs}{cache};
743   return @obj;
744 }
745
746 =head2 reset
747
748 =over 4
749
750 =item Arguments: none
751
752 =item Return Value: $self
753
754 =back
755
756 Resets the resultset's cursor, so you can iterate through the elements again.
757
758 =cut
759
760 sub reset {
761   my ($self) = @_;
762   $self->{all_cache_position} = 0;
763   $self->cursor->reset;
764   return $self;
765 }
766
767 =head2 first
768
769 =over 4
770
771 =item Arguments: none
772
773 =item Return Value: $object?
774
775 =back
776
777 Resets the resultset and returns an object for the first result (if the
778 resultset returns anything).
779
780 =cut
781
782 sub first {
783   return $_[0]->reset->next;
784 }
785
786 # _cond_for_update_delete
787 #
788 # update/delete require the condition to be modified to handle
789 # the differing SQL syntax available.  This transforms the $self->{cond}
790 # appropriately, returning the new condition
791
792 sub _cond_for_update_delete {
793   my ($self) = @_;
794   my $cond = {};
795
796   if (!ref($self->{cond})) {
797     # No-op. No condition, we're update/deleting everything
798   }
799   elsif (ref $self->{cond} eq 'ARRAY') {
800     $cond = [
801       map {
802         my %hash;
803         foreach my $key (keys %{$_}) {
804           $key =~ /([^.]+)$/;
805           $hash{$1} = $_->{$key};
806         }
807         \%hash;
808         } @{$self->{cond}}
809     ];
810   }
811   elsif (ref $self->{cond} eq 'HASH') {
812     if ((keys %{$self->{cond}})[0] eq '-and') {
813       $cond->{-and} = [
814         map {
815           my %hash;
816           foreach my $key (keys %{$_}) {
817             $key =~ /([^.]+)$/;
818             $hash{$1} = $_->{$key};
819           }
820           \%hash;
821           } @{$self->{cond}{-and}}
822       ];
823     }
824     else {
825       foreach my $key (keys %{$self->{cond}}) {
826         $key =~ /([^.]+)$/;
827         $cond->{$1} = $self->{cond}{$key};
828       }
829     }
830   }
831   else {
832     $self->throw_exception(
833                "Can't update/delete on resultset with condition unless hash or array");
834   }
835   return $cond;
836 }
837
838
839 =head2 update
840
841 =over 4
842
843 =item Arguments: \%values
844
845 =item Return Value: $storage_rv
846
847 =back
848
849 Sets the specified columns in the resultset to the supplied values in a
850 single query. Return value will be true if the update succeeded or false
851 if no records were updated; exact type of success value is storage-dependent.
852
853 =cut
854
855 sub update {
856   my ($self, $values) = @_;
857   $self->throw_exception("Values for update must be a hash")
858     unless ref $values eq 'HASH';
859
860   my $cond = $self->_cond_for_update_delete;
861
862   return $self->result_source->storage->update(
863     $self->result_source->from, $values, $cond
864   );
865 }
866
867 =head2 update_all
868
869 =over 4
870
871 =item Arguments: \%values
872
873 =item Return Value: 1
874
875 =back
876
877 Fetches all objects and updates them one at a time. Note that C<update_all>
878 will run DBIC cascade triggers, while L</update> will not.
879
880 =cut
881
882 sub update_all {
883   my ($self, $values) = @_;
884   $self->throw_exception("Values for update must be a hash")
885     unless ref $values eq 'HASH';
886   foreach my $obj ($self->all) {
887     $obj->set_columns($values)->update;
888   }
889   return 1;
890 }
891
892 =head2 delete
893
894 =over 4
895
896 =item Arguments: none
897
898 =item Return Value: 1
899
900 =back
901
902 Deletes the contents of the resultset from its result source. Note that this
903 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
904 to run.
905
906 =cut
907
908 sub delete {
909   my ($self) = @_;
910   my $del = {};
911
912   my $cond = $self->_cond_for_update_delete;
913
914   $self->result_source->storage->delete($self->result_source->from, $cond);
915   return 1;
916 }
917
918 =head2 delete_all
919
920 =over 4
921
922 =item Arguments: none
923
924 =item Return Value: 1
925
926 =back
927
928 Fetches all objects and deletes them one at a time. Note that C<delete_all>
929 will run DBIC cascade triggers, while L</delete> will not.
930
931 =cut
932
933 sub delete_all {
934   my ($self) = @_;
935   $_->delete for $self->all;
936   return 1;
937 }
938
939 =head2 pager
940
941 =over 4
942
943 =item Arguments: none
944
945 =item Return Value: $pager
946
947 =back
948
949 Return Value a L<Data::Page> object for the current resultset. Only makes
950 sense for queries with a C<page> attribute.
951
952 =cut
953
954 sub pager {
955   my ($self) = @_;
956   my $attrs = $self->{attrs};
957   $self->throw_exception("Can't create pager for non-paged rs")
958     unless $self->{page};
959   $attrs->{rows} ||= 10;
960   return $self->{pager} ||= Data::Page->new(
961     $self->_count, $attrs->{rows}, $self->{page});
962 }
963
964 =head2 page
965
966 =over 4
967
968 =item Arguments: $page_number
969
970 =item Return Value: $rs
971
972 =back
973
974 Returns a resultset for the $page_number page of the resultset on which page
975 is called, where each page contains a number of rows equal to the 'rows'
976 attribute set on the resultset (10 by default).
977
978 =cut
979
980 sub page {
981   my ($self, $page) = @_;
982   my $attrs = { %{$self->{attrs}} };
983   $attrs->{page} = $page;
984   return (ref $self)->new($self->result_source, $attrs);
985 }
986
987 =head2 new_result
988
989 =over 4
990
991 =item Arguments: \%vals
992
993 =item Return Value: $object
994
995 =back
996
997 Creates an object in the resultset's result class and returns it.
998
999 =cut
1000
1001 sub new_result {
1002   my ($self, $values) = @_;
1003   $self->throw_exception( "new_result needs a hash" )
1004     unless (ref $values eq 'HASH');
1005   $self->throw_exception(
1006     "Can't abstract implicit construct, condition not a hash"
1007   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1008   my %new = %$values;
1009   my $alias = $self->{attrs}{alias};
1010   foreach my $key (keys %{$self->{cond}||{}}) {
1011     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1012   }
1013   my $obj = $self->result_class->new(\%new);
1014   $obj->result_source($self->result_source) if $obj->can('result_source');
1015   return $obj;
1016 }
1017
1018 =head2 create
1019
1020 =over 4
1021
1022 =item Arguments: \%vals
1023
1024 =item Return Value: $object
1025
1026 =back
1027
1028 Inserts a record into the resultset and returns the object representing it.
1029
1030 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1031
1032 =cut
1033
1034 sub create {
1035   my ($self, $attrs) = @_;
1036   $self->throw_exception( "create needs a hashref" )
1037     unless ref $attrs eq 'HASH';
1038   return $self->new_result($attrs)->insert;
1039 }
1040
1041 =head2 find_or_create
1042
1043 =over 4
1044
1045 =item Arguments: \%vals, \%attrs?
1046
1047 =item Return Value: $object
1048
1049 =back
1050
1051   $class->find_or_create({ key => $val, ... });
1052
1053 Searches for a record matching the search condition; if it doesn't find one,
1054 creates one and returns that instead.
1055
1056   my $cd = $schema->resultset('CD')->find_or_create({
1057     cdid   => 5,
1058     artist => 'Massive Attack',
1059     title  => 'Mezzanine',
1060     year   => 2005,
1061   });
1062
1063 Also takes an optional C<key> attribute, to search by a specific key or unique
1064 constraint. For example:
1065
1066   my $cd = $schema->resultset('CD')->find_or_create(
1067     {
1068       artist => 'Massive Attack',
1069       title  => 'Mezzanine',
1070     },
1071     { key => 'artist_title' }
1072   );
1073
1074 See also L</find> and L</update_or_create>.
1075
1076 =cut
1077
1078 sub find_or_create {
1079   my $self     = shift;
1080   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1081   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1082   my $exists   = $self->find($hash, $attrs);
1083   return defined $exists ? $exists : $self->create($hash);
1084 }
1085
1086 =head2 update_or_create
1087
1088 =over 4
1089
1090 =item Arguments: \%col_values, { key => $unique_constraint }?
1091
1092 =item Return Value: $object
1093
1094 =back
1095
1096   $class->update_or_create({ col => $val, ... });
1097
1098 First, searches for an existing row matching one of the unique constraints
1099 (including the primary key) on the source of this resultset. If a row is
1100 found, updates it with the other given column values. Otherwise, creates a new
1101 row.
1102
1103 Takes an optional C<key> attribute to search on a specific unique constraint.
1104 For example:
1105
1106   # In your application
1107   my $cd = $schema->resultset('CD')->update_or_create(
1108     {
1109       artist => 'Massive Attack',
1110       title  => 'Mezzanine',
1111       year   => 1998,
1112     },
1113     { key => 'artist_title' }
1114   );
1115
1116 If no C<key> is specified, it searches on all unique constraints defined on the
1117 source, including the primary key.
1118
1119 If the C<key> is specified as C<primary>, it searches only on the primary key.
1120
1121 See also L</find> and L</find_or_create>.
1122
1123 =cut
1124
1125 sub update_or_create {
1126   my $self = shift;
1127   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1128   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
1129
1130   my %unique_constraints = $self->result_source->unique_constraints;
1131   my @constraint_names   = (exists $attrs->{key}
1132                             ? ($attrs->{key})
1133                             : keys %unique_constraints);
1134
1135   my @unique_hashes;
1136   foreach my $name (@constraint_names) {
1137     my @unique_cols = @{ $unique_constraints{$name} };
1138     my %unique_hash =
1139       map  { $_ => $hash->{$_} }
1140       grep { exists $hash->{$_} }
1141       @unique_cols;
1142
1143     push @unique_hashes, \%unique_hash
1144       if (scalar keys %unique_hash == scalar @unique_cols);
1145   }
1146
1147   if (@unique_hashes) {
1148     my $row = $self->single(\@unique_hashes);
1149     if (defined $row) {
1150       $row->set_columns($hash);
1151       $row->update;
1152       return $row;
1153     }
1154   }
1155
1156   return $self->create($hash);
1157 }
1158
1159 =head2 get_cache
1160
1161 =over 4
1162
1163 =item Arguments: none
1164
1165 =item Return Value: \@cache_objects?
1166
1167 =back
1168
1169 Gets the contents of the cache for the resultset, if the cache is set.
1170
1171 =cut
1172
1173 sub get_cache {
1174   shift->{all_cache} || [];
1175 }
1176
1177 =head2 set_cache
1178
1179 =over 4
1180
1181 =item Arguments: \@cache_objects
1182
1183 =item Return Value: \@cache_objects
1184
1185 =back
1186
1187 Sets the contents of the cache for the resultset. Expects an arrayref
1188 of objects of the same class as those produced by the resultset. Note that
1189 if the cache is set the resultset will return the cached objects rather
1190 than re-querying the database even if the cache attr is not set.
1191
1192 =cut
1193
1194 sub set_cache {
1195   my ( $self, $data ) = @_;
1196   $self->throw_exception("set_cache requires an arrayref")
1197     if ref $data ne 'ARRAY';
1198   my $result_class = $self->result_class;
1199   foreach( @$data ) {
1200     $self->throw_exception(
1201       "cannot cache object of type '$_', expected '$result_class'"
1202     ) if ref $_ ne $result_class;
1203   }
1204   $self->{all_cache} = $data;
1205 }
1206
1207 =head2 clear_cache
1208
1209 =over 4
1210
1211 =item Arguments: none
1212
1213 =item Return Value: []
1214
1215 =back
1216
1217 Clears the cache for the resultset.
1218
1219 =cut
1220
1221 sub clear_cache {
1222   shift->set_cache([]);
1223 }
1224
1225 =head2 related_resultset
1226
1227 =over 4
1228
1229 =item Arguments: $relationship_name
1230
1231 =item Return Value: $resultset
1232
1233 =back
1234
1235 Returns a related resultset for the supplied relationship name.
1236
1237   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1238
1239 =cut
1240
1241 sub related_resultset {
1242   my ( $self, $rel ) = @_;
1243   $self->{related_resultsets} ||= {};
1244   return $self->{related_resultsets}{$rel} ||= do {
1245       #warn "fetching related resultset for rel '$rel'";
1246       my $rel_obj = $self->result_source->relationship_info($rel);
1247       $self->throw_exception(
1248         "search_related: result source '" . $self->result_source->name .
1249         "' has no such relationship ${rel}")
1250         unless $rel_obj; #die Dumper $self->{attrs};
1251
1252       my $rs = $self->search(undef, { join => $rel });
1253       my $alias = defined $rs->{attrs}{seen_join}{$rel}
1254                     && $rs->{attrs}{seen_join}{$rel} > 1
1255                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
1256                   : $rel;
1257
1258       $self->result_source->schema->resultset($rel_obj->{class}
1259            )->search( undef,
1260              { %{$rs->{attrs}},
1261                alias => $alias,
1262                select => undef,
1263                as => undef }
1264            );
1265   };
1266 }
1267
1268 =head2 throw_exception
1269
1270 See L<DBIx::Class::Schema/throw_exception> for details.
1271
1272 =cut
1273
1274 sub throw_exception {
1275   my $self=shift;
1276   $self->result_source->schema->throw_exception(@_);
1277 }
1278
1279 # XXX: FIXME: Attributes docs need clearing up
1280
1281 =head1 ATTRIBUTES
1282
1283 The resultset takes various attributes that modify its behavior. Here's an
1284 overview of them:
1285
1286 =head2 order_by
1287
1288 =over 4
1289
1290 =item Value: ($order_by | \@order_by)
1291
1292 =back
1293
1294 Which column(s) to order the results by. This is currently passed
1295 through directly to SQL, so you can give e.g. C<year DESC> for a
1296 descending order on the column `year'.
1297
1298 =head2 columns
1299
1300 =over 4
1301
1302 =item Value: \@columns
1303
1304 =back
1305
1306 Shortcut to request a particular set of columns to be retrieved.  Adds
1307 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1308 from that, then auto-populates C<as> from C<select> as normal. (You may also
1309 use the C<cols> attribute, as in earlier versions of DBIC.)
1310
1311 =head2 include_columns
1312
1313 =over 4
1314
1315 =item Value: \@columns
1316
1317 =back
1318
1319 Shortcut to include additional columns in the returned results - for example
1320
1321   $schema->resultset('CD')->search(undef, {
1322     include_columns => ['artist.name'],
1323     join => ['artist']
1324   });
1325
1326 would return all CDs and include a 'name' column to the information
1327 passed to object inflation
1328
1329 =head2 select
1330
1331 =over 4
1332
1333 =item Value: \@select_columns
1334
1335 =back
1336
1337 Indicates which columns should be selected from the storage. You can use
1338 column names, or in the case of RDBMS back ends, function or stored procedure
1339 names:
1340
1341   $rs = $schema->resultset('Employee')->search(undef, {
1342     select => [
1343       'name',
1344       { count => 'employeeid' },
1345       { sum => 'salary' }
1346     ]
1347   });
1348
1349 When you use function/stored procedure names and do not supply an C<as>
1350 attribute, the column names returned are storage-dependent. E.g. MySQL would
1351 return a column named C<count(employeeid)> in the above example.
1352
1353 =head2 as
1354
1355 =over 4
1356
1357 =item Value: \@inflation_names
1358
1359 =back
1360
1361 Indicates column names for object inflation. This is used in conjunction with
1362 C<select>, usually when C<select> contains one or more function or stored
1363 procedure names:
1364
1365   $rs = $schema->resultset('Employee')->search(undef, {
1366     select => [
1367       'name',
1368       { count => 'employeeid' }
1369     ],
1370     as => ['name', 'employee_count'],
1371   });
1372
1373   my $employee = $rs->first(); # get the first Employee
1374
1375 If the object against which the search is performed already has an accessor
1376 matching a column name specified in C<as>, the value can be retrieved using
1377 the accessor as normal:
1378
1379   my $name = $employee->name();
1380
1381 If on the other hand an accessor does not exist in the object, you need to
1382 use C<get_column> instead:
1383
1384   my $employee_count = $employee->get_column('employee_count');
1385
1386 You can create your own accessors if required - see
1387 L<DBIx::Class::Manual::Cookbook> for details.
1388
1389 =head2 join
1390
1391 =over 4
1392
1393 =item Value: ($rel_name | \@rel_names | \%rel_names)
1394
1395 =back
1396
1397 Contains a list of relationships that should be joined for this query.  For
1398 example:
1399
1400   # Get CDs by Nine Inch Nails
1401   my $rs = $schema->resultset('CD')->search(
1402     { 'artist.name' => 'Nine Inch Nails' },
1403     { join => 'artist' }
1404   );
1405
1406 Can also contain a hash reference to refer to the other relation's relations.
1407 For example:
1408
1409   package MyApp::Schema::Track;
1410   use base qw/DBIx::Class/;
1411   __PACKAGE__->table('track');
1412   __PACKAGE__->add_columns(qw/trackid cd position title/);
1413   __PACKAGE__->set_primary_key('trackid');
1414   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1415   1;
1416
1417   # In your application
1418   my $rs = $schema->resultset('Artist')->search(
1419     { 'track.title' => 'Teardrop' },
1420     {
1421       join     => { cd => 'track' },
1422       order_by => 'artist.name',
1423     }
1424   );
1425
1426 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1427 similarly for a third time). For e.g.
1428
1429   my $rs = $schema->resultset('Artist')->search({
1430     'cds.title'   => 'Down to Earth',
1431     'cds_2.title' => 'Popular',
1432   }, {
1433     join => [ qw/cds cds/ ],
1434   });
1435
1436 will return a set of all artists that have both a cd with title 'Down
1437 to Earth' and a cd with title 'Popular'.
1438
1439 If you want to fetch related objects from other tables as well, see C<prefetch>
1440 below.
1441
1442 =head2 prefetch
1443
1444 =over 4
1445
1446 =item Value: ($rel_name | \@rel_names | \%rel_names)
1447
1448 =back
1449
1450 Contains one or more relationships that should be fetched along with the main
1451 query (when they are accessed afterwards they will have already been
1452 "prefetched").  This is useful for when you know you will need the related
1453 objects, because it saves at least one query:
1454
1455   my $rs = $schema->resultset('Tag')->search(
1456     undef,
1457     {
1458       prefetch => {
1459         cd => 'artist'
1460       }
1461     }
1462   );
1463
1464 The initial search results in SQL like the following:
1465
1466   SELECT tag.*, cd.*, artist.* FROM tag
1467   JOIN cd ON tag.cd = cd.cdid
1468   JOIN artist ON cd.artist = artist.artistid
1469
1470 L<DBIx::Class> has no need to go back to the database when we access the
1471 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1472 case.
1473
1474 Simple prefetches will be joined automatically, so there is no need
1475 for a C<join> attribute in the above search. If you're prefetching to
1476 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1477 specify the join as well.
1478
1479 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1480 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1481 with an accessor type of 'single' or 'filter').
1482
1483 =head2 from
1484
1485 =over 4
1486
1487 =item Value: \@from_clause
1488
1489 =back
1490
1491 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1492 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1493 clauses.
1494
1495 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1496 C<join> will usually do what you need and it is strongly recommended that you
1497 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1498
1499 In simple terms, C<from> works as follows:
1500
1501     [
1502         { <alias> => <table>, -join_type => 'inner|left|right' }
1503         [] # nested JOIN (optional)
1504         { <table.column> => <foreign_table.foreign_key> }
1505     ]
1506
1507     JOIN
1508         <alias> <table>
1509         [JOIN ...]
1510     ON <table.column> = <foreign_table.foreign_key>
1511
1512 An easy way to follow the examples below is to remember the following:
1513
1514     Anything inside "[]" is a JOIN
1515     Anything inside "{}" is a condition for the enclosing JOIN
1516
1517 The following examples utilize a "person" table in a family tree application.
1518 In order to express parent->child relationships, this table is self-joined:
1519
1520     # Person->belongs_to('father' => 'Person');
1521     # Person->belongs_to('mother' => 'Person');
1522
1523 C<from> can be used to nest joins. Here we return all children with a father,
1524 then search against all mothers of those children:
1525
1526   $rs = $schema->resultset('Person')->search(
1527       undef,
1528       {
1529           alias => 'mother', # alias columns in accordance with "from"
1530           from => [
1531               { mother => 'person' },
1532               [
1533                   [
1534                       { child => 'person' },
1535                       [
1536                           { father => 'person' },
1537                           { 'father.person_id' => 'child.father_id' }
1538                       ]
1539                   ],
1540                   { 'mother.person_id' => 'child.mother_id' }
1541               ],
1542           ]
1543       },
1544   );
1545
1546   # Equivalent SQL:
1547   # SELECT mother.* FROM person mother
1548   # JOIN (
1549   #   person child
1550   #   JOIN person father
1551   #   ON ( father.person_id = child.father_id )
1552   # )
1553   # ON ( mother.person_id = child.mother_id )
1554
1555 The type of any join can be controlled manually. To search against only people
1556 with a father in the person table, we could explicitly use C<INNER JOIN>:
1557
1558     $rs = $schema->resultset('Person')->search(
1559         undef,
1560         {
1561             alias => 'child', # alias columns in accordance with "from"
1562             from => [
1563                 { child => 'person' },
1564                 [
1565                     { father => 'person', -join_type => 'inner' },
1566                     { 'father.id' => 'child.father_id' }
1567                 ],
1568             ]
1569         },
1570     );
1571
1572     # Equivalent SQL:
1573     # SELECT child.* FROM person child
1574     # INNER JOIN person father ON child.father_id = father.id
1575
1576 =head2 page
1577
1578 =over 4
1579
1580 =item Value: $page
1581
1582 =back
1583
1584 Makes the resultset paged and specifies the page to retrieve. Effectively
1585 identical to creating a non-pages resultset and then calling ->page($page)
1586 on it.
1587
1588 =head2 rows
1589
1590 =over 4
1591
1592 =item Value: $rows
1593
1594 =back
1595
1596 Specifes the maximum number of rows for direct retrieval or the number of
1597 rows per page if the page attribute or method is used.
1598
1599 =head2 group_by
1600
1601 =over 4
1602
1603 =item Value: \@columns
1604
1605 =back
1606
1607 A arrayref of columns to group by. Can include columns of joined tables.
1608
1609   group_by => [qw/ column1 column2 ... /]
1610
1611 =head2 having
1612
1613 =over 4
1614
1615 =item Value: $condition
1616
1617 =back
1618
1619 HAVING is a select statement attribute that is applied between GROUP BY and
1620 ORDER BY. It is applied to the after the grouping calculations have been
1621 done. 
1622
1623   having => { 'count(employee)' => { '>=', 100 } }
1624
1625 =head2 distinct
1626
1627 =over 4
1628
1629 =item Value: (0 | 1)
1630
1631 =back
1632
1633 Set to 1 to group by all columns.
1634
1635 =head2 cache
1636
1637 Set to 1 to cache search results. This prevents extra SQL queries if you
1638 revisit rows in your ResultSet:
1639
1640   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1641   
1642   while( my $artist = $resultset->next ) {
1643     ... do stuff ...
1644   }
1645
1646   $rs->first; # without cache, this would issue a query
1647
1648 By default, searches are not cached.
1649
1650 For more examples of using these attributes, see
1651 L<DBIx::Class::Manual::Cookbook>.
1652
1653 =cut
1654
1655 1;