Nuked a couple of pointless warns and a Dumper load
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   my ($source, $attrs) = @_;
73   #use Data::Dumper; warn Dumper($attrs);
74   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
75   my %seen;
76   my $alias = ($attrs->{alias} ||= 'me');
77   if ($attrs->{cols} || !$attrs->{select}) {
78     delete $attrs->{as} if $attrs->{cols};
79     my @cols = ($attrs->{cols}
80                  ? @{delete $attrs->{cols}}
81                  : $source->columns);
82     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
83   }
84   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
85   if (my $include = delete $attrs->{include_columns}) {
86     push(@{$attrs->{select}}, @$include);
87     push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
88   }
89   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   if (my $join = delete $attrs->{join}) {
93     foreach my $j (ref $join eq 'ARRAY'
94               ? (@{$join}) : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
104
105   $attrs->{order_by} = [ $attrs->{order_by} ]
106     if $attrs->{order_by} && !ref($attrs->{order_by});
107   $attrs->{order_by} ||= [];
108
109   my $collapse = {};
110
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY'
114               ? (@{$prefetch}) : ($prefetch)) {
115       if( ref $p eq 'HASH' ) {
116         foreach my $key (keys %$p) {
117           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
118             unless $seen{$key};
119         }
120       }
121       else {
122         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
123             unless $seen{$p};
124       }
125       my @prefetch = $source->resolve_prefetch(
126            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
127       #die Dumper \@cols;
128       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
129       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
130     }
131     push(@{$attrs->{order_by}}, @pre_order);
132   }
133
134   if ($attrs->{page}) {
135     $attrs->{rows} ||= 10;
136     $attrs->{offset} ||= 0;
137     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
138   }
139
140 #if (keys %{$collapse}) {
141 #  use Data::Dumper; warn Dumper($collapse);
142 #}
143
144   my $new = {
145     result_source => $source,
146     result_class => $attrs->{result_class} || $source->result_class,
147     cond => $attrs->{where},
148     from => $attrs->{from},
149     collapse => $collapse,
150     count => undef,
151     page => delete $attrs->{page},
152     pager => undef,
153     attrs => $attrs };
154   bless ($new, $class);
155   return $new;
156 }
157
158 =head2 search
159
160   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
161   my $new_rs = $rs->search({ foo => 3 });
162
163 If you need to pass in additional attributes but no additional condition,
164 call it as C<search({}, \%attrs);>.
165
166   # "SELECT foo, bar FROM $class_table"
167   my @all = $class->search({}, { cols => [qw/foo bar/] });
168
169 =cut
170
171 sub search {
172   my $self = shift;
173
174   my $rs;
175   if( @_ ) {
176     
177     my $attrs = { %{$self->{attrs}} };
178     my $having = delete $attrs->{having};
179     if (@_ > 1 && ref $_[$#_] eq 'HASH') {
180      $attrs = { %$attrs, %{ pop(@_) } };
181     }
182
183     my $where = (@_
184                   ? ((@_ == 1 || ref $_[0] eq "HASH")
185                       ? shift
186                       : ((@_ % 2)
187                           ? $self->throw_exception(
188                               "Odd number of arguments to search")
189                           : {@_}))
190                   : undef());
191     if (defined $where) {
192       $where = (defined $attrs->{where}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $where, $attrs->{where} ] }
196                 : $where);
197       $attrs->{where} = $where;
198     }
199
200     if (defined $having) {
201       $having = (defined $attrs->{having}
202                 ? { '-and' =>
203                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
204                         $having, $attrs->{having} ] }
205                 : $having);
206       $attrs->{having} = $having;
207     }
208
209     $rs = (ref $self)->new($self->result_source, $attrs);
210   }
211   else {
212     $rs = $self;
213     $rs->reset();
214   }
215   return (wantarray ? $rs->all : $rs);
216 }
217
218 =head2 search_literal
219
220   my @obj    = $rs->search_literal($literal_where_cond, @bind);
221   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
222
223 Pass a literal chunk of SQL to be added to the conditional part of the
224 resultset.
225
226 =cut
227
228 sub search_literal {
229   my ($self, $cond, @vals) = @_;
230   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
231   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
232   return $self->search(\$cond, $attrs);
233 }
234
235 =head2 find
236
237 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
238
239 Finds a row based on its primary key or unique constraint. For example:
240
241   my $cd = $schema->resultset('CD')->find(5);
242
243 Also takes an optional C<key> attribute, to search by a specific key or unique
244 constraint. For example:
245
246   my $cd = $schema->resultset('CD')->find(
247     {
248       artist => 'Massive Attack',
249       title  => 'Mezzanine',
250     },
251     { key => 'artist_title' }
252   );
253
254 See also L</find_or_create> and L</update_or_create>.
255
256 =cut
257
258 sub find {
259   my ($self, @vals) = @_;
260   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
261
262   my @cols = $self->result_source->primary_columns;
263   if (exists $attrs->{key}) {
264     my %uniq = $self->result_source->unique_constraints;
265     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
266       unless exists $uniq{$attrs->{key}};
267     @cols = @{ $uniq{$attrs->{key}} };
268   }
269   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
270   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
271     unless @cols;
272
273   my $query;
274   if (ref $vals[0] eq 'HASH') {
275     $query = { %{$vals[0]} };
276   } elsif (@cols == @vals) {
277     $query = {};
278     @{$query}{@cols} = @vals;
279   } else {
280     $query = {@vals};
281   }
282   foreach (keys %$query) {
283     next if m/\./;
284     $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
285   }
286   #warn Dumper($query);
287   return (keys %$attrs
288            ? $self->search($query,$attrs)->single
289            : $self->single($query));
290 }
291
292 =head2 search_related
293
294   $rs->search_related('relname', $cond?, $attrs?);
295
296 Search the specified relationship. Optionally specify a condition for matching
297 records.
298
299 =cut
300
301 sub search_related {
302   return shift->related_resultset(shift)->search(@_);
303 }
304
305 =head2 cursor
306
307 Returns a storage-driven cursor to the given resultset.
308
309 =cut
310
311 sub cursor {
312   my ($self) = @_;
313   my ($attrs) = $self->{attrs};
314   $attrs = { %$attrs };
315   return $self->{cursor}
316     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
317           $attrs->{where},$attrs);
318 }
319
320 =head2 single
321
322 Inflates the first result without creating a cursor
323
324 =cut
325
326 sub single {
327   my ($self, $extra) = @_;
328   my ($attrs) = $self->{attrs};
329   $attrs = { %$attrs };
330   if ($extra) {
331     if (defined $attrs->{where}) {
332       $attrs->{where} = {
333         '-and'
334           => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
335                delete $attrs->{where}, $extra ]
336       };
337     } else {
338       $attrs->{where} = $extra;
339     }
340   }
341   my @data = $self->result_source->storage->select_single(
342           $self->{from}, $attrs->{select},
343           $attrs->{where},$attrs);
344   return (@data ? $self->_construct_object(@data) : ());
345 }
346
347
348 =head2 search_like
349
350 Perform a search, but use C<LIKE> instead of equality as the condition. Note
351 that this is simply a convenience method; you most likely want to use
352 L</search> with specific operators.
353
354 For more information, see L<DBIx::Class::Manual::Cookbook>.
355
356 =cut
357
358 sub search_like {
359   my $class    = shift;
360   my $attrs = { };
361   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
362     $attrs = pop(@_);
363   }
364   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
365   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
366   return $class->search($query, { %$attrs });
367 }
368
369 =head2 slice
370
371 =head3 Arguments: ($first, $last)
372
373 Returns a subset of elements from the resultset.
374
375 =cut
376
377 sub slice {
378   my ($self, $min, $max) = @_;
379   my $attrs = { %{ $self->{attrs} || {} } };
380   $attrs->{offset} ||= 0;
381   $attrs->{offset} += $min;
382   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
383   my $slice = (ref $self)->new($self->result_source, $attrs);
384   return (wantarray ? $slice->all : $slice);
385 }
386
387 =head2 next
388
389 Returns the next element in the resultset (C<undef> is there is none).
390
391 Can be used to efficiently iterate over records in the resultset:
392
393   my $rs = $schema->resultset('CD')->search({});
394   while (my $cd = $rs->next) {
395     print $cd->title;
396   }
397
398 =cut
399
400 sub next {
401   my ($self) = @_;
402   my $cache;
403   if( @{$cache = $self->{all_cache} || []}) {
404     $self->{all_cache_position} ||= 0;
405     my $obj = $cache->[$self->{all_cache_position}];
406     $self->{all_cache_position}++;
407     return $obj;
408   }
409   if ($self->{attrs}{cache}) {
410     $self->{all_cache_position} = 1;
411     return ($self->all)[0];
412   }
413   my @row = (exists $self->{stashed_row}
414                ? @{delete $self->{stashed_row}}
415                : $self->cursor->next);
416 #  warn Dumper(\@row); use Data::Dumper;
417   return unless (@row);
418   return $self->_construct_object(@row);
419 }
420
421 sub _construct_object {
422   my ($self, @row) = @_;
423   my @as = @{ $self->{attrs}{as} };
424
425   my $info = $self->_collapse_result(\@as, \@row);
426
427   my $new = $self->result_class->inflate_result($self->result_source, @$info);
428
429   $new = $self->{attrs}{record_filter}->($new)
430     if exists $self->{attrs}{record_filter};
431  
432   return $new;
433 }
434
435 sub _collapse_result {
436   my ($self, $as, $row, $prefix) = @_;
437
438   my %const;
439
440   my @copy = @$row;
441   foreach my $this_as (@$as) {
442     my $val = shift @copy;
443     if (defined $prefix) {
444       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
445         my $remain = $1;
446         $remain =~ /^(?:(.*)\.)?([^\.]+)$/;
447         $const{$1||''}{$2} = $val;
448       }
449     } else {
450       $this_as =~ /^(?:(.*)\.)?([^\.]+)$/;
451       $const{$1||''}{$2} = $val;
452     }
453   }
454
455   my $info = [ {}, {} ];
456   foreach my $key (keys %const) {
457     if (length $key) {
458       my $target = $info;
459       my @parts = split(/\./, $key);
460       foreach my $p (@parts) {
461         $target = $target->[1]->{$p} ||= [];
462       }
463       $target->[0] = $const{$key};
464     } else {
465       $info->[0] = $const{$key};
466     }
467   }
468
469   my @collapse = (defined($prefix)
470                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
471                        keys %{$self->{collapse}})
472                    : keys %{$self->{collapse}});
473   if (@collapse) {
474     my ($c) = sort { length $a <=> length $b } @collapse;
475     my $target = $info;
476     foreach my $p (split(/\./, $c)) {
477       $target = $target->[1]->{$p} ||= [];
478     }
479     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
480     my @co_key = @{$self->{collapse}{$c_prefix}};
481     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
482     my $tree = $self->_collapse_result($as, $row, $c_prefix);
483     my (@final, @raw);
484     while ( !(grep {
485                 !defined($tree->[0]->{$_})
486                 || $co_check{$_} ne $tree->[0]->{$_}
487               } @co_key) ) {
488       push(@final, $tree);
489       last unless (@raw = $self->cursor->next);
490       $row = $self->{stashed_row} = \@raw;
491       $tree = $self->_collapse_result($as, $row, $c_prefix);
492       #warn Data::Dumper::Dumper($tree, $row);
493     }
494     @{$target} = @final;
495   }
496
497   return $info;
498 }
499
500 =head2 result_source
501
502 Returns a reference to the result source for this recordset.
503
504 =cut
505
506
507 =head2 count
508
509 Performs an SQL C<COUNT> with the same query as the resultset was built
510 with to find the number of elements. If passed arguments, does a search
511 on the resultset and counts the results of that.
512
513 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
514 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
515 not support C<DISTINCT> with multiple columns. If you are using such a
516 database, you should only use columns from the main table in your C<group_by>
517 clause.
518
519 =cut
520
521 sub count {
522   my $self = shift;
523   return $self->search(@_)->count if @_ && defined $_[0];
524   unless (defined $self->{count}) {
525     return scalar @{ $self->get_cache }
526       if @{ $self->get_cache };
527     my $group_by;
528     my $select = { 'count' => '*' };
529     my $attrs = { %{ $self->{attrs} } };
530     if( $group_by = delete $attrs->{group_by} ) {
531       delete $attrs->{having};
532       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
533       # todo: try CONCAT for multi-column pk
534       my @pk = $self->result_source->primary_columns;
535       if( scalar(@pk) == 1 ) {
536         my $pk = shift(@pk);
537         my $alias = $attrs->{alias};
538         my $re = qr/^($alias\.)?$pk$/;
539         foreach my $column ( @distinct) {
540           if( $column =~ $re ) {
541             @distinct = ( $column );
542             last;
543           }
544         } 
545       }
546
547       $select = { count => { 'distinct' => \@distinct } };
548       #use Data::Dumper; die Dumper $select;
549     }
550
551     $attrs->{select} = $select;
552     $attrs->{as} = [ 'count' ];
553     # offset, order by and page are not needed to count. record_filter is cdbi
554     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
555         
556     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
557   }
558   return 0 unless $self->{count};
559   my $count = $self->{count};
560   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
561   $count = $self->{attrs}{rows} if
562     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
563   return $count;
564 }
565
566 =head2 count_literal
567
568 Calls L</search_literal> with the passed arguments, then L</count>.
569
570 =cut
571
572 sub count_literal { shift->search_literal(@_)->count; }
573
574 =head2 all
575
576 Returns all elements in the resultset. Called implictly if the resultset
577 is returned in list context.
578
579 =cut
580
581 sub all {
582   my ($self) = @_;
583   return @{ $self->get_cache }
584     if @{ $self->get_cache };
585
586   my @obj;
587
588   if (keys %{$self->{collapse}}) {
589       # Using $self->cursor->all is really just an optimisation.
590       # If we're collapsing has_many prefetches it probably makes
591       # very little difference, and this is cleaner than hacking
592       # _construct_object to survive the approach
593     my @row;
594     $self->cursor->reset;
595     while (@row = $self->cursor->next) {
596       push(@obj, $self->_construct_object(@row));
597     }
598   } else {
599     @obj = map { $self->_construct_object(@$_); }
600              $self->cursor->all;
601   }
602
603   if( $self->{attrs}->{cache} ) {
604     $self->set_cache( \@obj );
605   }
606
607   return @obj;
608 }
609
610 =head2 reset
611
612 Resets the resultset's cursor, so you can iterate through the elements again.
613
614 =cut
615
616 sub reset {
617   my ($self) = @_;
618   $self->{all_cache_position} = 0;
619   $self->cursor->reset;
620   return $self;
621 }
622
623 =head2 first
624
625 Resets the resultset and returns the first element.
626
627 =cut
628
629 sub first {
630   return $_[0]->reset->next;
631 }
632
633 =head2 update
634
635 =head3 Arguments: (\%values)
636
637 Sets the specified columns in the resultset to the supplied values.
638
639 =cut
640
641 sub update {
642   my ($self, $values) = @_;
643   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
644   return $self->result_source->storage->update(
645            $self->result_source->from, $values, $self->{cond});
646 }
647
648 =head2 update_all
649
650 =head3 Arguments: (\%values)
651
652 Fetches all objects and updates them one at a time.  Note that C<update_all>
653 will run cascade triggers while L</update> will not.
654
655 =cut
656
657 sub update_all {
658   my ($self, $values) = @_;
659   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
660   foreach my $obj ($self->all) {
661     $obj->set_columns($values)->update;
662   }
663   return 1;
664 }
665
666 =head2 delete
667
668 Deletes the contents of the resultset from its result source.
669
670 =cut
671
672 sub delete {
673   my ($self) = @_;
674   my $del = {};
675   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
676     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
677   if (ref $self->{cond} eq 'ARRAY') {
678     $del = [ map { my %hash;
679       foreach my $key (keys %{$_}) {
680         $key =~ /([^\.]+)$/;
681         $hash{$1} = $_->{$key};
682       }; \%hash; } @{$self->{cond}} ];
683   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
684     $del->{-and} = [ map { my %hash;
685       foreach my $key (keys %{$_}) {
686         $key =~ /([^\.]+)$/;
687         $hash{$1} = $_->{$key};
688       }; \%hash; } @{$self->{cond}{-and}} ];
689   } else {
690     foreach my $key (keys %{$self->{cond}}) {
691       $key =~ /([^\.]+)$/;
692       $del->{$1} = $self->{cond}{$key};
693     }
694   }
695   $self->result_source->storage->delete($self->result_source->from, $del);
696   return 1;
697 }
698
699 =head2 delete_all
700
701 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
702 will run cascade triggers while L</delete> will not.
703
704 =cut
705
706 sub delete_all {
707   my ($self) = @_;
708   $_->delete for $self->all;
709   return 1;
710 }
711
712 =head2 pager
713
714 Returns a L<Data::Page> object for the current resultset. Only makes
715 sense for queries with a C<page> attribute.
716
717 =cut
718
719 sub pager {
720   my ($self) = @_;
721   my $attrs = $self->{attrs};
722   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
723   $attrs->{rows} ||= 10;
724   $self->count;
725   return $self->{pager} ||= Data::Page->new(
726     $self->{count}, $attrs->{rows}, $self->{page});
727 }
728
729 =head2 page
730
731 =head3 Arguments: ($page_num)
732
733 Returns a new resultset for the specified page.
734
735 =cut
736
737 sub page {
738   my ($self, $page) = @_;
739   my $attrs = { %{$self->{attrs}} };
740   $attrs->{page} = $page;
741   return (ref $self)->new($self->result_source, $attrs);
742 }
743
744 =head2 new_result
745
746 =head3 Arguments: (\%vals)
747
748 Creates a result in the resultset's result class.
749
750 =cut
751
752 sub new_result {
753   my ($self, $values) = @_;
754   $self->throw_exception( "new_result needs a hash" )
755     unless (ref $values eq 'HASH');
756   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
757     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
758   my %new = %$values;
759   my $alias = $self->{attrs}{alias};
760   foreach my $key (keys %{$self->{cond}||{}}) {
761     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
762   }
763   my $obj = $self->result_class->new(\%new);
764   $obj->result_source($self->result_source) if $obj->can('result_source');
765   $obj;
766 }
767
768 =head2 create
769
770 =head3 Arguments: (\%vals)
771
772 Inserts a record into the resultset and returns the object.
773
774 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
775
776 =cut
777
778 sub create {
779   my ($self, $attrs) = @_;
780   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
781   return $self->new_result($attrs)->insert;
782 }
783
784 =head2 find_or_create
785
786 =head3 Arguments: (\%vals, \%attrs?)
787
788   $class->find_or_create({ key => $val, ... });
789
790 Searches for a record matching the search condition; if it doesn't find one,
791 creates one and returns that instead.
792
793   my $cd = $schema->resultset('CD')->find_or_create({
794     cdid   => 5,
795     artist => 'Massive Attack',
796     title  => 'Mezzanine',
797     year   => 2005,
798   });
799
800 Also takes an optional C<key> attribute, to search by a specific key or unique
801 constraint. For example:
802
803   my $cd = $schema->resultset('CD')->find_or_create(
804     {
805       artist => 'Massive Attack',
806       title  => 'Mezzanine',
807     },
808     { key => 'artist_title' }
809   );
810
811 See also L</find> and L</update_or_create>.
812
813 =cut
814
815 sub find_or_create {
816   my $self     = shift;
817   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
818   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
819   my $exists   = $self->find($hash, $attrs);
820   return defined($exists) ? $exists : $self->create($hash);
821 }
822
823 =head2 update_or_create
824
825   $class->update_or_create({ key => $val, ... });
826
827 First, search for an existing row matching one of the unique constraints
828 (including the primary key) on the source of this resultset.  If a row is
829 found, update it with the other given column values.  Otherwise, create a new
830 row.
831
832 Takes an optional C<key> attribute to search on a specific unique constraint.
833 For example:
834
835   # In your application
836   my $cd = $schema->resultset('CD')->update_or_create(
837     {
838       artist => 'Massive Attack',
839       title  => 'Mezzanine',
840       year   => 1998,
841     },
842     { key => 'artist_title' }
843   );
844
845 If no C<key> is specified, it searches on all unique constraints defined on the
846 source, including the primary key.
847
848 If the C<key> is specified as C<primary>, search only on the primary key.
849
850 See also L</find> and L</find_or_create>.
851
852 =cut
853
854 sub update_or_create {
855   my $self = shift;
856
857   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
858   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
859
860   my %unique_constraints = $self->result_source->unique_constraints;
861   my @constraint_names   = (exists $attrs->{key}
862                             ? ($attrs->{key})
863                             : keys %unique_constraints);
864
865   my @unique_hashes;
866   foreach my $name (@constraint_names) {
867     my @unique_cols = @{ $unique_constraints{$name} };
868     my %unique_hash =
869       map  { $_ => $hash->{$_} }
870       grep { exists $hash->{$_} }
871       @unique_cols;
872
873     push @unique_hashes, \%unique_hash
874       if (scalar keys %unique_hash == scalar @unique_cols);
875   }
876
877   my $row;
878   if (@unique_hashes) {
879     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
880     if ($row) {
881       $row->set_columns($hash);
882       $row->update;
883     }
884   }
885
886   unless ($row) {
887     $row = $self->create($hash);
888   }
889
890   return $row;
891 }
892
893 =head2 get_cache
894
895 Gets the contents of the cache for the resultset.
896
897 =cut
898
899 sub get_cache {
900   my $self = shift;
901   return $self->{all_cache} || [];
902 }
903
904 =head2 set_cache
905
906 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
907
908 =cut
909
910 sub set_cache {
911   my ( $self, $data ) = @_;
912   $self->throw_exception("set_cache requires an arrayref")
913     if ref $data ne 'ARRAY';
914   my $result_class = $self->result_class;
915   foreach( @$data ) {
916     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
917       if ref $_ ne $result_class;
918   }
919   $self->{all_cache} = $data;
920 }
921
922 =head2 clear_cache
923
924 Clears the cache for the resultset.
925
926 =cut
927
928 sub clear_cache {
929   my $self = shift;
930   $self->set_cache([]);
931 }
932
933 =head2 related_resultset
934
935 Returns a related resultset for the supplied relationship name.
936
937   $rs = $rs->related_resultset('foo');
938
939 =cut
940
941 sub related_resultset {
942   my ( $self, $rel, @rest ) = @_;
943   $self->{related_resultsets} ||= {};
944   my $resultsets = $self->{related_resultsets};
945   if( !exists $resultsets->{$rel} ) {
946     #warn "fetching related resultset for rel '$rel'";
947     my $rel_obj = $self->result_source->relationship_info($rel);
948     $self->throw_exception(
949       "search_related: result source '" . $self->result_source->name .
950       "' has no such relationship ${rel}")
951       unless $rel_obj; #die Dumper $self->{attrs};
952     my $rs = $self->search(undef, { join => $rel });
953     #if( $self->{attrs}->{cache} ) {
954     #  $rs = $self->search(undef);
955     #}
956     #else {
957     #}
958     #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
959     #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
960     my $alias = (defined $rs->{attrs}{seen_join}{$rel}
961                   && $rs->{attrs}{seen_join}{$rel} > 1
962                 ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
963                 : $rel);
964     $resultsets->{$rel} =
965       $self->result_source->schema->resultset($rel_obj->{class}
966            )->search( undef,
967              { %{$rs->{attrs}},
968                alias => $alias,
969                select => undef(),
970                as => undef() }
971            )->search(@rest);
972   }
973   return $resultsets->{$rel};
974 }
975
976 =head2 throw_exception
977
978 See Schema's throw_exception
979
980 =cut
981
982 sub throw_exception {
983   my $self=shift;
984   $self->result_source->schema->throw_exception(@_);
985 }
986
987 =head1 ATTRIBUTES
988
989 The resultset takes various attributes that modify its behavior. Here's an
990 overview of them:
991
992 =head2 order_by
993
994 Which column(s) to order the results by. This is currently passed through
995 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
996
997 =head2 cols
998
999 =head3 Arguments: (arrayref)
1000
1001 Shortcut to request a particular set of columns to be retrieved.  Adds
1002 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1003 from that, then auto-populates C<as> from C<select> as normal.
1004
1005 =head2 include_columns
1006
1007 =head3 Arguments: (arrayref)
1008
1009 Shortcut to include additional columns in the returned results - for example
1010
1011   { include_columns => ['foo.name'], join => ['foo'] }
1012
1013 would add a 'name' column to the information passed to object inflation
1014
1015 =head2 select
1016
1017 =head3 Arguments: (arrayref)
1018
1019 Indicates which columns should be selected from the storage. You can use
1020 column names, or in the case of RDBMS back ends, function or stored procedure
1021 names:
1022
1023   $rs = $schema->resultset('Foo')->search(
1024     {},
1025     {
1026       select => [
1027         'column_name',
1028         { count => 'column_to_count' },
1029         { sum => 'column_to_sum' }
1030       ]
1031     }
1032   );
1033
1034 When you use function/stored procedure names and do not supply an C<as>
1035 attribute, the column names returned are storage-dependent. E.g. MySQL would
1036 return a column named C<count(column_to_count)> in the above example.
1037
1038 =head2 as
1039
1040 =head3 Arguments: (arrayref)
1041
1042 Indicates column names for object inflation. This is used in conjunction with
1043 C<select>, usually when C<select> contains one or more function or stored
1044 procedure names:
1045
1046   $rs = $schema->resultset('Foo')->search(
1047     {},
1048     {
1049       select => [
1050         'column1',
1051         { count => 'column2' }
1052       ],
1053       as => [qw/ column1 column2_count /]
1054     }
1055   );
1056
1057   my $foo = $rs->first(); # get the first Foo
1058
1059 If the object against which the search is performed already has an accessor
1060 matching a column name specified in C<as>, the value can be retrieved using
1061 the accessor as normal:
1062
1063   my $column1 = $foo->column1();
1064
1065 If on the other hand an accessor does not exist in the object, you need to
1066 use C<get_column> instead:
1067
1068   my $column2_count = $foo->get_column('column2_count');
1069
1070 You can create your own accessors if required - see
1071 L<DBIx::Class::Manual::Cookbook> for details.
1072
1073 =head2 join
1074
1075 Contains a list of relationships that should be joined for this query.  For
1076 example:
1077
1078   # Get CDs by Nine Inch Nails
1079   my $rs = $schema->resultset('CD')->search(
1080     { 'artist.name' => 'Nine Inch Nails' },
1081     { join => 'artist' }
1082   );
1083
1084 Can also contain a hash reference to refer to the other relation's relations.
1085 For example:
1086
1087   package MyApp::Schema::Track;
1088   use base qw/DBIx::Class/;
1089   __PACKAGE__->table('track');
1090   __PACKAGE__->add_columns(qw/trackid cd position title/);
1091   __PACKAGE__->set_primary_key('trackid');
1092   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1093   1;
1094
1095   # In your application
1096   my $rs = $schema->resultset('Artist')->search(
1097     { 'track.title' => 'Teardrop' },
1098     {
1099       join     => { cd => 'track' },
1100       order_by => 'artist.name',
1101     }
1102   );
1103
1104 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1105 similarly for a third time). For e.g.
1106
1107   my $rs = $schema->resultset('Artist')->search(
1108     { 'cds.title'   => 'Foo',
1109       'cds_2.title' => 'Bar' },
1110     { join => [ qw/cds cds/ ] });
1111
1112 will return a set of all artists that have both a cd with title Foo and a cd
1113 with title Bar.
1114
1115 If you want to fetch related objects from other tables as well, see C<prefetch>
1116 below.
1117
1118 =head2 prefetch
1119
1120 =head3 Arguments: arrayref/hashref
1121
1122 Contains one or more relationships that should be fetched along with the main 
1123 query (when they are accessed afterwards they will have already been
1124 "prefetched").  This is useful for when you know you will need the related
1125 objects, because it saves at least one query:
1126
1127   my $rs = $schema->resultset('Tag')->search(
1128     {},
1129     {
1130       prefetch => {
1131         cd => 'artist'
1132       }
1133     }
1134   );
1135
1136 The initial search results in SQL like the following:
1137
1138   SELECT tag.*, cd.*, artist.* FROM tag
1139   JOIN cd ON tag.cd = cd.cdid
1140   JOIN artist ON cd.artist = artist.artistid
1141
1142 L<DBIx::Class> has no need to go back to the database when we access the
1143 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1144 case.
1145
1146 Simple prefetches will be joined automatically, so there is no need
1147 for a C<join> attribute in the above search. If you're prefetching to
1148 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1149 specify the join as well.
1150
1151 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1152 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1153 with an accessor type of 'single' or 'filter').
1154
1155 =head2 from
1156
1157 =head3 Arguments: (arrayref)
1158
1159 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1160 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1161 clauses.
1162
1163 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1164 C<join> will usually do what you need and it is strongly recommended that you
1165 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1166
1167 In simple terms, C<from> works as follows:
1168
1169     [
1170         { <alias> => <table>, -join-type => 'inner|left|right' }
1171         [] # nested JOIN (optional)
1172         { <table.column> = <foreign_table.foreign_key> }
1173     ]
1174
1175     JOIN
1176         <alias> <table>
1177         [JOIN ...]
1178     ON <table.column> = <foreign_table.foreign_key>
1179
1180 An easy way to follow the examples below is to remember the following:
1181
1182     Anything inside "[]" is a JOIN
1183     Anything inside "{}" is a condition for the enclosing JOIN
1184
1185 The following examples utilize a "person" table in a family tree application.
1186 In order to express parent->child relationships, this table is self-joined:
1187
1188     # Person->belongs_to('father' => 'Person');
1189     # Person->belongs_to('mother' => 'Person');
1190
1191 C<from> can be used to nest joins. Here we return all children with a father,
1192 then search against all mothers of those children:
1193
1194   $rs = $schema->resultset('Person')->search(
1195       {},
1196       {
1197           alias => 'mother', # alias columns in accordance with "from"
1198           from => [
1199               { mother => 'person' },
1200               [
1201                   [
1202                       { child => 'person' },
1203                       [
1204                           { father => 'person' },
1205                           { 'father.person_id' => 'child.father_id' }
1206                       ]
1207                   ],
1208                   { 'mother.person_id' => 'child.mother_id' }
1209               ],
1210           ]
1211       },
1212   );
1213
1214   # Equivalent SQL:
1215   # SELECT mother.* FROM person mother
1216   # JOIN (
1217   #   person child
1218   #   JOIN person father
1219   #   ON ( father.person_id = child.father_id )
1220   # )
1221   # ON ( mother.person_id = child.mother_id )
1222
1223 The type of any join can be controlled manually. To search against only people
1224 with a father in the person table, we could explicitly use C<INNER JOIN>:
1225
1226     $rs = $schema->resultset('Person')->search(
1227         {},
1228         {
1229             alias => 'child', # alias columns in accordance with "from"
1230             from => [
1231                 { child => 'person' },
1232                 [
1233                     { father => 'person', -join-type => 'inner' },
1234                     { 'father.id' => 'child.father_id' }
1235                 ],
1236             ]
1237         },
1238     );
1239
1240     # Equivalent SQL:
1241     # SELECT child.* FROM person child
1242     # INNER JOIN person father ON child.father_id = father.id
1243
1244 =head2 page
1245
1246 For a paged resultset, specifies which page to retrieve.  Leave unset
1247 for an unpaged resultset.
1248
1249 =head2 rows
1250
1251 For a paged resultset, how many rows per page:
1252
1253   rows => 10
1254
1255 Can also be used to simulate an SQL C<LIMIT>.
1256
1257 =head2 group_by
1258
1259 =head3 Arguments: (arrayref)
1260
1261 A arrayref of columns to group by. Can include columns of joined tables.
1262
1263   group_by => [qw/ column1 column2 ... /]
1264
1265 =head2 distinct
1266
1267 Set to 1 to group by all columns.
1268
1269 For more examples of using these attributes, see
1270 L<DBIx::Class::Manual::Cookbook>.
1271
1272 =cut
1273
1274 1;