rename columns attr to cols
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   
73   my ($source, $attrs) = @_;
74   #use Data::Dumper; warn Dumper($attrs);
75   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
76   my $alias = ($attrs->{alias} ||= 'me');
77   
78   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
79   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
80   if ($attrs->{columns}) {
81     delete $attrs->{as};
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ];
83   }
84   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90
91   $attrs->{from} ||= [ { $alias => $source->from } ];
92   $attrs->{seen_join} ||= {};
93   my %seen;
94   if (my $join = delete $attrs->{join}) {
95     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
96       if (ref $j eq 'HASH') {
97         $seen{$_} = 1 foreach keys %$j;
98       } else {
99         $seen{$j} = 1;
100       }
101     }
102     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
103   }
104   
105   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
106   $attrs->{order_by} = [ $attrs->{order_by} ] if !ref($attrs->{order_by});
107   $attrs->{order_by} ||= [];
108
109   my $collapse = $attrs->{collapse} || {};
110   if (my $prefetch = delete $attrs->{prefetch}) {
111     my @pre_order;
112     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
113       if ( ref $p eq 'HASH' ) {
114         foreach my $key (keys %$p) {
115           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
116             unless $seen{$key};
117         }
118       } else {
119         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
120             unless $seen{$p};
121       }
122       my @prefetch = $source->resolve_prefetch(
123            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
124       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
125       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
126     }
127     push(@{$attrs->{order_by}}, @pre_order);
128   }
129   $attrs->{collapse} = $collapse;
130 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
131
132   if ($attrs->{page}) {
133     $attrs->{rows} ||= 10;
134     $attrs->{offset} ||= 0;
135     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
136   }
137
138   bless {
139     result_source => $source,
140     result_class => $attrs->{result_class} || $source->result_class,
141     cond => $attrs->{where},
142     from => $attrs->{from},
143     collapse => $collapse,
144     count => undef,
145     page => delete $attrs->{page},
146     pager => undef,
147     attrs => $attrs
148   }, $class;
149 }
150
151 =head2 search
152
153   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
154   my $new_rs = $rs->search({ foo => 3 });
155
156 If you need to pass in additional attributes but no additional condition,
157 call it as C<search(undef, \%attrs);>.
158
159   # "SELECT foo, bar FROM $class_table"
160   my @all = $class->search(undef, { columns => [qw/foo bar/] });
161
162 =cut
163
164 sub search {
165   my $self = shift;
166
167   my $rs;
168   if( @_ ) {
169     
170     my $attrs = { %{$self->{attrs}} };
171     my $having = delete $attrs->{having};
172     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
173      $attrs = { %$attrs, %{ pop(@_) } };
174     }
175
176     my $where = (@_
177                   ? ((@_ == 1 || ref $_[0] eq "HASH")
178                       ? shift
179                       : ((@_ % 2)
180                           ? $self->throw_exception(
181                               "Odd number of arguments to search")
182                           : {@_}))
183                   : undef());
184     if (defined $where) {
185       $where = (defined $attrs->{where}
186                 ? { '-and' =>
187                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
188                         $where, $attrs->{where} ] }
189                 : $where);
190       $attrs->{where} = $where;
191     }
192
193     if (defined $having) {
194       $having = (defined $attrs->{having}
195                 ? { '-and' =>
196                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
197                         $having, $attrs->{having} ] }
198                 : $having);
199       $attrs->{having} = $having;
200     }
201
202     $rs = (ref $self)->new($self->result_source, $attrs);
203   }
204   else {
205     $rs = $self;
206     $rs->reset();
207   }
208   return (wantarray ? $rs->all : $rs);
209 }
210
211 =head2 search_literal
212
213   my @obj    = $rs->search_literal($literal_where_cond, @bind);
214   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
215
216 Pass a literal chunk of SQL to be added to the conditional part of the
217 resultset.
218
219 =cut
220
221 sub search_literal {
222   my ($self, $cond, @vals) = @_;
223   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
224   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
225   return $self->search(\$cond, $attrs);
226 }
227
228 =head2 find
229
230 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
231
232 Finds a row based on its primary key or unique constraint. For example:
233
234   my $cd = $schema->resultset('CD')->find(5);
235
236 Also takes an optional C<key> attribute, to search by a specific key or unique
237 constraint. For example:
238
239   my $cd = $schema->resultset('CD')->find(
240     {
241       artist => 'Massive Attack',
242       title  => 'Mezzanine',
243     },
244     { key => 'artist_title' }
245   );
246
247 See also L</find_or_create> and L</update_or_create>.
248
249 =cut
250
251 sub find {
252   my ($self, @vals) = @_;
253   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
254
255   my @cols = $self->result_source->primary_columns;
256   if (exists $attrs->{key}) {
257     my %uniq = $self->result_source->unique_constraints;
258     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
259       unless exists $uniq{$attrs->{key}};
260     @cols = @{ $uniq{$attrs->{key}} };
261   }
262   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
263   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
264     unless @cols;
265
266   my $query;
267   if (ref $vals[0] eq 'HASH') {
268     $query = { %{$vals[0]} };
269   } elsif (@cols == @vals) {
270     $query = {};
271     @{$query}{@cols} = @vals;
272   } else {
273     $query = {@vals};
274   }
275   foreach (keys %$query) {
276     next if m/\./;
277     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
278   }
279   #warn Dumper($query);
280   
281   if (keys %$attrs) {
282       my $rs = $self->search($query,$attrs);
283       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
284   } else {
285       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
286   }
287 }
288
289 =head2 search_related
290
291   $rs->search_related('relname', $cond?, $attrs?);
292
293 Search the specified relationship. Optionally specify a condition for matching
294 records.
295
296 =cut
297
298 sub search_related {
299   return shift->related_resultset(shift)->search(@_);
300 }
301
302 =head2 cursor
303
304 Returns a storage-driven cursor to the given resultset.
305
306 =cut
307
308 sub cursor {
309   my ($self) = @_;
310   my ($attrs) = $self->{attrs};
311   $attrs = { %$attrs };
312   return $self->{cursor}
313     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
314           $attrs->{where},$attrs);
315 }
316
317 =head2 single
318
319 Inflates the first result without creating a cursor
320
321 =cut
322
323 sub single {
324   my ($self, $extra) = @_;
325   my ($attrs) = $self->{attrs};
326   $attrs = { %$attrs };
327   if ($extra) {
328     if (defined $attrs->{where}) {
329       $attrs->{where} = {
330         '-and'
331           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
332                delete $attrs->{where}, $extra ]
333       };
334     } else {
335       $attrs->{where} = $extra;
336     }
337   }
338   my @data = $self->result_source->storage->select_single(
339           $self->{from}, $attrs->{select},
340           $attrs->{where},$attrs);
341   return (@data ? $self->_construct_object(@data) : ());
342 }
343
344
345 =head2 search_like
346
347 Perform a search, but use C<LIKE> instead of equality as the condition. Note
348 that this is simply a convenience method; you most likely want to use
349 L</search> with specific operators.
350
351 For more information, see L<DBIx::Class::Manual::Cookbook>.
352
353 =cut
354
355 sub search_like {
356   my $class    = shift;
357   my $attrs = { };
358   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
359     $attrs = pop(@_);
360   }
361   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
362   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
363   return $class->search($query, { %$attrs });
364 }
365
366 =head2 slice
367
368 =head3 Arguments: ($first, $last)
369
370 Returns a subset of elements from the resultset.
371
372 =cut
373
374 sub slice {
375   my ($self, $min, $max) = @_;
376   my $attrs = { %{ $self->{attrs} || {} } };
377   $attrs->{offset} ||= 0;
378   $attrs->{offset} += $min;
379   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
380   my $slice = (ref $self)->new($self->result_source, $attrs);
381   return (wantarray ? $slice->all : $slice);
382 }
383
384 =head2 next
385
386 Returns the next element in the resultset (C<undef> is there is none).
387
388 Can be used to efficiently iterate over records in the resultset:
389
390   my $rs = $schema->resultset('CD')->search;
391   while (my $cd = $rs->next) {
392     print $cd->title;
393   }
394
395 =cut
396
397 sub next {
398   my ($self) = @_;
399   my $cache;
400   if( @{$cache = $self->{all_cache} || []}) {
401     $self->{all_cache_position} ||= 0;
402     my $obj = $cache->[$self->{all_cache_position}];
403     $self->{all_cache_position}++;
404     return $obj;
405   }
406   if ($self->{attrs}{cache}) {
407     $self->{all_cache_position} = 1;
408     return ($self->all)[0];
409   }
410   my @row = (exists $self->{stashed_row}
411                ? @{delete $self->{stashed_row}}
412                : $self->cursor->next);
413 #  warn Dumper(\@row); use Data::Dumper;
414   return unless (@row);
415   return $self->_construct_object(@row);
416 }
417
418 sub _construct_object {
419   my ($self, @row) = @_;
420   my @as = @{ $self->{attrs}{as} };
421
422   my $info = $self->_collapse_result(\@as, \@row);
423
424   my $new = $self->result_class->inflate_result($self->result_source, @$info);
425
426   $new = $self->{attrs}{record_filter}->($new)
427     if exists $self->{attrs}{record_filter};
428  
429   return $new;
430 }
431
432 sub _collapse_result {
433   my ($self, $as, $row, $prefix) = @_;
434
435   my %const;
436
437   my @copy = @$row;
438   foreach my $this_as (@$as) {
439     my $val = shift @copy;
440     if (defined $prefix) {
441       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
442         my $remain = $1;
443         $remain =~ /^(?:(.*)\.)?([^\.]+)$/;
444         $const{$1||''}{$2} = $val;
445       }
446     } else {
447       $this_as =~ /^(?:(.*)\.)?([^\.]+)$/;
448       $const{$1||''}{$2} = $val;
449     }
450   }
451
452   my $info = [ {}, {} ];
453   foreach my $key (keys %const) {
454     if (length $key) {
455       my $target = $info;
456       my @parts = split(/\./, $key);
457       foreach my $p (@parts) {
458         $target = $target->[1]->{$p} ||= [];
459       }
460       $target->[0] = $const{$key};
461     } else {
462       $info->[0] = $const{$key};
463     }
464   }
465
466   my @collapse = (defined($prefix)
467                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
468                        keys %{$self->{collapse}})
469                    : keys %{$self->{collapse}});
470   if (@collapse) {
471     my ($c) = sort { length $a <=> length $b } @collapse;
472     my $target = $info;
473     foreach my $p (split(/\./, $c)) {
474       $target = $target->[1]->{$p} ||= [];
475     }
476     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
477     my @co_key = @{$self->{collapse}{$c_prefix}};
478     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
479     my $tree = $self->_collapse_result($as, $row, $c_prefix);
480     my (@final, @raw);
481     while ( !(grep {
482                 !defined($tree->[0]->{$_})
483                 || $co_check{$_} ne $tree->[0]->{$_}
484               } @co_key) ) {
485       push(@final, $tree);
486       last unless (@raw = $self->cursor->next);
487       $row = $self->{stashed_row} = \@raw;
488       $tree = $self->_collapse_result($as, $row, $c_prefix);
489       #warn Data::Dumper::Dumper($tree, $row);
490     }
491     @{$target} = @final;
492   }
493
494   return $info;
495 }
496
497 =head2 result_source
498
499 Returns a reference to the result source for this recordset.
500
501 =cut
502
503
504 =head2 count
505
506 Performs an SQL C<COUNT> with the same query as the resultset was built
507 with to find the number of elements. If passed arguments, does a search
508 on the resultset and counts the results of that.
509
510 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
511 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
512 not support C<DISTINCT> with multiple columns. If you are using such a
513 database, you should only use columns from the main table in your C<group_by>
514 clause.
515
516 =cut
517
518 sub count {
519   my $self = shift;
520   return $self->search(@_)->count if @_ && defined $_[0];
521   unless (defined $self->{count}) {
522     return scalar @{ $self->get_cache }
523       if @{ $self->get_cache };
524     my $group_by;
525     my $select = { 'count' => '*' };
526     my $attrs = { %{ $self->{attrs} } };
527     if( $group_by = delete $attrs->{group_by} ) {
528       delete $attrs->{having};
529       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
530       # todo: try CONCAT for multi-column pk
531       my @pk = $self->result_source->primary_columns;
532       if( scalar(@pk) == 1 ) {
533         my $pk = shift(@pk);
534         my $alias = $attrs->{alias};
535         my $re = qr/^($alias\.)?$pk$/;
536         foreach my $column ( @distinct) {
537           if( $column =~ $re ) {
538             @distinct = ( $column );
539             last;
540           }
541         } 
542       }
543
544       $select = { count => { 'distinct' => \@distinct } };
545       #use Data::Dumper; die Dumper $select;
546     }
547
548     $attrs->{select} = $select;
549     $attrs->{as} = [ 'count' ];
550     # offset, order by and page are not needed to count. record_filter is cdbi
551     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
552         
553     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
554   }
555   return 0 unless $self->{count};
556   my $count = $self->{count};
557   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
558   $count = $self->{attrs}{rows} if
559     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
560   return $count;
561 }
562
563 =head2 count_literal
564
565 Calls L</search_literal> with the passed arguments, then L</count>.
566
567 =cut
568
569 sub count_literal { shift->search_literal(@_)->count; }
570
571 =head2 all
572
573 Returns all elements in the resultset. Called implictly if the resultset
574 is returned in list context.
575
576 =cut
577
578 sub all {
579   my ($self) = @_;
580   return @{ $self->get_cache }
581     if @{ $self->get_cache };
582
583   my @obj;
584
585   if (keys %{$self->{collapse}}) {
586       # Using $self->cursor->all is really just an optimisation.
587       # If we're collapsing has_many prefetches it probably makes
588       # very little difference, and this is cleaner than hacking
589       # _construct_object to survive the approach
590     my @row;
591     $self->cursor->reset;
592     while (@row = $self->cursor->next) {
593       push(@obj, $self->_construct_object(@row));
594     }
595   } else {
596     @obj = map { $self->_construct_object(@$_); }
597              $self->cursor->all;
598   }
599
600   if( $self->{attrs}->{cache} ) {
601     $self->set_cache( \@obj );
602   }
603
604   return @obj;
605 }
606
607 =head2 reset
608
609 Resets the resultset's cursor, so you can iterate through the elements again.
610
611 =cut
612
613 sub reset {
614   my ($self) = @_;
615   $self->{all_cache_position} = 0;
616   $self->cursor->reset;
617   return $self;
618 }
619
620 =head2 first
621
622 Resets the resultset and returns the first element.
623
624 =cut
625
626 sub first {
627   return $_[0]->reset->next;
628 }
629
630 =head2 update
631
632 =head3 Arguments: (\%values)
633
634 Sets the specified columns in the resultset to the supplied values.
635
636 =cut
637
638 sub update {
639   my ($self, $values) = @_;
640   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
641   return $self->result_source->storage->update(
642            $self->result_source->from, $values, $self->{cond});
643 }
644
645 =head2 update_all
646
647 =head3 Arguments: (\%values)
648
649 Fetches all objects and updates them one at a time.  Note that C<update_all>
650 will run cascade triggers while L</update> will not.
651
652 =cut
653
654 sub update_all {
655   my ($self, $values) = @_;
656   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
657   foreach my $obj ($self->all) {
658     $obj->set_columns($values)->update;
659   }
660   return 1;
661 }
662
663 =head2 delete
664
665 Deletes the contents of the resultset from its result source.
666
667 =cut
668
669 sub delete {
670   my ($self) = @_;
671   my $del = {};
672   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
673     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
674   if (ref $self->{cond} eq 'ARRAY') {
675     $del = [ map { my %hash;
676       foreach my $key (keys %{$_}) {
677         $key =~ /([^\.]+)$/;
678         $hash{$1} = $_->{$key};
679       }; \%hash; } @{$self->{cond}} ];
680   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
681     $del->{-and} = [ map { my %hash;
682       foreach my $key (keys %{$_}) {
683         $key =~ /([^\.]+)$/;
684         $hash{$1} = $_->{$key};
685       }; \%hash; } @{$self->{cond}{-and}} ];
686   } else {
687     foreach my $key (keys %{$self->{cond}}) {
688       $key =~ /([^\.]+)$/;
689       $del->{$1} = $self->{cond}{$key};
690     }
691   }
692   $self->result_source->storage->delete($self->result_source->from, $del);
693   return 1;
694 }
695
696 =head2 delete_all
697
698 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
699 will run cascade triggers while L</delete> will not.
700
701 =cut
702
703 sub delete_all {
704   my ($self) = @_;
705   $_->delete for $self->all;
706   return 1;
707 }
708
709 =head2 pager
710
711 Returns a L<Data::Page> object for the current resultset. Only makes
712 sense for queries with a C<page> attribute.
713
714 =cut
715
716 sub pager {
717   my ($self) = @_;
718   my $attrs = $self->{attrs};
719   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
720   $attrs->{rows} ||= 10;
721   $self->count;
722   return $self->{pager} ||= Data::Page->new(
723     $self->{count}, $attrs->{rows}, $self->{page});
724 }
725
726 =head2 page
727
728 =head3 Arguments: ($page_num)
729
730 Returns a new resultset for the specified page.
731
732 =cut
733
734 sub page {
735   my ($self, $page) = @_;
736   my $attrs = { %{$self->{attrs}} };
737   $attrs->{page} = $page;
738   return (ref $self)->new($self->result_source, $attrs);
739 }
740
741 =head2 new_result
742
743 =head3 Arguments: (\%vals)
744
745 Creates a result in the resultset's result class.
746
747 =cut
748
749 sub new_result {
750   my ($self, $values) = @_;
751   $self->throw_exception( "new_result needs a hash" )
752     unless (ref $values eq 'HASH');
753   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
754     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
755   my %new = %$values;
756   my $alias = $self->{attrs}{alias};
757   foreach my $key (keys %{$self->{cond}||{}}) {
758     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
759   }
760   my $obj = $self->result_class->new(\%new);
761   $obj->result_source($self->result_source) if $obj->can('result_source');
762   $obj;
763 }
764
765 =head2 create
766
767 =head3 Arguments: (\%vals)
768
769 Inserts a record into the resultset and returns the object.
770
771 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
772
773 =cut
774
775 sub create {
776   my ($self, $attrs) = @_;
777   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
778   return $self->new_result($attrs)->insert;
779 }
780
781 =head2 find_or_create
782
783 =head3 Arguments: (\%vals, \%attrs?)
784
785   $class->find_or_create({ key => $val, ... });
786
787 Searches for a record matching the search condition; if it doesn't find one,
788 creates one and returns that instead.
789
790   my $cd = $schema->resultset('CD')->find_or_create({
791     cdid   => 5,
792     artist => 'Massive Attack',
793     title  => 'Mezzanine',
794     year   => 2005,
795   });
796
797 Also takes an optional C<key> attribute, to search by a specific key or unique
798 constraint. For example:
799
800   my $cd = $schema->resultset('CD')->find_or_create(
801     {
802       artist => 'Massive Attack',
803       title  => 'Mezzanine',
804     },
805     { key => 'artist_title' }
806   );
807
808 See also L</find> and L</update_or_create>.
809
810 =cut
811
812 sub find_or_create {
813   my $self     = shift;
814   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
815   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
816   my $exists   = $self->find($hash, $attrs);
817   return defined($exists) ? $exists : $self->create($hash);
818 }
819
820 =head2 update_or_create
821
822   $class->update_or_create({ key => $val, ... });
823
824 First, search for an existing row matching one of the unique constraints
825 (including the primary key) on the source of this resultset.  If a row is
826 found, update it with the other given column values.  Otherwise, create a new
827 row.
828
829 Takes an optional C<key> attribute to search on a specific unique constraint.
830 For example:
831
832   # In your application
833   my $cd = $schema->resultset('CD')->update_or_create(
834     {
835       artist => 'Massive Attack',
836       title  => 'Mezzanine',
837       year   => 1998,
838     },
839     { key => 'artist_title' }
840   );
841
842 If no C<key> is specified, it searches on all unique constraints defined on the
843 source, including the primary key.
844
845 If the C<key> is specified as C<primary>, search only on the primary key.
846
847 See also L</find> and L</find_or_create>.
848
849 =cut
850
851 sub update_or_create {
852   my $self = shift;
853
854   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
855   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
856
857   my %unique_constraints = $self->result_source->unique_constraints;
858   my @constraint_names   = (exists $attrs->{key}
859                             ? ($attrs->{key})
860                             : keys %unique_constraints);
861
862   my @unique_hashes;
863   foreach my $name (@constraint_names) {
864     my @unique_cols = @{ $unique_constraints{$name} };
865     my %unique_hash =
866       map  { $_ => $hash->{$_} }
867       grep { exists $hash->{$_} }
868       @unique_cols;
869
870     push @unique_hashes, \%unique_hash
871       if (scalar keys %unique_hash == scalar @unique_cols);
872   }
873
874   my $row;
875   if (@unique_hashes) {
876     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
877     if ($row) {
878       $row->set_columns($hash);
879       $row->update;
880     }
881   }
882
883   unless ($row) {
884     $row = $self->create($hash);
885   }
886
887   return $row;
888 }
889
890 =head2 get_cache
891
892 Gets the contents of the cache for the resultset.
893
894 =cut
895
896 sub get_cache {
897   my $self = shift;
898   return $self->{all_cache} || [];
899 }
900
901 =head2 set_cache
902
903 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
904
905 =cut
906
907 sub set_cache {
908   my ( $self, $data ) = @_;
909   $self->throw_exception("set_cache requires an arrayref")
910     if ref $data ne 'ARRAY';
911   my $result_class = $self->result_class;
912   foreach( @$data ) {
913     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
914       if ref $_ ne $result_class;
915   }
916   $self->{all_cache} = $data;
917 }
918
919 =head2 clear_cache
920
921 Clears the cache for the resultset.
922
923 =cut
924
925 sub clear_cache {
926   my $self = shift;
927   $self->set_cache([]);
928 }
929
930 =head2 related_resultset
931
932 Returns a related resultset for the supplied relationship name.
933
934   $rs = $rs->related_resultset('foo');
935
936 =cut
937
938 sub related_resultset {
939   my ( $self, $rel, @rest ) = @_;
940   $self->{related_resultsets} ||= {};
941   my $resultsets = $self->{related_resultsets};
942   if( !exists $resultsets->{$rel} ) {
943     #warn "fetching related resultset for rel '$rel'";
944     my $rel_obj = $self->result_source->relationship_info($rel);
945     $self->throw_exception(
946       "search_related: result source '" . $self->result_source->name .
947       "' has no such relationship ${rel}")
948       unless $rel_obj; #die Dumper $self->{attrs};
949     my $rs = $self->search(undef, { join => $rel });
950     #if( $self->{attrs}->{cache} ) {
951     #  $rs = $self->search(undef);
952     #}
953     #else {
954     #}
955     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
956     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
957     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
958                   && $rs->{attrs}{seen_join}{$rel} > 1
959                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
960                 : $rel);
961     $resultsets->{$rel} =
962       $self->result_source->schema->resultset($rel_obj->{class}
963            )->search( undef,
964              { %{$rs->{attrs}},
965                alias => $alias,
966                select => undef(),
967                as => undef() }
968            )->search(@rest);
969   }
970   return $resultsets->{$rel};
971 }
972
973 =head2 throw_exception
974
975 See Schema's throw_exception
976
977 =cut
978
979 sub throw_exception {
980   my $self=shift;
981   $self->result_source->schema->throw_exception(@_);
982 }
983
984 =head1 ATTRIBUTES
985
986 The resultset takes various attributes that modify its behavior. Here's an
987 overview of them:
988
989 =head2 order_by
990
991 Which column(s) to order the results by. This is currently passed through
992 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
993
994 =head2 columns
995
996 =head3 Arguments: (arrayref)
997
998 Shortcut to request a particular set of columns to be retrieved.  Adds
999 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1000 from that, then auto-populates C<as> from C<select> as normal. (You may also
1001 use the C<cols> attribute, as in earlier versions of DBIC.)
1002
1003 =head2 include_columns
1004
1005 =head3 Arguments: (arrayref)
1006
1007 Shortcut to include additional columns in the returned results - for example
1008
1009   { include_columns => ['foo.name'], join => ['foo'] }
1010
1011 would add a 'name' column to the information passed to object inflation
1012
1013 =head2 select
1014
1015 =head3 Arguments: (arrayref)
1016
1017 Indicates which columns should be selected from the storage. You can use
1018 column names, or in the case of RDBMS back ends, function or stored procedure
1019 names:
1020
1021   $rs = $schema->resultset('Foo')->search(
1022     undef,
1023     {
1024       select => [
1025         'column_name',
1026         { count => 'column_to_count' },
1027         { sum => 'column_to_sum' }
1028       ]
1029     }
1030   );
1031
1032 When you use function/stored procedure names and do not supply an C<as>
1033 attribute, the column names returned are storage-dependent. E.g. MySQL would
1034 return a column named C<count(column_to_count)> in the above example.
1035
1036 =head2 as
1037
1038 =head3 Arguments: (arrayref)
1039
1040 Indicates column names for object inflation. This is used in conjunction with
1041 C<select>, usually when C<select> contains one or more function or stored
1042 procedure names:
1043
1044   $rs = $schema->resultset('Foo')->search(
1045     undef,
1046     {
1047       select => [
1048         'column1',
1049         { count => 'column2' }
1050       ],
1051       as => [qw/ column1 column2_count /]
1052     }
1053   );
1054
1055   my $foo = $rs->first(); # get the first Foo
1056
1057 If the object against which the search is performed already has an accessor
1058 matching a column name specified in C<as>, the value can be retrieved using
1059 the accessor as normal:
1060
1061   my $column1 = $foo->column1();
1062
1063 If on the other hand an accessor does not exist in the object, you need to
1064 use C<get_column> instead:
1065
1066   my $column2_count = $foo->get_column('column2_count');
1067
1068 You can create your own accessors if required - see
1069 L<DBIx::Class::Manual::Cookbook> for details.
1070
1071 =head2 join
1072
1073 Contains a list of relationships that should be joined for this query.  For
1074 example:
1075
1076   # Get CDs by Nine Inch Nails
1077   my $rs = $schema->resultset('CD')->search(
1078     { 'artist.name' => 'Nine Inch Nails' },
1079     { join => 'artist' }
1080   );
1081
1082 Can also contain a hash reference to refer to the other relation's relations.
1083 For example:
1084
1085   package MyApp::Schema::Track;
1086   use base qw/DBIx::Class/;
1087   __PACKAGE__->table('track');
1088   __PACKAGE__->add_columns(qw/trackid cd position title/);
1089   __PACKAGE__->set_primary_key('trackid');
1090   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1091   1;
1092
1093   # In your application
1094   my $rs = $schema->resultset('Artist')->search(
1095     { 'track.title' => 'Teardrop' },
1096     {
1097       join     => { cd => 'track' },
1098       order_by => 'artist.name',
1099     }
1100   );
1101
1102 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1103 similarly for a third time). For e.g.
1104
1105   my $rs = $schema->resultset('Artist')->search(
1106     { 'cds.title'   => 'Foo',
1107       'cds_2.title' => 'Bar' },
1108     { join => [ qw/cds cds/ ] });
1109
1110 will return a set of all artists that have both a cd with title Foo and a cd
1111 with title Bar.
1112
1113 If you want to fetch related objects from other tables as well, see C<prefetch>
1114 below.
1115
1116 =head2 prefetch
1117
1118 =head3 Arguments: arrayref/hashref
1119
1120 Contains one or more relationships that should be fetched along with the main 
1121 query (when they are accessed afterwards they will have already been
1122 "prefetched").  This is useful for when you know you will need the related
1123 objects, because it saves at least one query:
1124
1125   my $rs = $schema->resultset('Tag')->search(
1126     undef,
1127     {
1128       prefetch => {
1129         cd => 'artist'
1130       }
1131     }
1132   );
1133
1134 The initial search results in SQL like the following:
1135
1136   SELECT tag.*, cd.*, artist.* FROM tag
1137   JOIN cd ON tag.cd = cd.cdid
1138   JOIN artist ON cd.artist = artist.artistid
1139
1140 L<DBIx::Class> has no need to go back to the database when we access the
1141 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1142 case.
1143
1144 Simple prefetches will be joined automatically, so there is no need
1145 for a C<join> attribute in the above search. If you're prefetching to
1146 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1147 specify the join as well.
1148
1149 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1150 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1151 with an accessor type of 'single' or 'filter').
1152
1153 =head2 from
1154
1155 =head3 Arguments: (arrayref)
1156
1157 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1158 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1159 clauses.
1160
1161 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1162 C<join> will usually do what you need and it is strongly recommended that you
1163 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1164
1165 In simple terms, C<from> works as follows:
1166
1167     [
1168         { <alias> => <table>, -join-type => 'inner|left|right' }
1169         [] # nested JOIN (optional)
1170         { <table.column> = <foreign_table.foreign_key> }
1171     ]
1172
1173     JOIN
1174         <alias> <table>
1175         [JOIN ...]
1176     ON <table.column> = <foreign_table.foreign_key>
1177
1178 An easy way to follow the examples below is to remember the following:
1179
1180     Anything inside "[]" is a JOIN
1181     Anything inside "{}" is a condition for the enclosing JOIN
1182
1183 The following examples utilize a "person" table in a family tree application.
1184 In order to express parent->child relationships, this table is self-joined:
1185
1186     # Person->belongs_to('father' => 'Person');
1187     # Person->belongs_to('mother' => 'Person');
1188
1189 C<from> can be used to nest joins. Here we return all children with a father,
1190 then search against all mothers of those children:
1191
1192   $rs = $schema->resultset('Person')->search(
1193       undef,
1194       {
1195           alias => 'mother', # alias columns in accordance with "from"
1196           from => [
1197               { mother => 'person' },
1198               [
1199                   [
1200                       { child => 'person' },
1201                       [
1202                           { father => 'person' },
1203                           { 'father.person_id' => 'child.father_id' }
1204                       ]
1205                   ],
1206                   { 'mother.person_id' => 'child.mother_id' }
1207               ],
1208           ]
1209       },
1210   );
1211
1212   # Equivalent SQL:
1213   # SELECT mother.* FROM person mother
1214   # JOIN (
1215   #   person child
1216   #   JOIN person father
1217   #   ON ( father.person_id = child.father_id )
1218   # )
1219   # ON ( mother.person_id = child.mother_id )
1220
1221 The type of any join can be controlled manually. To search against only people
1222 with a father in the person table, we could explicitly use C<INNER JOIN>:
1223
1224     $rs = $schema->resultset('Person')->search(
1225         undef,
1226         {
1227             alias => 'child', # alias columns in accordance with "from"
1228             from => [
1229                 { child => 'person' },
1230                 [
1231                     { father => 'person', -join-type => 'inner' },
1232                     { 'father.id' => 'child.father_id' }
1233                 ],
1234             ]
1235         },
1236     );
1237
1238     # Equivalent SQL:
1239     # SELECT child.* FROM person child
1240     # INNER JOIN person father ON child.father_id = father.id
1241
1242 =head2 page
1243
1244 For a paged resultset, specifies which page to retrieve.  Leave unset
1245 for an unpaged resultset.
1246
1247 =head2 rows
1248
1249 For a paged resultset, how many rows per page:
1250
1251   rows => 10
1252
1253 Can also be used to simulate an SQL C<LIMIT>.
1254
1255 =head2 group_by
1256
1257 =head3 Arguments: (arrayref)
1258
1259 A arrayref of columns to group by. Can include columns of joined tables.
1260
1261   group_by => [qw/ column1 column2 ... /]
1262
1263 =head2 distinct
1264
1265 Set to 1 to group by all columns.
1266
1267 For more examples of using these attributes, see
1268 L<DBIx::Class::Manual::Cookbook>.
1269
1270 =cut
1271
1272 1;