weaken result_source in all resultsets
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11 use Scalar::Util qw/weaken/;
12
13 use base qw/DBIx::Class/;
14 __PACKAGE__->load_components(qw/AccessorGroup/);
15 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
16
17 =head1 NAME
18
19 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
20
21 =head1 SYNOPSIS
22
23   my $rs   = $schema->resultset('User')->search(registered => 1);
24   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
25
26 =head1 DESCRIPTION
27
28 The resultset is also known as an iterator. It is responsible for handling
29 queries that may return an arbitrary number of rows, e.g. via L</search>
30 or a C<has_many> relationship.
31
32 In the examples below, the following table classes are used:
33
34   package MyApp::Schema::Artist;
35   use base qw/DBIx::Class/;
36   __PACKAGE__->load_components(qw/Core/);
37   __PACKAGE__->table('artist');
38   __PACKAGE__->add_columns(qw/artistid name/);
39   __PACKAGE__->set_primary_key('artistid');
40   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
41   1;
42
43   package MyApp::Schema::CD;
44   use base qw/DBIx::Class/;
45   __PACKAGE__->load_components(qw/Core/);
46   __PACKAGE__->table('cd');
47   __PACKAGE__->add_columns(qw/cdid artist title year/);
48   __PACKAGE__->set_primary_key('cdid');
49   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
50   1;
51
52 =head1 METHODS
53
54 =head2 new
55
56 =head3 Arguments: ($source, \%$attrs)
57
58 The resultset constructor. Takes a source object (usually a
59 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
60 below).  Does not perform any queries -- these are executed as needed by the
61 other methods.
62
63 Generally you won't need to construct a resultset manually.  You'll
64 automatically get one from e.g. a L</search> called in scalar context:
65
66   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
67
68 =cut
69
70 sub new {
71   my $class = shift;
72   return $class->new_result(@_) if ref $class;
73   
74   my ($source, $attrs) = @_;
75   weaken $source;
76   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
77   #use Data::Dumper; warn Dumper($attrs);
78   my $alias = ($attrs->{alias} ||= 'me');
79   
80   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
81   delete $attrs->{as} if $attrs->{columns};
82   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
83   $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
84     if $attrs->{columns};
85   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
86   if (my $include = delete $attrs->{include_columns}) {
87     push(@{$attrs->{select}}, @$include);
88     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
89   }
90   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
91
92   $attrs->{from} ||= [ { $alias => $source->from } ];
93   $attrs->{seen_join} ||= {};
94   my %seen;
95   if (my $join = delete $attrs->{join}) {
96     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
97       if (ref $j eq 'HASH') {
98         $seen{$_} = 1 foreach keys %$j;
99       } else {
100         $seen{$j} = 1;
101       }
102     }
103     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
104   }
105   
106   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
107   $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
108   $attrs->{order_by} ||= [];
109
110   my $collapse = $attrs->{collapse} || {};
111   if (my $prefetch = delete $attrs->{prefetch}) {
112     my @pre_order;
113     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
114       if ( ref $p eq 'HASH' ) {
115         foreach my $key (keys %$p) {
116           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
117             unless $seen{$key};
118         }
119       } else {
120         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
121             unless $seen{$p};
122       }
123       my @prefetch = $source->resolve_prefetch(
124            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
125       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
126       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
127     }
128     push(@{$attrs->{order_by}}, @pre_order);
129   }
130   $attrs->{collapse} = $collapse;
131 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
132
133   if ($attrs->{page}) {
134     $attrs->{rows} ||= 10;
135     $attrs->{offset} ||= 0;
136     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
137   }
138
139   bless {
140     result_source => $source,
141     result_class => $attrs->{result_class} || $source->result_class,
142     cond => $attrs->{where},
143     from => $attrs->{from},
144     collapse => $collapse,
145     count => undef,
146     page => delete $attrs->{page},
147     pager => undef,
148     attrs => $attrs
149   }, $class;
150 }
151
152 =head2 search
153
154   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
155   my $new_rs = $rs->search({ foo => 3 });
156
157 If you need to pass in additional attributes but no additional condition,
158 call it as C<search(undef, \%attrs);>.
159
160   # "SELECT foo, bar FROM $class_table"
161   my @all = $class->search(undef, { columns => [qw/foo bar/] });
162
163 =cut
164
165 sub search {
166   my $self = shift;
167
168   my $rs;
169   if( @_ ) {
170     
171     my $attrs = { %{$self->{attrs}} };
172     my $having = delete $attrs->{having};
173     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
174
175     my $where = (@_
176                   ? ((@_ == 1 || ref $_[0] eq "HASH")
177                       ? shift
178                       : ((@_ % 2)
179                           ? $self->throw_exception(
180                               "Odd number of arguments to search")
181                           : {@_}))
182                   : undef());
183     if (defined $where) {
184       $attrs->{where} = (defined $attrs->{where}
185                 ? { '-and' =>
186                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
187                         $where, $attrs->{where} ] }
188                 : $where);
189     }
190
191     if (defined $having) {
192       $attrs->{having} = (defined $attrs->{having}
193                 ? { '-and' =>
194                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
195                         $having, $attrs->{having} ] }
196                 : $having);
197     }
198
199     $rs = (ref $self)->new($self->result_source, $attrs);
200   }
201   else {
202     $rs = $self;
203     $rs->reset;
204   }
205   return (wantarray ? $rs->all : $rs);
206 }
207
208 =head2 search_literal
209
210   my @obj    = $rs->search_literal($literal_where_cond, @bind);
211   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
212
213 Pass a literal chunk of SQL to be added to the conditional part of the
214 resultset.
215
216 =cut
217
218 sub search_literal {
219   my ($self, $cond, @vals) = @_;
220   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
221   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
222   return $self->search(\$cond, $attrs);
223 }
224
225 =head2 find
226
227 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
228
229 Finds a row based on its primary key or unique constraint. For example:
230
231   my $cd = $schema->resultset('CD')->find(5);
232
233 Also takes an optional C<key> attribute, to search by a specific key or unique
234 constraint. For example:
235
236   my $cd = $schema->resultset('CD')->find(
237     {
238       artist => 'Massive Attack',
239       title  => 'Mezzanine',
240     },
241     { key => 'artist_title' }
242   );
243
244 See also L</find_or_create> and L</update_or_create>.
245
246 =cut
247
248 sub find {
249   my ($self, @vals) = @_;
250   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
251
252   my @cols = $self->result_source->primary_columns;
253   if (exists $attrs->{key}) {
254     my %uniq = $self->result_source->unique_constraints;
255     $self->throw_exception( "Unknown key $attrs->{key} on $self->name" )
256       unless exists $uniq{$attrs->{key}};
257     @cols = @{ $uniq{$attrs->{key}} };
258   }
259   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
260   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
261     unless @cols;
262
263   my $query;
264   if (ref $vals[0] eq 'HASH') {
265     $query = { %{$vals[0]} };
266   } elsif (@cols == @vals) {
267     $query = {};
268     @{$query}{@cols} = @vals;
269   } else {
270     $query = {@vals};
271   }
272   foreach my $key (grep { ! m/\./ } keys %$query) {
273     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
274   }
275   #warn Dumper($query);
276   
277   if (keys %$attrs) {
278       my $rs = $self->search($query,$attrs);
279       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
280   } else {
281       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
282   }
283 }
284
285 =head2 search_related
286
287   $rs->search_related('relname', $cond?, $attrs?);
288
289 Search the specified relationship. Optionally specify a condition for matching
290 records.
291
292 =cut
293
294 sub search_related {
295   return shift->related_resultset(shift)->search(@_);
296 }
297
298 =head2 cursor
299
300 Returns a storage-driven cursor to the given resultset.
301
302 =cut
303
304 sub cursor {
305   my ($self) = @_;
306   my $attrs = { %{$self->{attrs}} };
307   return $self->{cursor}
308     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
309           $attrs->{where},$attrs);
310 }
311
312 =head2 single
313
314 Inflates the first result without creating a cursor
315
316 =cut
317
318 sub single {
319   my ($self, $where) = @_;
320   my $attrs = { %{$self->{attrs}} };
321   if ($where) {
322     if (defined $attrs->{where}) {
323       $attrs->{where} = {
324         '-and' => 
325             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
326                $where, delete $attrs->{where} ]
327       };
328     } else {
329       $attrs->{where} = $where;
330     }
331   }
332   my @data = $self->result_source->storage->select_single(
333           $self->{from}, $attrs->{select},
334           $attrs->{where},$attrs);
335   return (@data ? $self->_construct_object(@data) : ());
336 }
337
338
339 =head2 search_like
340
341 Perform a search, but use C<LIKE> instead of equality as the condition. Note
342 that this is simply a convenience method; you most likely want to use
343 L</search> with specific operators.
344
345 For more information, see L<DBIx::Class::Manual::Cookbook>.
346
347 =cut
348
349 sub search_like {
350   my $class = shift;
351   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
352   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
353   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
354   return $class->search($query, { %$attrs });
355 }
356
357 =head2 slice
358
359 =head3 Arguments: ($first, $last)
360
361 Returns a subset of elements from the resultset.
362
363 =cut
364
365 sub slice {
366   my ($self, $min, $max) = @_;
367   my $attrs = { %{ $self->{attrs} || {} } };
368   $attrs->{offset} ||= 0;
369   $attrs->{offset} += $min;
370   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
371   my $slice = (ref $self)->new($self->result_source, $attrs);
372   return (wantarray ? $slice->all : $slice);
373 }
374
375 =head2 next
376
377 Returns the next element in the resultset (C<undef> is there is none).
378
379 Can be used to efficiently iterate over records in the resultset:
380
381   my $rs = $schema->resultset('CD')->search;
382   while (my $cd = $rs->next) {
383     print $cd->title;
384   }
385
386 =cut
387
388 sub next {
389   my ($self) = @_;
390   if (@{$self->{all_cache} || []}) {
391     $self->{all_cache_position} ||= 0;
392     return $self->{all_cache}->[$self->{all_cache_position}++];
393   }
394   if ($self->{attrs}{cache}) {
395     $self->{all_cache_position} = 1;
396     return ($self->all)[0];
397   }
398   my @row = (exists $self->{stashed_row}
399                ? @{delete $self->{stashed_row}}
400                : $self->cursor->next);
401 #  warn Dumper(\@row); use Data::Dumper;
402   return unless (@row);
403   return $self->_construct_object(@row);
404 }
405
406 sub _construct_object {
407   my ($self, @row) = @_;
408   my @as = @{ $self->{attrs}{as} };
409   
410   my $info = $self->_collapse_result(\@as, \@row);
411   
412   my $new = $self->result_class->inflate_result($self->result_source, @$info);
413   
414   $new = $self->{attrs}{record_filter}->($new)
415     if exists $self->{attrs}{record_filter};
416   return $new;
417 }
418
419 sub _collapse_result {
420   my ($self, $as, $row, $prefix) = @_;
421
422   my %const;
423
424   my @copy = @$row;
425   foreach my $this_as (@$as) {
426     my $val = shift @copy;
427     if (defined $prefix) {
428       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
429         my $remain = $1;
430         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
431         $const{$1||''}{$2} = $val;
432       }
433     } else {
434       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
435       $const{$1||''}{$2} = $val;
436     }
437   }
438
439   my $info = [ {}, {} ];
440   foreach my $key (keys %const) {
441     if (length $key) {
442       my $target = $info;
443       my @parts = split(/\./, $key);
444       foreach my $p (@parts) {
445         $target = $target->[1]->{$p} ||= [];
446       }
447       $target->[0] = $const{$key};
448     } else {
449       $info->[0] = $const{$key};
450     }
451   }
452
453   my @collapse = (defined($prefix)
454                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
455                        keys %{$self->{collapse}})
456                    : keys %{$self->{collapse}});
457   if (@collapse) {
458     my ($c) = sort { length $a <=> length $b } @collapse;
459     my $target = $info;
460     foreach my $p (split(/\./, $c)) {
461       $target = $target->[1]->{$p} ||= [];
462     }
463     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
464     my @co_key = @{$self->{collapse}{$c_prefix}};
465     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
466     my $tree = $self->_collapse_result($as, $row, $c_prefix);
467     my (@final, @raw);
468     while ( !(grep {
469                 !defined($tree->[0]->{$_})
470                 || $co_check{$_} ne $tree->[0]->{$_}
471               } @co_key) ) {
472       push(@final, $tree);
473       last unless (@raw = $self->cursor->next);
474       $row = $self->{stashed_row} = \@raw;
475       $tree = $self->_collapse_result($as, $row, $c_prefix);
476       #warn Data::Dumper::Dumper($tree, $row);
477     }
478     @$target = @final;
479   }
480
481   return $info;
482 }
483
484 =head2 result_source
485
486 Returns a reference to the result source for this recordset.
487
488 =cut
489
490
491 =head2 count
492
493 Performs an SQL C<COUNT> with the same query as the resultset was built
494 with to find the number of elements. If passed arguments, does a search
495 on the resultset and counts the results of that.
496
497 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
498 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
499 not support C<DISTINCT> with multiple columns. If you are using such a
500 database, you should only use columns from the main table in your C<group_by>
501 clause.
502
503 =cut
504
505 sub count {
506   my $self = shift;
507   return $self->search(@_)->count if @_ and defined $_[0];
508   unless (defined $self->{count}) {
509     return scalar @{ $self->get_cache } if @{ $self->get_cache };
510     my $select = { count => '*' };
511     my $attrs = { %{ $self->{attrs} } };
512     if (my $group_by = delete $attrs->{group_by}) {
513       delete $attrs->{having};
514       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
515       # todo: try CONCAT for multi-column pk
516       my @pk = $self->result_source->primary_columns;
517       if (@pk == 1) {
518         foreach my $column (@distinct) {
519           if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
520             @distinct = ($column);
521             last;
522           }
523         } 
524       }
525
526       $select = { count => { distinct => \@distinct } };
527       #use Data::Dumper; die Dumper $select;
528     }
529
530     $attrs->{select} = $select;
531     $attrs->{as} = [qw/count/];
532     # offset, order by and page are not needed to count. record_filter is cdbi
533     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
534         
535     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
536   }
537   return 0 unless $self->{count};
538   my $count = $self->{count};
539   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
540   $count = $self->{attrs}{rows} if
541     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
542   return $count;
543 }
544
545 =head2 count_literal
546
547 Calls L</search_literal> with the passed arguments, then L</count>.
548
549 =cut
550
551 sub count_literal { shift->search_literal(@_)->count; }
552
553 =head2 all
554
555 Returns all elements in the resultset. Called implictly if the resultset
556 is returned in list context.
557
558 =cut
559
560 sub all {
561   my ($self) = @_;
562   return @{ $self->get_cache } if @{ $self->get_cache };
563
564   my @obj;
565
566   if (keys %{$self->{collapse}}) {
567       # Using $self->cursor->all is really just an optimisation.
568       # If we're collapsing has_many prefetches it probably makes
569       # very little difference, and this is cleaner than hacking
570       # _construct_object to survive the approach
571     $self->cursor->reset;
572     my @row = $self->cursor->next;
573     while (@row) {
574       push(@obj, $self->_construct_object(@row));
575       @row = (exists $self->{stashed_row}
576                ? @{delete $self->{stashed_row}}
577                : $self->cursor->next);
578     }
579   } else {
580     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
581   }
582
583   $self->set_cache(\@obj) if $self->{attrs}{cache};
584   return @obj;
585 }
586
587 =head2 reset
588
589 Resets the resultset's cursor, so you can iterate through the elements again.
590
591 =cut
592
593 sub reset {
594   my ($self) = @_;
595   $self->{all_cache_position} = 0;
596   $self->cursor->reset;
597   return $self;
598 }
599
600 =head2 first
601
602 Resets the resultset and returns the first element.
603
604 =cut
605
606 sub first {
607   return $_[0]->reset->next;
608 }
609
610 =head2 update
611
612 =head3 Arguments: (\%values)
613
614 Sets the specified columns in the resultset to the supplied values.
615
616 =cut
617
618 sub update {
619   my ($self, $values) = @_;
620   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
621   return $self->result_source->storage->update(
622            $self->result_source->from, $values, $self->{cond});
623 }
624
625 =head2 update_all
626
627 =head3 Arguments: (\%values)
628
629 Fetches all objects and updates them one at a time.  Note that C<update_all>
630 will run cascade triggers while L</update> will not.
631
632 =cut
633
634 sub update_all {
635   my ($self, $values) = @_;
636   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
637   foreach my $obj ($self->all) {
638     $obj->set_columns($values)->update;
639   }
640   return 1;
641 }
642
643 =head2 delete
644
645 Deletes the contents of the resultset from its result source.
646
647 =cut
648
649 sub delete {
650   my ($self) = @_;
651   my $del = {};
652   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
653     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
654   if (ref $self->{cond} eq 'ARRAY') {
655     $del = [ map { my %hash;
656       foreach my $key (keys %{$_}) {
657         $key =~ /([^.]+)$/;
658         $hash{$1} = $_->{$key};
659       }; \%hash; } @{$self->{cond}} ];
660   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
661     $del->{-and} = [ map { my %hash;
662       foreach my $key (keys %{$_}) {
663         $key =~ /([^.]+)$/;
664         $hash{$1} = $_->{$key};
665       }; \%hash; } @{$self->{cond}{-and}} ];
666   } else {
667     foreach my $key (keys %{$self->{cond}}) {
668       $key =~ /([^.]+)$/;
669       $del->{$1} = $self->{cond}{$key};
670     }
671   }
672   $self->result_source->storage->delete($self->result_source->from, $del);
673   return 1;
674 }
675
676 =head2 delete_all
677
678 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
679 will run cascade triggers while L</delete> will not.
680
681 =cut
682
683 sub delete_all {
684   my ($self) = @_;
685   $_->delete for $self->all;
686   return 1;
687 }
688
689 =head2 pager
690
691 Returns a L<Data::Page> object for the current resultset. Only makes
692 sense for queries with a C<page> attribute.
693
694 =cut
695
696 sub pager {
697   my ($self) = @_;
698   my $attrs = $self->{attrs};
699   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
700   $attrs->{rows} ||= 10;
701   $self->count;
702   return $self->{pager} ||= Data::Page->new(
703     $self->{count}, $attrs->{rows}, $self->{page});
704 }
705
706 =head2 page
707
708 =head3 Arguments: ($page_num)
709
710 Returns a new resultset for the specified page.
711
712 =cut
713
714 sub page {
715   my ($self, $page) = @_;
716   my $attrs = { %{$self->{attrs}} };
717   $attrs->{page} = $page;
718   return (ref $self)->new($self->result_source, $attrs);
719 }
720
721 =head2 new_result
722
723 =head3 Arguments: (\%vals)
724
725 Creates a result in the resultset's result class.
726
727 =cut
728
729 sub new_result {
730   my ($self, $values) = @_;
731   $self->throw_exception( "new_result needs a hash" )
732     unless (ref $values eq 'HASH');
733   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
734     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
735   my %new = %$values;
736   my $alias = $self->{attrs}{alias};
737   foreach my $key (keys %{$self->{cond}||{}}) {
738     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
739   }
740   my $obj = $self->result_class->new(\%new);
741   $obj->result_source($self->result_source) if $obj->can('result_source');
742   return $obj;
743 }
744
745 =head2 create
746
747 =head3 Arguments: (\%vals)
748
749 Inserts a record into the resultset and returns the object.
750
751 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
752
753 =cut
754
755 sub create {
756   my ($self, $attrs) = @_;
757   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
758   return $self->new_result($attrs)->insert;
759 }
760
761 =head2 find_or_create
762
763 =head3 Arguments: (\%vals, \%attrs?)
764
765   $class->find_or_create({ key => $val, ... });
766
767 Searches for a record matching the search condition; if it doesn't find one,
768 creates one and returns that instead.
769
770   my $cd = $schema->resultset('CD')->find_or_create({
771     cdid   => 5,
772     artist => 'Massive Attack',
773     title  => 'Mezzanine',
774     year   => 2005,
775   });
776
777 Also takes an optional C<key> attribute, to search by a specific key or unique
778 constraint. For example:
779
780   my $cd = $schema->resultset('CD')->find_or_create(
781     {
782       artist => 'Massive Attack',
783       title  => 'Mezzanine',
784     },
785     { key => 'artist_title' }
786   );
787
788 See also L</find> and L</update_or_create>.
789
790 =cut
791
792 sub find_or_create {
793   my $self     = shift;
794   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
795   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
796   my $exists   = $self->find($hash, $attrs);
797   return defined $exists ? $exists : $self->create($hash);
798 }
799
800 =head2 update_or_create
801
802   $class->update_or_create({ key => $val, ... });
803
804 First, search for an existing row matching one of the unique constraints
805 (including the primary key) on the source of this resultset.  If a row is
806 found, update it with the other given column values.  Otherwise, create a new
807 row.
808
809 Takes an optional C<key> attribute to search on a specific unique constraint.
810 For example:
811
812   # In your application
813   my $cd = $schema->resultset('CD')->update_or_create(
814     {
815       artist => 'Massive Attack',
816       title  => 'Mezzanine',
817       year   => 1998,
818     },
819     { key => 'artist_title' }
820   );
821
822 If no C<key> is specified, it searches on all unique constraints defined on the
823 source, including the primary key.
824
825 If the C<key> is specified as C<primary>, search only on the primary key.
826
827 See also L</find> and L</find_or_create>.
828
829 =cut
830
831 sub update_or_create {
832   my $self = shift;
833   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
834   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
835
836   my %unique_constraints = $self->result_source->unique_constraints;
837   my @constraint_names   = (exists $attrs->{key}
838                             ? ($attrs->{key})
839                             : keys %unique_constraints);
840
841   my @unique_hashes;
842   foreach my $name (@constraint_names) {
843     my @unique_cols = @{ $unique_constraints{$name} };
844     my %unique_hash =
845       map  { $_ => $hash->{$_} }
846       grep { exists $hash->{$_} }
847       @unique_cols;
848
849     push @unique_hashes, \%unique_hash
850       if (scalar keys %unique_hash == scalar @unique_cols);
851   }
852
853   if (@unique_hashes) {
854     my $row = $self->single(\@unique_hashes);
855     if (defined $row) {
856       $row->set_columns($hash);
857       $row->update;
858       return $row;
859     }
860   }
861
862   return $self->create($hash);
863 }
864
865 =head2 get_cache
866
867 Gets the contents of the cache for the resultset.
868
869 =cut
870
871 sub get_cache {
872   shift->{all_cache} || [];
873 }
874
875 =head2 set_cache
876
877 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
878
879 =cut
880
881 sub set_cache {
882   my ( $self, $data ) = @_;
883   $self->throw_exception("set_cache requires an arrayref")
884     if ref $data ne 'ARRAY';
885   my $result_class = $self->result_class;
886   foreach( @$data ) {
887     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
888       if ref $_ ne $result_class;
889   }
890   $self->{all_cache} = $data;
891 }
892
893 =head2 clear_cache
894
895 Clears the cache for the resultset.
896
897 =cut
898
899 sub clear_cache {
900   shift->set_cache([]);
901 }
902
903 =head2 related_resultset
904
905 Returns a related resultset for the supplied relationship name.
906
907   $rs = $rs->related_resultset('foo');
908
909 =cut
910
911 sub related_resultset {
912   my ( $self, $rel, @rest ) = @_;
913   $self->{related_resultsets} ||= {};
914   return $self->{related_resultsets}{$rel} ||= do {
915       #warn "fetching related resultset for rel '$rel'";
916       my $rel_obj = $self->result_source->relationship_info($rel);
917       $self->throw_exception(
918         "search_related: result source '" . $self->result_source->name .
919         "' has no such relationship ${rel}")
920         unless $rel_obj; #die Dumper $self->{attrs};
921
922       my $rs = $self->search(undef, { join => $rel });
923       my $alias = defined $rs->{attrs}{seen_join}{$rel}
924                     && $rs->{attrs}{seen_join}{$rel} > 1
925                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
926                   : $rel;
927
928       $self->result_source->schema->resultset($rel_obj->{class}
929            )->search( undef,
930              { %{$rs->{attrs}},
931                alias => $alias,
932                select => undef,
933                as => undef }
934            )->search(@rest);      
935   };
936 }
937
938 =head2 throw_exception
939
940 See Schema's throw_exception
941
942 =cut
943
944 sub throw_exception {
945   my $self=shift;
946   $self->result_source->schema->throw_exception(@_);
947 }
948
949 =head1 ATTRIBUTES
950
951 The resultset takes various attributes that modify its behavior. Here's an
952 overview of them:
953
954 =head2 order_by
955
956 Which column(s) to order the results by. This is currently passed through
957 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
958
959 =head2 columns
960
961 =head3 Arguments: (arrayref)
962
963 Shortcut to request a particular set of columns to be retrieved.  Adds
964 C<me.> onto the start of any column without a C<.> in it and sets C<select>
965 from that, then auto-populates C<as> from C<select> as normal. (You may also
966 use the C<cols> attribute, as in earlier versions of DBIC.)
967
968 =head2 include_columns
969
970 =head3 Arguments: (arrayref)
971
972 Shortcut to include additional columns in the returned results - for example
973
974   { include_columns => ['foo.name'], join => ['foo'] }
975
976 would add a 'name' column to the information passed to object inflation
977
978 =head2 select
979
980 =head3 Arguments: (arrayref)
981
982 Indicates which columns should be selected from the storage. You can use
983 column names, or in the case of RDBMS back ends, function or stored procedure
984 names:
985
986   $rs = $schema->resultset('Foo')->search(
987     undef,
988     {
989       select => [
990         'column_name',
991         { count => 'column_to_count' },
992         { sum => 'column_to_sum' }
993       ]
994     }
995   );
996
997 When you use function/stored procedure names and do not supply an C<as>
998 attribute, the column names returned are storage-dependent. E.g. MySQL would
999 return a column named C<count(column_to_count)> in the above example.
1000
1001 =head2 as
1002
1003 =head3 Arguments: (arrayref)
1004
1005 Indicates column names for object inflation. This is used in conjunction with
1006 C<select>, usually when C<select> contains one or more function or stored
1007 procedure names:
1008
1009   $rs = $schema->resultset('Foo')->search(
1010     undef,
1011     {
1012       select => [
1013         'column1',
1014         { count => 'column2' }
1015       ],
1016       as => [qw/ column1 column2_count /]
1017     }
1018   );
1019
1020   my $foo = $rs->first(); # get the first Foo
1021
1022 If the object against which the search is performed already has an accessor
1023 matching a column name specified in C<as>, the value can be retrieved using
1024 the accessor as normal:
1025
1026   my $column1 = $foo->column1();
1027
1028 If on the other hand an accessor does not exist in the object, you need to
1029 use C<get_column> instead:
1030
1031   my $column2_count = $foo->get_column('column2_count');
1032
1033 You can create your own accessors if required - see
1034 L<DBIx::Class::Manual::Cookbook> for details.
1035
1036 =head2 join
1037
1038 Contains a list of relationships that should be joined for this query.  For
1039 example:
1040
1041   # Get CDs by Nine Inch Nails
1042   my $rs = $schema->resultset('CD')->search(
1043     { 'artist.name' => 'Nine Inch Nails' },
1044     { join => 'artist' }
1045   );
1046
1047 Can also contain a hash reference to refer to the other relation's relations.
1048 For example:
1049
1050   package MyApp::Schema::Track;
1051   use base qw/DBIx::Class/;
1052   __PACKAGE__->table('track');
1053   __PACKAGE__->add_columns(qw/trackid cd position title/);
1054   __PACKAGE__->set_primary_key('trackid');
1055   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1056   1;
1057
1058   # In your application
1059   my $rs = $schema->resultset('Artist')->search(
1060     { 'track.title' => 'Teardrop' },
1061     {
1062       join     => { cd => 'track' },
1063       order_by => 'artist.name',
1064     }
1065   );
1066
1067 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1068 similarly for a third time). For e.g.
1069
1070   my $rs = $schema->resultset('Artist')->search(
1071     { 'cds.title'   => 'Foo',
1072       'cds_2.title' => 'Bar' },
1073     { join => [ qw/cds cds/ ] });
1074
1075 will return a set of all artists that have both a cd with title Foo and a cd
1076 with title Bar.
1077
1078 If you want to fetch related objects from other tables as well, see C<prefetch>
1079 below.
1080
1081 =head2 prefetch
1082
1083 =head3 Arguments: arrayref/hashref
1084
1085 Contains one or more relationships that should be fetched along with the main 
1086 query (when they are accessed afterwards they will have already been
1087 "prefetched").  This is useful for when you know you will need the related
1088 objects, because it saves at least one query:
1089
1090   my $rs = $schema->resultset('Tag')->search(
1091     undef,
1092     {
1093       prefetch => {
1094         cd => 'artist'
1095       }
1096     }
1097   );
1098
1099 The initial search results in SQL like the following:
1100
1101   SELECT tag.*, cd.*, artist.* FROM tag
1102   JOIN cd ON tag.cd = cd.cdid
1103   JOIN artist ON cd.artist = artist.artistid
1104
1105 L<DBIx::Class> has no need to go back to the database when we access the
1106 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1107 case.
1108
1109 Simple prefetches will be joined automatically, so there is no need
1110 for a C<join> attribute in the above search. If you're prefetching to
1111 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1112 specify the join as well.
1113
1114 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1115 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1116 with an accessor type of 'single' or 'filter').
1117
1118 =head2 from
1119
1120 =head3 Arguments: (arrayref)
1121
1122 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1123 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1124 clauses.
1125
1126 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1127 C<join> will usually do what you need and it is strongly recommended that you
1128 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1129
1130 In simple terms, C<from> works as follows:
1131
1132     [
1133         { <alias> => <table>, -join-type => 'inner|left|right' }
1134         [] # nested JOIN (optional)
1135         { <table.column> = <foreign_table.foreign_key> }
1136     ]
1137
1138     JOIN
1139         <alias> <table>
1140         [JOIN ...]
1141     ON <table.column> = <foreign_table.foreign_key>
1142
1143 An easy way to follow the examples below is to remember the following:
1144
1145     Anything inside "[]" is a JOIN
1146     Anything inside "{}" is a condition for the enclosing JOIN
1147
1148 The following examples utilize a "person" table in a family tree application.
1149 In order to express parent->child relationships, this table is self-joined:
1150
1151     # Person->belongs_to('father' => 'Person');
1152     # Person->belongs_to('mother' => 'Person');
1153
1154 C<from> can be used to nest joins. Here we return all children with a father,
1155 then search against all mothers of those children:
1156
1157   $rs = $schema->resultset('Person')->search(
1158       undef,
1159       {
1160           alias => 'mother', # alias columns in accordance with "from"
1161           from => [
1162               { mother => 'person' },
1163               [
1164                   [
1165                       { child => 'person' },
1166                       [
1167                           { father => 'person' },
1168                           { 'father.person_id' => 'child.father_id' }
1169                       ]
1170                   ],
1171                   { 'mother.person_id' => 'child.mother_id' }
1172               ],
1173           ]
1174       },
1175   );
1176
1177   # Equivalent SQL:
1178   # SELECT mother.* FROM person mother
1179   # JOIN (
1180   #   person child
1181   #   JOIN person father
1182   #   ON ( father.person_id = child.father_id )
1183   # )
1184   # ON ( mother.person_id = child.mother_id )
1185
1186 The type of any join can be controlled manually. To search against only people
1187 with a father in the person table, we could explicitly use C<INNER JOIN>:
1188
1189     $rs = $schema->resultset('Person')->search(
1190         undef,
1191         {
1192             alias => 'child', # alias columns in accordance with "from"
1193             from => [
1194                 { child => 'person' },
1195                 [
1196                     { father => 'person', -join-type => 'inner' },
1197                     { 'father.id' => 'child.father_id' }
1198                 ],
1199             ]
1200         },
1201     );
1202
1203     # Equivalent SQL:
1204     # SELECT child.* FROM person child
1205     # INNER JOIN person father ON child.father_id = father.id
1206
1207 =head2 page
1208
1209 For a paged resultset, specifies which page to retrieve.  Leave unset
1210 for an unpaged resultset.
1211
1212 =head2 rows
1213
1214 For a paged resultset, how many rows per page:
1215
1216   rows => 10
1217
1218 Can also be used to simulate an SQL C<LIMIT>.
1219
1220 =head2 group_by
1221
1222 =head3 Arguments: (arrayref)
1223
1224 A arrayref of columns to group by. Can include columns of joined tables.
1225
1226   group_by => [qw/ column1 column2 ... /]
1227
1228 =head2 distinct
1229
1230 Set to 1 to group by all columns.
1231
1232 For more examples of using these attributes, see
1233 L<DBIx::Class::Manual::Cookbook>.
1234
1235 =cut
1236
1237 1;