Row::update encapsulates this when passed a hashref; using set_columns bypasses deflation
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('CD')->search(year => 2005);
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =over 4
57
58 =item Arguments: $source, \%$attrs
59
60 =item Return Value: $rs
61
62 =back
63
64 The resultset constructor. Takes a source object (usually a
65 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
66 L</ATTRIBUTES> below).  Does not perform any queries -- these are
67 executed as needed by the other methods.
68
69 Generally you won't need to construct a resultset manually.  You'll
70 automatically get one from e.g. a L</search> called in scalar context:
71
72   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
73
74 IMPORTANT: If called on an object, proxies to new_result instead so
75
76   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
77
78 will return a CD object, not a ResultSet.
79
80 =cut
81
82 sub new {
83   my $class = shift;
84   return $class->new_result(@_) if ref $class;
85   
86   my ($source, $attrs) = @_;
87   weaken $source;
88   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
89   #use Data::Dumper; warn Dumper($attrs);
90   my $alias = ($attrs->{alias} ||= 'me');
91   
92   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
93   delete $attrs->{as} if $attrs->{columns};
94   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
95   $attrs->{select} = [
96     map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}}
97   ] if $attrs->{columns};
98   $attrs->{as} ||= [
99     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
100   ];
101   if (my $include = delete $attrs->{include_columns}) {
102     push(@{$attrs->{select}}, @$include);
103     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
104   }
105   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
106
107   $attrs->{from} ||= [ { $alias => $source->from } ];
108   $attrs->{seen_join} ||= {};
109   my %seen;
110   if (my $join = delete $attrs->{join}) {
111     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
112       if (ref $j eq 'HASH') {
113         $seen{$_} = 1 foreach keys %$j;
114       } else {
115         $seen{$j} = 1;
116       }
117     }
118     push(@{$attrs->{from}}, $source->resolve_join(
119       $join, $attrs->{alias}, $attrs->{seen_join})
120     );
121   }
122   
123   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
124   $attrs->{order_by} = [ $attrs->{order_by} ] if
125     $attrs->{order_by} and !ref($attrs->{order_by});
126   $attrs->{order_by} ||= [];
127
128   my $collapse = $attrs->{collapse} || {};
129   if (my $prefetch = delete $attrs->{prefetch}) {
130     my @pre_order;
131     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
132       if ( ref $p eq 'HASH' ) {
133         foreach my $key (keys %$p) {
134           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
135             unless $seen{$key};
136         }
137       } else {
138         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
139             unless $seen{$p};
140       }
141       my @prefetch = $source->resolve_prefetch(
142            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
143       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
144       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
145     }
146     push(@{$attrs->{order_by}}, @pre_order);
147   }
148   $attrs->{collapse} = $collapse;
149 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
150
151   if ($attrs->{page}) {
152     $attrs->{rows} ||= 10;
153     $attrs->{offset} ||= 0;
154     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
155   }
156
157   bless {
158     result_source => $source,
159     result_class => $attrs->{result_class} || $source->result_class,
160     cond => $attrs->{where},
161     from => $attrs->{from},
162     collapse => $collapse,
163     count => undef,
164     page => delete $attrs->{page},
165     pager => undef,
166     attrs => $attrs
167   }, $class;
168 }
169
170 =head2 search
171
172 =over 4
173
174 =item Arguments: $cond, \%attrs?
175
176 =item Return Value: $resultset (scalar context), @row_objs (list context)
177
178 =back
179
180   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
181   my $new_rs = $cd_rs->search({ year => 2005 });
182
183   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
184                  # year = 2005 OR year = 2004
185
186 If you need to pass in additional attributes but no additional condition,
187 call it as C<search(undef, \%attrs)>.
188
189   # "SELECT name, artistid FROM $artist_table"
190   my @all_artists = $schema->resultset('Artist')->search(undef, {
191     columns => [qw/name artistid/],
192   });
193
194 =cut
195
196 sub search {
197   my $self = shift;
198     
199   my $attrs = { %{$self->{attrs}} };
200   my $having = delete $attrs->{having};
201   $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
202
203   my $where = (@_
204                 ? ((@_ == 1 || ref $_[0] eq "HASH")
205                     ? shift
206                     : ((@_ % 2)
207                         ? $self->throw_exception(
208                             "Odd number of arguments to search")
209                         : {@_}))
210                 : undef());
211   if (defined $where) {
212     $attrs->{where} = (defined $attrs->{where}
213               ? { '-and' =>
214                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
215                       $where, $attrs->{where} ] }
216               : $where);
217   }
218
219   if (defined $having) {
220     $attrs->{having} = (defined $attrs->{having}
221               ? { '-and' =>
222                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
223                       $having, $attrs->{having} ] }
224               : $having);
225   }
226
227   my $rs = (ref $self)->new($self->result_source, $attrs);
228
229   unless (@_) { # no search, effectively just a clone
230     my $rows = $self->get_cache;
231     if ($rows) {
232       $rs->set_cache($rows);
233     }
234   }
235   
236   return (wantarray ? $rs->all : $rs);
237 }
238
239 =head2 search_literal
240
241 =over 4
242
243 =item Arguments: $sql_fragment, @bind_values
244
245 =item Return Value: $resultset (scalar context), @row_objs (list context)
246
247 =back
248
249   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
250   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
251
252 Pass a literal chunk of SQL to be added to the conditional part of the
253 resultset query.
254
255 =cut
256
257 sub search_literal {
258   my ($self, $cond, @vals) = @_;
259   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
260   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
261   return $self->search(\$cond, $attrs);
262 }
263
264 =head2 find
265
266 =over 4
267
268 =item Arguments: @values | \%cols, \%attrs?
269
270 =item Return Value: $row_object
271
272 =back
273
274 Finds a row based on its primary key or unique constraint. For example:
275
276   my $cd = $schema->resultset('CD')->find(5);
277
278 Also takes an optional C<key> attribute, to search by a specific key or unique
279 constraint. For example:
280
281   my $cd = $schema->resultset('CD')->find(
282     {
283       artist => 'Massive Attack',
284       title  => 'Mezzanine',
285     },
286     { key => 'artist_title' }
287   );
288
289 See also L</find_or_create> and L</update_or_create>.
290
291 =cut
292
293 sub find {
294   my ($self, @vals) = @_;
295   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
296
297   my @cols = $self->result_source->primary_columns;
298   if (exists $attrs->{key}) {
299     my %uniq = $self->result_source->unique_constraints;
300     $self->throw_exception(
301       "Unknown key $attrs->{key} on '" . $self->result_source->name . "'"
302     ) unless exists $uniq{$attrs->{key}};
303     @cols = @{ $uniq{$attrs->{key}} };
304   }
305   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
306   $self->throw_exception(
307     "Can't find unless a primary key or unique constraint is defined"
308   ) unless @cols;
309
310   my $query;
311   if (ref $vals[0] eq 'HASH') {
312     $query = { %{$vals[0]} };
313   } elsif (@cols == @vals) {
314     $query = {};
315     @{$query}{@cols} = @vals;
316   } else {
317     $query = {@vals};
318   }
319   foreach my $key (grep { ! m/\./ } keys %$query) {
320     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
321   }
322   #warn Dumper($query);
323   
324   if (keys %$attrs) {
325       my $rs = $self->search($query,$attrs);
326       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
327   } else {
328       return keys %{$self->{collapse}} ?
329         $self->search($query)->next :
330         $self->single($query);
331   }
332 }
333
334 =head2 search_related
335
336 =over 4
337
338 =item Arguments: $cond, \%attrs?
339
340 =item Return Value: $new_resultset
341
342 =back
343
344   $new_rs = $cd_rs->search_related('artist', {
345     name => 'Emo-R-Us',
346   });
347
348 Searches the specified relationship, optionally specifying a condition and
349 attributes for matching records. See L</ATTRIBUTES> for more information.
350
351 =cut
352
353 sub search_related {
354   return shift->related_resultset(shift)->search(@_);
355 }
356
357 =head2 cursor
358
359 =over 4
360
361 =item Arguments: none
362
363 =item Return Value: $cursor
364
365 =back
366
367 Returns a storage-driven cursor to the given resultset. See
368 L<DBIx::Class::Cursor> for more information.
369
370 =cut
371
372 sub cursor {
373   my ($self) = @_;
374   my $attrs = { %{$self->{attrs}} };
375   return $self->{cursor}
376     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
377           $attrs->{where},$attrs);
378 }
379
380 =head2 single
381
382 =over 4
383
384 =item Arguments: $cond?
385
386 =item Return Value: $row_object?
387
388 =back
389
390   my $cd = $schema->resultset('CD')->single({ year => 2001 });
391
392 Inflates the first result without creating a cursor if the resultset has
393 any records in it; if not returns nothing. Used by find() as an optimisation.
394
395 =cut
396
397 sub single {
398   my ($self, $where) = @_;
399   my $attrs = { %{$self->{attrs}} };
400   if ($where) {
401     if (defined $attrs->{where}) {
402       $attrs->{where} = {
403         '-and' =>
404             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
405                $where, delete $attrs->{where} ]
406       };
407     } else {
408       $attrs->{where} = $where;
409     }
410   }
411   my @data = $self->result_source->storage->select_single(
412           $self->{from}, $attrs->{select},
413           $attrs->{where},$attrs);
414   return (@data ? $self->_construct_object(@data) : ());
415 }
416
417
418 =head2 search_like
419
420 =over 4
421
422 =item Arguments: $cond, \%attrs?
423
424 =item Return Value: $resultset (scalar context), @row_objs (list context)
425
426 =back
427
428   # WHERE title LIKE '%blue%'
429   $cd_rs = $rs->search_like({ title => '%blue%'});
430
431 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
432 that this is simply a convenience method. You most likely want to use
433 L</search> with specific operators.
434
435 For more information, see L<DBIx::Class::Manual::Cookbook>.
436
437 =cut
438
439 sub search_like {
440   my $class = shift;
441   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
442   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
443   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
444   return $class->search($query, { %$attrs });
445 }
446
447 =head2 slice
448
449 =over 4
450
451 =item Arguments: $first, $last
452
453 =item Return Value: $resultset (scalar context), @row_objs (list context)
454
455 =back
456
457 Returns a resultset or object list representing a subset of elements from the
458 resultset slice is called on. Indexes are from 0, i.e., to get the first
459 three records, call:
460
461   my ($one, $two, $three) = $rs->slice(0, 2);
462
463 =cut
464
465 sub slice {
466   my ($self, $min, $max) = @_;
467   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
468   $attrs->{offset} = $self->{attrs}{offset} || 0;
469   $attrs->{offset} += $min;
470   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
471   return $self->search(undef(), $attrs);
472   #my $slice = (ref $self)->new($self->result_source, $attrs);
473   #return (wantarray ? $slice->all : $slice);
474 }
475
476 =head2 next
477
478 =over 4
479
480 =item Arguments: none
481
482 =item Return Value: $result?
483
484 =back
485
486 Returns the next element in the resultset (C<undef> is there is none).
487
488 Can be used to efficiently iterate over records in the resultset:
489
490   my $rs = $schema->resultset('CD')->search;
491   while (my $cd = $rs->next) {
492     print $cd->title;
493   }
494
495 Note that you need to store the resultset object, and call C<next> on it. 
496 Calling C<< resultset('Table')->next >> repeatedly will always return the
497 first record from the resultset.
498
499 =cut
500
501 sub next {
502   my ($self) = @_;
503   if (my $cache = $self->get_cache) {
504     $self->{all_cache_position} ||= 0;
505     return $cache->[$self->{all_cache_position}++];
506   }
507   if ($self->{attrs}{cache}) {
508     $self->{all_cache_position} = 1;
509     return ($self->all)[0];
510   }
511   my @row = (exists $self->{stashed_row} ?
512                @{delete $self->{stashed_row}} :
513                $self->cursor->next
514   );
515 #  warn Dumper(\@row); use Data::Dumper;
516   return unless (@row);
517   return $self->_construct_object(@row);
518 }
519
520 sub _construct_object {
521   my ($self, @row) = @_;
522   my @as = @{ $self->{attrs}{as} };
523   
524   my $info = $self->_collapse_result(\@as, \@row);
525   
526   my $new = $self->result_class->inflate_result($self->result_source, @$info);
527   
528   $new = $self->{attrs}{record_filter}->($new)
529     if exists $self->{attrs}{record_filter};
530   return $new;
531 }
532
533 sub _collapse_result {
534   my ($self, $as, $row, $prefix) = @_;
535
536   my %const;
537
538   my @copy = @$row;
539   foreach my $this_as (@$as) {
540     my $val = shift @copy;
541     if (defined $prefix) {
542       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
543         my $remain = $1;
544         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
545         $const{$1||''}{$2} = $val;
546       }
547     } else {
548       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
549       $const{$1||''}{$2} = $val;
550     }
551   }
552
553   my $info = [ {}, {} ];
554   foreach my $key (keys %const) {
555     if (length $key) {
556       my $target = $info;
557       my @parts = split(/\./, $key);
558       foreach my $p (@parts) {
559         $target = $target->[1]->{$p} ||= [];
560       }
561       $target->[0] = $const{$key};
562     } else {
563       $info->[0] = $const{$key};
564     }
565   }
566
567   my @collapse;
568   if (defined $prefix) {
569     @collapse = map {
570         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
571     } keys %{$self->{collapse}}
572   } else {
573     @collapse = keys %{$self->{collapse}};
574   };
575
576   if (@collapse) {
577     my ($c) = sort { length $a <=> length $b } @collapse;
578     my $target = $info;
579     foreach my $p (split(/\./, $c)) {
580       $target = $target->[1]->{$p} ||= [];
581     }
582     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
583     my @co_key = @{$self->{collapse}{$c_prefix}};
584     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
585     my $tree = $self->_collapse_result($as, $row, $c_prefix);
586     my (@final, @raw);
587     while ( !(grep {
588                 !defined($tree->[0]->{$_}) ||
589                 $co_check{$_} ne $tree->[0]->{$_}
590               } @co_key) ) {
591       push(@final, $tree);
592       last unless (@raw = $self->cursor->next);
593       $row = $self->{stashed_row} = \@raw;
594       $tree = $self->_collapse_result($as, $row, $c_prefix);
595     }
596     @$target = (@final ? @final : [ {}, {} ]);
597       # single empty result to indicate an empty prefetched has_many
598   }
599
600   return $info;
601 }
602
603 =head2 result_source
604
605 =over 4
606
607 =item Arguments: $result_source?
608
609 =item Return Value: $result_source
610
611 =back
612
613 An accessor for the primary ResultSource object from which this ResultSet
614 is derived.
615
616 =cut
617
618
619 =head2 count
620
621 =over 4
622
623 =item Arguments: $cond, \%attrs??
624
625 =item Return Value: $count
626
627 =back
628
629 Performs an SQL C<COUNT> with the same query as the resultset was built
630 with to find the number of elements. If passed arguments, does a search
631 on the resultset and counts the results of that.
632
633 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
634 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
635 not support C<DISTINCT> with multiple columns. If you are using such a
636 database, you should only use columns from the main table in your C<group_by>
637 clause.
638
639 =cut
640
641 sub count {
642   my $self = shift;
643   return $self->search(@_)->count if @_ and defined $_[0];
644   return scalar @{ $self->get_cache } if $self->get_cache;
645
646   my $count = $self->_count;
647   return 0 unless $count;
648
649   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
650   $count = $self->{attrs}{rows} if
651     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
652   return $count;
653 }
654
655 sub _count { # Separated out so pager can get the full count
656   my $self = shift;
657   my $select = { count => '*' };
658   my $attrs = { %{ $self->{attrs} } };
659   if (my $group_by = delete $attrs->{group_by}) {
660     delete $attrs->{having};
661     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
662     # todo: try CONCAT for multi-column pk
663     my @pk = $self->result_source->primary_columns;
664     if (@pk == 1) {
665       foreach my $column (@distinct) {
666         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
667           @distinct = ($column);
668           last;
669         }
670       }
671     }
672
673     $select = { count => { distinct => \@distinct } };
674     #use Data::Dumper; die Dumper $select;
675   }
676
677   $attrs->{select} = $select;
678   $attrs->{as} = [qw/count/];
679
680   # offset, order by and page are not needed to count. record_filter is cdbi
681   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
682         
683   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
684   return $count;
685 }
686
687 =head2 count_literal
688
689 =over 4
690
691 =item Arguments: $sql_fragment, @bind_values
692
693 =item Return Value: $count
694
695 =back
696
697 Counts the results in a literal query. Equivalent to calling L</search_literal>
698 with the passed arguments, then L</count>.
699
700 =cut
701
702 sub count_literal { shift->search_literal(@_)->count; }
703
704 =head2 all
705
706 =over 4
707
708 =item Arguments: none
709
710 =item Return Value: @objects
711
712 =back
713
714 Returns all elements in the resultset. Called implicitly if the resultset
715 is returned in list context.
716
717 =cut
718
719 sub all {
720   my ($self) = @_;
721   return @{ $self->get_cache } if $self->get_cache;
722
723   my @obj;
724
725   if (keys %{$self->{collapse}}) {
726       # Using $self->cursor->all is really just an optimisation.
727       # If we're collapsing has_many prefetches it probably makes
728       # very little difference, and this is cleaner than hacking
729       # _construct_object to survive the approach
730     $self->cursor->reset;
731     my @row = $self->cursor->next;
732     while (@row) {
733       push(@obj, $self->_construct_object(@row));
734       @row = (exists $self->{stashed_row}
735                ? @{delete $self->{stashed_row}}
736                : $self->cursor->next);
737     }
738   } else {
739     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
740   }
741
742   $self->set_cache(\@obj) if $self->{attrs}{cache};
743   return @obj;
744 }
745
746 =head2 reset
747
748 =over 4
749
750 =item Arguments: none
751
752 =item Return Value: $self
753
754 =back
755
756 Resets the resultset's cursor, so you can iterate through the elements again.
757
758 =cut
759
760 sub reset {
761   my ($self) = @_;
762   $self->{all_cache_position} = 0;
763   $self->cursor->reset;
764   return $self;
765 }
766
767 =head2 first
768
769 =over 4
770
771 =item Arguments: none
772
773 =item Return Value: $object?
774
775 =back
776
777 Resets the resultset and returns an object for the first result (if the
778 resultset returns anything).
779
780 =cut
781
782 sub first {
783   return $_[0]->reset->next;
784 }
785
786 # _cond_for_update_delete
787 #
788 # update/delete require the condition to be modified to handle
789 # the differing SQL syntax available.  This transforms the $self->{cond}
790 # appropriately, returning the new condition.
791
792 sub _cond_for_update_delete {
793   my ($self) = @_;
794   my $cond = {};
795
796   if (!ref($self->{cond})) {
797     # No-op. No condition, we're updating/deleting everything
798   }
799   elsif (ref $self->{cond} eq 'ARRAY') {
800     $cond = [
801       map {
802         my %hash;
803         foreach my $key (keys %{$_}) {
804           $key =~ /([^.]+)$/;
805           $hash{$1} = $_->{$key};
806         }
807         \%hash;
808       } @{$self->{cond}}
809     ];
810   }
811   elsif (ref $self->{cond} eq 'HASH') {
812     if ((keys %{$self->{cond}})[0] eq '-and') {
813       $cond->{-and} = [];
814
815       my @cond = @{$self->{cond}{-and}};
816       for (my $i = 0; $i < @cond - 1; $i++) {
817         my $entry = $cond[$i];
818
819         my %hash;
820         if (ref $entry eq 'HASH') {
821           foreach my $key (keys %{$entry}) {
822             $key =~ /([^.]+)$/;
823             $hash{$1} = $entry->{$key};
824           }
825         }
826         else {
827           $entry =~ /([^.]+)$/;
828           $hash{$entry} = $cond[++$i];
829         }
830
831         push @{$cond->{-and}}, \%hash;
832       }
833     }
834     else {
835       foreach my $key (keys %{$self->{cond}}) {
836         $key =~ /([^.]+)$/;
837         $cond->{$1} = $self->{cond}{$key};
838       }
839     }
840   }
841   else {
842     $self->throw_exception(
843       "Can't update/delete on resultset with condition unless hash or array"
844     );
845   }
846
847   return $cond;
848 }
849
850
851 =head2 update
852
853 =over 4
854
855 =item Arguments: \%values
856
857 =item Return Value: $storage_rv
858
859 =back
860
861 Sets the specified columns in the resultset to the supplied values in a
862 single query. Return value will be true if the update succeeded or false
863 if no records were updated; exact type of success value is storage-dependent.
864
865 =cut
866
867 sub update {
868   my ($self, $values) = @_;
869   $self->throw_exception("Values for update must be a hash")
870     unless ref $values eq 'HASH';
871
872   my $cond = $self->_cond_for_update_delete;
873
874   return $self->result_source->storage->update(
875     $self->result_source->from, $values, $cond
876   );
877 }
878
879 =head2 update_all
880
881 =over 4
882
883 =item Arguments: \%values
884
885 =item Return Value: 1
886
887 =back
888
889 Fetches all objects and updates them one at a time. Note that C<update_all>
890 will run DBIC cascade triggers, while L</update> will not.
891
892 =cut
893
894 sub update_all {
895   my ($self, $values) = @_;
896   $self->throw_exception("Values for update must be a hash")
897     unless ref $values eq 'HASH';
898   foreach my $obj ($self->all) {
899     $obj->set_columns($values)->update;
900   }
901   return 1;
902 }
903
904 =head2 delete
905
906 =over 4
907
908 =item Arguments: none
909
910 =item Return Value: 1
911
912 =back
913
914 Deletes the contents of the resultset from its result source. Note that this
915 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
916 to run.
917
918 =cut
919
920 sub delete {
921   my ($self) = @_;
922   my $del = {};
923
924   my $cond = $self->_cond_for_update_delete;
925
926   $self->result_source->storage->delete($self->result_source->from, $cond);
927   return 1;
928 }
929
930 =head2 delete_all
931
932 =over 4
933
934 =item Arguments: none
935
936 =item Return Value: 1
937
938 =back
939
940 Fetches all objects and deletes them one at a time. Note that C<delete_all>
941 will run DBIC cascade triggers, while L</delete> will not.
942
943 =cut
944
945 sub delete_all {
946   my ($self) = @_;
947   $_->delete for $self->all;
948   return 1;
949 }
950
951 =head2 pager
952
953 =over 4
954
955 =item Arguments: none
956
957 =item Return Value: $pager
958
959 =back
960
961 Return Value a L<Data::Page> object for the current resultset. Only makes
962 sense for queries with a C<page> attribute.
963
964 =cut
965
966 sub pager {
967   my ($self) = @_;
968   my $attrs = $self->{attrs};
969   $self->throw_exception("Can't create pager for non-paged rs")
970     unless $self->{page};
971   $attrs->{rows} ||= 10;
972   return $self->{pager} ||= Data::Page->new(
973     $self->_count, $attrs->{rows}, $self->{page});
974 }
975
976 =head2 page
977
978 =over 4
979
980 =item Arguments: $page_number
981
982 =item Return Value: $rs
983
984 =back
985
986 Returns a resultset for the $page_number page of the resultset on which page
987 is called, where each page contains a number of rows equal to the 'rows'
988 attribute set on the resultset (10 by default).
989
990 =cut
991
992 sub page {
993   my ($self, $page) = @_;
994   my $attrs = { %{$self->{attrs}} };
995   $attrs->{page} = $page;
996   return (ref $self)->new($self->result_source, $attrs);
997 }
998
999 =head2 new_result
1000
1001 =over 4
1002
1003 =item Arguments: \%vals
1004
1005 =item Return Value: $object
1006
1007 =back
1008
1009 Creates an object in the resultset's result class and returns it.
1010
1011 =cut
1012
1013 sub new_result {
1014   my ($self, $values) = @_;
1015   $self->throw_exception( "new_result needs a hash" )
1016     unless (ref $values eq 'HASH');
1017   $self->throw_exception(
1018     "Can't abstract implicit construct, condition not a hash"
1019   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1020   my %new = %$values;
1021   my $alias = $self->{attrs}{alias};
1022   foreach my $key (keys %{$self->{cond}||{}}) {
1023     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1024   }
1025   my $obj = $self->result_class->new(\%new);
1026   $obj->result_source($self->result_source) if $obj->can('result_source');
1027   return $obj;
1028 }
1029
1030 =head2 create
1031
1032 =over 4
1033
1034 =item Arguments: \%vals
1035
1036 =item Return Value: $object
1037
1038 =back
1039
1040 Inserts a record into the resultset and returns the object representing it.
1041
1042 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1043
1044 =cut
1045
1046 sub create {
1047   my ($self, $attrs) = @_;
1048   $self->throw_exception( "create needs a hashref" )
1049     unless ref $attrs eq 'HASH';
1050   return $self->new_result($attrs)->insert;
1051 }
1052
1053 =head2 find_or_create
1054
1055 =over 4
1056
1057 =item Arguments: \%vals, \%attrs?
1058
1059 =item Return Value: $object
1060
1061 =back
1062
1063   $class->find_or_create({ key => $val, ... });
1064
1065 Searches for a record matching the search condition; if it doesn't find one,
1066 creates one and returns that instead.
1067
1068   my $cd = $schema->resultset('CD')->find_or_create({
1069     cdid   => 5,
1070     artist => 'Massive Attack',
1071     title  => 'Mezzanine',
1072     year   => 2005,
1073   });
1074
1075 Also takes an optional C<key> attribute, to search by a specific key or unique
1076 constraint. For example:
1077
1078   my $cd = $schema->resultset('CD')->find_or_create(
1079     {
1080       artist => 'Massive Attack',
1081       title  => 'Mezzanine',
1082     },
1083     { key => 'artist_title' }
1084   );
1085
1086 See also L</find> and L</update_or_create>.
1087
1088 =cut
1089
1090 sub find_or_create {
1091   my $self     = shift;
1092   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1093   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1094   my $exists   = $self->find($hash, $attrs);
1095   return defined $exists ? $exists : $self->create($hash);
1096 }
1097
1098 =head2 update_or_create
1099
1100 =over 4
1101
1102 =item Arguments: \%col_values, { key => $unique_constraint }?
1103
1104 =item Return Value: $object
1105
1106 =back
1107
1108   $class->update_or_create({ col => $val, ... });
1109
1110 First, searches for an existing row matching one of the unique constraints
1111 (including the primary key) on the source of this resultset. If a row is
1112 found, updates it with the other given column values. Otherwise, creates a new
1113 row.
1114
1115 Takes an optional C<key> attribute to search on a specific unique constraint.
1116 For example:
1117
1118   # In your application
1119   my $cd = $schema->resultset('CD')->update_or_create(
1120     {
1121       artist => 'Massive Attack',
1122       title  => 'Mezzanine',
1123       year   => 1998,
1124     },
1125     { key => 'artist_title' }
1126   );
1127
1128 If no C<key> is specified, it searches on all unique constraints defined on the
1129 source, including the primary key.
1130
1131 If the C<key> is specified as C<primary>, it searches only on the primary key.
1132
1133 See also L</find> and L</find_or_create>.
1134
1135 =cut
1136
1137 sub update_or_create {
1138   my $self = shift;
1139   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1140   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
1141
1142   my %unique_constraints = $self->result_source->unique_constraints;
1143   my @constraint_names   = (exists $attrs->{key}
1144                             ? ($attrs->{key})
1145                             : keys %unique_constraints);
1146
1147   my @unique_hashes;
1148   foreach my $name (@constraint_names) {
1149     my @unique_cols = @{ $unique_constraints{$name} };
1150     my %unique_hash =
1151       map  { $_ => $hash->{$_} }
1152       grep { exists $hash->{$_} }
1153       @unique_cols;
1154
1155     push @unique_hashes, \%unique_hash
1156       if (scalar keys %unique_hash == scalar @unique_cols);
1157   }
1158
1159   if (@unique_hashes) {
1160     my $row = $self->single(\@unique_hashes);
1161     if (defined $row) {
1162       $row->update($hash);
1163       return $row;
1164     }
1165   }
1166
1167   return $self->create($hash);
1168 }
1169
1170 =head2 get_cache
1171
1172 =over 4
1173
1174 =item Arguments: none
1175
1176 =item Return Value: \@cache_objects?
1177
1178 =back
1179
1180 Gets the contents of the cache for the resultset, if the cache is set.
1181
1182 =cut
1183
1184 sub get_cache {
1185   shift->{all_cache};
1186 }
1187
1188 =head2 set_cache
1189
1190 =over 4
1191
1192 =item Arguments: \@cache_objects
1193
1194 =item Return Value: \@cache_objects
1195
1196 =back
1197
1198 Sets the contents of the cache for the resultset. Expects an arrayref
1199 of objects of the same class as those produced by the resultset. Note that
1200 if the cache is set the resultset will return the cached objects rather
1201 than re-querying the database even if the cache attr is not set.
1202
1203 =cut
1204
1205 sub set_cache {
1206   my ( $self, $data ) = @_;
1207   $self->throw_exception("set_cache requires an arrayref")
1208     if defined($data) && (ref $data ne 'ARRAY');
1209   $self->{all_cache} = $data;
1210 }
1211
1212 =head2 clear_cache
1213
1214 =over 4
1215
1216 =item Arguments: none
1217
1218 =item Return Value: []
1219
1220 =back
1221
1222 Clears the cache for the resultset.
1223
1224 =cut
1225
1226 sub clear_cache {
1227   shift->set_cache(undef);
1228 }
1229
1230 =head2 related_resultset
1231
1232 =over 4
1233
1234 =item Arguments: $relationship_name
1235
1236 =item Return Value: $resultset
1237
1238 =back
1239
1240 Returns a related resultset for the supplied relationship name.
1241
1242   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1243
1244 =cut
1245
1246 sub related_resultset {
1247   my ( $self, $rel ) = @_;
1248   $self->{related_resultsets} ||= {};
1249   return $self->{related_resultsets}{$rel} ||= do {
1250       #warn "fetching related resultset for rel '$rel'";
1251       my $rel_obj = $self->result_source->relationship_info($rel);
1252       $self->throw_exception(
1253         "search_related: result source '" . $self->result_source->name .
1254         "' has no such relationship ${rel}")
1255         unless $rel_obj; #die Dumper $self->{attrs};
1256
1257       my $rs = $self->search(undef, { join => $rel });
1258       my $alias = defined $rs->{attrs}{seen_join}{$rel}
1259                     && $rs->{attrs}{seen_join}{$rel} > 1
1260                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
1261                   : $rel;
1262
1263       $self->result_source->schema->resultset($rel_obj->{class}
1264            )->search( undef,
1265              { %{$rs->{attrs}},
1266                alias => $alias,
1267                select => undef,
1268                as => undef }
1269            );
1270   };
1271 }
1272
1273 =head2 throw_exception
1274
1275 See L<DBIx::Class::Schema/throw_exception> for details.
1276
1277 =cut
1278
1279 sub throw_exception {
1280   my $self=shift;
1281   $self->result_source->schema->throw_exception(@_);
1282 }
1283
1284 # XXX: FIXME: Attributes docs need clearing up
1285
1286 =head1 ATTRIBUTES
1287
1288 The resultset takes various attributes that modify its behavior. Here's an
1289 overview of them:
1290
1291 =head2 order_by
1292
1293 =over 4
1294
1295 =item Value: ($order_by | \@order_by)
1296
1297 =back
1298
1299 Which column(s) to order the results by. This is currently passed
1300 through directly to SQL, so you can give e.g. C<year DESC> for a
1301 descending order on the column `year'.
1302
1303 =head2 columns
1304
1305 =over 4
1306
1307 =item Value: \@columns
1308
1309 =back
1310
1311 Shortcut to request a particular set of columns to be retrieved.  Adds
1312 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1313 from that, then auto-populates C<as> from C<select> as normal. (You may also
1314 use the C<cols> attribute, as in earlier versions of DBIC.)
1315
1316 =head2 include_columns
1317
1318 =over 4
1319
1320 =item Value: \@columns
1321
1322 =back
1323
1324 Shortcut to include additional columns in the returned results - for example
1325
1326   $schema->resultset('CD')->search(undef, {
1327     include_columns => ['artist.name'],
1328     join => ['artist']
1329   });
1330
1331 would return all CDs and include a 'name' column to the information
1332 passed to object inflation
1333
1334 =head2 select
1335
1336 =over 4
1337
1338 =item Value: \@select_columns
1339
1340 =back
1341
1342 Indicates which columns should be selected from the storage. You can use
1343 column names, or in the case of RDBMS back ends, function or stored procedure
1344 names:
1345
1346   $rs = $schema->resultset('Employee')->search(undef, {
1347     select => [
1348       'name',
1349       { count => 'employeeid' },
1350       { sum => 'salary' }
1351     ]
1352   });
1353
1354 When you use function/stored procedure names and do not supply an C<as>
1355 attribute, the column names returned are storage-dependent. E.g. MySQL would
1356 return a column named C<count(employeeid)> in the above example.
1357
1358 =head2 as
1359
1360 =over 4
1361
1362 =item Value: \@inflation_names
1363
1364 =back
1365
1366 Indicates column names for object inflation. This is used in conjunction with
1367 C<select>, usually when C<select> contains one or more function or stored
1368 procedure names:
1369
1370   $rs = $schema->resultset('Employee')->search(undef, {
1371     select => [
1372       'name',
1373       { count => 'employeeid' }
1374     ],
1375     as => ['name', 'employee_count'],
1376   });
1377
1378   my $employee = $rs->first(); # get the first Employee
1379
1380 If the object against which the search is performed already has an accessor
1381 matching a column name specified in C<as>, the value can be retrieved using
1382 the accessor as normal:
1383
1384   my $name = $employee->name();
1385
1386 If on the other hand an accessor does not exist in the object, you need to
1387 use C<get_column> instead:
1388
1389   my $employee_count = $employee->get_column('employee_count');
1390
1391 You can create your own accessors if required - see
1392 L<DBIx::Class::Manual::Cookbook> for details.
1393
1394 =head2 join
1395
1396 =over 4
1397
1398 =item Value: ($rel_name | \@rel_names | \%rel_names)
1399
1400 =back
1401
1402 Contains a list of relationships that should be joined for this query.  For
1403 example:
1404
1405   # Get CDs by Nine Inch Nails
1406   my $rs = $schema->resultset('CD')->search(
1407     { 'artist.name' => 'Nine Inch Nails' },
1408     { join => 'artist' }
1409   );
1410
1411 Can also contain a hash reference to refer to the other relation's relations.
1412 For example:
1413
1414   package MyApp::Schema::Track;
1415   use base qw/DBIx::Class/;
1416   __PACKAGE__->table('track');
1417   __PACKAGE__->add_columns(qw/trackid cd position title/);
1418   __PACKAGE__->set_primary_key('trackid');
1419   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1420   1;
1421
1422   # In your application
1423   my $rs = $schema->resultset('Artist')->search(
1424     { 'track.title' => 'Teardrop' },
1425     {
1426       join     => { cd => 'track' },
1427       order_by => 'artist.name',
1428     }
1429   );
1430
1431 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1432 similarly for a third time). For e.g.
1433
1434   my $rs = $schema->resultset('Artist')->search({
1435     'cds.title'   => 'Down to Earth',
1436     'cds_2.title' => 'Popular',
1437   }, {
1438     join => [ qw/cds cds/ ],
1439   });
1440
1441 will return a set of all artists that have both a cd with title 'Down
1442 to Earth' and a cd with title 'Popular'.
1443
1444 If you want to fetch related objects from other tables as well, see C<prefetch>
1445 below.
1446
1447 =head2 prefetch
1448
1449 =over 4
1450
1451 =item Value: ($rel_name | \@rel_names | \%rel_names)
1452
1453 =back
1454
1455 Contains one or more relationships that should be fetched along with the main
1456 query (when they are accessed afterwards they will have already been
1457 "prefetched").  This is useful for when you know you will need the related
1458 objects, because it saves at least one query:
1459
1460   my $rs = $schema->resultset('Tag')->search(
1461     undef,
1462     {
1463       prefetch => {
1464         cd => 'artist'
1465       }
1466     }
1467   );
1468
1469 The initial search results in SQL like the following:
1470
1471   SELECT tag.*, cd.*, artist.* FROM tag
1472   JOIN cd ON tag.cd = cd.cdid
1473   JOIN artist ON cd.artist = artist.artistid
1474
1475 L<DBIx::Class> has no need to go back to the database when we access the
1476 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1477 case.
1478
1479 Simple prefetches will be joined automatically, so there is no need
1480 for a C<join> attribute in the above search. If you're prefetching to
1481 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1482 specify the join as well.
1483
1484 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1485 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1486 with an accessor type of 'single' or 'filter').
1487
1488 =head2 from
1489
1490 =over 4
1491
1492 =item Value: \@from_clause
1493
1494 =back
1495
1496 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1497 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1498 clauses.
1499
1500 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1501 C<join> will usually do what you need and it is strongly recommended that you
1502 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1503
1504 In simple terms, C<from> works as follows:
1505
1506     [
1507         { <alias> => <table>, -join_type => 'inner|left|right' }
1508         [] # nested JOIN (optional)
1509         { <table.column> => <foreign_table.foreign_key> }
1510     ]
1511
1512     JOIN
1513         <alias> <table>
1514         [JOIN ...]
1515     ON <table.column> = <foreign_table.foreign_key>
1516
1517 An easy way to follow the examples below is to remember the following:
1518
1519     Anything inside "[]" is a JOIN
1520     Anything inside "{}" is a condition for the enclosing JOIN
1521
1522 The following examples utilize a "person" table in a family tree application.
1523 In order to express parent->child relationships, this table is self-joined:
1524
1525     # Person->belongs_to('father' => 'Person');
1526     # Person->belongs_to('mother' => 'Person');
1527
1528 C<from> can be used to nest joins. Here we return all children with a father,
1529 then search against all mothers of those children:
1530
1531   $rs = $schema->resultset('Person')->search(
1532       undef,
1533       {
1534           alias => 'mother', # alias columns in accordance with "from"
1535           from => [
1536               { mother => 'person' },
1537               [
1538                   [
1539                       { child => 'person' },
1540                       [
1541                           { father => 'person' },
1542                           { 'father.person_id' => 'child.father_id' }
1543                       ]
1544                   ],
1545                   { 'mother.person_id' => 'child.mother_id' }
1546               ],
1547           ]
1548       },
1549   );
1550
1551   # Equivalent SQL:
1552   # SELECT mother.* FROM person mother
1553   # JOIN (
1554   #   person child
1555   #   JOIN person father
1556   #   ON ( father.person_id = child.father_id )
1557   # )
1558   # ON ( mother.person_id = child.mother_id )
1559
1560 The type of any join can be controlled manually. To search against only people
1561 with a father in the person table, we could explicitly use C<INNER JOIN>:
1562
1563     $rs = $schema->resultset('Person')->search(
1564         undef,
1565         {
1566             alias => 'child', # alias columns in accordance with "from"
1567             from => [
1568                 { child => 'person' },
1569                 [
1570                     { father => 'person', -join_type => 'inner' },
1571                     { 'father.id' => 'child.father_id' }
1572                 ],
1573             ]
1574         },
1575     );
1576
1577     # Equivalent SQL:
1578     # SELECT child.* FROM person child
1579     # INNER JOIN person father ON child.father_id = father.id
1580
1581 =head2 page
1582
1583 =over 4
1584
1585 =item Value: $page
1586
1587 =back
1588
1589 Makes the resultset paged and specifies the page to retrieve. Effectively
1590 identical to creating a non-pages resultset and then calling ->page($page)
1591 on it.
1592
1593 =head2 rows
1594
1595 =over 4
1596
1597 =item Value: $rows
1598
1599 =back
1600
1601 Specifes the maximum number of rows for direct retrieval or the number of
1602 rows per page if the page attribute or method is used.
1603
1604 =head2 group_by
1605
1606 =over 4
1607
1608 =item Value: \@columns
1609
1610 =back
1611
1612 A arrayref of columns to group by. Can include columns of joined tables.
1613
1614   group_by => [qw/ column1 column2 ... /]
1615
1616 =head2 having
1617
1618 =over 4
1619
1620 =item Value: $condition
1621
1622 =back
1623
1624 HAVING is a select statement attribute that is applied between GROUP BY and
1625 ORDER BY. It is applied to the after the grouping calculations have been
1626 done. 
1627
1628   having => { 'count(employee)' => { '>=', 100 } }
1629
1630 =head2 distinct
1631
1632 =over 4
1633
1634 =item Value: (0 | 1)
1635
1636 =back
1637
1638 Set to 1 to group by all columns.
1639
1640 =head2 cache
1641
1642 Set to 1 to cache search results. This prevents extra SQL queries if you
1643 revisit rows in your ResultSet:
1644
1645   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1646   
1647   while( my $artist = $resultset->next ) {
1648     ... do stuff ...
1649   }
1650
1651   $rs->first; # without cache, this would issue a query
1652
1653 By default, searches are not cached.
1654
1655 For more examples of using these attributes, see
1656 L<DBIx::Class::Manual::Cookbook>.
1657
1658 =cut
1659
1660 1;