add test for last fix
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   $attrs->{order_by} = [ $attrs->{order_by} ]
106     if $attrs->{order_by} && !ref($attrs->{order_by});
107   $attrs->{order_by} ||= [];
108
109   my $collapse = {};
110
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY'
114               ? (@{$prefetch}) : ($prefetch)) {
115       if( ref $p eq 'HASH' ) {
116         foreach my $key (keys %$p) {
117           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
118             unless $seen{$key};
119         }
120       }
121       else {
122         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
123             unless $seen{$p};
124       }
125       my @prefetch = $source->resolve_prefetch(
126            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
127       #die Dumper \@cols;
128       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
129       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
130     }
131     push(@{$attrs->{order_by}}, @pre_order);
132   }
133
134   if ($attrs->{page}) {
135     $attrs->{rows} ||= 10;
136     $attrs->{offset} ||= 0;
137     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
138   }
139
140 #if (keys %{$collapse}) {
141 #  use Data::Dumper; warn Dumper($collapse);
142 #}
143
144   my $new = {
145     result_source => $source,
146     result_class => $attrs->{result_class} || $source->result_class,
147     cond => $attrs->{where},
148     from => $attrs->{from},
149     collapse => $collapse,
150     count => undef,
151     page => delete $attrs->{page},
152     pager => undef,
153     attrs => $attrs };
154   bless ($new, $class);
155   return $new;
156 }
157
158 =head2 search
159
160   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
161   my $new_rs = $rs->search({ foo => 3 });
162
163 If you need to pass in additional attributes but no additional condition,
164 call it as C<search({}, \%attrs);>.
165
166   # "SELECT foo, bar FROM $class_table"
167   my @all = $class->search({}, { cols => [qw/foo bar/] });
168
169 =cut
170
171 sub search {
172   my $self = shift;
173
174   my $rs;
175   if( @_ ) {
176     
177     my $attrs = { %{$self->{attrs}} };
178     my $having = delete $attrs->{having};
179     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
180      $attrs = { %$attrs, %{ pop(@_) } };
181     }
182
183     my $where = (@_
184                   ? ((@_ == 1 || ref $_[0] eq "HASH")
185                       ? shift
186                       : ((@_ % 2)
187                           ? $self->throw_exception(
188                               "Odd number of arguments to search")
189                           : {@_}))
190                   : undef());
191     if (defined $where) {
192       $where = (defined $attrs->{where}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $where, $attrs->{where} ] }
196                 : $where);
197       $attrs->{where} = $where;
198     }
199
200     if (defined $having) {
201       $having = (defined $attrs->{having}
202                 ? { '-and' =>
203                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
204                         $having, $attrs->{having} ] }
205                 : $having);
206       $attrs->{having} = $having;
207     }
208
209     $rs = (ref $self)->new($self->result_source, $attrs);
210   }
211   else {
212     $rs = $self;
213     $rs->reset();
214   }
215   return (wantarray ? $rs->all : $rs);
216 }
217
218 =head2 search_literal
219
220   my @obj    = $rs->search_literal($literal_where_cond, @bind);
221   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
222
223 Pass a literal chunk of SQL to be added to the conditional part of the
224 resultset.
225
226 =cut
227
228 sub search_literal {
229   my ($self, $cond, @vals) = @_;
230   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
231   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
232   return $self->search(\$cond, $attrs);
233 }
234
235 =head2 find
236
237 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
238
239 Finds a row based on its primary key or unique constraint. For example:
240
241   my $cd = $schema->resultset('CD')->find(5);
242
243 Also takes an optional C<key> attribute, to search by a specific key or unique
244 constraint. For example:
245
246   my $cd = $schema->resultset('CD')->find(
247     {
248       artist => 'Massive Attack',
249       title  => 'Mezzanine',
250     },
251     { key => 'artist_title' }
252   );
253
254 See also L</find_or_create> and L</update_or_create>.
255
256 =cut
257
258 sub find {
259   my ($self, @vals) = @_;
260   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
261
262   my @cols = $self->result_source->primary_columns;
263   if (exists $attrs->{key}) {
264     my %uniq = $self->result_source->unique_constraints;
265     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
266       unless exists $uniq{$attrs->{key}};
267     @cols = @{ $uniq{$attrs->{key}} };
268   }
269   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
270   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
271     unless @cols;
272
273   my $query;
274   if (ref $vals[0] eq 'HASH') {
275     $query = { %{$vals[0]} };
276   } elsif (@cols == @vals) {
277     $query = {};
278     @{$query}{@cols} = @vals;
279   } else {
280     $query = {@vals};
281   }
282   foreach (keys %$query) {
283     next if m/\./;
284     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
285   }
286   #warn Dumper($query);
287   
288   if (keys %$attrs) {
289       my $rs = $self->search($query,$attrs);
290       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
291   } else {
292       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
293   }
294 }
295
296 =head2 search_related
297
298   $rs->search_related('relname', $cond?, $attrs?);
299
300 Search the specified relationship. Optionally specify a condition for matching
301 records.
302
303 =cut
304
305 sub search_related {
306   return shift->related_resultset(shift)->search(@_);
307 }
308
309 =head2 cursor
310
311 Returns a storage-driven cursor to the given resultset.
312
313 =cut
314
315 sub cursor {
316   my ($self) = @_;
317   my ($attrs) = $self->{attrs};
318   $attrs = { %$attrs };
319   return $self->{cursor}
320     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
321           $attrs->{where},$attrs);
322 }
323
324 =head2 single
325
326 Inflates the first result without creating a cursor
327
328 =cut
329
330 sub single {
331   my ($self, $extra) = @_;
332   my ($attrs) = $self->{attrs};
333   $attrs = { %$attrs };
334   if ($extra) {
335     if (defined $attrs->{where}) {
336       $attrs->{where} = {
337         '-and'
338           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
339                delete $attrs->{where}, $extra ]
340       };
341     } else {
342       $attrs->{where} = $extra;
343     }
344   }
345   my @data = $self->result_source->storage->select_single(
346           $self->{from}, $attrs->{select},
347           $attrs->{where},$attrs);
348   return (@data ? $self->_construct_object(@data) : ());
349 }
350
351
352 =head2 search_like
353
354 Perform a search, but use C<LIKE> instead of equality as the condition. Note
355 that this is simply a convenience method; you most likely want to use
356 L</search> with specific operators.
357
358 For more information, see L<DBIx::Class::Manual::Cookbook>.
359
360 =cut
361
362 sub search_like {
363   my $class    = shift;
364   my $attrs = { };
365   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
366     $attrs = pop(@_);
367   }
368   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
369   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
370   return $class->search($query, { %$attrs });
371 }
372
373 =head2 slice
374
375 =head3 Arguments: ($first, $last)
376
377 Returns a subset of elements from the resultset.
378
379 =cut
380
381 sub slice {
382   my ($self, $min, $max) = @_;
383   my $attrs = { %{ $self->{attrs} || {} } };
384   $attrs->{offset} ||= 0;
385   $attrs->{offset} += $min;
386   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
387   my $slice = (ref $self)->new($self->result_source, $attrs);
388   return (wantarray ? $slice->all : $slice);
389 }
390
391 =head2 next
392
393 Returns the next element in the resultset (C<undef> is there is none).
394
395 Can be used to efficiently iterate over records in the resultset:
396
397   my $rs = $schema->resultset('CD')->search({});
398   while (my $cd = $rs->next) {
399     print $cd->title;
400   }
401
402 =cut
403
404 sub next {
405   my ($self) = @_;
406   my $cache;
407   if( @{$cache = $self->{all_cache} || []}) {
408     $self->{all_cache_position} ||= 0;
409     my $obj = $cache->[$self->{all_cache_position}];
410     $self->{all_cache_position}++;
411     return $obj;
412   }
413   if ($self->{attrs}{cache}) {
414     $self->{all_cache_position} = 1;
415     return ($self->all)[0];
416   }
417   my @row = (exists $self->{stashed_row}
418                ? @{delete $self->{stashed_row}}
419                : $self->cursor->next);
420 #  warn Dumper(\@row); use Data::Dumper;
421   return unless (@row);
422   return $self->_construct_object(@row);
423 }
424
425 sub _construct_object {
426   my ($self, @row) = @_;
427   my @as = @{ $self->{attrs}{as} };
428
429   my $info = $self->_collapse_result(\@as, \@row);
430
431   my $new = $self->result_class->inflate_result($self->result_source, @$info);
432
433   $new = $self->{attrs}{record_filter}->($new)
434     if exists $self->{attrs}{record_filter};
435  
436   return $new;
437 }
438
439 sub _collapse_result {
440   my ($self, $as, $row, $prefix) = @_;
441
442   my %const;
443
444   my @copy = @$row;
445   foreach my $this_as (@$as) {
446     my $val = shift @copy;
447     if (defined $prefix) {
448       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
449         my $remain = $1;
450         $remain =~ /^(?:(.*)\.)?([^\.]+)$/;
451         $const{$1||''}{$2} = $val;
452       }
453     } else {
454       $this_as =~ /^(?:(.*)\.)?([^\.]+)$/;
455       $const{$1||''}{$2} = $val;
456     }
457   }
458
459   my $info = [ {}, {} ];
460   foreach my $key (keys %const) {
461     if (length $key) {
462       my $target = $info;
463       my @parts = split(/\./, $key);
464       foreach my $p (@parts) {
465         $target = $target->[1]->{$p} ||= [];
466       }
467       $target->[0] = $const{$key};
468     } else {
469       $info->[0] = $const{$key};
470     }
471   }
472
473   my @collapse = (defined($prefix)
474                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
475                        keys %{$self->{collapse}})
476                    : keys %{$self->{collapse}});
477   if (@collapse) {
478     my ($c) = sort { length $a <=> length $b } @collapse;
479     my $target = $info;
480     foreach my $p (split(/\./, $c)) {
481       $target = $target->[1]->{$p} ||= [];
482     }
483     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
484     my @co_key = @{$self->{collapse}{$c_prefix}};
485     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
486     my $tree = $self->_collapse_result($as, $row, $c_prefix);
487     my (@final, @raw);
488     while ( !(grep {
489                 !defined($tree->[0]->{$_})
490                 || $co_check{$_} ne $tree->[0]->{$_}
491               } @co_key) ) {
492       push(@final, $tree);
493       last unless (@raw = $self->cursor->next);
494       $row = $self->{stashed_row} = \@raw;
495       $tree = $self->_collapse_result($as, $row, $c_prefix);
496       #warn Data::Dumper::Dumper($tree, $row);
497     }
498     @{$target} = @final;
499   }
500
501   return $info;
502 }
503
504 =head2 result_source
505
506 Returns a reference to the result source for this recordset.
507
508 =cut
509
510
511 =head2 count
512
513 Performs an SQL C<COUNT> with the same query as the resultset was built
514 with to find the number of elements. If passed arguments, does a search
515 on the resultset and counts the results of that.
516
517 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
518 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
519 not support C<DISTINCT> with multiple columns. If you are using such a
520 database, you should only use columns from the main table in your C<group_by>
521 clause.
522
523 =cut
524
525 sub count {
526   my $self = shift;
527   return $self->search(@_)->count if @_ && defined $_[0];
528   unless (defined $self->{count}) {
529     return scalar @{ $self->get_cache }
530       if @{ $self->get_cache };
531     my $group_by;
532     my $select = { 'count' => '*' };
533     my $attrs = { %{ $self->{attrs} } };
534     if( $group_by = delete $attrs->{group_by} ) {
535       delete $attrs->{having};
536       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
537       # todo: try CONCAT for multi-column pk
538       my @pk = $self->result_source->primary_columns;
539       if( scalar(@pk) == 1 ) {
540         my $pk = shift(@pk);
541         my $alias = $attrs->{alias};
542         my $re = qr/^($alias\.)?$pk$/;
543         foreach my $column ( @distinct) {
544           if( $column =~ $re ) {
545             @distinct = ( $column );
546             last;
547           }
548         } 
549       }
550
551       $select = { count => { 'distinct' => \@distinct } };
552       #use Data::Dumper; die Dumper $select;
553     }
554
555     $attrs->{select} = $select;
556     $attrs->{as} = [ 'count' ];
557     # offset, order by and page are not needed to count. record_filter is cdbi
558     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
559         
560     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
561   }
562   return 0 unless $self->{count};
563   my $count = $self->{count};
564   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
565   $count = $self->{attrs}{rows} if
566     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
567   return $count;
568 }
569
570 =head2 count_literal
571
572 Calls L</search_literal> with the passed arguments, then L</count>.
573
574 =cut
575
576 sub count_literal { shift->search_literal(@_)->count; }
577
578 =head2 all
579
580 Returns all elements in the resultset. Called implictly if the resultset
581 is returned in list context.
582
583 =cut
584
585 sub all {
586   my ($self) = @_;
587   return @{ $self->get_cache }
588     if @{ $self->get_cache };
589
590   my @obj;
591
592   if (keys %{$self->{collapse}}) {
593       # Using $self->cursor->all is really just an optimisation.
594       # If we're collapsing has_many prefetches it probably makes
595       # very little difference, and this is cleaner than hacking
596       # _construct_object to survive the approach
597     my @row;
598     $self->cursor->reset;
599     while (@row = $self->cursor->next) {
600       push(@obj, $self->_construct_object(@row));
601     }
602   } else {
603     @obj = map { $self->_construct_object(@$_); }
604              $self->cursor->all;
605   }
606
607   if( $self->{attrs}->{cache} ) {
608     $self->set_cache( \@obj );
609   }
610
611   return @obj;
612 }
613
614 =head2 reset
615
616 Resets the resultset's cursor, so you can iterate through the elements again.
617
618 =cut
619
620 sub reset {
621   my ($self) = @_;
622   $self->{all_cache_position} = 0;
623   $self->cursor->reset;
624   return $self;
625 }
626
627 =head2 first
628
629 Resets the resultset and returns the first element.
630
631 =cut
632
633 sub first {
634   return $_[0]->reset->next;
635 }
636
637 =head2 update
638
639 =head3 Arguments: (\%values)
640
641 Sets the specified columns in the resultset to the supplied values.
642
643 =cut
644
645 sub update {
646   my ($self, $values) = @_;
647   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
648   return $self->result_source->storage->update(
649            $self->result_source->from, $values, $self->{cond});
650 }
651
652 =head2 update_all
653
654 =head3 Arguments: (\%values)
655
656 Fetches all objects and updates them one at a time.  Note that C<update_all>
657 will run cascade triggers while L</update> will not.
658
659 =cut
660
661 sub update_all {
662   my ($self, $values) = @_;
663   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
664   foreach my $obj ($self->all) {
665     $obj->set_columns($values)->update;
666   }
667   return 1;
668 }
669
670 =head2 delete
671
672 Deletes the contents of the resultset from its result source.
673
674 =cut
675
676 sub delete {
677   my ($self) = @_;
678   my $del = {};
679   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
680     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
681   if (ref $self->{cond} eq 'ARRAY') {
682     $del = [ map { my %hash;
683       foreach my $key (keys %{$_}) {
684         $key =~ /([^\.]+)$/;
685         $hash{$1} = $_->{$key};
686       }; \%hash; } @{$self->{cond}} ];
687   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
688     $del->{-and} = [ map { my %hash;
689       foreach my $key (keys %{$_}) {
690         $key =~ /([^\.]+)$/;
691         $hash{$1} = $_->{$key};
692       }; \%hash; } @{$self->{cond}{-and}} ];
693   } else {
694     foreach my $key (keys %{$self->{cond}}) {
695       $key =~ /([^\.]+)$/;
696       $del->{$1} = $self->{cond}{$key};
697     }
698   }
699   $self->result_source->storage->delete($self->result_source->from, $del);
700   return 1;
701 }
702
703 =head2 delete_all
704
705 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
706 will run cascade triggers while L</delete> will not.
707
708 =cut
709
710 sub delete_all {
711   my ($self) = @_;
712   $_->delete for $self->all;
713   return 1;
714 }
715
716 =head2 pager
717
718 Returns a L<Data::Page> object for the current resultset. Only makes
719 sense for queries with a C<page> attribute.
720
721 =cut
722
723 sub pager {
724   my ($self) = @_;
725   my $attrs = $self->{attrs};
726   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
727   $attrs->{rows} ||= 10;
728   $self->count;
729   return $self->{pager} ||= Data::Page->new(
730     $self->{count}, $attrs->{rows}, $self->{page});
731 }
732
733 =head2 page
734
735 =head3 Arguments: ($page_num)
736
737 Returns a new resultset for the specified page.
738
739 =cut
740
741 sub page {
742   my ($self, $page) = @_;
743   my $attrs = { %{$self->{attrs}} };
744   $attrs->{page} = $page;
745   return (ref $self)->new($self->result_source, $attrs);
746 }
747
748 =head2 new_result
749
750 =head3 Arguments: (\%vals)
751
752 Creates a result in the resultset's result class.
753
754 =cut
755
756 sub new_result {
757   my ($self, $values) = @_;
758   $self->throw_exception( "new_result needs a hash" )
759     unless (ref $values eq 'HASH');
760   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
761     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
762   my %new = %$values;
763   my $alias = $self->{attrs}{alias};
764   foreach my $key (keys %{$self->{cond}||{}}) {
765     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
766   }
767   my $obj = $self->result_class->new(\%new);
768   $obj->result_source($self->result_source) if $obj->can('result_source');
769   $obj;
770 }
771
772 =head2 create
773
774 =head3 Arguments: (\%vals)
775
776 Inserts a record into the resultset and returns the object.
777
778 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
779
780 =cut
781
782 sub create {
783   my ($self, $attrs) = @_;
784   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
785   return $self->new_result($attrs)->insert;
786 }
787
788 =head2 find_or_create
789
790 =head3 Arguments: (\%vals, \%attrs?)
791
792   $class->find_or_create({ key => $val, ... });
793
794 Searches for a record matching the search condition; if it doesn't find one,
795 creates one and returns that instead.
796
797   my $cd = $schema->resultset('CD')->find_or_create({
798     cdid   => 5,
799     artist => 'Massive Attack',
800     title  => 'Mezzanine',
801     year   => 2005,
802   });
803
804 Also takes an optional C<key> attribute, to search by a specific key or unique
805 constraint. For example:
806
807   my $cd = $schema->resultset('CD')->find_or_create(
808     {
809       artist => 'Massive Attack',
810       title  => 'Mezzanine',
811     },
812     { key => 'artist_title' }
813   );
814
815 See also L</find> and L</update_or_create>.
816
817 =cut
818
819 sub find_or_create {
820   my $self     = shift;
821   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
822   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
823   my $exists   = $self->find($hash, $attrs);
824   return defined($exists) ? $exists : $self->create($hash);
825 }
826
827 =head2 update_or_create
828
829   $class->update_or_create({ key => $val, ... });
830
831 First, search for an existing row matching one of the unique constraints
832 (including the primary key) on the source of this resultset.  If a row is
833 found, update it with the other given column values.  Otherwise, create a new
834 row.
835
836 Takes an optional C<key> attribute to search on a specific unique constraint.
837 For example:
838
839   # In your application
840   my $cd = $schema->resultset('CD')->update_or_create(
841     {
842       artist => 'Massive Attack',
843       title  => 'Mezzanine',
844       year   => 1998,
845     },
846     { key => 'artist_title' }
847   );
848
849 If no C<key> is specified, it searches on all unique constraints defined on the
850 source, including the primary key.
851
852 If the C<key> is specified as C<primary>, search only on the primary key.
853
854 See also L</find> and L</find_or_create>.
855
856 =cut
857
858 sub update_or_create {
859   my $self = shift;
860
861   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
862   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
863
864   my %unique_constraints = $self->result_source->unique_constraints;
865   my @constraint_names   = (exists $attrs->{key}
866                             ? ($attrs->{key})
867                             : keys %unique_constraints);
868
869   my @unique_hashes;
870   foreach my $name (@constraint_names) {
871     my @unique_cols = @{ $unique_constraints{$name} };
872     my %unique_hash =
873       map  { $_ => $hash->{$_} }
874       grep { exists $hash->{$_} }
875       @unique_cols;
876
877     push @unique_hashes, \%unique_hash
878       if (scalar keys %unique_hash == scalar @unique_cols);
879   }
880
881   my $row;
882   if (@unique_hashes) {
883     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
884     if ($row) {
885       $row->set_columns($hash);
886       $row->update;
887     }
888   }
889
890   unless ($row) {
891     $row = $self->create($hash);
892   }
893
894   return $row;
895 }
896
897 =head2 get_cache
898
899 Gets the contents of the cache for the resultset.
900
901 =cut
902
903 sub get_cache {
904   my $self = shift;
905   return $self->{all_cache} || [];
906 }
907
908 =head2 set_cache
909
910 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
911
912 =cut
913
914 sub set_cache {
915   my ( $self, $data ) = @_;
916   $self->throw_exception("set_cache requires an arrayref")
917     if ref $data ne 'ARRAY';
918   my $result_class = $self->result_class;
919   foreach( @$data ) {
920     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
921       if ref $_ ne $result_class;
922   }
923   $self->{all_cache} = $data;
924 }
925
926 =head2 clear_cache
927
928 Clears the cache for the resultset.
929
930 =cut
931
932 sub clear_cache {
933   my $self = shift;
934   $self->set_cache([]);
935 }
936
937 =head2 related_resultset
938
939 Returns a related resultset for the supplied relationship name.
940
941   $rs = $rs->related_resultset('foo');
942
943 =cut
944
945 sub related_resultset {
946   my ( $self, $rel, @rest ) = @_;
947   $self->{related_resultsets} ||= {};
948   my $resultsets = $self->{related_resultsets};
949   if( !exists $resultsets->{$rel} ) {
950     #warn "fetching related resultset for rel '$rel'";
951     my $rel_obj = $self->result_source->relationship_info($rel);
952     $self->throw_exception(
953       "search_related: result source '" . $self->result_source->name .
954       "' has no such relationship ${rel}")
955       unless $rel_obj; #die Dumper $self->{attrs};
956     my $rs = $self->search(undef, { join => $rel });
957     #if( $self->{attrs}->{cache} ) {
958     #  $rs = $self->search(undef);
959     #}
960     #else {
961     #}
962     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
963     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
964     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
965                   && $rs->{attrs}{seen_join}{$rel} > 1
966                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
967                 : $rel);
968     $resultsets->{$rel} =
969       $self->result_source->schema->resultset($rel_obj->{class}
970            )->search( undef,
971              { %{$rs->{attrs}},
972                alias => $alias,
973                select => undef(),
974                as => undef() }
975            )->search(@rest);
976   }
977   return $resultsets->{$rel};
978 }
979
980 =head2 throw_exception
981
982 See Schema's throw_exception
983
984 =cut
985
986 sub throw_exception {
987   my $self=shift;
988   $self->result_source->schema->throw_exception(@_);
989 }
990
991 =head1 ATTRIBUTES
992
993 The resultset takes various attributes that modify its behavior. Here's an
994 overview of them:
995
996 =head2 order_by
997
998 Which column(s) to order the results by. This is currently passed through
999 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
1000
1001 =head2 cols
1002
1003 =head3 Arguments: (arrayref)
1004
1005 Shortcut to request a particular set of columns to be retrieved.  Adds
1006 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1007 from that, then auto-populates C<as> from C<select> as normal.
1008
1009 =head2 include_columns
1010
1011 =head3 Arguments: (arrayref)
1012
1013 Shortcut to include additional columns in the returned results - for example
1014
1015   { include_columns => ['foo.name'], join => ['foo'] }
1016
1017 would add a 'name' column to the information passed to object inflation
1018
1019 =head2 select
1020
1021 =head3 Arguments: (arrayref)
1022
1023 Indicates which columns should be selected from the storage. You can use
1024 column names, or in the case of RDBMS back ends, function or stored procedure
1025 names:
1026
1027   $rs = $schema->resultset('Foo')->search(
1028     {},
1029     {
1030       select => [
1031         'column_name',
1032         { count => 'column_to_count' },
1033         { sum => 'column_to_sum' }
1034       ]
1035     }
1036   );
1037
1038 When you use function/stored procedure names and do not supply an C<as>
1039 attribute, the column names returned are storage-dependent. E.g. MySQL would
1040 return a column named C<count(column_to_count)> in the above example.
1041
1042 =head2 as
1043
1044 =head3 Arguments: (arrayref)
1045
1046 Indicates column names for object inflation. This is used in conjunction with
1047 C<select>, usually when C<select> contains one or more function or stored
1048 procedure names:
1049
1050   $rs = $schema->resultset('Foo')->search(
1051     {},
1052     {
1053       select => [
1054         'column1',
1055         { count => 'column2' }
1056       ],
1057       as => [qw/ column1 column2_count /]
1058     }
1059   );
1060
1061   my $foo = $rs->first(); # get the first Foo
1062
1063 If the object against which the search is performed already has an accessor
1064 matching a column name specified in C<as>, the value can be retrieved using
1065 the accessor as normal:
1066
1067   my $column1 = $foo->column1();
1068
1069 If on the other hand an accessor does not exist in the object, you need to
1070 use C<get_column> instead:
1071
1072   my $column2_count = $foo->get_column('column2_count');
1073
1074 You can create your own accessors if required - see
1075 L<DBIx::Class::Manual::Cookbook> for details.
1076
1077 =head2 join
1078
1079 Contains a list of relationships that should be joined for this query.  For
1080 example:
1081
1082   # Get CDs by Nine Inch Nails
1083   my $rs = $schema->resultset('CD')->search(
1084     { 'artist.name' => 'Nine Inch Nails' },
1085     { join => 'artist' }
1086   );
1087
1088 Can also contain a hash reference to refer to the other relation's relations.
1089 For example:
1090
1091   package MyApp::Schema::Track;
1092   use base qw/DBIx::Class/;
1093   __PACKAGE__->table('track');
1094   __PACKAGE__->add_columns(qw/trackid cd position title/);
1095   __PACKAGE__->set_primary_key('trackid');
1096   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1097   1;
1098
1099   # In your application
1100   my $rs = $schema->resultset('Artist')->search(
1101     { 'track.title' => 'Teardrop' },
1102     {
1103       join     => { cd => 'track' },
1104       order_by => 'artist.name',
1105     }
1106   );
1107
1108 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1109 similarly for a third time). For e.g.
1110
1111   my $rs = $schema->resultset('Artist')->search(
1112     { 'cds.title'   => 'Foo',
1113       'cds_2.title' => 'Bar' },
1114     { join => [ qw/cds cds/ ] });
1115
1116 will return a set of all artists that have both a cd with title Foo and a cd
1117 with title Bar.
1118
1119 If you want to fetch related objects from other tables as well, see C<prefetch>
1120 below.
1121
1122 =head2 prefetch
1123
1124 =head3 Arguments: arrayref/hashref
1125
1126 Contains one or more relationships that should be fetched along with the main 
1127 query (when they are accessed afterwards they will have already been
1128 "prefetched").  This is useful for when you know you will need the related
1129 objects, because it saves at least one query:
1130
1131   my $rs = $schema->resultset('Tag')->search(
1132     {},
1133     {
1134       prefetch => {
1135         cd => 'artist'
1136       }
1137     }
1138   );
1139
1140 The initial search results in SQL like the following:
1141
1142   SELECT tag.*, cd.*, artist.* FROM tag
1143   JOIN cd ON tag.cd = cd.cdid
1144   JOIN artist ON cd.artist = artist.artistid
1145
1146 L<DBIx::Class> has no need to go back to the database when we access the
1147 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1148 case.
1149
1150 Simple prefetches will be joined automatically, so there is no need
1151 for a C<join> attribute in the above search. If you're prefetching to
1152 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1153 specify the join as well.
1154
1155 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1156 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1157 with an accessor type of 'single' or 'filter').
1158
1159 =head2 from
1160
1161 =head3 Arguments: (arrayref)
1162
1163 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1164 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1165 clauses.
1166
1167 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1168 C<join> will usually do what you need and it is strongly recommended that you
1169 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1170
1171 In simple terms, C<from> works as follows:
1172
1173     [
1174         { <alias> => <table>, -join-type => 'inner|left|right' }
1175         [] # nested JOIN (optional)
1176         { <table.column> = <foreign_table.foreign_key> }
1177     ]
1178
1179     JOIN
1180         <alias> <table>
1181         [JOIN ...]
1182     ON <table.column> = <foreign_table.foreign_key>
1183
1184 An easy way to follow the examples below is to remember the following:
1185
1186     Anything inside "[]" is a JOIN
1187     Anything inside "{}" is a condition for the enclosing JOIN
1188
1189 The following examples utilize a "person" table in a family tree application.
1190 In order to express parent->child relationships, this table is self-joined:
1191
1192     # Person->belongs_to('father' => 'Person');
1193     # Person->belongs_to('mother' => 'Person');
1194
1195 C<from> can be used to nest joins. Here we return all children with a father,
1196 then search against all mothers of those children:
1197
1198   $rs = $schema->resultset('Person')->search(
1199       {},
1200       {
1201           alias => 'mother', # alias columns in accordance with "from"
1202           from => [
1203               { mother => 'person' },
1204               [
1205                   [
1206                       { child => 'person' },
1207                       [
1208                           { father => 'person' },
1209                           { 'father.person_id' => 'child.father_id' }
1210                       ]
1211                   ],
1212                   { 'mother.person_id' => 'child.mother_id' }
1213               ],
1214           ]
1215       },
1216   );
1217
1218   # Equivalent SQL:
1219   # SELECT mother.* FROM person mother
1220   # JOIN (
1221   #   person child
1222   #   JOIN person father
1223   #   ON ( father.person_id = child.father_id )
1224   # )
1225   # ON ( mother.person_id = child.mother_id )
1226
1227 The type of any join can be controlled manually. To search against only people
1228 with a father in the person table, we could explicitly use C<INNER JOIN>:
1229
1230     $rs = $schema->resultset('Person')->search(
1231         {},
1232         {
1233             alias => 'child', # alias columns in accordance with "from"
1234             from => [
1235                 { child => 'person' },
1236                 [
1237                     { father => 'person', -join-type => 'inner' },
1238                     { 'father.id' => 'child.father_id' }
1239                 ],
1240             ]
1241         },
1242     );
1243
1244     # Equivalent SQL:
1245     # SELECT child.* FROM person child
1246     # INNER JOIN person father ON child.father_id = father.id
1247
1248 =head2 page
1249
1250 For a paged resultset, specifies which page to retrieve.  Leave unset
1251 for an unpaged resultset.
1252
1253 =head2 rows
1254
1255 For a paged resultset, how many rows per page:
1256
1257   rows => 10
1258
1259 Can also be used to simulate an SQL C<LIMIT>.
1260
1261 =head2 group_by
1262
1263 =head3 Arguments: (arrayref)
1264
1265 A arrayref of columns to group by. Can include columns of joined tables.
1266
1267   group_by => [qw/ column1 column2 ... /]
1268
1269 =head2 distinct
1270
1271 Set to 1 to group by all columns.
1272
1273 For more examples of using these attributes, see
1274 L<DBIx::Class::Manual::Cookbook>.
1275
1276 =cut
1277
1278 1;