Fix for -and conditions when updating or deleting on a ResultSet
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85   
86   my ($source, $attrs) = @_;
87   weaken $source;
88   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
89   #use Data::Dumper; warn Dumper($attrs);
90   my $alias = ($attrs->{alias} ||= 'me');
91   
92   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
93   delete $attrs->{as} if $attrs->{columns};
94   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
95   $attrs->{select} = [
96     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
97   ] if $attrs->{columns};
98   $attrs->{as} ||= [
99     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
100   ];
101   if (my $include = delete $attrs->{include_columns}) {
102     push(@{$attrs->{select}}, @$include);
103     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
104   }
105   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
106
107   $attrs->{from} ||= [ { $alias => $source->from } ];
108   $attrs->{seen_join} ||= {};
109   my %seen;
110   if (my $join = delete $attrs->{join}) {
111     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
112       if (ref $j eq 'HASH') {
113         $seen{$_} = 1 foreach keys %$j;
114       } else {
115         $seen{$j} = 1;
116       }
117     }
118     push(@{$attrs->{from}}, $source->resolve_join(
119       $join, $attrs->{alias}, $attrs->{seen_join})
120     );
121   }
122   
123   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
124   $attrs->{order_by} = [ $attrs->{order_by} ] if
125     $attrs->{order_by} and !ref($attrs->{order_by});
126   $attrs->{order_by} ||= [];
127
128   my $collapse = $attrs->{collapse} || {};
129   if (my $prefetch = delete $attrs->{prefetch}) {
130     my @pre_order;
131     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
132       if ( ref $p eq 'HASH' ) {
133         foreach my $key (keys %$p) {
134           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
135             unless $seen{$key};
136         }
137       } else {
138         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
139             unless $seen{$p};
140       }
141       my @prefetch = $source->resolve_prefetch(
142            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
143       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
144       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
145     }
146     push(@{$attrs->{order_by}}, @pre_order);
147   }
148   $attrs->{collapse} = $collapse;
149 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
150
151   if ($attrs->{page}) {
152     $attrs->{rows} ||= 10;
153     $attrs->{offset} ||= 0;
154     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
155   }
156
157   bless {
158     result_source => $source,
159     result_class => $attrs->{result_class} || $source->result_class,
160     cond => $attrs->{where},
161     from => $attrs->{from},
162     collapse => $collapse,
163     count => undef,
164     page => delete $attrs->{page},
165     pager => undef,
166     attrs => $attrs
167   }, $class;
168 }
169
170 =head2 search
171
172 =over 4
173
174 =item Arguments: $cond, \%attrs?
175
176 =item Return Value: $resultset (scalar context), @row_objs (list context)
177
178 =back
179
180   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
181   my $new_rs = $cd_rs->search({ year => 2005 });
182
183   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
184                  # year = 2005 OR year = 2004
185
186 If you need to pass in additional attributes but no additional condition,
187 call it as C<search(undef, \%attrs)>.
188
189   # "SELECT name, artistid FROM $artist_table"
190   my @all_artists = $schema->resultset('Artist')->search(undef, {
191     columns => [qw/name artistid/],
192   });
193
194 =cut
195
196 sub search {
197   my $self = shift;
198
199   my $rs;
200   if( @_ ) {
201     
202     my $attrs = { %{$self->{attrs}} };
203     my $having = delete $attrs->{having};
204     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
205
206     my $where = (@_
207                   ? ((@_ == 1 || ref $_[0] eq "HASH")
208                       ? shift
209                       : ((@_ % 2)
210                           ? $self->throw_exception(
211                               "Odd number of arguments to search")
212                           : {@_}))
213                   : undef());
214     if (defined $where) {
215       $attrs->{where} = (defined $attrs->{where}
216                 ? { '-and' =>
217                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
218                         $where, $attrs->{where} ] }
219                 : $where);
220     }
221
222     if (defined $having) {
223       $attrs->{having} = (defined $attrs->{having}
224                 ? { '-and' =>
225                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
226                         $having, $attrs->{having} ] }
227                 : $having);
228     }
229
230     $rs = (ref $self)->new($self->result_source, $attrs);
231   }
232   else {
233     $rs = $self;
234     $rs->reset;
235   }
236   return (wantarray ? $rs->all : $rs);
237 }
238
239 =head2 search_literal
240
241 =over 4
242
243 =item Arguments: $sql_fragment, @bind_values
244
245 =item Return Value: $resultset (scalar context), @row_objs (list context)
246
247 =back
248
249   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
250   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
251
252 Pass a literal chunk of SQL to be added to the conditional part of the
253 resultset query.
254
255 =cut
256
257 sub search_literal {
258   my ($self, $cond, @vals) = @_;
259   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
260   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
261   return $self->search(\$cond, $attrs);
262 }
263
264 =head2 find
265
266 =over 4
267
268 =item Arguments: @values | \%cols, \%attrs?
269
270 =item Return Value: $row_object
271
272 =back
273
274 Finds a row based on its primary key or unique constraint. For example:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 Also takes an optional C<key> attribute, to search by a specific key or unique
279 constraint. For example:
280
281   my $cd = $schema->resultset('CD')->find(
282     {
283       artist => 'Massive Attack',
284       title  => 'Mezzanine',
285     },
286     { key => 'artist_title' }
287   );
288
289 See also L</find_or_create> and L</update_or_create>.
290
291 =cut
292
293 sub find {
294   my ($self, @vals) = @_;
295   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
296
297   my @cols = $self->result_source->primary_columns;
298   if (exists $attrs->{key}) {
299     my %uniq = $self->result_source->unique_constraints;
300     $self->throw_exception(
301       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
302     ) unless exists $uniq{$attrs->{key}};
303     @cols = @{ $uniq{$attrs->{key}} };
304   }
305   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
306   $self->throw_exception(
307     "Can't find unless a primary key or unique constraint is defined"
308   ) unless @cols;
309
310   my $query;
311   if (ref $vals[0] eq 'HASH') {
312     $query = { %{$vals[0]} };
313   } elsif (@cols == @vals) {
314     $query = {};
315     @{$query}{@cols} = @vals;
316   } else {
317     $query = {@vals};
318   }
319   foreach my $key (grep { ! m/\./ } keys %$query) {
320     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
321   }
322   #warn Dumper($query);
323   
324   if (keys %$attrs) {
325       my $rs = $self->search($query,$attrs);
326       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
327   } else {
328       return keys %{$self->{collapse}} ?
329         $self->search($query)->next :
330         $self->single($query);
331   }
332 }
333
334 =head2 search_related
335
336 =over 4
337
338 =item Arguments: $cond, \%attrs?
339
340 =item Return Value: $new_resultset
341
342 =back
343
344   $new_rs = $cd_rs->search_related('artist', {
345     name => 'Emo-R-Us',
346   });
347
348 Searches the specified relationship, optionally specifying a condition and
349 attributes for matching records. See L</ATTRIBUTES> for more information.
350
351 =cut
352
353 sub search_related {
354   return shift->related_resultset(shift)->search(@_);
355 }
356
357 =head2 cursor
358
359 =over 4
360
361 =item Arguments: none
362
363 =item Return Value: $cursor
364
365 =back
366
367 Returns a storage-driven cursor to the given resultset. See
368 L<DBIx::Class::Cursor> for more information.
369
370 =cut
371
372 sub cursor {
373   my ($self) = @_;
374   my $attrs = { %{$self->{attrs}} };
375   return $self->{cursor}
376     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
377           $attrs->{where},$attrs);
378 }
379
380 =head2 single
381
382 =over 4
383
384 =item Arguments: $cond?
385
386 =item Return Value: $row_object?
387
388 =back
389
390   my $cd = $schema->resultset('CD')->single({ year => 2001 });
391
392 Inflates the first result without creating a cursor if the resultset has
393 any records in it; if not returns nothing. Used by find() as an optimisation.
394
395 =cut
396
397 sub single {
398   my ($self, $where) = @_;
399   my $attrs = { %{$self->{attrs}} };
400   if ($where) {
401     if (defined $attrs->{where}) {
402       $attrs->{where} = {
403         '-and' =>
404             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
405                $where, delete $attrs->{where} ]
406       };
407     } else {
408       $attrs->{where} = $where;
409     }
410   }
411   my @data = $self->result_source->storage->select_single(
412           $self->{from}, $attrs->{select},
413           $attrs->{where},$attrs);
414   return (@data ? $self->_construct_object(@data) : ());
415 }
416
417
418 =head2 search_like
419
420 =over 4
421
422 =item Arguments: $cond, \%attrs?
423
424 =item Return Value: $resultset (scalar context), @row_objs (list context)
425
426 =back
427
428   # WHERE title LIKE '%blue%'
429   $cd_rs = $rs->search_like({ title => '%blue%'});
430
431 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
432 that this is simply a convenience method. You most likely want to use
433 L</search> with specific operators.
434
435 For more information, see L<DBIx::Class::Manual::Cookbook>.
436
437 =cut
438
439 sub search_like {
440   my $class = shift;
441   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
442   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
443   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
444   return $class->search($query, { %$attrs });
445 }
446
447 =head2 slice
448
449 =over 4
450
451 =item Arguments: $first, $last
452
453 =item Return Value: $resultset (scalar context), @row_objs (list context)
454
455 =back
456
457 Returns a resultset or object list representing a subset of elements from the
458 resultset slice is called on. Indexes are from 0, i.e., to get the first
459 three records, call:
460
461   my ($one, $two, $three) = $rs->slice(0, 2);
462
463 =cut
464
465 sub slice {
466   my ($self, $min, $max) = @_;
467   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
468   $attrs->{offset} = $self->{attrs}{offset} || 0;
469   $attrs->{offset} += $min;
470   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
471   return $self->search(undef(), $attrs);
472   #my $slice = (ref $self)->new($self->result_source, $attrs);
473   #return (wantarray ? $slice->all : $slice);
474 }
475
476 =head2 next
477
478 =over 4
479
480 =item Arguments: none
481
482 =item Return Value: $result?
483
484 =back
485
486 Returns the next element in the resultset (C<undef> is there is none).
487
488 Can be used to efficiently iterate over records in the resultset:
489
490   my $rs = $schema->resultset('CD')->search;
491   while (my $cd = $rs->next) {
492     print $cd->title;
493   }
494
495 Note that you need to store the resultset object, and call C<next> on it. 
496 Calling C<< resultset('Table')->next >> repeatedly will always return the
497 first record from the resultset.
498
499 =cut
500
501 sub next {
502   my ($self) = @_;
503   if (@{$self->{all_cache} || []}) {
504     $self->{all_cache_position} ||= 0;
505     return $self->{all_cache}->[$self->{all_cache_position}++];
506   }
507   if ($self->{attrs}{cache}) {
508     $self->{all_cache_position} = 1;
509     return ($self->all)[0];
510   }
511   my @row = (exists $self->{stashed_row} ?
512                @{delete $self->{stashed_row}} :
513                $self->cursor->next
514   );
515 #  warn Dumper(\@row); use Data::Dumper;
516   return unless (@row);
517   return $self->_construct_object(@row);
518 }
519
520 sub _construct_object {
521   my ($self, @row) = @_;
522   my @as = @{ $self->{attrs}{as} };
523   
524   my $info = $self->_collapse_result(\@as, \@row);
525   
526   my $new = $self->result_class->inflate_result($self->result_source, @$info);
527   
528   $new = $self->{attrs}{record_filter}->($new)
529     if exists $self->{attrs}{record_filter};
530   return $new;
531 }
532
533 sub _collapse_result {
534   my ($self, $as, $row, $prefix) = @_;
535
536   my %const;
537
538   my @copy = @$row;
539   foreach my $this_as (@$as) {
540     my $val = shift @copy;
541     if (defined $prefix) {
542       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
543         my $remain = $1;
544         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
545         $const{$1||''}{$2} = $val;
546       }
547     } else {
548       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
549       $const{$1||''}{$2} = $val;
550     }
551   }
552
553   my $info = [ {}, {} ];
554   foreach my $key (keys %const) {
555     if (length $key) {
556       my $target = $info;
557       my @parts = split(/\./, $key);
558       foreach my $p (@parts) {
559         $target = $target->[1]->{$p} ||= [];
560       }
561       $target->[0] = $const{$key};
562     } else {
563       $info->[0] = $const{$key};
564     }
565   }
566
567   my @collapse;
568   if (defined $prefix) {
569     @collapse = map {
570         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
571     } keys %{$self->{collapse}}
572   } else {
573     @collapse = keys %{$self->{collapse}};
574   };
575
576   if (@collapse) {
577     my ($c) = sort { length $a <=> length $b } @collapse;
578     my $target = $info;
579     foreach my $p (split(/\./, $c)) {
580       $target = $target->[1]->{$p} ||= [];
581     }
582     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
583     my @co_key = @{$self->{collapse}{$c_prefix}};
584     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
585     my $tree = $self->_collapse_result($as, $row, $c_prefix);
586     my (@final, @raw);
587     while ( !(grep {
588                 !defined($tree->[0]->{$_}) ||
589                 $co_check{$_} ne $tree->[0]->{$_}
590               } @co_key) ) {
591       push(@final, $tree);
592       last unless (@raw = $self->cursor->next);
593       $row = $self->{stashed_row} = \@raw;
594       $tree = $self->_collapse_result($as, $row, $c_prefix);
595       #warn Data::Dumper::Dumper($tree, $row);
596     }
597     @$target = @final;
598   }
599
600   return $info;
601 }
602
603 =head2 result_source
604
605 =over 4
606
607 =item Arguments: $result_source?
608
609 =item Return Value: $result_source
610
611 =back
612
613 An accessor for the primary ResultSource object from which this ResultSet
614 is derived.
615
616 =cut
617
618
619 =head2 count
620
621 =over 4
622
623 =item Arguments: $cond, \%attrs??
624
625 =item Return Value: $count
626
627 =back
628
629 Performs an SQL C<COUNT> with the same query as the resultset was built
630 with to find the number of elements. If passed arguments, does a search
631 on the resultset and counts the results of that.
632
633 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
634 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
635 not support C<DISTINCT> with multiple columns. If you are using such a
636 database, you should only use columns from the main table in your C<group_by>
637 clause.
638
639 =cut
640
641 sub count {
642   my $self = shift;
643   return $self->search(@_)->count if @_ and defined $_[0];
644   return scalar @{ $self->get_cache } if @{ $self->get_cache };
645
646   my $count = $self->_count;
647   return 0 unless $count;
648
649   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
650   $count = $self->{attrs}{rows} if
651     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
652   return $count;
653 }
654
655 sub _count { # Separated out so pager can get the full count
656   my $self = shift;
657   my $select = { count => '*' };
658   my $attrs = { %{ $self->{attrs} } };
659   if (my $group_by = delete $attrs->{group_by}) {
660     delete $attrs->{having};
661     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
662     # todo: try CONCAT for multi-column pk
663     my @pk = $self->result_source->primary_columns;
664     if (@pk == 1) {
665       foreach my $column (@distinct) {
666         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
667           @distinct = ($column);
668           last;
669         }
670       }
671     }
672
673     $select = { count => { distinct => \@distinct } };
674     #use Data::Dumper; die Dumper $select;
675   }
676
677   $attrs->{select} = $select;
678   $attrs->{as} = [qw/count/];
679
680   # offset, order by and page are not needed to count. record_filter is cdbi
681   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
682         
683   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
684   return $count;
685 }
686
687 =head2 count_literal
688
689 =over 4
690
691 =item Arguments: $sql_fragment, @bind_values
692
693 =item Return Value: $count
694
695 =back
696
697 Counts the results in a literal query. Equivalent to calling L</search_literal>
698 with the passed arguments, then L</count>.
699
700 =cut
701
702 sub count_literal { shift->search_literal(@_)->count; }
703
704 =head2 all
705
706 =over 4
707
708 =item Arguments: none
709
710 =item Return Value: @objects
711
712 =back
713
714 Returns all elements in the resultset. Called implicitly if the resultset
715 is returned in list context.
716
717 =cut
718
719 sub all {
720   my ($self) = @_;
721   return @{ $self->get_cache } if @{ $self->get_cache };
722
723   my @obj;
724
725   if (keys %{$self->{collapse}}) {
726       # Using $self->cursor->all is really just an optimisation.
727       # If we're collapsing has_many prefetches it probably makes
728       # very little difference, and this is cleaner than hacking
729       # _construct_object to survive the approach
730     $self->cursor->reset;
731     my @row = $self->cursor->next;
732     while (@row) {
733       push(@obj, $self->_construct_object(@row));
734       @row = (exists $self->{stashed_row}
735                ? @{delete $self->{stashed_row}}
736                : $self->cursor->next);
737     }
738   } else {
739     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
740   }
741
742   $self->set_cache(\@obj) if $self->{attrs}{cache};
743   return @obj;
744 }
745
746 =head2 reset
747
748 =over 4
749
750 =item Arguments: none
751
752 =item Return Value: $self
753
754 =back
755
756 Resets the resultset's cursor, so you can iterate through the elements again.
757
758 =cut
759
760 sub reset {
761   my ($self) = @_;
762   $self->{all_cache_position} = 0;
763   $self->cursor->reset;
764   return $self;
765 }
766
767 =head2 first
768
769 =over 4
770
771 =item Arguments: none
772
773 =item Return Value: $object?
774
775 =back
776
777 Resets the resultset and returns an object for the first result (if the
778 resultset returns anything).
779
780 =cut
781
782 sub first {
783   return $_[0]->reset->next;
784 }
785
786 # _cond_for_update_delete
787 #
788 # update/delete require the condition to be modified to handle
789 # the differing SQL syntax available.  This transforms the $self->{cond}
790 # appropriately, returning the new condition.
791
792 sub _cond_for_update_delete {
793   my ($self) = @_;
794   my $cond = {};
795
796   if (!ref($self->{cond})) {
797     # No-op. No condition, we're updating/deleting everything
798   }
799   elsif (ref $self->{cond} eq 'ARRAY') {
800     $cond = [
801       map {
802         my %hash;
803         foreach my $key (keys %{$_}) {
804           $key =~ /([^.]+)$/;
805           $hash{$1} = $_->{$key};
806         }
807         \%hash;
808       } @{$self->{cond}}
809     ];
810   }
811   elsif (ref $self->{cond} eq 'HASH') {
812     if ((keys %{$self->{cond}})[0] eq '-and') {
813       $cond->{-and} = [];
814
815       my @cond = @{$self->{cond}{-and}};
816       for (my $i = 0; $i < @cond - 1; $i++) {
817         my $entry = $cond[$i];
818
819         my %hash;
820         if (ref $entry eq 'HASH') {
821           foreach my $key (keys %{$entry}) {
822             $key =~ /([^.]+)$/;
823             $hash{$1} = $entry->{$key};
824           }
825         }
826         else {
827           $entry =~ /([^.]+)$/;
828           $hash{$entry} = $cond[++$i];
829         }
830
831         push @{$cond->{-and}}, \%hash;
832       }
833     }
834     else {
835       foreach my $key (keys %{$self->{cond}}) {
836         $key =~ /([^.]+)$/;
837         $cond->{$1} = $self->{cond}{$key};
838       }
839     }
840   }
841   else {
842     $self->throw_exception(
843       "Can't update/delete on resultset with condition unless hash or array"
844     );
845   }
846
847   return $cond;
848 }
849
850
851 =head2 update
852
853 =over 4
854
855 =item Arguments: \%values
856
857 =item Return Value: $storage_rv
858
859 =back
860
861 Sets the specified columns in the resultset to the supplied values in a
862 single query. Return value will be true if the update succeeded or false
863 if no records were updated; exact type of success value is storage-dependent.
864
865 =cut
866
867 sub update {
868   my ($self, $values) = @_;
869   $self->throw_exception("Values for update must be a hash")
870     unless ref $values eq 'HASH';
871
872   my $cond = $self->_cond_for_update_delete;
873
874   return $self->result_source->storage->update(
875     $self->result_source->from, $values, $cond
876   );
877 }
878
879 =head2 update_all
880
881 =over 4
882
883 =item Arguments: \%values
884
885 =item Return Value: 1
886
887 =back
888
889 Fetches all objects and updates them one at a time. Note that C<update_all>
890 will run DBIC cascade triggers, while L</update> will not.
891
892 =cut
893
894 sub update_all {
895   my ($self, $values) = @_;
896   $self->throw_exception("Values for update must be a hash")
897     unless ref $values eq 'HASH';
898   foreach my $obj ($self->all) {
899     $obj->set_columns($values)->update;
900   }
901   return 1;
902 }
903
904 =head2 delete
905
906 =over 4
907
908 =item Arguments: none
909
910 =item Return Value: 1
911
912 =back
913
914 Deletes the contents of the resultset from its result source. Note that this
915 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
916 to run.
917
918 =cut
919
920 sub delete {
921   my ($self) = @_;
922   my $del = {};
923
924   my $cond = $self->_cond_for_update_delete;
925
926   $self->result_source->storage->delete($self->result_source->from, $cond);
927   return 1;
928 }
929
930 =head2 delete_all
931
932 =over 4
933
934 =item Arguments: none
935
936 =item Return Value: 1
937
938 =back
939
940 Fetches all objects and deletes them one at a time. Note that C<delete_all>
941 will run DBIC cascade triggers, while L</delete> will not.
942
943 =cut
944
945 sub delete_all {
946   my ($self) = @_;
947   $_->delete for $self->all;
948   return 1;
949 }
950
951 =head2 pager
952
953 =over 4
954
955 =item Arguments: none
956
957 =item Return Value: $pager
958
959 =back
960
961 Return Value a L<Data::Page> object for the current resultset. Only makes
962 sense for queries with a C<page> attribute.
963
964 =cut
965
966 sub pager {
967   my ($self) = @_;
968   my $attrs = $self->{attrs};
969   $self->throw_exception("Can't create pager for non-paged rs")
970     unless $self->{page};
971   $attrs->{rows} ||= 10;
972   return $self->{pager} ||= Data::Page->new(
973     $self->_count, $attrs->{rows}, $self->{page});
974 }
975
976 =head2 page
977
978 =over 4
979
980 =item Arguments: $page_number
981
982 =item Return Value: $rs
983
984 =back
985
986 Returns a resultset for the $page_number page of the resultset on which page
987 is called, where each page contains a number of rows equal to the 'rows'
988 attribute set on the resultset (10 by default).
989
990 =cut
991
992 sub page {
993   my ($self, $page) = @_;
994   my $attrs = { %{$self->{attrs}} };
995   $attrs->{page} = $page;
996   return (ref $self)->new($self->result_source, $attrs);
997 }
998
999 =head2 new_result
1000
1001 =over 4
1002
1003 =item Arguments: \%vals
1004
1005 =item Return Value: $object
1006
1007 =back
1008
1009 Creates an object in the resultset's result class and returns it.
1010
1011 =cut
1012
1013 sub new_result {
1014   my ($self, $values) = @_;
1015   $self->throw_exception( "new_result needs a hash" )
1016     unless (ref $values eq 'HASH');
1017   $self->throw_exception(
1018     "Can't abstract implicit construct, condition not a hash"
1019   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1020   my %new = %$values;
1021   my $alias = $self->{attrs}{alias};
1022   foreach my $key (keys %{$self->{cond}||{}}) {
1023     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1024   }
1025   my $obj = $self->result_class->new(\%new);
1026   $obj->result_source($self->result_source) if $obj->can('result_source');
1027   return $obj;
1028 }
1029
1030 =head2 create
1031
1032 =over 4
1033
1034 =item Arguments: \%vals
1035
1036 =item Return Value: $object
1037
1038 =back
1039
1040 Inserts a record into the resultset and returns the object representing it.
1041
1042 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1043
1044 =cut
1045
1046 sub create {
1047   my ($self, $attrs) = @_;
1048   $self->throw_exception( "create needs a hashref" )
1049     unless ref $attrs eq 'HASH';
1050   return $self->new_result($attrs)->insert;
1051 }
1052
1053 =head2 find_or_create
1054
1055 =over 4
1056
1057 =item Arguments: \%vals, \%attrs?
1058
1059 =item Return Value: $object
1060
1061 =back
1062
1063   $class->find_or_create({ key => $val, ... });
1064
1065 Searches for a record matching the search condition; if it doesn't find one,
1066 creates one and returns that instead.
1067
1068   my $cd = $schema->resultset('CD')->find_or_create({
1069     cdid   => 5,
1070     artist => 'Massive Attack',
1071     title  => 'Mezzanine',
1072     year   => 2005,
1073   });
1074
1075 Also takes an optional C<key> attribute, to search by a specific key or unique
1076 constraint. For example:
1077
1078   my $cd = $schema->resultset('CD')->find_or_create(
1079     {
1080       artist => 'Massive Attack',
1081       title  => 'Mezzanine',
1082     },
1083     { key => 'artist_title' }
1084   );
1085
1086 See also L</find> and L</update_or_create>.
1087
1088 =cut
1089
1090 sub find_or_create {
1091   my $self     = shift;
1092   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1093   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1094   my $exists   = $self->find($hash, $attrs);
1095   return defined $exists ? $exists : $self->create($hash);
1096 }
1097
1098 =head2 update_or_create
1099
1100 =over 4
1101
1102 =item Arguments: \%col_values, { key => $unique_constraint }?
1103
1104 =item Return Value: $object
1105
1106 =back
1107
1108   $class->update_or_create({ col => $val, ... });
1109
1110 First, searches for an existing row matching one of the unique constraints
1111 (including the primary key) on the source of this resultset. If a row is
1112 found, updates it with the other given column values. Otherwise, creates a new
1113 row.
1114
1115 Takes an optional C<key> attribute to search on a specific unique constraint.
1116 For example:
1117
1118   # In your application
1119   my $cd = $schema->resultset('CD')->update_or_create(
1120     {
1121       artist => 'Massive Attack',
1122       title  => 'Mezzanine',
1123       year   => 1998,
1124     },
1125     { key => 'artist_title' }
1126   );
1127
1128 If no C<key> is specified, it searches on all unique constraints defined on the
1129 source, including the primary key.
1130
1131 If the C<key> is specified as C<primary>, it searches only on the primary key.
1132
1133 See also L</find> and L</find_or_create>.
1134
1135 =cut
1136
1137 sub update_or_create {
1138   my $self = shift;
1139   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1140   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
1141
1142   my %unique_constraints = $self->result_source->unique_constraints;
1143   my @constraint_names   = (exists $attrs->{key}
1144                             ? ($attrs->{key})
1145                             : keys %unique_constraints);
1146
1147   my @unique_hashes;
1148   foreach my $name (@constraint_names) {
1149     my @unique_cols = @{ $unique_constraints{$name} };
1150     my %unique_hash =
1151       map  { $_ => $hash->{$_} }
1152       grep { exists $hash->{$_} }
1153       @unique_cols;
1154
1155     push @unique_hashes, \%unique_hash
1156       if (scalar keys %unique_hash == scalar @unique_cols);
1157   }
1158
1159   if (@unique_hashes) {
1160     my $row = $self->single(\@unique_hashes);
1161     if (defined $row) {
1162       $row->set_columns($hash);
1163       $row->update;
1164       return $row;
1165     }
1166   }
1167
1168   return $self->create($hash);
1169 }
1170
1171 =head2 get_cache
1172
1173 =over 4
1174
1175 =item Arguments: none
1176
1177 =item Return Value: \@cache_objects?
1178
1179 =back
1180
1181 Gets the contents of the cache for the resultset, if the cache is set.
1182
1183 =cut
1184
1185 sub get_cache {
1186   shift->{all_cache} || [];
1187 }
1188
1189 =head2 set_cache
1190
1191 =over 4
1192
1193 =item Arguments: \@cache_objects
1194
1195 =item Return Value: \@cache_objects
1196
1197 =back
1198
1199 Sets the contents of the cache for the resultset. Expects an arrayref
1200 of objects of the same class as those produced by the resultset. Note that
1201 if the cache is set the resultset will return the cached objects rather
1202 than re-querying the database even if the cache attr is not set.
1203
1204 =cut
1205
1206 sub set_cache {
1207   my ( $self, $data ) = @_;
1208   $self->throw_exception("set_cache requires an arrayref")
1209     if ref $data ne 'ARRAY';
1210   my $result_class = $self->result_class;
1211   foreach( @$data ) {
1212     $self->throw_exception(
1213       "cannot cache object of type '$_', expected '$result_class'"
1214     ) if ref $_ ne $result_class;
1215   }
1216   $self->{all_cache} = $data;
1217 }
1218
1219 =head2 clear_cache
1220
1221 =over 4
1222
1223 =item Arguments: none
1224
1225 =item Return Value: []
1226
1227 =back
1228
1229 Clears the cache for the resultset.
1230
1231 =cut
1232
1233 sub clear_cache {
1234   shift->set_cache([]);
1235 }
1236
1237 =head2 related_resultset
1238
1239 =over 4
1240
1241 =item Arguments: $relationship_name
1242
1243 =item Return Value: $resultset
1244
1245 =back
1246
1247 Returns a related resultset for the supplied relationship name.
1248
1249   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1250
1251 =cut
1252
1253 sub related_resultset {
1254   my ( $self, $rel ) = @_;
1255   $self->{related_resultsets} ||= {};
1256   return $self->{related_resultsets}{$rel} ||= do {
1257       #warn "fetching related resultset for rel '$rel'";
1258       my $rel_obj = $self->result_source->relationship_info($rel);
1259       $self->throw_exception(
1260         "search_related: result source '" . $self->result_source->name .
1261         "' has no such relationship ${rel}")
1262         unless $rel_obj; #die Dumper $self->{attrs};
1263
1264       my $rs = $self->search(undef, { join => $rel });
1265       my $alias = defined $rs->{attrs}{seen_join}{$rel}
1266                     && $rs->{attrs}{seen_join}{$rel} > 1
1267                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
1268                   : $rel;
1269
1270       $self->result_source->schema->resultset($rel_obj->{class}
1271            )->search( undef,
1272              { %{$rs->{attrs}},
1273                alias => $alias,
1274                select => undef,
1275                as => undef }
1276            );
1277   };
1278 }
1279
1280 =head2 throw_exception
1281
1282 See L<DBIx::Class::Schema/throw_exception> for details.
1283
1284 =cut
1285
1286 sub throw_exception {
1287   my $self=shift;
1288   $self->result_source->schema->throw_exception(@_);
1289 }
1290
1291 # XXX: FIXME: Attributes docs need clearing up
1292
1293 =head1 ATTRIBUTES
1294
1295 The resultset takes various attributes that modify its behavior. Here's an
1296 overview of them:
1297
1298 =head2 order_by
1299
1300 =over 4
1301
1302 =item Value: ($order_by | \@order_by)
1303
1304 =back
1305
1306 Which column(s) to order the results by. This is currently passed
1307 through directly to SQL, so you can give e.g. C<year DESC> for a
1308 descending order on the column `year'.
1309
1310 =head2 columns
1311
1312 =over 4
1313
1314 =item Value: \@columns
1315
1316 =back
1317
1318 Shortcut to request a particular set of columns to be retrieved.  Adds
1319 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1320 from that, then auto-populates C<as> from C<select> as normal. (You may also
1321 use the C<cols> attribute, as in earlier versions of DBIC.)
1322
1323 =head2 include_columns
1324
1325 =over 4
1326
1327 =item Value: \@columns
1328
1329 =back
1330
1331 Shortcut to include additional columns in the returned results - for example
1332
1333   $schema->resultset('CD')->search(undef, {
1334     include_columns => ['artist.name'],
1335     join => ['artist']
1336   });
1337
1338 would return all CDs and include a 'name' column to the information
1339 passed to object inflation
1340
1341 =head2 select
1342
1343 =over 4
1344
1345 =item Value: \@select_columns
1346
1347 =back
1348
1349 Indicates which columns should be selected from the storage. You can use
1350 column names, or in the case of RDBMS back ends, function or stored procedure
1351 names:
1352
1353   $rs = $schema->resultset('Employee')->search(undef, {
1354     select => [
1355       'name',
1356       { count => 'employeeid' },
1357       { sum => 'salary' }
1358     ]
1359   });
1360
1361 When you use function/stored procedure names and do not supply an C<as>
1362 attribute, the column names returned are storage-dependent. E.g. MySQL would
1363 return a column named C<count(employeeid)> in the above example.
1364
1365 =head2 as
1366
1367 =over 4
1368
1369 =item Value: \@inflation_names
1370
1371 =back
1372
1373 Indicates column names for object inflation. This is used in conjunction with
1374 C<select>, usually when C<select> contains one or more function or stored
1375 procedure names:
1376
1377   $rs = $schema->resultset('Employee')->search(undef, {
1378     select => [
1379       'name',
1380       { count => 'employeeid' }
1381     ],
1382     as => ['name', 'employee_count'],
1383   });
1384
1385   my $employee = $rs->first(); # get the first Employee
1386
1387 If the object against which the search is performed already has an accessor
1388 matching a column name specified in C<as>, the value can be retrieved using
1389 the accessor as normal:
1390
1391   my $name = $employee->name();
1392
1393 If on the other hand an accessor does not exist in the object, you need to
1394 use C<get_column> instead:
1395
1396   my $employee_count = $employee->get_column('employee_count');
1397
1398 You can create your own accessors if required - see
1399 L<DBIx::Class::Manual::Cookbook> for details.
1400
1401 =head2 join
1402
1403 =over 4
1404
1405 =item Value: ($rel_name | \@rel_names | \%rel_names)
1406
1407 =back
1408
1409 Contains a list of relationships that should be joined for this query.  For
1410 example:
1411
1412   # Get CDs by Nine Inch Nails
1413   my $rs = $schema->resultset('CD')->search(
1414     { 'artist.name' => 'Nine Inch Nails' },
1415     { join => 'artist' }
1416   );
1417
1418 Can also contain a hash reference to refer to the other relation's relations.
1419 For example:
1420
1421   package MyApp::Schema::Track;
1422   use base qw/DBIx::Class/;
1423   __PACKAGE__->table('track');
1424   __PACKAGE__->add_columns(qw/trackid cd position title/);
1425   __PACKAGE__->set_primary_key('trackid');
1426   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1427   1;
1428
1429   # In your application
1430   my $rs = $schema->resultset('Artist')->search(
1431     { 'track.title' => 'Teardrop' },
1432     {
1433       join     => { cd => 'track' },
1434       order_by => 'artist.name',
1435     }
1436   );
1437
1438 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1439 similarly for a third time). For e.g.
1440
1441   my $rs = $schema->resultset('Artist')->search({
1442     'cds.title'   => 'Down to Earth',
1443     'cds_2.title' => 'Popular',
1444   }, {
1445     join => [ qw/cds cds/ ],
1446   });
1447
1448 will return a set of all artists that have both a cd with title 'Down
1449 to Earth' and a cd with title 'Popular'.
1450
1451 If you want to fetch related objects from other tables as well, see C<prefetch>
1452 below.
1453
1454 =head2 prefetch
1455
1456 =over 4
1457
1458 =item Value: ($rel_name | \@rel_names | \%rel_names)
1459
1460 =back
1461
1462 Contains one or more relationships that should be fetched along with the main
1463 query (when they are accessed afterwards they will have already been
1464 "prefetched").  This is useful for when you know you will need the related
1465 objects, because it saves at least one query:
1466
1467   my $rs = $schema->resultset('Tag')->search(
1468     undef,
1469     {
1470       prefetch => {
1471         cd => 'artist'
1472       }
1473     }
1474   );
1475
1476 The initial search results in SQL like the following:
1477
1478   SELECT tag.*, cd.*, artist.* FROM tag
1479   JOIN cd ON tag.cd = cd.cdid
1480   JOIN artist ON cd.artist = artist.artistid
1481
1482 L<DBIx::Class> has no need to go back to the database when we access the
1483 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1484 case.
1485
1486 Simple prefetches will be joined automatically, so there is no need
1487 for a C<join> attribute in the above search. If you're prefetching to
1488 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1489 specify the join as well.
1490
1491 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1492 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1493 with an accessor type of 'single' or 'filter').
1494
1495 =head2 from
1496
1497 =over 4
1498
1499 =item Value: \@from_clause
1500
1501 =back
1502
1503 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1504 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1505 clauses.
1506
1507 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1508 C<join> will usually do what you need and it is strongly recommended that you
1509 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1510
1511 In simple terms, C<from> works as follows:
1512
1513     [
1514         { <alias> => <table>, -join_type => 'inner|left|right' }
1515         [] # nested JOIN (optional)
1516         { <table.column> => <foreign_table.foreign_key> }
1517     ]
1518
1519     JOIN
1520         <alias> <table>
1521         [JOIN ...]
1522     ON <table.column> = <foreign_table.foreign_key>
1523
1524 An easy way to follow the examples below is to remember the following:
1525
1526     Anything inside "[]" is a JOIN
1527     Anything inside "{}" is a condition for the enclosing JOIN
1528
1529 The following examples utilize a "person" table in a family tree application.
1530 In order to express parent->child relationships, this table is self-joined:
1531
1532     # Person->belongs_to('father' => 'Person');
1533     # Person->belongs_to('mother' => 'Person');
1534
1535 C<from> can be used to nest joins. Here we return all children with a father,
1536 then search against all mothers of those children:
1537
1538   $rs = $schema->resultset('Person')->search(
1539       undef,
1540       {
1541           alias => 'mother', # alias columns in accordance with "from"
1542           from => [
1543               { mother => 'person' },
1544               [
1545                   [
1546                       { child => 'person' },
1547                       [
1548                           { father => 'person' },
1549                           { 'father.person_id' => 'child.father_id' }
1550                       ]
1551                   ],
1552                   { 'mother.person_id' => 'child.mother_id' }
1553               ],
1554           ]
1555       },
1556   );
1557
1558   # Equivalent SQL:
1559   # SELECT mother.* FROM person mother
1560   # JOIN (
1561   #   person child
1562   #   JOIN person father
1563   #   ON ( father.person_id = child.father_id )
1564   # )
1565   # ON ( mother.person_id = child.mother_id )
1566
1567 The type of any join can be controlled manually. To search against only people
1568 with a father in the person table, we could explicitly use C<INNER JOIN>:
1569
1570     $rs = $schema->resultset('Person')->search(
1571         undef,
1572         {
1573             alias => 'child', # alias columns in accordance with "from"
1574             from => [
1575                 { child => 'person' },
1576                 [
1577                     { father => 'person', -join_type => 'inner' },
1578                     { 'father.id' => 'child.father_id' }
1579                 ],
1580             ]
1581         },
1582     );
1583
1584     # Equivalent SQL:
1585     # SELECT child.* FROM person child
1586     # INNER JOIN person father ON child.father_id = father.id
1587
1588 =head2 page
1589
1590 =over 4
1591
1592 =item Value: $page
1593
1594 =back
1595
1596 Makes the resultset paged and specifies the page to retrieve. Effectively
1597 identical to creating a non-pages resultset and then calling ->page($page)
1598 on it.
1599
1600 =head2 rows
1601
1602 =over 4
1603
1604 =item Value: $rows
1605
1606 =back
1607
1608 Specifes the maximum number of rows for direct retrieval or the number of
1609 rows per page if the page attribute or method is used.
1610
1611 =head2 group_by
1612
1613 =over 4
1614
1615 =item Value: \@columns
1616
1617 =back
1618
1619 A arrayref of columns to group by. Can include columns of joined tables.
1620
1621   group_by => [qw/ column1 column2 ... /]
1622
1623 =head2 having
1624
1625 =over 4
1626
1627 =item Value: $condition
1628
1629 =back
1630
1631 HAVING is a select statement attribute that is applied between GROUP BY and
1632 ORDER BY. It is applied to the after the grouping calculations have been
1633 done. 
1634
1635   having => { 'count(employee)' => { '>=', 100 } }
1636
1637 =head2 distinct
1638
1639 =over 4
1640
1641 =item Value: (0 | 1)
1642
1643 =back
1644
1645 Set to 1 to group by all columns.
1646
1647 =head2 cache
1648
1649 Set to 1 to cache search results. This prevents extra SQL queries if you
1650 revisit rows in your ResultSet:
1651
1652   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1653   
1654   while( my $artist = $resultset->next ) {
1655     ... do stuff ...
1656   }
1657
1658   $rs->first; # without cache, this would issue a query
1659
1660 By default, searches are not cached.
1661
1662 For more examples of using these attributes, see
1663 L<DBIx::Class::Manual::Cookbook>.
1664
1665 =cut
1666
1667 1;