Fixed hm prefetch
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   
73   my ($source, $attrs) = @_;
74   #use Data::Dumper; warn Dumper($attrs);
75   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
76   my $alias = ($attrs->{alias} ||= 'me');
77   
78   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
79   delete $attrs->{as} if $attrs->{columns};
80   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
81   $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
82     if $attrs->{columns};
83   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
84   if (my $include = delete $attrs->{include_columns}) {
85     push(@{$attrs->{select}}, @$include);
86     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
87   }
88   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
89
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   my %seen;
93   if (my $join = delete $attrs->{join}) {
94     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   
104   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
105   $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
106   $attrs->{order_by} ||= [];
107
108   my $collapse = $attrs->{collapse} || {};
109   if (my $prefetch = delete $attrs->{prefetch}) {
110     my @pre_order;
111     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
112       if ( ref $p eq 'HASH' ) {
113         foreach my $key (keys %$p) {
114           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
115             unless $seen{$key};
116         }
117       } else {
118         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
119             unless $seen{$p};
120       }
121       my @prefetch = $source->resolve_prefetch(
122            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
123       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
124       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
125     }
126     push(@{$attrs->{order_by}}, @pre_order);
127   }
128   $attrs->{collapse} = $collapse;
129 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
130
131   if ($attrs->{page}) {
132     $attrs->{rows} ||= 10;
133     $attrs->{offset} ||= 0;
134     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
135   }
136
137   bless {
138     result_source => $source,
139     result_class => $attrs->{result_class} || $source->result_class,
140     cond => $attrs->{where},
141     from => $attrs->{from},
142     collapse => $collapse,
143     count => undef,
144     page => delete $attrs->{page},
145     pager => undef,
146     attrs => $attrs
147   }, $class;
148 }
149
150 =head2 search
151
152   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
153   my $new_rs = $rs->search({ foo => 3 });
154
155 If you need to pass in additional attributes but no additional condition,
156 call it as C<search(undef, \%attrs);>.
157
158   # "SELECT foo, bar FROM $class_table"
159   my @all = $class->search(undef, { columns => [qw/foo bar/] });
160
161 =cut
162
163 sub search {
164   my $self = shift;
165
166   my $rs;
167   if( @_ ) {
168     
169     my $attrs = { %{$self->{attrs}} };
170     my $having = delete $attrs->{having};
171     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
172
173     my $where = (@_
174                   ? ((@_ == 1 || ref $_[0] eq "HASH")
175                       ? shift
176                       : ((@_ % 2)
177                           ? $self->throw_exception(
178                               "Odd number of arguments to search")
179                           : {@_}))
180                   : undef());
181     if (defined $where) {
182       $attrs->{where} = (defined $attrs->{where}
183                 ? { '-and' =>
184                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
185                         $where, $attrs->{where} ] }
186                 : $where);
187     }
188
189     if (defined $having) {
190       $attrs->{having} = (defined $attrs->{having}
191                 ? { '-and' =>
192                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
193                         $having, $attrs->{having} ] }
194                 : $having);
195     }
196
197     $rs = (ref $self)->new($self->result_source, $attrs);
198   }
199   else {
200     $rs = $self;
201     $rs->reset;
202   }
203   return (wantarray ? $rs->all : $rs);
204 }
205
206 =head2 search_literal
207
208   my @obj    = $rs->search_literal($literal_where_cond, @bind);
209   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
210
211 Pass a literal chunk of SQL to be added to the conditional part of the
212 resultset.
213
214 =cut
215
216 sub search_literal {
217   my ($self, $cond, @vals) = @_;
218   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
219   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
220   return $self->search(\$cond, $attrs);
221 }
222
223 =head2 find
224
225 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
226
227 Finds a row based on its primary key or unique constraint. For example:
228
229   my $cd = $schema->resultset('CD')->find(5);
230
231 Also takes an optional C<key> attribute, to search by a specific key or unique
232 constraint. For example:
233
234   my $cd = $schema->resultset('CD')->find(
235     {
236       artist => 'Massive Attack',
237       title  => 'Mezzanine',
238     },
239     { key => 'artist_title' }
240   );
241
242 See also L</find_or_create> and L</update_or_create>.
243
244 =cut
245
246 sub find {
247   my ($self, @vals) = @_;
248   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
249
250   my @cols = $self->result_source->primary_columns;
251   if (exists $attrs->{key}) {
252     my %uniq = $self->result_source->unique_constraints;
253     $self->throw_exception( "Unknown key $attrs->{key} on $self->name" )
254       unless exists $uniq{$attrs->{key}};
255     @cols = @{ $uniq{$attrs->{key}} };
256   }
257   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
258   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
259     unless @cols;
260
261   my $query;
262   if (ref $vals[0] eq 'HASH') {
263     $query = { %{$vals[0]} };
264   } elsif (@cols == @vals) {
265     $query = {};
266     @{$query}{@cols} = @vals;
267   } else {
268     $query = {@vals};
269   }
270   foreach my $key (grep { ! m/\./ } keys %$query) {
271     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
272   }
273   #warn Dumper($query);
274   
275   if (keys %$attrs) {
276       my $rs = $self->search($query,$attrs);
277       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
278   } else {
279       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
280   }
281 }
282
283 =head2 search_related
284
285   $rs->search_related('relname', $cond?, $attrs?);
286
287 Search the specified relationship. Optionally specify a condition for matching
288 records.
289
290 =cut
291
292 sub search_related {
293   return shift->related_resultset(shift)->search(@_);
294 }
295
296 =head2 cursor
297
298 Returns a storage-driven cursor to the given resultset.
299
300 =cut
301
302 sub cursor {
303   my ($self) = @_;
304   my $attrs = { %{$self->{attrs}} };
305   return $self->{cursor}
306     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
307           $attrs->{where},$attrs);
308 }
309
310 =head2 single
311
312 Inflates the first result without creating a cursor
313
314 =cut
315
316 sub single {
317   my ($self, $where) = @_;
318   my $attrs = { %{$self->{attrs}} };
319   if ($where) {
320     if (defined $attrs->{where}) {
321       $attrs->{where} = {
322         '-and' => 
323             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
324                $where, delete $attrs->{where} ]
325       };
326     } else {
327       $attrs->{where} = $where;
328     }
329   }
330   my @data = $self->result_source->storage->select_single(
331           $self->{from}, $attrs->{select},
332           $attrs->{where},$attrs);
333   return (@data ? $self->_construct_object(@data) : ());
334 }
335
336
337 =head2 search_like
338
339 Perform a search, but use C<LIKE> instead of equality as the condition. Note
340 that this is simply a convenience method; you most likely want to use
341 L</search> with specific operators.
342
343 For more information, see L<DBIx::Class::Manual::Cookbook>.
344
345 =cut
346
347 sub search_like {
348   my $class = shift;
349   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
350   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
351   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
352   return $class->search($query, { %$attrs });
353 }
354
355 =head2 slice
356
357 =head3 Arguments: ($first, $last)
358
359 Returns a subset of elements from the resultset.
360
361 =cut
362
363 sub slice {
364   my ($self, $min, $max) = @_;
365   my $attrs = { %{ $self->{attrs} || {} } };
366   $attrs->{offset} ||= 0;
367   $attrs->{offset} += $min;
368   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
369   my $slice = (ref $self)->new($self->result_source, $attrs);
370   return (wantarray ? $slice->all : $slice);
371 }
372
373 =head2 next
374
375 Returns the next element in the resultset (C<undef> is there is none).
376
377 Can be used to efficiently iterate over records in the resultset:
378
379   my $rs = $schema->resultset('CD')->search;
380   while (my $cd = $rs->next) {
381     print $cd->title;
382   }
383
384 =cut
385
386 sub next {
387   my ($self) = @_;
388   if (@{$self->{all_cache} || []}) {
389     $self->{all_cache_position} ||= 0;
390     return $self->{all_cache}->[$self->{all_cache_position}++];
391   }
392   if ($self->{attrs}{cache}) {
393     $self->{all_cache_position} = 1;
394     return ($self->all)[0];
395   }
396   my @row = (exists $self->{stashed_row}
397                ? @{delete $self->{stashed_row}}
398                : $self->cursor->next);
399 #  warn Dumper(\@row); use Data::Dumper;
400   return unless (@row);
401   return $self->_construct_object(@row);
402 }
403
404 sub _construct_object {
405   my ($self, @row) = @_;
406   my @as = @{ $self->{attrs}{as} };
407   
408   my $info = $self->_collapse_result(\@as, \@row);
409   
410   my $new = $self->result_class->inflate_result($self->result_source, @$info);
411   
412   $new = $self->{attrs}{record_filter}->($new)
413     if exists $self->{attrs}{record_filter};
414   return $new;
415 }
416
417 sub _collapse_result {
418   my ($self, $as, $row, $prefix) = @_;
419
420   my %const;
421
422   my @copy = @$row;
423   foreach my $this_as (@$as) {
424     my $val = shift @copy;
425     if (defined $prefix) {
426       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
427         my $remain = $1;
428         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
429         $const{$1||''}{$2} = $val;
430       }
431     } else {
432       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
433       $const{$1||''}{$2} = $val;
434     }
435   }
436
437   my $info = [ {}, {} ];
438   foreach my $key (keys %const) {
439     if (length $key) {
440       my $target = $info;
441       my @parts = split(/\./, $key);
442       foreach my $p (@parts) {
443         $target = $target->[1]->{$p} ||= [];
444       }
445       $target->[0] = $const{$key};
446     } else {
447       $info->[0] = $const{$key};
448     }
449   }
450
451   my @collapse = (defined($prefix)
452                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
453                        keys %{$self->{collapse}})
454                    : keys %{$self->{collapse}});
455   if (@collapse) {
456     my ($c) = sort { length $a <=> length $b } @collapse;
457     my $target = $info;
458     foreach my $p (split(/\./, $c)) {
459       $target = $target->[1]->{$p} ||= [];
460     }
461     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
462     my @co_key = @{$self->{collapse}{$c_prefix}};
463     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
464     my $tree = $self->_collapse_result($as, $row, $c_prefix);
465     my (@final, @raw);
466     while ( !(grep {
467                 !defined($tree->[0]->{$_})
468                 || $co_check{$_} ne $tree->[0]->{$_}
469               } @co_key) ) {
470       push(@final, $tree);
471       last unless (@raw = $self->cursor->next);
472       $row = $self->{stashed_row} = \@raw;
473       $tree = $self->_collapse_result($as, $row, $c_prefix);
474       #warn Data::Dumper::Dumper($tree, $row);
475     }
476     @$target = @final;
477   }
478
479   return $info;
480 }
481
482 =head2 result_source
483
484 Returns a reference to the result source for this recordset.
485
486 =cut
487
488
489 =head2 count
490
491 Performs an SQL C<COUNT> with the same query as the resultset was built
492 with to find the number of elements. If passed arguments, does a search
493 on the resultset and counts the results of that.
494
495 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
496 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
497 not support C<DISTINCT> with multiple columns. If you are using such a
498 database, you should only use columns from the main table in your C<group_by>
499 clause.
500
501 =cut
502
503 sub count {
504   my $self = shift;
505   return $self->search(@_)->count if @_ and defined $_[0];
506   unless (defined $self->{count}) {
507     return scalar @{ $self->get_cache } if @{ $self->get_cache };
508     my $select = { count => '*' };
509     my $attrs = { %{ $self->{attrs} } };
510     if (my $group_by = delete $attrs->{group_by}) {
511       delete $attrs->{having};
512       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
513       # todo: try CONCAT for multi-column pk
514       my @pk = $self->result_source->primary_columns;
515       if (@pk == 1) {
516         foreach my $column (@distinct) {
517           if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
518             @distinct = ($column);
519             last;
520           }
521         } 
522       }
523
524       $select = { count => { distinct => \@distinct } };
525       #use Data::Dumper; die Dumper $select;
526     }
527
528     $attrs->{select} = $select;
529     $attrs->{as} = [qw/count/];
530     # offset, order by and page are not needed to count. record_filter is cdbi
531     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
532         
533     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
534   }
535   return 0 unless $self->{count};
536   my $count = $self->{count};
537   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
538   $count = $self->{attrs}{rows} if
539     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
540   return $count;
541 }
542
543 =head2 count_literal
544
545 Calls L</search_literal> with the passed arguments, then L</count>.
546
547 =cut
548
549 sub count_literal { shift->search_literal(@_)->count; }
550
551 =head2 all
552
553 Returns all elements in the resultset. Called implictly if the resultset
554 is returned in list context.
555
556 =cut
557
558 sub all {
559   my ($self) = @_;
560   return @{ $self->get_cache } if @{ $self->get_cache };
561
562   my @obj;
563
564   if (keys %{$self->{collapse}}) {
565       # Using $self->cursor->all is really just an optimisation.
566       # If we're collapsing has_many prefetches it probably makes
567       # very little difference, and this is cleaner than hacking
568       # _construct_object to survive the approach
569     $self->cursor->reset;
570     my @row = $self->cursor->next;
571     while (@row) {
572       push(@obj, $self->_construct_object(@row));
573       @row = (exists $self->{stashed_row}
574                ? @{delete $self->{stashed_row}}
575                : $self->cursor->next);
576     }
577   } else {
578     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
579   }
580
581   $self->set_cache(\@obj) if $self->{attrs}{cache};
582   return @obj;
583 }
584
585 =head2 reset
586
587 Resets the resultset's cursor, so you can iterate through the elements again.
588
589 =cut
590
591 sub reset {
592   my ($self) = @_;
593   $self->{all_cache_position} = 0;
594   $self->cursor->reset;
595   return $self;
596 }
597
598 =head2 first
599
600 Resets the resultset and returns the first element.
601
602 =cut
603
604 sub first {
605   return $_[0]->reset->next;
606 }
607
608 =head2 update
609
610 =head3 Arguments: (\%values)
611
612 Sets the specified columns in the resultset to the supplied values.
613
614 =cut
615
616 sub update {
617   my ($self, $values) = @_;
618   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
619   return $self->result_source->storage->update(
620            $self->result_source->from, $values, $self->{cond});
621 }
622
623 =head2 update_all
624
625 =head3 Arguments: (\%values)
626
627 Fetches all objects and updates them one at a time.  Note that C<update_all>
628 will run cascade triggers while L</update> will not.
629
630 =cut
631
632 sub update_all {
633   my ($self, $values) = @_;
634   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
635   foreach my $obj ($self->all) {
636     $obj->set_columns($values)->update;
637   }
638   return 1;
639 }
640
641 =head2 delete
642
643 Deletes the contents of the resultset from its result source.
644
645 =cut
646
647 sub delete {
648   my ($self) = @_;
649   my $del = {};
650   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
651     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
652   if (ref $self->{cond} eq 'ARRAY') {
653     $del = [ map { my %hash;
654       foreach my $key (keys %{$_}) {
655         $key =~ /([^.]+)$/;
656         $hash{$1} = $_->{$key};
657       }; \%hash; } @{$self->{cond}} ];
658   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
659     $del->{-and} = [ map { my %hash;
660       foreach my $key (keys %{$_}) {
661         $key =~ /([^.]+)$/;
662         $hash{$1} = $_->{$key};
663       }; \%hash; } @{$self->{cond}{-and}} ];
664   } else {
665     foreach my $key (keys %{$self->{cond}}) {
666       $key =~ /([^.]+)$/;
667       $del->{$1} = $self->{cond}{$key};
668     }
669   }
670   $self->result_source->storage->delete($self->result_source->from, $del);
671   return 1;
672 }
673
674 =head2 delete_all
675
676 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
677 will run cascade triggers while L</delete> will not.
678
679 =cut
680
681 sub delete_all {
682   my ($self) = @_;
683   $_->delete for $self->all;
684   return 1;
685 }
686
687 =head2 pager
688
689 Returns a L<Data::Page> object for the current resultset. Only makes
690 sense for queries with a C<page> attribute.
691
692 =cut
693
694 sub pager {
695   my ($self) = @_;
696   my $attrs = $self->{attrs};
697   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
698   $attrs->{rows} ||= 10;
699   $self->count;
700   return $self->{pager} ||= Data::Page->new(
701     $self->{count}, $attrs->{rows}, $self->{page});
702 }
703
704 =head2 page
705
706 =head3 Arguments: ($page_num)
707
708 Returns a new resultset for the specified page.
709
710 =cut
711
712 sub page {
713   my ($self, $page) = @_;
714   my $attrs = { %{$self->{attrs}} };
715   $attrs->{page} = $page;
716   return (ref $self)->new($self->result_source, $attrs);
717 }
718
719 =head2 new_result
720
721 =head3 Arguments: (\%vals)
722
723 Creates a result in the resultset's result class.
724
725 =cut
726
727 sub new_result {
728   my ($self, $values) = @_;
729   $self->throw_exception( "new_result needs a hash" )
730     unless (ref $values eq 'HASH');
731   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
732     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
733   my %new = %$values;
734   my $alias = $self->{attrs}{alias};
735   foreach my $key (keys %{$self->{cond}||{}}) {
736     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
737   }
738   my $obj = $self->result_class->new(\%new);
739   $obj->result_source($self->result_source) if $obj->can('result_source');
740   return $obj;
741 }
742
743 =head2 create
744
745 =head3 Arguments: (\%vals)
746
747 Inserts a record into the resultset and returns the object.
748
749 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
750
751 =cut
752
753 sub create {
754   my ($self, $attrs) = @_;
755   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
756   return $self->new_result($attrs)->insert;
757 }
758
759 =head2 find_or_create
760
761 =head3 Arguments: (\%vals, \%attrs?)
762
763   $class->find_or_create({ key => $val, ... });
764
765 Searches for a record matching the search condition; if it doesn't find one,
766 creates one and returns that instead.
767
768   my $cd = $schema->resultset('CD')->find_or_create({
769     cdid   => 5,
770     artist => 'Massive Attack',
771     title  => 'Mezzanine',
772     year   => 2005,
773   });
774
775 Also takes an optional C<key> attribute, to search by a specific key or unique
776 constraint. For example:
777
778   my $cd = $schema->resultset('CD')->find_or_create(
779     {
780       artist => 'Massive Attack',
781       title  => 'Mezzanine',
782     },
783     { key => 'artist_title' }
784   );
785
786 See also L</find> and L</update_or_create>.
787
788 =cut
789
790 sub find_or_create {
791   my $self     = shift;
792   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
793   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
794   my $exists   = $self->find($hash, $attrs);
795   return defined $exists ? $exists : $self->create($hash);
796 }
797
798 =head2 update_or_create
799
800   $class->update_or_create({ key => $val, ... });
801
802 First, search for an existing row matching one of the unique constraints
803 (including the primary key) on the source of this resultset.  If a row is
804 found, update it with the other given column values.  Otherwise, create a new
805 row.
806
807 Takes an optional C<key> attribute to search on a specific unique constraint.
808 For example:
809
810   # In your application
811   my $cd = $schema->resultset('CD')->update_or_create(
812     {
813       artist => 'Massive Attack',
814       title  => 'Mezzanine',
815       year   => 1998,
816     },
817     { key => 'artist_title' }
818   );
819
820 If no C<key> is specified, it searches on all unique constraints defined on the
821 source, including the primary key.
822
823 If the C<key> is specified as C<primary>, search only on the primary key.
824
825 See also L</find> and L</find_or_create>.
826
827 =cut
828
829 sub update_or_create {
830   my $self = shift;
831   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
832   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
833
834   my %unique_constraints = $self->result_source->unique_constraints;
835   my @constraint_names   = (exists $attrs->{key}
836                             ? ($attrs->{key})
837                             : keys %unique_constraints);
838
839   my @unique_hashes;
840   foreach my $name (@constraint_names) {
841     my @unique_cols = @{ $unique_constraints{$name} };
842     my %unique_hash =
843       map  { $_ => $hash->{$_} }
844       grep { exists $hash->{$_} }
845       @unique_cols;
846
847     push @unique_hashes, \%unique_hash
848       if (scalar keys %unique_hash == scalar @unique_cols);
849   }
850
851   if (@unique_hashes) {
852     my $row = $self->single(\@unique_hashes);
853     if (defined $row) {
854       $row->set_columns($hash);
855       $row->update;
856       return $row;
857     }
858   }
859
860   return $self->create($hash);
861 }
862
863 =head2 get_cache
864
865 Gets the contents of the cache for the resultset.
866
867 =cut
868
869 sub get_cache {
870   shift->{all_cache} || [];
871 }
872
873 =head2 set_cache
874
875 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
876
877 =cut
878
879 sub set_cache {
880   my ( $self, $data ) = @_;
881   $self->throw_exception("set_cache requires an arrayref")
882     if ref $data ne 'ARRAY';
883   my $result_class = $self->result_class;
884   foreach( @$data ) {
885     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
886       if ref $_ ne $result_class;
887   }
888   $self->{all_cache} = $data;
889 }
890
891 =head2 clear_cache
892
893 Clears the cache for the resultset.
894
895 =cut
896
897 sub clear_cache {
898   shift->set_cache([]);
899 }
900
901 =head2 related_resultset
902
903 Returns a related resultset for the supplied relationship name.
904
905   $rs = $rs->related_resultset('foo');
906
907 =cut
908
909 sub related_resultset {
910   my ( $self, $rel, @rest ) = @_;
911   $self->{related_resultsets} ||= {};
912   return $self->{related_resultsets}{$rel} ||= do {
913       #warn "fetching related resultset for rel '$rel'";
914       my $rel_obj = $self->result_source->relationship_info($rel);
915       $self->throw_exception(
916         "search_related: result source '" . $self->result_source->name .
917         "' has no such relationship ${rel}")
918         unless $rel_obj; #die Dumper $self->{attrs};
919
920       my $rs = $self->search(undef, { join => $rel });
921       my $alias = defined $rs->{attrs}{seen_join}{$rel}
922                     && $rs->{attrs}{seen_join}{$rel} > 1
923                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
924                   : $rel;
925
926       $self->result_source->schema->resultset($rel_obj->{class}
927            )->search( undef,
928              { %{$rs->{attrs}},
929                alias => $alias,
930                select => undef,
931                as => undef }
932            )->search(@rest);      
933   };
934 }
935
936 =head2 throw_exception
937
938 See Schema's throw_exception
939
940 =cut
941
942 sub throw_exception {
943   my $self=shift;
944   $self->result_source->schema->throw_exception(@_);
945 }
946
947 =head1 ATTRIBUTES
948
949 The resultset takes various attributes that modify its behavior. Here's an
950 overview of them:
951
952 =head2 order_by
953
954 Which column(s) to order the results by. This is currently passed through
955 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
956
957 =head2 columns
958
959 =head3 Arguments: (arrayref)
960
961 Shortcut to request a particular set of columns to be retrieved.  Adds
962 C<me.> onto the start of any column without a C<.> in it and sets C<select>
963 from that, then auto-populates C<as> from C<select> as normal. (You may also
964 use the C<cols> attribute, as in earlier versions of DBIC.)
965
966 =head2 include_columns
967
968 =head3 Arguments: (arrayref)
969
970 Shortcut to include additional columns in the returned results - for example
971
972   { include_columns => ['foo.name'], join => ['foo'] }
973
974 would add a 'name' column to the information passed to object inflation
975
976 =head2 select
977
978 =head3 Arguments: (arrayref)
979
980 Indicates which columns should be selected from the storage. You can use
981 column names, or in the case of RDBMS back ends, function or stored procedure
982 names:
983
984   $rs = $schema->resultset('Foo')->search(
985     undef,
986     {
987       select => [
988         'column_name',
989         { count => 'column_to_count' },
990         { sum => 'column_to_sum' }
991       ]
992     }
993   );
994
995 When you use function/stored procedure names and do not supply an C<as>
996 attribute, the column names returned are storage-dependent. E.g. MySQL would
997 return a column named C<count(column_to_count)> in the above example.
998
999 =head2 as
1000
1001 =head3 Arguments: (arrayref)
1002
1003 Indicates column names for object inflation. This is used in conjunction with
1004 C<select>, usually when C<select> contains one or more function or stored
1005 procedure names:
1006
1007   $rs = $schema->resultset('Foo')->search(
1008     undef,
1009     {
1010       select => [
1011         'column1',
1012         { count => 'column2' }
1013       ],
1014       as => [qw/ column1 column2_count /]
1015     }
1016   );
1017
1018   my $foo = $rs->first(); # get the first Foo
1019
1020 If the object against which the search is performed already has an accessor
1021 matching a column name specified in C<as>, the value can be retrieved using
1022 the accessor as normal:
1023
1024   my $column1 = $foo->column1();
1025
1026 If on the other hand an accessor does not exist in the object, you need to
1027 use C<get_column> instead:
1028
1029   my $column2_count = $foo->get_column('column2_count');
1030
1031 You can create your own accessors if required - see
1032 L<DBIx::Class::Manual::Cookbook> for details.
1033
1034 =head2 join
1035
1036 Contains a list of relationships that should be joined for this query.  For
1037 example:
1038
1039   # Get CDs by Nine Inch Nails
1040   my $rs = $schema->resultset('CD')->search(
1041     { 'artist.name' => 'Nine Inch Nails' },
1042     { join => 'artist' }
1043   );
1044
1045 Can also contain a hash reference to refer to the other relation's relations.
1046 For example:
1047
1048   package MyApp::Schema::Track;
1049   use base qw/DBIx::Class/;
1050   __PACKAGE__->table('track');
1051   __PACKAGE__->add_columns(qw/trackid cd position title/);
1052   __PACKAGE__->set_primary_key('trackid');
1053   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1054   1;
1055
1056   # In your application
1057   my $rs = $schema->resultset('Artist')->search(
1058     { 'track.title' => 'Teardrop' },
1059     {
1060       join     => { cd => 'track' },
1061       order_by => 'artist.name',
1062     }
1063   );
1064
1065 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1066 similarly for a third time). For e.g.
1067
1068   my $rs = $schema->resultset('Artist')->search(
1069     { 'cds.title'   => 'Foo',
1070       'cds_2.title' => 'Bar' },
1071     { join => [ qw/cds cds/ ] });
1072
1073 will return a set of all artists that have both a cd with title Foo and a cd
1074 with title Bar.
1075
1076 If you want to fetch related objects from other tables as well, see C<prefetch>
1077 below.
1078
1079 =head2 prefetch
1080
1081 =head3 Arguments: arrayref/hashref
1082
1083 Contains one or more relationships that should be fetched along with the main 
1084 query (when they are accessed afterwards they will have already been
1085 "prefetched").  This is useful for when you know you will need the related
1086 objects, because it saves at least one query:
1087
1088   my $rs = $schema->resultset('Tag')->search(
1089     undef,
1090     {
1091       prefetch => {
1092         cd => 'artist'
1093       }
1094     }
1095   );
1096
1097 The initial search results in SQL like the following:
1098
1099   SELECT tag.*, cd.*, artist.* FROM tag
1100   JOIN cd ON tag.cd = cd.cdid
1101   JOIN artist ON cd.artist = artist.artistid
1102
1103 L<DBIx::Class> has no need to go back to the database when we access the
1104 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1105 case.
1106
1107 Simple prefetches will be joined automatically, so there is no need
1108 for a C<join> attribute in the above search. If you're prefetching to
1109 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1110 specify the join as well.
1111
1112 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1113 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1114 with an accessor type of 'single' or 'filter').
1115
1116 =head2 from
1117
1118 =head3 Arguments: (arrayref)
1119
1120 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1121 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1122 clauses.
1123
1124 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1125 C<join> will usually do what you need and it is strongly recommended that you
1126 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1127
1128 In simple terms, C<from> works as follows:
1129
1130     [
1131         { <alias> => <table>, -join-type => 'inner|left|right' }
1132         [] # nested JOIN (optional)
1133         { <table.column> = <foreign_table.foreign_key> }
1134     ]
1135
1136     JOIN
1137         <alias> <table>
1138         [JOIN ...]
1139     ON <table.column> = <foreign_table.foreign_key>
1140
1141 An easy way to follow the examples below is to remember the following:
1142
1143     Anything inside "[]" is a JOIN
1144     Anything inside "{}" is a condition for the enclosing JOIN
1145
1146 The following examples utilize a "person" table in a family tree application.
1147 In order to express parent->child relationships, this table is self-joined:
1148
1149     # Person->belongs_to('father' => 'Person');
1150     # Person->belongs_to('mother' => 'Person');
1151
1152 C<from> can be used to nest joins. Here we return all children with a father,
1153 then search against all mothers of those children:
1154
1155   $rs = $schema->resultset('Person')->search(
1156       undef,
1157       {
1158           alias => 'mother', # alias columns in accordance with "from"
1159           from => [
1160               { mother => 'person' },
1161               [
1162                   [
1163                       { child => 'person' },
1164                       [
1165                           { father => 'person' },
1166                           { 'father.person_id' => 'child.father_id' }
1167                       ]
1168                   ],
1169                   { 'mother.person_id' => 'child.mother_id' }
1170               ],
1171           ]
1172       },
1173   );
1174
1175   # Equivalent SQL:
1176   # SELECT mother.* FROM person mother
1177   # JOIN (
1178   #   person child
1179   #   JOIN person father
1180   #   ON ( father.person_id = child.father_id )
1181   # )
1182   # ON ( mother.person_id = child.mother_id )
1183
1184 The type of any join can be controlled manually. To search against only people
1185 with a father in the person table, we could explicitly use C<INNER JOIN>:
1186
1187     $rs = $schema->resultset('Person')->search(
1188         undef,
1189         {
1190             alias => 'child', # alias columns in accordance with "from"
1191             from => [
1192                 { child => 'person' },
1193                 [
1194                     { father => 'person', -join-type => 'inner' },
1195                     { 'father.id' => 'child.father_id' }
1196                 ],
1197             ]
1198         },
1199     );
1200
1201     # Equivalent SQL:
1202     # SELECT child.* FROM person child
1203     # INNER JOIN person father ON child.father_id = father.id
1204
1205 =head2 page
1206
1207 For a paged resultset, specifies which page to retrieve.  Leave unset
1208 for an unpaged resultset.
1209
1210 =head2 rows
1211
1212 For a paged resultset, how many rows per page:
1213
1214   rows => 10
1215
1216 Can also be used to simulate an SQL C<LIMIT>.
1217
1218 =head2 group_by
1219
1220 =head3 Arguments: (arrayref)
1221
1222 A arrayref of columns to group by. Can include columns of joined tables.
1223
1224   group_by => [qw/ column1 column2 ... /]
1225
1226 =head2 distinct
1227
1228 Set to 1 to group by all columns.
1229
1230 For more examples of using these attributes, see
1231 L<DBIx::Class::Manual::Cookbook>.
1232
1233 =cut
1234
1235 1;