Fix for delete on full-table resultsets
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =head3 Arguments: ($source, \%$attrs)
57
58 The resultset constructor. Takes a source object (usually a
59 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
60 below).  Does not perform any queries -- these are executed as needed by the
61 other methods.
62
63 Generally you won't need to construct a resultset manually.  You'll
64 automatically get one from e.g. a L</search> called in scalar context:
65
66   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
67
68 =cut
69
70 sub new {
71   my $class = shift;
72   return $class->new_result(@_) if ref $class;
73   
74   my ($source, $attrs) = @_;
75   weaken $source;
76   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
77   #use Data::Dumper; warn Dumper($attrs);
78   my $alias = ($attrs->{alias} ||= 'me');
79   
80   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
81   delete $attrs->{as} if $attrs->{columns};
82   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
83   $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
84     if $attrs->{columns};
85   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
86   if (my $include = delete $attrs->{include_columns}) {
87     push(@{$attrs->{select}}, @$include);
88     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
89   }
90   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
91
92   $attrs->{from} ||= [ { $alias => $source->from } ];
93   $attrs->{seen_join} ||= {};
94   my %seen;
95   if (my $join = delete $attrs->{join}) {
96     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
97       if (ref $j eq 'HASH') {
98         $seen{$_} = 1 foreach keys %$j;
99       } else {
100         $seen{$j} = 1;
101       }
102     }
103     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
104   }
105   
106   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
107   $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
108   $attrs->{order_by} ||= [];
109
110   my $collapse = $attrs->{collapse} || {};
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
114       if ( ref $p eq 'HASH' ) {
115         foreach my $key (keys %$p) {
116           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
117             unless $seen{$key};
118         }
119       } else {
120         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
121             unless $seen{$p};
122       }
123       my @prefetch = $source->resolve_prefetch(
124            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
125       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
126       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
127     }
128     push(@{$attrs->{order_by}}, @pre_order);
129   }
130   $attrs->{collapse} = $collapse;
131 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
132
133   if ($attrs->{page}) {
134     $attrs->{rows} ||= 10;
135     $attrs->{offset} ||= 0;
136     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
137   }
138
139   bless {
140     result_source => $source,
141     result_class => $attrs->{result_class} || $source->result_class,
142     cond => $attrs->{where},
143     from => $attrs->{from},
144     collapse => $collapse,
145     count => undef,
146     page => delete $attrs->{page},
147     pager => undef,
148     attrs => $attrs
149   }, $class;
150 }
151
152 =head2 search
153
154   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
155   my $new_rs = $rs->search({ foo => 3 });
156
157 If you need to pass in additional attributes but no additional condition,
158 call it as C<search(undef, \%attrs);>.
159
160   # "SELECT foo, bar FROM $class_table"
161   my @all = $class->search(undef, { columns => [qw/foo bar/] });
162
163 =cut
164
165 sub search {
166   my $self = shift;
167
168   my $rs;
169   if( @_ ) {
170     
171     my $attrs = { %{$self->{attrs}} };
172     my $having = delete $attrs->{having};
173     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
174
175     my $where = (@_
176                   ? ((@_ == 1 || ref $_[0] eq "HASH")
177                       ? shift
178                       : ((@_ % 2)
179                           ? $self->throw_exception(
180                               "Odd number of arguments to search")
181                           : {@_}))
182                   : undef());
183     if (defined $where) {
184       $attrs->{where} = (defined $attrs->{where}
185                 ? { '-and' =>
186                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
187                         $where, $attrs->{where} ] }
188                 : $where);
189     }
190
191     if (defined $having) {
192       $attrs->{having} = (defined $attrs->{having}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $having, $attrs->{having} ] }
196                 : $having);
197     }
198
199     $rs = (ref $self)->new($self->result_source, $attrs);
200   }
201   else {
202     $rs = $self;
203     $rs->reset;
204   }
205   return (wantarray ? $rs->all : $rs);
206 }
207
208 =head2 search_literal
209
210   my @obj    = $rs->search_literal($literal_where_cond, @bind);
211   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
212
213 Pass a literal chunk of SQL to be added to the conditional part of the
214 resultset.
215
216 =cut
217
218 sub search_literal {
219   my ($self, $cond, @vals) = @_;
220   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
221   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
222   return $self->search(\$cond, $attrs);
223 }
224
225 =head2 find
226
227 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
228
229 Finds a row based on its primary key or unique constraint. For example:
230
231   my $cd = $schema->resultset('CD')->find(5);
232
233 Also takes an optional C<key> attribute, to search by a specific key or unique
234 constraint. For example:
235
236   my $cd = $schema->resultset('CD')->find(
237     {
238       artist => 'Massive Attack',
239       title  => 'Mezzanine',
240     },
241     { key => 'artist_title' }
242   );
243
244 See also L</find_or_create> and L</update_or_create>.
245
246 =cut
247
248 sub find {
249   my ($self, @vals) = @_;
250   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
251
252   my @cols = $self->result_source->primary_columns;
253   if (exists $attrs->{key}) {
254     my %uniq = $self->result_source->unique_constraints;
255     $self->throw_exception( "Unknown key $attrs->{key} on $self->name" )
256       unless exists $uniq{$attrs->{key}};
257     @cols = @{ $uniq{$attrs->{key}} };
258   }
259   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
260   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
261     unless @cols;
262
263   my $query;
264   if (ref $vals[0] eq 'HASH') {
265     $query = { %{$vals[0]} };
266   } elsif (@cols == @vals) {
267     $query = {};
268     @{$query}{@cols} = @vals;
269   } else {
270     $query = {@vals};
271   }
272   foreach my $key (grep { ! m/\./ } keys %$query) {
273     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
274   }
275   #warn Dumper($query);
276   
277   if (keys %$attrs) {
278       my $rs = $self->search($query,$attrs);
279       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
280   } else {
281       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
282   }
283 }
284
285 =head2 search_related
286
287   $rs->search_related('relname', $cond?, $attrs?);
288
289 Search the specified relationship. Optionally specify a condition for matching
290 records.
291
292 =cut
293
294 sub search_related {
295   return shift->related_resultset(shift)->search(@_);
296 }
297
298 =head2 cursor
299
300 Returns a storage-driven cursor to the given resultset.
301
302 =cut
303
304 sub cursor {
305   my ($self) = @_;
306   my $attrs = { %{$self->{attrs}} };
307   return $self->{cursor}
308     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
309           $attrs->{where},$attrs);
310 }
311
312 =head2 single
313
314 Inflates the first result without creating a cursor
315
316 =cut
317
318 sub single {
319   my ($self, $where) = @_;
320   my $attrs = { %{$self->{attrs}} };
321   if ($where) {
322     if (defined $attrs->{where}) {
323       $attrs->{where} = {
324         '-and' => 
325             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
326                $where, delete $attrs->{where} ]
327       };
328     } else {
329       $attrs->{where} = $where;
330     }
331   }
332   my @data = $self->result_source->storage->select_single(
333           $self->{from}, $attrs->{select},
334           $attrs->{where},$attrs);
335   return (@data ? $self->_construct_object(@data) : ());
336 }
337
338
339 =head2 search_like
340
341 Perform a search, but use C<LIKE> instead of equality as the condition. Note
342 that this is simply a convenience method; you most likely want to use
343 L</search> with specific operators.
344
345 For more information, see L<DBIx::Class::Manual::Cookbook>.
346
347 =cut
348
349 sub search_like {
350   my $class = shift;
351   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
352   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
353   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
354   return $class->search($query, { %$attrs });
355 }
356
357 =head2 slice
358
359 =head3 Arguments: ($first, $last)
360
361 Returns a subset of elements from the resultset.
362
363 =cut
364
365 sub slice {
366   my ($self, $min, $max) = @_;
367   my $attrs = { %{ $self->{attrs} || {} } };
368   $attrs->{offset} ||= 0;
369   $attrs->{offset} += $min;
370   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
371   my $slice = (ref $self)->new($self->result_source, $attrs);
372   return (wantarray ? $slice->all : $slice);
373 }
374
375 =head2 next
376
377 Returns the next element in the resultset (C<undef> is there is none).
378
379 Can be used to efficiently iterate over records in the resultset:
380
381   my $rs = $schema->resultset('CD')->search;
382   while (my $cd = $rs->next) {
383     print $cd->title;
384   }
385
386 =cut
387
388 sub next {
389   my ($self) = @_;
390   if (@{$self->{all_cache} || []}) {
391     $self->{all_cache_position} ||= 0;
392     return $self->{all_cache}->[$self->{all_cache_position}++];
393   }
394   if ($self->{attrs}{cache}) {
395     $self->{all_cache_position} = 1;
396     return ($self->all)[0];
397   }
398   my @row = (exists $self->{stashed_row}
399                ? @{delete $self->{stashed_row}}
400                : $self->cursor->next);
401 #  warn Dumper(\@row); use Data::Dumper;
402   return unless (@row);
403   return $self->_construct_object(@row);
404 }
405
406 sub _construct_object {
407   my ($self, @row) = @_;
408   my @as = @{ $self->{attrs}{as} };
409   
410   my $info = $self->_collapse_result(\@as, \@row);
411   
412   my $new = $self->result_class->inflate_result($self->result_source, @$info);
413   
414   $new = $self->{attrs}{record_filter}->($new)
415     if exists $self->{attrs}{record_filter};
416   return $new;
417 }
418
419 sub _collapse_result {
420   my ($self, $as, $row, $prefix) = @_;
421
422   my %const;
423
424   my @copy = @$row;
425   foreach my $this_as (@$as) {
426     my $val = shift @copy;
427     if (defined $prefix) {
428       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
429         my $remain = $1;
430         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
431         $const{$1||''}{$2} = $val;
432       }
433     } else {
434       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
435       $const{$1||''}{$2} = $val;
436     }
437   }
438
439   my $info = [ {}, {} ];
440   foreach my $key (keys %const) {
441     if (length $key) {
442       my $target = $info;
443       my @parts = split(/\./, $key);
444       foreach my $p (@parts) {
445         $target = $target->[1]->{$p} ||= [];
446       }
447       $target->[0] = $const{$key};
448     } else {
449       $info->[0] = $const{$key};
450     }
451   }
452
453   my @collapse = (defined($prefix)
454                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
455                        keys %{$self->{collapse}})
456                    : keys %{$self->{collapse}});
457   if (@collapse) {
458     my ($c) = sort { length $a <=> length $b } @collapse;
459     my $target = $info;
460     foreach my $p (split(/\./, $c)) {
461       $target = $target->[1]->{$p} ||= [];
462     }
463     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
464     my @co_key = @{$self->{collapse}{$c_prefix}};
465     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
466     my $tree = $self->_collapse_result($as, $row, $c_prefix);
467     my (@final, @raw);
468     while ( !(grep {
469                 !defined($tree->[0]->{$_})
470                 || $co_check{$_} ne $tree->[0]->{$_}
471               } @co_key) ) {
472       push(@final, $tree);
473       last unless (@raw = $self->cursor->next);
474       $row = $self->{stashed_row} = \@raw;
475       $tree = $self->_collapse_result($as, $row, $c_prefix);
476       #warn Data::Dumper::Dumper($tree, $row);
477     }
478     @$target = @final;
479   }
480
481   return $info;
482 }
483
484 =head2 result_source
485
486 Returns a reference to the result source for this recordset.
487
488 =cut
489
490
491 =head2 count
492
493 Performs an SQL C<COUNT> with the same query as the resultset was built
494 with to find the number of elements. If passed arguments, does a search
495 on the resultset and counts the results of that.
496
497 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
498 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
499 not support C<DISTINCT> with multiple columns. If you are using such a
500 database, you should only use columns from the main table in your C<group_by>
501 clause.
502
503 =cut
504
505 sub count {
506   my $self = shift;
507   return $self->search(@_)->count if @_ and defined $_[0];
508   return scalar @{ $self->get_cache } if @{ $self->get_cache };
509
510   my $count = $self->_count;
511   return 0 unless $count;
512
513   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
514   $count = $self->{attrs}{rows} if
515     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
516   return $count;
517 }
518
519 sub _count { # Separated out so pager can get the full count
520   my $self = shift;
521   my $select = { count => '*' };
522   my $attrs = { %{ $self->{attrs} } };
523   if (my $group_by = delete $attrs->{group_by}) {
524     delete $attrs->{having};
525     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
526     # todo: try CONCAT for multi-column pk
527     my @pk = $self->result_source->primary_columns;
528     if (@pk == 1) {
529       foreach my $column (@distinct) {
530         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
531           @distinct = ($column);
532           last;
533         }
534       } 
535     }
536
537     $select = { count => { distinct => \@distinct } };
538     #use Data::Dumper; die Dumper $select;
539   }
540
541   $attrs->{select} = $select;
542   $attrs->{as} = [qw/count/];
543
544   # offset, order by and page are not needed to count. record_filter is cdbi
545   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
546         
547   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
548   return $count;
549 }
550
551 =head2 count_literal
552
553 Calls L</search_literal> with the passed arguments, then L</count>.
554
555 =cut
556
557 sub count_literal { shift->search_literal(@_)->count; }
558
559 =head2 all
560
561 Returns all elements in the resultset. Called implictly if the resultset
562 is returned in list context.
563
564 =cut
565
566 sub all {
567   my ($self) = @_;
568   return @{ $self->get_cache } if @{ $self->get_cache };
569
570   my @obj;
571
572   if (keys %{$self->{collapse}}) {
573       # Using $self->cursor->all is really just an optimisation.
574       # If we're collapsing has_many prefetches it probably makes
575       # very little difference, and this is cleaner than hacking
576       # _construct_object to survive the approach
577     $self->cursor->reset;
578     my @row = $self->cursor->next;
579     while (@row) {
580       push(@obj, $self->_construct_object(@row));
581       @row = (exists $self->{stashed_row}
582                ? @{delete $self->{stashed_row}}
583                : $self->cursor->next);
584     }
585   } else {
586     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
587   }
588
589   $self->set_cache(\@obj) if $self->{attrs}{cache};
590   return @obj;
591 }
592
593 =head2 reset
594
595 Resets the resultset's cursor, so you can iterate through the elements again.
596
597 =cut
598
599 sub reset {
600   my ($self) = @_;
601   $self->{all_cache_position} = 0;
602   $self->cursor->reset;
603   return $self;
604 }
605
606 =head2 first
607
608 Resets the resultset and returns the first element.
609
610 =cut
611
612 sub first {
613   return $_[0]->reset->next;
614 }
615
616 =head2 update
617
618 =head3 Arguments: (\%values)
619
620 Sets the specified columns in the resultset to the supplied values.
621
622 =cut
623
624 sub update {
625   my ($self, $values) = @_;
626   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
627   return $self->result_source->storage->update(
628            $self->result_source->from, $values, $self->{cond});
629 }
630
631 =head2 update_all
632
633 =head3 Arguments: (\%values)
634
635 Fetches all objects and updates them one at a time.  Note that C<update_all>
636 will run cascade triggers while L</update> will not.
637
638 =cut
639
640 sub update_all {
641   my ($self, $values) = @_;
642   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
643   foreach my $obj ($self->all) {
644     $obj->set_columns($values)->update;
645   }
646   return 1;
647 }
648
649 =head2 delete
650
651 Deletes the contents of the resultset from its result source.
652
653 =cut
654
655 sub delete {
656   my ($self) = @_;
657   my $del = {};
658
659   if (!ref($self->{cond})) {
660
661     # No-op. No condition, we're deleting everything
662
663   } elsif (ref $self->{cond} eq 'ARRAY') {
664
665     $del = [ map { my %hash;
666       foreach my $key (keys %{$_}) {
667         $key =~ /([^.]+)$/;
668         $hash{$1} = $_->{$key};
669       }; \%hash; } @{$self->{cond}} ];
670
671   } elsif (ref $self->{cond} eq 'HASH') {
672
673     if ((keys %{$self->{cond}})[0] eq '-and') {
674
675       $del->{-and} = [ map { my %hash;
676         foreach my $key (keys %{$_}) {
677           $key =~ /([^.]+)$/;
678           $hash{$1} = $_->{$key};
679         }; \%hash; } @{$self->{cond}{-and}} ];
680
681     } else {
682
683       foreach my $key (keys %{$self->{cond}}) {
684         $key =~ /([^.]+)$/;
685         $del->{$1} = $self->{cond}{$key};
686       }
687     }
688   } else {
689     $self->throw_exception(
690       "Can't delete on resultset with condition unless hash or array");
691   }
692
693   $self->result_source->storage->delete($self->result_source->from, $del);
694   return 1;
695 }
696
697 =head2 delete_all
698
699 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
700 will run cascade triggers while L</delete> will not.
701
702 =cut
703
704 sub delete_all {
705   my ($self) = @_;
706   $_->delete for $self->all;
707   return 1;
708 }
709
710 =head2 pager
711
712 Returns a L<Data::Page> object for the current resultset. Only makes
713 sense for queries with a C<page> attribute.
714
715 =cut
716
717 sub pager {
718   my ($self) = @_;
719   my $attrs = $self->{attrs};
720   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
721   $attrs->{rows} ||= 10;
722   return $self->{pager} ||= Data::Page->new(
723     $self->_count, $attrs->{rows}, $self->{page});
724 }
725
726 =head2 page
727
728 =head3 Arguments: ($page_num)
729
730 Returns a new resultset for the specified page.
731
732 =cut
733
734 sub page {
735   my ($self, $page) = @_;
736   my $attrs = { %{$self->{attrs}} };
737   $attrs->{page} = $page;
738   return (ref $self)->new($self->result_source, $attrs);
739 }
740
741 =head2 new_result
742
743 =head3 Arguments: (\%vals)
744
745 Creates a result in the resultset's result class.
746
747 =cut
748
749 sub new_result {
750   my ($self, $values) = @_;
751   $self->throw_exception( "new_result needs a hash" )
752     unless (ref $values eq 'HASH');
753   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
754     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
755   my %new = %$values;
756   my $alias = $self->{attrs}{alias};
757   foreach my $key (keys %{$self->{cond}||{}}) {
758     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
759   }
760   my $obj = $self->result_class->new(\%new);
761   $obj->result_source($self->result_source) if $obj->can('result_source');
762   return $obj;
763 }
764
765 =head2 create
766
767 =head3 Arguments: (\%vals)
768
769 Inserts a record into the resultset and returns the object.
770
771 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
772
773 =cut
774
775 sub create {
776   my ($self, $attrs) = @_;
777   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
778   return $self->new_result($attrs)->insert;
779 }
780
781 =head2 find_or_create
782
783 =head3 Arguments: (\%vals, \%attrs?)
784
785   $class->find_or_create({ key => $val, ... });
786
787 Searches for a record matching the search condition; if it doesn't find one,
788 creates one and returns that instead.
789
790   my $cd = $schema->resultset('CD')->find_or_create({
791     cdid   => 5,
792     artist => 'Massive Attack',
793     title  => 'Mezzanine',
794     year   => 2005,
795   });
796
797 Also takes an optional C<key> attribute, to search by a specific key or unique
798 constraint. For example:
799
800   my $cd = $schema->resultset('CD')->find_or_create(
801     {
802       artist => 'Massive Attack',
803       title  => 'Mezzanine',
804     },
805     { key => 'artist_title' }
806   );
807
808 See also L</find> and L</update_or_create>.
809
810 =cut
811
812 sub find_or_create {
813   my $self     = shift;
814   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
815   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
816   my $exists   = $self->find($hash, $attrs);
817   return defined $exists ? $exists : $self->create($hash);
818 }
819
820 =head2 update_or_create
821
822   $class->update_or_create({ key => $val, ... });
823
824 First, search for an existing row matching one of the unique constraints
825 (including the primary key) on the source of this resultset.  If a row is
826 found, update it with the other given column values.  Otherwise, create a new
827 row.
828
829 Takes an optional C<key> attribute to search on a specific unique constraint.
830 For example:
831
832   # In your application
833   my $cd = $schema->resultset('CD')->update_or_create(
834     {
835       artist => 'Massive Attack',
836       title  => 'Mezzanine',
837       year   => 1998,
838     },
839     { key => 'artist_title' }
840   );
841
842 If no C<key> is specified, it searches on all unique constraints defined on the
843 source, including the primary key.
844
845 If the C<key> is specified as C<primary>, search only on the primary key.
846
847 See also L</find> and L</find_or_create>.
848
849 =cut
850
851 sub update_or_create {
852   my $self = shift;
853   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
854   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
855
856   my %unique_constraints = $self->result_source->unique_constraints;
857   my @constraint_names   = (exists $attrs->{key}
858                             ? ($attrs->{key})
859                             : keys %unique_constraints);
860
861   my @unique_hashes;
862   foreach my $name (@constraint_names) {
863     my @unique_cols = @{ $unique_constraints{$name} };
864     my %unique_hash =
865       map  { $_ => $hash->{$_} }
866       grep { exists $hash->{$_} }
867       @unique_cols;
868
869     push @unique_hashes, \%unique_hash
870       if (scalar keys %unique_hash == scalar @unique_cols);
871   }
872
873   if (@unique_hashes) {
874     my $row = $self->single(\@unique_hashes);
875     if (defined $row) {
876       $row->set_columns($hash);
877       $row->update;
878       return $row;
879     }
880   }
881
882   return $self->create($hash);
883 }
884
885 =head2 get_cache
886
887 Gets the contents of the cache for the resultset.
888
889 =cut
890
891 sub get_cache {
892   shift->{all_cache} || [];
893 }
894
895 =head2 set_cache
896
897 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
898
899 =cut
900
901 sub set_cache {
902   my ( $self, $data ) = @_;
903   $self->throw_exception("set_cache requires an arrayref")
904     if ref $data ne 'ARRAY';
905   my $result_class = $self->result_class;
906   foreach( @$data ) {
907     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
908       if ref $_ ne $result_class;
909   }
910   $self->{all_cache} = $data;
911 }
912
913 =head2 clear_cache
914
915 Clears the cache for the resultset.
916
917 =cut
918
919 sub clear_cache {
920   shift->set_cache([]);
921 }
922
923 =head2 related_resultset
924
925 Returns a related resultset for the supplied relationship name.
926
927   $rs = $rs->related_resultset('foo');
928
929 =cut
930
931 sub related_resultset {
932   my ( $self, $rel, @rest ) = @_;
933   $self->{related_resultsets} ||= {};
934   return $self->{related_resultsets}{$rel} ||= do {
935       #warn "fetching related resultset for rel '$rel'";
936       my $rel_obj = $self->result_source->relationship_info($rel);
937       $self->throw_exception(
938         "search_related: result source '" . $self->result_source->name .
939         "' has no such relationship ${rel}")
940         unless $rel_obj; #die Dumper $self->{attrs};
941
942       my $rs = $self->search(undef, { join => $rel });
943       my $alias = defined $rs->{attrs}{seen_join}{$rel}
944                     && $rs->{attrs}{seen_join}{$rel} > 1
945                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
946                   : $rel;
947
948       $self->result_source->schema->resultset($rel_obj->{class}
949            )->search( undef,
950              { %{$rs->{attrs}},
951                alias => $alias,
952                select => undef,
953                as => undef }
954            )->search(@rest);      
955   };
956 }
957
958 =head2 throw_exception
959
960 See Schema's throw_exception
961
962 =cut
963
964 sub throw_exception {
965   my $self=shift;
966   $self->result_source->schema->throw_exception(@_);
967 }
968
969 =head1 ATTRIBUTES
970
971 The resultset takes various attributes that modify its behavior. Here's an
972 overview of them:
973
974 =head2 order_by
975
976 Which column(s) to order the results by. This is currently passed through
977 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
978
979 =head2 columns
980
981 =head3 Arguments: (arrayref)
982
983 Shortcut to request a particular set of columns to be retrieved.  Adds
984 C<me.> onto the start of any column without a C<.> in it and sets C<select>
985 from that, then auto-populates C<as> from C<select> as normal. (You may also
986 use the C<cols> attribute, as in earlier versions of DBIC.)
987
988 =head2 include_columns
989
990 =head3 Arguments: (arrayref)
991
992 Shortcut to include additional columns in the returned results - for example
993
994   { include_columns => ['foo.name'], join => ['foo'] }
995
996 would add a 'name' column to the information passed to object inflation
997
998 =head2 select
999
1000 =head3 Arguments: (arrayref)
1001
1002 Indicates which columns should be selected from the storage. You can use
1003 column names, or in the case of RDBMS back ends, function or stored procedure
1004 names:
1005
1006   $rs = $schema->resultset('Foo')->search(
1007     undef,
1008     {
1009       select => [
1010         'column_name',
1011         { count => 'column_to_count' },
1012         { sum => 'column_to_sum' }
1013       ]
1014     }
1015   );
1016
1017 When you use function/stored procedure names and do not supply an C<as>
1018 attribute, the column names returned are storage-dependent. E.g. MySQL would
1019 return a column named C<count(column_to_count)> in the above example.
1020
1021 =head2 as
1022
1023 =head3 Arguments: (arrayref)
1024
1025 Indicates column names for object inflation. This is used in conjunction with
1026 C<select>, usually when C<select> contains one or more function or stored
1027 procedure names:
1028
1029   $rs = $schema->resultset('Foo')->search(
1030     undef,
1031     {
1032       select => [
1033         'column1',
1034         { count => 'column2' }
1035       ],
1036       as => [qw/ column1 column2_count /]
1037     }
1038   );
1039
1040   my $foo = $rs->first(); # get the first Foo
1041
1042 If the object against which the search is performed already has an accessor
1043 matching a column name specified in C<as>, the value can be retrieved using
1044 the accessor as normal:
1045
1046   my $column1 = $foo->column1();
1047
1048 If on the other hand an accessor does not exist in the object, you need to
1049 use C<get_column> instead:
1050
1051   my $column2_count = $foo->get_column('column2_count');
1052
1053 You can create your own accessors if required - see
1054 L<DBIx::Class::Manual::Cookbook> for details.
1055
1056 =head2 join
1057
1058 Contains a list of relationships that should be joined for this query.  For
1059 example:
1060
1061   # Get CDs by Nine Inch Nails
1062   my $rs = $schema->resultset('CD')->search(
1063     { 'artist.name' => 'Nine Inch Nails' },
1064     { join => 'artist' }
1065   );
1066
1067 Can also contain a hash reference to refer to the other relation's relations.
1068 For example:
1069
1070   package MyApp::Schema::Track;
1071   use base qw/DBIx::Class/;
1072   __PACKAGE__->table('track');
1073   __PACKAGE__->add_columns(qw/trackid cd position title/);
1074   __PACKAGE__->set_primary_key('trackid');
1075   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1076   1;
1077
1078   # In your application
1079   my $rs = $schema->resultset('Artist')->search(
1080     { 'track.title' => 'Teardrop' },
1081     {
1082       join     => { cd => 'track' },
1083       order_by => 'artist.name',
1084     }
1085   );
1086
1087 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1088 similarly for a third time). For e.g.
1089
1090   my $rs = $schema->resultset('Artist')->search(
1091     { 'cds.title'   => 'Foo',
1092       'cds_2.title' => 'Bar' },
1093     { join => [ qw/cds cds/ ] });
1094
1095 will return a set of all artists that have both a cd with title Foo and a cd
1096 with title Bar.
1097
1098 If you want to fetch related objects from other tables as well, see C<prefetch>
1099 below.
1100
1101 =head2 prefetch
1102
1103 =head3 Arguments: arrayref/hashref
1104
1105 Contains one or more relationships that should be fetched along with the main 
1106 query (when they are accessed afterwards they will have already been
1107 "prefetched").  This is useful for when you know you will need the related
1108 objects, because it saves at least one query:
1109
1110   my $rs = $schema->resultset('Tag')->search(
1111     undef,
1112     {
1113       prefetch => {
1114         cd => 'artist'
1115       }
1116     }
1117   );
1118
1119 The initial search results in SQL like the following:
1120
1121   SELECT tag.*, cd.*, artist.* FROM tag
1122   JOIN cd ON tag.cd = cd.cdid
1123   JOIN artist ON cd.artist = artist.artistid
1124
1125 L<DBIx::Class> has no need to go back to the database when we access the
1126 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1127 case.
1128
1129 Simple prefetches will be joined automatically, so there is no need
1130 for a C<join> attribute in the above search. If you're prefetching to
1131 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1132 specify the join as well.
1133
1134 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1135 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1136 with an accessor type of 'single' or 'filter').
1137
1138 =head2 from
1139
1140 =head3 Arguments: (arrayref)
1141
1142 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1143 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1144 clauses.
1145
1146 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1147 C<join> will usually do what you need and it is strongly recommended that you
1148 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1149
1150 In simple terms, C<from> works as follows:
1151
1152     [
1153         { <alias> => <table>, -join-type => 'inner|left|right' }
1154         [] # nested JOIN (optional)
1155         { <table.column> = <foreign_table.foreign_key> }
1156     ]
1157
1158     JOIN
1159         <alias> <table>
1160         [JOIN ...]
1161     ON <table.column> = <foreign_table.foreign_key>
1162
1163 An easy way to follow the examples below is to remember the following:
1164
1165     Anything inside "[]" is a JOIN
1166     Anything inside "{}" is a condition for the enclosing JOIN
1167
1168 The following examples utilize a "person" table in a family tree application.
1169 In order to express parent->child relationships, this table is self-joined:
1170
1171     # Person->belongs_to('father' => 'Person');
1172     # Person->belongs_to('mother' => 'Person');
1173
1174 C<from> can be used to nest joins. Here we return all children with a father,
1175 then search against all mothers of those children:
1176
1177   $rs = $schema->resultset('Person')->search(
1178       undef,
1179       {
1180           alias => 'mother', # alias columns in accordance with "from"
1181           from => [
1182               { mother => 'person' },
1183               [
1184                   [
1185                       { child => 'person' },
1186                       [
1187                           { father => 'person' },
1188                           { 'father.person_id' => 'child.father_id' }
1189                       ]
1190                   ],
1191                   { 'mother.person_id' => 'child.mother_id' }
1192               ],
1193           ]
1194       },
1195   );
1196
1197   # Equivalent SQL:
1198   # SELECT mother.* FROM person mother
1199   # JOIN (
1200   #   person child
1201   #   JOIN person father
1202   #   ON ( father.person_id = child.father_id )
1203   # )
1204   # ON ( mother.person_id = child.mother_id )
1205
1206 The type of any join can be controlled manually. To search against only people
1207 with a father in the person table, we could explicitly use C<INNER JOIN>:
1208
1209     $rs = $schema->resultset('Person')->search(
1210         undef,
1211         {
1212             alias => 'child', # alias columns in accordance with "from"
1213             from => [
1214                 { child => 'person' },
1215                 [
1216                     { father => 'person', -join-type => 'inner' },
1217                     { 'father.id' => 'child.father_id' }
1218                 ],
1219             ]
1220         },
1221     );
1222
1223     # Equivalent SQL:
1224     # SELECT child.* FROM person child
1225     # INNER JOIN person father ON child.father_id = father.id
1226
1227 =head2 page
1228
1229 For a paged resultset, specifies which page to retrieve.  Leave unset
1230 for an unpaged resultset.
1231
1232 =head2 rows
1233
1234 For a paged resultset, how many rows per page:
1235
1236   rows => 10
1237
1238 Can also be used to simulate an SQL C<LIMIT>.
1239
1240 =head2 group_by
1241
1242 =head3 Arguments: (arrayref)
1243
1244 A arrayref of columns to group by. Can include columns of joined tables.
1245
1246   group_by => [qw/ column1 column2 ... /]
1247
1248 =head2 distinct
1249
1250 Set to 1 to group by all columns.
1251
1252 For more examples of using these attributes, see
1253 L<DBIx::Class::Manual::Cookbook>.
1254
1255 =cut
1256
1257 1;