Fixup for count
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =head3 Arguments: ($source, \%$attrs)
57
58 The resultset constructor. Takes a source object (usually a
59 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
60 below).  Does not perform any queries -- these are executed as needed by the
61 other methods.
62
63 Generally you won't need to construct a resultset manually.  You'll
64 automatically get one from e.g. a L</search> called in scalar context:
65
66   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
67
68 =cut
69
70 sub new {
71   my $class = shift;
72   return $class->new_result(@_) if ref $class;
73   
74   my ($source, $attrs) = @_;
75   weaken $source;
76   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
77   #use Data::Dumper; warn Dumper($attrs);
78   my $alias = ($attrs->{alias} ||= 'me');
79   
80   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
81   delete $attrs->{as} if $attrs->{columns};
82   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
83   $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
84     if $attrs->{columns};
85   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
86   if (my $include = delete $attrs->{include_columns}) {
87     push(@{$attrs->{select}}, @$include);
88     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
89   }
90   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
91
92   $attrs->{from} ||= [ { $alias => $source->from } ];
93   $attrs->{seen_join} ||= {};
94   my %seen;
95   if (my $join = delete $attrs->{join}) {
96     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
97       if (ref $j eq 'HASH') {
98         $seen{$_} = 1 foreach keys %$j;
99       } else {
100         $seen{$j} = 1;
101       }
102     }
103     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
104   }
105   
106   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
107   $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
108   $attrs->{order_by} ||= [];
109
110   my $collapse = $attrs->{collapse} || {};
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
114       if ( ref $p eq 'HASH' ) {
115         foreach my $key (keys %$p) {
116           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
117             unless $seen{$key};
118         }
119       } else {
120         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
121             unless $seen{$p};
122       }
123       my @prefetch = $source->resolve_prefetch(
124            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
125       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
126       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
127     }
128     push(@{$attrs->{order_by}}, @pre_order);
129   }
130   $attrs->{collapse} = $collapse;
131 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
132
133   if ($attrs->{page}) {
134     $attrs->{rows} ||= 10;
135     $attrs->{offset} ||= 0;
136     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
137   }
138
139   bless {
140     result_source => $source,
141     result_class => $attrs->{result_class} || $source->result_class,
142     cond => $attrs->{where},
143     from => $attrs->{from},
144     collapse => $collapse,
145     count => undef,
146     page => delete $attrs->{page},
147     pager => undef,
148     attrs => $attrs
149   }, $class;
150 }
151
152 =head2 search
153
154   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
155   my $new_rs = $rs->search({ foo => 3 });
156
157 If you need to pass in additional attributes but no additional condition,
158 call it as C<search(undef, \%attrs);>.
159
160   # "SELECT foo, bar FROM $class_table"
161   my @all = $class->search(undef, { columns => [qw/foo bar/] });
162
163 =cut
164
165 sub search {
166   my $self = shift;
167
168   my $rs;
169   if( @_ ) {
170     
171     my $attrs = { %{$self->{attrs}} };
172     my $having = delete $attrs->{having};
173     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
174
175     my $where = (@_
176                   ? ((@_ == 1 || ref $_[0] eq "HASH")
177                       ? shift
178                       : ((@_ % 2)
179                           ? $self->throw_exception(
180                               "Odd number of arguments to search")
181                           : {@_}))
182                   : undef());
183     if (defined $where) {
184       $attrs->{where} = (defined $attrs->{where}
185                 ? { '-and' =>
186                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
187                         $where, $attrs->{where} ] }
188                 : $where);
189     }
190
191     if (defined $having) {
192       $attrs->{having} = (defined $attrs->{having}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $having, $attrs->{having} ] }
196                 : $having);
197     }
198
199     $rs = (ref $self)->new($self->result_source, $attrs);
200   }
201   else {
202     $rs = $self;
203     $rs->reset;
204   }
205   return (wantarray ? $rs->all : $rs);
206 }
207
208 =head2 search_literal
209
210   my @obj    = $rs->search_literal($literal_where_cond, @bind);
211   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
212
213 Pass a literal chunk of SQL to be added to the conditional part of the
214 resultset.
215
216 =cut
217
218 sub search_literal {
219   my ($self, $cond, @vals) = @_;
220   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
221   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
222   return $self->search(\$cond, $attrs);
223 }
224
225 =head2 find
226
227 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
228
229 Finds a row based on its primary key or unique constraint. For example:
230
231   my $cd = $schema->resultset('CD')->find(5);
232
233 Also takes an optional C<key> attribute, to search by a specific key or unique
234 constraint. For example:
235
236   my $cd = $schema->resultset('CD')->find(
237     {
238       artist => 'Massive Attack',
239       title  => 'Mezzanine',
240     },
241     { key => 'artist_title' }
242   );
243
244 See also L</find_or_create> and L</update_or_create>.
245
246 =cut
247
248 sub find {
249   my ($self, @vals) = @_;
250   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
251
252   my @cols = $self->result_source->primary_columns;
253   if (exists $attrs->{key}) {
254     my %uniq = $self->result_source->unique_constraints;
255     $self->throw_exception( "Unknown key $attrs->{key} on $self->name" )
256       unless exists $uniq{$attrs->{key}};
257     @cols = @{ $uniq{$attrs->{key}} };
258   }
259   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
260   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
261     unless @cols;
262
263   my $query;
264   if (ref $vals[0] eq 'HASH') {
265     $query = { %{$vals[0]} };
266   } elsif (@cols == @vals) {
267     $query = {};
268     @{$query}{@cols} = @vals;
269   } else {
270     $query = {@vals};
271   }
272   foreach my $key (grep { ! m/\./ } keys %$query) {
273     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
274   }
275   #warn Dumper($query);
276   
277   if (keys %$attrs) {
278       my $rs = $self->search($query,$attrs);
279       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
280   } else {
281       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
282   }
283 }
284
285 =head2 search_related
286
287   $rs->search_related('relname', $cond?, $attrs?);
288
289 Search the specified relationship. Optionally specify a condition for matching
290 records.
291
292 =cut
293
294 sub search_related {
295   return shift->related_resultset(shift)->search(@_);
296 }
297
298 =head2 cursor
299
300 Returns a storage-driven cursor to the given resultset.
301
302 =cut
303
304 sub cursor {
305   my ($self) = @_;
306   my $attrs = { %{$self->{attrs}} };
307   return $self->{cursor}
308     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
309           $attrs->{where},$attrs);
310 }
311
312 =head2 single
313
314 Inflates the first result without creating a cursor
315
316 =cut
317
318 sub single {
319   my ($self, $where) = @_;
320   my $attrs = { %{$self->{attrs}} };
321   if ($where) {
322     if (defined $attrs->{where}) {
323       $attrs->{where} = {
324         '-and' => 
325             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
326                $where, delete $attrs->{where} ]
327       };
328     } else {
329       $attrs->{where} = $where;
330     }
331   }
332   my @data = $self->result_source->storage->select_single(
333           $self->{from}, $attrs->{select},
334           $attrs->{where},$attrs);
335   return (@data ? $self->_construct_object(@data) : ());
336 }
337
338
339 =head2 search_like
340
341 Perform a search, but use C<LIKE> instead of equality as the condition. Note
342 that this is simply a convenience method; you most likely want to use
343 L</search> with specific operators.
344
345 For more information, see L<DBIx::Class::Manual::Cookbook>.
346
347 =cut
348
349 sub search_like {
350   my $class = shift;
351   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
352   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
353   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
354   return $class->search($query, { %$attrs });
355 }
356
357 =head2 slice
358
359 =head3 Arguments: ($first, $last)
360
361 Returns a subset of elements from the resultset.
362
363 =cut
364
365 sub slice {
366   my ($self, $min, $max) = @_;
367   my $attrs = { %{ $self->{attrs} || {} } };
368   $attrs->{offset} ||= 0;
369   $attrs->{offset} += $min;
370   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
371   my $slice = (ref $self)->new($self->result_source, $attrs);
372   return (wantarray ? $slice->all : $slice);
373 }
374
375 =head2 next
376
377 Returns the next element in the resultset (C<undef> is there is none).
378
379 Can be used to efficiently iterate over records in the resultset:
380
381   my $rs = $schema->resultset('CD')->search;
382   while (my $cd = $rs->next) {
383     print $cd->title;
384   }
385
386 =cut
387
388 sub next {
389   my ($self) = @_;
390   if (@{$self->{all_cache} || []}) {
391     $self->{all_cache_position} ||= 0;
392     return $self->{all_cache}->[$self->{all_cache_position}++];
393   }
394   if ($self->{attrs}{cache}) {
395     $self->{all_cache_position} = 1;
396     return ($self->all)[0];
397   }
398   my @row = (exists $self->{stashed_row}
399                ? @{delete $self->{stashed_row}}
400                : $self->cursor->next);
401 #  warn Dumper(\@row); use Data::Dumper;
402   return unless (@row);
403   return $self->_construct_object(@row);
404 }
405
406 sub _construct_object {
407   my ($self, @row) = @_;
408   my @as = @{ $self->{attrs}{as} };
409   
410   my $info = $self->_collapse_result(\@as, \@row);
411   
412   my $new = $self->result_class->inflate_result($self->result_source, @$info);
413   
414   $new = $self->{attrs}{record_filter}->($new)
415     if exists $self->{attrs}{record_filter};
416   return $new;
417 }
418
419 sub _collapse_result {
420   my ($self, $as, $row, $prefix) = @_;
421
422   my %const;
423
424   my @copy = @$row;
425   foreach my $this_as (@$as) {
426     my $val = shift @copy;
427     if (defined $prefix) {
428       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
429         my $remain = $1;
430         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
431         $const{$1||''}{$2} = $val;
432       }
433     } else {
434       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
435       $const{$1||''}{$2} = $val;
436     }
437   }
438
439   my $info = [ {}, {} ];
440   foreach my $key (keys %const) {
441     if (length $key) {
442       my $target = $info;
443       my @parts = split(/\./, $key);
444       foreach my $p (@parts) {
445         $target = $target->[1]->{$p} ||= [];
446       }
447       $target->[0] = $const{$key};
448     } else {
449       $info->[0] = $const{$key};
450     }
451   }
452
453   my @collapse = (defined($prefix)
454                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
455                        keys %{$self->{collapse}})
456                    : keys %{$self->{collapse}});
457   if (@collapse) {
458     my ($c) = sort { length $a <=> length $b } @collapse;
459     my $target = $info;
460     foreach my $p (split(/\./, $c)) {
461       $target = $target->[1]->{$p} ||= [];
462     }
463     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
464     my @co_key = @{$self->{collapse}{$c_prefix}};
465     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
466     my $tree = $self->_collapse_result($as, $row, $c_prefix);
467     my (@final, @raw);
468     while ( !(grep {
469                 !defined($tree->[0]->{$_})
470                 || $co_check{$_} ne $tree->[0]->{$_}
471               } @co_key) ) {
472       push(@final, $tree);
473       last unless (@raw = $self->cursor->next);
474       $row = $self->{stashed_row} = \@raw;
475       $tree = $self->_collapse_result($as, $row, $c_prefix);
476       #warn Data::Dumper::Dumper($tree, $row);
477     }
478     @$target = @final;
479   }
480
481   return $info;
482 }
483
484 =head2 result_source
485
486 Returns a reference to the result source for this recordset.
487
488 =cut
489
490
491 =head2 count
492
493 Performs an SQL C<COUNT> with the same query as the resultset was built
494 with to find the number of elements. If passed arguments, does a search
495 on the resultset and counts the results of that.
496
497 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
498 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
499 not support C<DISTINCT> with multiple columns. If you are using such a
500 database, you should only use columns from the main table in your C<group_by>
501 clause.
502
503 =cut
504
505 sub count {
506   my $self = shift;
507   return $self->search(@_)->count if @_ and defined $_[0];
508   return scalar @{ $self->get_cache } if @{ $self->get_cache };
509
510   my $count = $self->_count;
511   return 0 unless $count;
512
513   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
514   $count = $self->{attrs}{rows} if
515     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
516   return $count;
517 }
518
519 sub _count { # Separated out so pager can get the full count
520   my $self = shift;
521   my $select = { count => '*' };
522   my $attrs = { %{ $self->{attrs} } };
523   if (my $group_by = delete $attrs->{group_by}) {
524     delete $attrs->{having};
525     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
526     # todo: try CONCAT for multi-column pk
527     my @pk = $self->result_source->primary_columns;
528     if (@pk == 1) {
529       foreach my $column (@distinct) {
530         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
531           @distinct = ($column);
532           last;
533         }
534       } 
535     }
536
537     $select = { count => { distinct => \@distinct } };
538     #use Data::Dumper; die Dumper $select;
539   }
540
541   $attrs->{select} = $select;
542   $attrs->{as} = [qw/count/];
543
544   # offset, order by and page are not needed to count. record_filter is cdbi
545   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
546         
547   my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
548   return $count;
549 }
550
551 =head2 count_literal
552
553 Calls L</search_literal> with the passed arguments, then L</count>.
554
555 =cut
556
557 sub count_literal { shift->search_literal(@_)->count; }
558
559 =head2 all
560
561 Returns all elements in the resultset. Called implictly if the resultset
562 is returned in list context.
563
564 =cut
565
566 sub all {
567   my ($self) = @_;
568   return @{ $self->get_cache } if @{ $self->get_cache };
569
570   my @obj;
571
572   if (keys %{$self->{collapse}}) {
573       # Using $self->cursor->all is really just an optimisation.
574       # If we're collapsing has_many prefetches it probably makes
575       # very little difference, and this is cleaner than hacking
576       # _construct_object to survive the approach
577     $self->cursor->reset;
578     my @row = $self->cursor->next;
579     while (@row) {
580       push(@obj, $self->_construct_object(@row));
581       @row = (exists $self->{stashed_row}
582                ? @{delete $self->{stashed_row}}
583                : $self->cursor->next);
584     }
585   } else {
586     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
587   }
588
589   $self->set_cache(\@obj) if $self->{attrs}{cache};
590   return @obj;
591 }
592
593 =head2 reset
594
595 Resets the resultset's cursor, so you can iterate through the elements again.
596
597 =cut
598
599 sub reset {
600   my ($self) = @_;
601   $self->{all_cache_position} = 0;
602   $self->cursor->reset;
603   return $self;
604 }
605
606 =head2 first
607
608 Resets the resultset and returns the first element.
609
610 =cut
611
612 sub first {
613   return $_[0]->reset->next;
614 }
615
616 =head2 update
617
618 =head3 Arguments: (\%values)
619
620 Sets the specified columns in the resultset to the supplied values.
621
622 =cut
623
624 sub update {
625   my ($self, $values) = @_;
626   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
627   return $self->result_source->storage->update(
628            $self->result_source->from, $values, $self->{cond});
629 }
630
631 =head2 update_all
632
633 =head3 Arguments: (\%values)
634
635 Fetches all objects and updates them one at a time.  Note that C<update_all>
636 will run cascade triggers while L</update> will not.
637
638 =cut
639
640 sub update_all {
641   my ($self, $values) = @_;
642   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
643   foreach my $obj ($self->all) {
644     $obj->set_columns($values)->update;
645   }
646   return 1;
647 }
648
649 =head2 delete
650
651 Deletes the contents of the resultset from its result source.
652
653 =cut
654
655 sub delete {
656   my ($self) = @_;
657   my $del = {};
658   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
659     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
660   if (ref $self->{cond} eq 'ARRAY') {
661     $del = [ map { my %hash;
662       foreach my $key (keys %{$_}) {
663         $key =~ /([^.]+)$/;
664         $hash{$1} = $_->{$key};
665       }; \%hash; } @{$self->{cond}} ];
666   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
667     $del->{-and} = [ map { my %hash;
668       foreach my $key (keys %{$_}) {
669         $key =~ /([^.]+)$/;
670         $hash{$1} = $_->{$key};
671       }; \%hash; } @{$self->{cond}{-and}} ];
672   } else {
673     foreach my $key (keys %{$self->{cond}}) {
674       $key =~ /([^.]+)$/;
675       $del->{$1} = $self->{cond}{$key};
676     }
677   }
678   $self->result_source->storage->delete($self->result_source->from, $del);
679   return 1;
680 }
681
682 =head2 delete_all
683
684 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
685 will run cascade triggers while L</delete> will not.
686
687 =cut
688
689 sub delete_all {
690   my ($self) = @_;
691   $_->delete for $self->all;
692   return 1;
693 }
694
695 =head2 pager
696
697 Returns a L<Data::Page> object for the current resultset. Only makes
698 sense for queries with a C<page> attribute.
699
700 =cut
701
702 sub pager {
703   my ($self) = @_;
704   my $attrs = $self->{attrs};
705   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
706   $attrs->{rows} ||= 10;
707   return $self->{pager} ||= Data::Page->new(
708     $self->_count, $attrs->{rows}, $self->{page});
709 }
710
711 =head2 page
712
713 =head3 Arguments: ($page_num)
714
715 Returns a new resultset for the specified page.
716
717 =cut
718
719 sub page {
720   my ($self, $page) = @_;
721   my $attrs = { %{$self->{attrs}} };
722   $attrs->{page} = $page;
723   return (ref $self)->new($self->result_source, $attrs);
724 }
725
726 =head2 new_result
727
728 =head3 Arguments: (\%vals)
729
730 Creates a result in the resultset's result class.
731
732 =cut
733
734 sub new_result {
735   my ($self, $values) = @_;
736   $self->throw_exception( "new_result needs a hash" )
737     unless (ref $values eq 'HASH');
738   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
739     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
740   my %new = %$values;
741   my $alias = $self->{attrs}{alias};
742   foreach my $key (keys %{$self->{cond}||{}}) {
743     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
744   }
745   my $obj = $self->result_class->new(\%new);
746   $obj->result_source($self->result_source) if $obj->can('result_source');
747   return $obj;
748 }
749
750 =head2 create
751
752 =head3 Arguments: (\%vals)
753
754 Inserts a record into the resultset and returns the object.
755
756 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
757
758 =cut
759
760 sub create {
761   my ($self, $attrs) = @_;
762   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
763   return $self->new_result($attrs)->insert;
764 }
765
766 =head2 find_or_create
767
768 =head3 Arguments: (\%vals, \%attrs?)
769
770   $class->find_or_create({ key => $val, ... });
771
772 Searches for a record matching the search condition; if it doesn't find one,
773 creates one and returns that instead.
774
775   my $cd = $schema->resultset('CD')->find_or_create({
776     cdid   => 5,
777     artist => 'Massive Attack',
778     title  => 'Mezzanine',
779     year   => 2005,
780   });
781
782 Also takes an optional C<key> attribute, to search by a specific key or unique
783 constraint. For example:
784
785   my $cd = $schema->resultset('CD')->find_or_create(
786     {
787       artist => 'Massive Attack',
788       title  => 'Mezzanine',
789     },
790     { key => 'artist_title' }
791   );
792
793 See also L</find> and L</update_or_create>.
794
795 =cut
796
797 sub find_or_create {
798   my $self     = shift;
799   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
800   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
801   my $exists   = $self->find($hash, $attrs);
802   return defined $exists ? $exists : $self->create($hash);
803 }
804
805 =head2 update_or_create
806
807   $class->update_or_create({ key => $val, ... });
808
809 First, search for an existing row matching one of the unique constraints
810 (including the primary key) on the source of this resultset.  If a row is
811 found, update it with the other given column values.  Otherwise, create a new
812 row.
813
814 Takes an optional C<key> attribute to search on a specific unique constraint.
815 For example:
816
817   # In your application
818   my $cd = $schema->resultset('CD')->update_or_create(
819     {
820       artist => 'Massive Attack',
821       title  => 'Mezzanine',
822       year   => 1998,
823     },
824     { key => 'artist_title' }
825   );
826
827 If no C<key> is specified, it searches on all unique constraints defined on the
828 source, including the primary key.
829
830 If the C<key> is specified as C<primary>, search only on the primary key.
831
832 See also L</find> and L</find_or_create>.
833
834 =cut
835
836 sub update_or_create {
837   my $self = shift;
838   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
839   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
840
841   my %unique_constraints = $self->result_source->unique_constraints;
842   my @constraint_names   = (exists $attrs->{key}
843                             ? ($attrs->{key})
844                             : keys %unique_constraints);
845
846   my @unique_hashes;
847   foreach my $name (@constraint_names) {
848     my @unique_cols = @{ $unique_constraints{$name} };
849     my %unique_hash =
850       map  { $_ => $hash->{$_} }
851       grep { exists $hash->{$_} }
852       @unique_cols;
853
854     push @unique_hashes, \%unique_hash
855       if (scalar keys %unique_hash == scalar @unique_cols);
856   }
857
858   if (@unique_hashes) {
859     my $row = $self->single(\@unique_hashes);
860     if (defined $row) {
861       $row->set_columns($hash);
862       $row->update;
863       return $row;
864     }
865   }
866
867   return $self->create($hash);
868 }
869
870 =head2 get_cache
871
872 Gets the contents of the cache for the resultset.
873
874 =cut
875
876 sub get_cache {
877   shift->{all_cache} || [];
878 }
879
880 =head2 set_cache
881
882 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
883
884 =cut
885
886 sub set_cache {
887   my ( $self, $data ) = @_;
888   $self->throw_exception("set_cache requires an arrayref")
889     if ref $data ne 'ARRAY';
890   my $result_class = $self->result_class;
891   foreach( @$data ) {
892     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
893       if ref $_ ne $result_class;
894   }
895   $self->{all_cache} = $data;
896 }
897
898 =head2 clear_cache
899
900 Clears the cache for the resultset.
901
902 =cut
903
904 sub clear_cache {
905   shift->set_cache([]);
906 }
907
908 =head2 related_resultset
909
910 Returns a related resultset for the supplied relationship name.
911
912   $rs = $rs->related_resultset('foo');
913
914 =cut
915
916 sub related_resultset {
917   my ( $self, $rel, @rest ) = @_;
918   $self->{related_resultsets} ||= {};
919   return $self->{related_resultsets}{$rel} ||= do {
920       #warn "fetching related resultset for rel '$rel'";
921       my $rel_obj = $self->result_source->relationship_info($rel);
922       $self->throw_exception(
923         "search_related: result source '" . $self->result_source->name .
924         "' has no such relationship ${rel}")
925         unless $rel_obj; #die Dumper $self->{attrs};
926
927       my $rs = $self->search(undef, { join => $rel });
928       my $alias = defined $rs->{attrs}{seen_join}{$rel}
929                     && $rs->{attrs}{seen_join}{$rel} > 1
930                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
931                   : $rel;
932
933       $self->result_source->schema->resultset($rel_obj->{class}
934            )->search( undef,
935              { %{$rs->{attrs}},
936                alias => $alias,
937                select => undef,
938                as => undef }
939            )->search(@rest);      
940   };
941 }
942
943 =head2 throw_exception
944
945 See Schema's throw_exception
946
947 =cut
948
949 sub throw_exception {
950   my $self=shift;
951   $self->result_source->schema->throw_exception(@_);
952 }
953
954 =head1 ATTRIBUTES
955
956 The resultset takes various attributes that modify its behavior. Here's an
957 overview of them:
958
959 =head2 order_by
960
961 Which column(s) to order the results by. This is currently passed through
962 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
963
964 =head2 columns
965
966 =head3 Arguments: (arrayref)
967
968 Shortcut to request a particular set of columns to be retrieved.  Adds
969 C<me.> onto the start of any column without a C<.> in it and sets C<select>
970 from that, then auto-populates C<as> from C<select> as normal. (You may also
971 use the C<cols> attribute, as in earlier versions of DBIC.)
972
973 =head2 include_columns
974
975 =head3 Arguments: (arrayref)
976
977 Shortcut to include additional columns in the returned results - for example
978
979   { include_columns => ['foo.name'], join => ['foo'] }
980
981 would add a 'name' column to the information passed to object inflation
982
983 =head2 select
984
985 =head3 Arguments: (arrayref)
986
987 Indicates which columns should be selected from the storage. You can use
988 column names, or in the case of RDBMS back ends, function or stored procedure
989 names:
990
991   $rs = $schema->resultset('Foo')->search(
992     undef,
993     {
994       select => [
995         'column_name',
996         { count => 'column_to_count' },
997         { sum => 'column_to_sum' }
998       ]
999     }
1000   );
1001
1002 When you use function/stored procedure names and do not supply an C<as>
1003 attribute, the column names returned are storage-dependent. E.g. MySQL would
1004 return a column named C<count(column_to_count)> in the above example.
1005
1006 =head2 as
1007
1008 =head3 Arguments: (arrayref)
1009
1010 Indicates column names for object inflation. This is used in conjunction with
1011 C<select>, usually when C<select> contains one or more function or stored
1012 procedure names:
1013
1014   $rs = $schema->resultset('Foo')->search(
1015     undef,
1016     {
1017       select => [
1018         'column1',
1019         { count => 'column2' }
1020       ],
1021       as => [qw/ column1 column2_count /]
1022     }
1023   );
1024
1025   my $foo = $rs->first(); # get the first Foo
1026
1027 If the object against which the search is performed already has an accessor
1028 matching a column name specified in C<as>, the value can be retrieved using
1029 the accessor as normal:
1030
1031   my $column1 = $foo->column1();
1032
1033 If on the other hand an accessor does not exist in the object, you need to
1034 use C<get_column> instead:
1035
1036   my $column2_count = $foo->get_column('column2_count');
1037
1038 You can create your own accessors if required - see
1039 L<DBIx::Class::Manual::Cookbook> for details.
1040
1041 =head2 join
1042
1043 Contains a list of relationships that should be joined for this query.  For
1044 example:
1045
1046   # Get CDs by Nine Inch Nails
1047   my $rs = $schema->resultset('CD')->search(
1048     { 'artist.name' => 'Nine Inch Nails' },
1049     { join => 'artist' }
1050   );
1051
1052 Can also contain a hash reference to refer to the other relation's relations.
1053 For example:
1054
1055   package MyApp::Schema::Track;
1056   use base qw/DBIx::Class/;
1057   __PACKAGE__->table('track');
1058   __PACKAGE__->add_columns(qw/trackid cd position title/);
1059   __PACKAGE__->set_primary_key('trackid');
1060   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1061   1;
1062
1063   # In your application
1064   my $rs = $schema->resultset('Artist')->search(
1065     { 'track.title' => 'Teardrop' },
1066     {
1067       join     => { cd => 'track' },
1068       order_by => 'artist.name',
1069     }
1070   );
1071
1072 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1073 similarly for a third time). For e.g.
1074
1075   my $rs = $schema->resultset('Artist')->search(
1076     { 'cds.title'   => 'Foo',
1077       'cds_2.title' => 'Bar' },
1078     { join => [ qw/cds cds/ ] });
1079
1080 will return a set of all artists that have both a cd with title Foo and a cd
1081 with title Bar.
1082
1083 If you want to fetch related objects from other tables as well, see C<prefetch>
1084 below.
1085
1086 =head2 prefetch
1087
1088 =head3 Arguments: arrayref/hashref
1089
1090 Contains one or more relationships that should be fetched along with the main 
1091 query (when they are accessed afterwards they will have already been
1092 "prefetched").  This is useful for when you know you will need the related
1093 objects, because it saves at least one query:
1094
1095   my $rs = $schema->resultset('Tag')->search(
1096     undef,
1097     {
1098       prefetch => {
1099         cd => 'artist'
1100       }
1101     }
1102   );
1103
1104 The initial search results in SQL like the following:
1105
1106   SELECT tag.*, cd.*, artist.* FROM tag
1107   JOIN cd ON tag.cd = cd.cdid
1108   JOIN artist ON cd.artist = artist.artistid
1109
1110 L<DBIx::Class> has no need to go back to the database when we access the
1111 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1112 case.
1113
1114 Simple prefetches will be joined automatically, so there is no need
1115 for a C<join> attribute in the above search. If you're prefetching to
1116 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1117 specify the join as well.
1118
1119 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1120 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1121 with an accessor type of 'single' or 'filter').
1122
1123 =head2 from
1124
1125 =head3 Arguments: (arrayref)
1126
1127 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1128 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1129 clauses.
1130
1131 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1132 C<join> will usually do what you need and it is strongly recommended that you
1133 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1134
1135 In simple terms, C<from> works as follows:
1136
1137     [
1138         { <alias> => <table>, -join-type => 'inner|left|right' }
1139         [] # nested JOIN (optional)
1140         { <table.column> = <foreign_table.foreign_key> }
1141     ]
1142
1143     JOIN
1144         <alias> <table>
1145         [JOIN ...]
1146     ON <table.column> = <foreign_table.foreign_key>
1147
1148 An easy way to follow the examples below is to remember the following:
1149
1150     Anything inside "[]" is a JOIN
1151     Anything inside "{}" is a condition for the enclosing JOIN
1152
1153 The following examples utilize a "person" table in a family tree application.
1154 In order to express parent->child relationships, this table is self-joined:
1155
1156     # Person->belongs_to('father' => 'Person');
1157     # Person->belongs_to('mother' => 'Person');
1158
1159 C<from> can be used to nest joins. Here we return all children with a father,
1160 then search against all mothers of those children:
1161
1162   $rs = $schema->resultset('Person')->search(
1163       undef,
1164       {
1165           alias => 'mother', # alias columns in accordance with "from"
1166           from => [
1167               { mother => 'person' },
1168               [
1169                   [
1170                       { child => 'person' },
1171                       [
1172                           { father => 'person' },
1173                           { 'father.person_id' => 'child.father_id' }
1174                       ]
1175                   ],
1176                   { 'mother.person_id' => 'child.mother_id' }
1177               ],
1178           ]
1179       },
1180   );
1181
1182   # Equivalent SQL:
1183   # SELECT mother.* FROM person mother
1184   # JOIN (
1185   #   person child
1186   #   JOIN person father
1187   #   ON ( father.person_id = child.father_id )
1188   # )
1189   # ON ( mother.person_id = child.mother_id )
1190
1191 The type of any join can be controlled manually. To search against only people
1192 with a father in the person table, we could explicitly use C<INNER JOIN>:
1193
1194     $rs = $schema->resultset('Person')->search(
1195         undef,
1196         {
1197             alias => 'child', # alias columns in accordance with "from"
1198             from => [
1199                 { child => 'person' },
1200                 [
1201                     { father => 'person', -join-type => 'inner' },
1202                     { 'father.id' => 'child.father_id' }
1203                 ],
1204             ]
1205         },
1206     );
1207
1208     # Equivalent SQL:
1209     # SELECT child.* FROM person child
1210     # INNER JOIN person father ON child.father_id = father.id
1211
1212 =head2 page
1213
1214 For a paged resultset, specifies which page to retrieve.  Leave unset
1215 for an unpaged resultset.
1216
1217 =head2 rows
1218
1219 For a paged resultset, how many rows per page:
1220
1221   rows => 10
1222
1223 Can also be used to simulate an SQL C<LIMIT>.
1224
1225 =head2 group_by
1226
1227 =head3 Arguments: (arrayref)
1228
1229 A arrayref of columns to group by. Can include columns of joined tables.
1230
1231   group_by => [qw/ column1 column2 ... /]
1232
1233 =head2 distinct
1234
1235 Set to 1 to group by all columns.
1236
1237 For more examples of using these attributes, see
1238 L<DBIx::Class::Manual::Cookbook>.
1239
1240 =cut
1241
1242 1;