assorted crunchy code cleanups to ResultSet.pm (you can kill me if anything broke)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Data::Page;
10 use Storable;
11
12 use base qw/DBIx::Class/;
13 __PACKAGE__->load_components(qw/AccessorGroup/);
14 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
15
16 =head1 NAME
17
18 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
19
20 =head1 SYNOPSIS
21
22   my $rs   = $schema->resultset('User')->search(registered => 1);
23   my @rows = $schema->resultset('Foo')->search(bar => 'baz');
24
25 =head1 DESCRIPTION
26
27 The resultset is also known as an iterator. It is responsible for handling
28 queries that may return an arbitrary number of rows, e.g. via L</search>
29 or a C<has_many> relationship.
30
31 In the examples below, the following table classes are used:
32
33   package MyApp::Schema::Artist;
34   use base qw/DBIx::Class/;
35   __PACKAGE__->load_components(qw/Core/);
36   __PACKAGE__->table('artist');
37   __PACKAGE__->add_columns(qw/artistid name/);
38   __PACKAGE__->set_primary_key('artistid');
39   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
40   1;
41
42   package MyApp::Schema::CD;
43   use base qw/DBIx::Class/;
44   __PACKAGE__->load_components(qw/Core/);
45   __PACKAGE__->table('cd');
46   __PACKAGE__->add_columns(qw/cdid artist title year/);
47   __PACKAGE__->set_primary_key('cdid');
48   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
49   1;
50
51 =head1 METHODS
52
53 =head2 new
54
55 =head3 Arguments: ($source, \%$attrs)
56
57 The resultset constructor. Takes a source object (usually a
58 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
59 below).  Does not perform any queries -- these are executed as needed by the
60 other methods.
61
62 Generally you won't need to construct a resultset manually.  You'll
63 automatically get one from e.g. a L</search> called in scalar context:
64
65   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
66
67 =cut
68
69 sub new {
70   my $class = shift;
71   return $class->new_result(@_) if ref $class;
72   
73   my ($source, $attrs) = @_;
74   #use Data::Dumper; warn Dumper($attrs);
75   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
76   my $alias = ($attrs->{alias} ||= 'me');
77   
78   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
79   delete $attrs->{as} if $attrs->{columns};
80   $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
81   $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
82     if $attrs->{columns};
83   $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
84   if (my $include = delete $attrs->{include_columns}) {
85     push(@{$attrs->{select}}, @$include);
86     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
87   }
88   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
89
90   $attrs->{from} ||= [ { $alias => $source->from } ];
91   $attrs->{seen_join} ||= {};
92   my %seen;
93   if (my $join = delete $attrs->{join}) {
94     foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
95       if (ref $j eq 'HASH') {
96         $seen{$_} = 1 foreach keys %$j;
97       } else {
98         $seen{$j} = 1;
99       }
100     }
101     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
102   }
103   
104   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
105   $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
106   $attrs->{order_by} ||= [];
107
108   my $collapse = $attrs->{collapse} || {};
109   if (my $prefetch = delete $attrs->{prefetch}) {
110     my @pre_order;
111     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
112       if ( ref $p eq 'HASH' ) {
113         foreach my $key (keys %$p) {
114           push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
115             unless $seen{$key};
116         }
117       } else {
118         push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
119             unless $seen{$p};
120       }
121       my @prefetch = $source->resolve_prefetch(
122            $p, $attrs->{alias}, {}, \@pre_order, $collapse);
123       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
124       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
125     }
126     push(@{$attrs->{order_by}}, @pre_order);
127   }
128   $attrs->{collapse} = $collapse;
129 #  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
130
131   if ($attrs->{page}) {
132     $attrs->{rows} ||= 10;
133     $attrs->{offset} ||= 0;
134     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
135   }
136
137   bless {
138     result_source => $source,
139     result_class => $attrs->{result_class} || $source->result_class,
140     cond => $attrs->{where},
141     from => $attrs->{from},
142     collapse => $collapse,
143     count => undef,
144     page => delete $attrs->{page},
145     pager => undef,
146     attrs => $attrs
147   }, $class;
148 }
149
150 =head2 search
151
152   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
153   my $new_rs = $rs->search({ foo => 3 });
154
155 If you need to pass in additional attributes but no additional condition,
156 call it as C<search(undef, \%attrs);>.
157
158   # "SELECT foo, bar FROM $class_table"
159   my @all = $class->search(undef, { columns => [qw/foo bar/] });
160
161 =cut
162
163 sub search {
164   my $self = shift;
165
166   my $rs;
167   if( @_ ) {
168     
169     my $attrs = { %{$self->{attrs}} };
170     my $having = delete $attrs->{having};
171     $attrs = { %$attrs, %{ pop(@_) } } if @_ > 1 and ref $_[$#_] eq 'HASH';
172
173     my $where = (@_
174                   ? ((@_ == 1 || ref $_[0] eq "HASH")
175                       ? shift
176                       : ((@_ % 2)
177                           ? $self->throw_exception(
178                               "Odd number of arguments to search")
179                           : {@_}))
180                   : undef());
181     if (defined $where) {
182       $attrs->{where} = (defined $attrs->{where}
183                 ? { '-and' =>
184                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
185                         $where, $attrs->{where} ] }
186                 : $where);
187     }
188
189     if (defined $having) {
190       $attrs->{having} = (defined $attrs->{having}
191                 ? { '-and' =>
192                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
193                         $having, $attrs->{having} ] }
194                 : $having);
195     }
196
197     $rs = (ref $self)->new($self->result_source, $attrs);
198   }
199   else {
200     $rs = $self;
201     $rs->reset;
202   }
203   return (wantarray ? $rs->all : $rs);
204 }
205
206 =head2 search_literal
207
208   my @obj    = $rs->search_literal($literal_where_cond, @bind);
209   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
210
211 Pass a literal chunk of SQL to be added to the conditional part of the
212 resultset.
213
214 =cut
215
216 sub search_literal {
217   my ($self, $cond, @vals) = @_;
218   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
219   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
220   return $self->search(\$cond, $attrs);
221 }
222
223 =head2 find
224
225 =head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
226
227 Finds a row based on its primary key or unique constraint. For example:
228
229   my $cd = $schema->resultset('CD')->find(5);
230
231 Also takes an optional C<key> attribute, to search by a specific key or unique
232 constraint. For example:
233
234   my $cd = $schema->resultset('CD')->find(
235     {
236       artist => 'Massive Attack',
237       title  => 'Mezzanine',
238     },
239     { key => 'artist_title' }
240   );
241
242 See also L</find_or_create> and L</update_or_create>.
243
244 =cut
245
246 sub find {
247   my ($self, @vals) = @_;
248   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
249
250   my @cols = $self->result_source->primary_columns;
251   if (exists $attrs->{key}) {
252     my %uniq = $self->result_source->unique_constraints;
253     $self->throw_exception( "Unknown key $attrs->{key} on $self->name" )
254       unless exists $uniq{$attrs->{key}};
255     @cols = @{ $uniq{$attrs->{key}} };
256   }
257   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
258   $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
259     unless @cols;
260
261   my $query;
262   if (ref $vals[0] eq 'HASH') {
263     $query = { %{$vals[0]} };
264   } elsif (@cols == @vals) {
265     $query = {};
266     @{$query}{@cols} = @vals;
267   } else {
268     $query = {@vals};
269   }
270   foreach my $key (grep { ! m/\./ } keys %$query) {
271     $query->{"$self->{attrs}{alias}.$key"} = delete $query->{$key};
272   }
273   #warn Dumper($query);
274   
275   if (keys %$attrs) {
276       my $rs = $self->search($query,$attrs);
277       return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
278   } else {
279       return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
280   }
281 }
282
283 =head2 search_related
284
285   $rs->search_related('relname', $cond?, $attrs?);
286
287 Search the specified relationship. Optionally specify a condition for matching
288 records.
289
290 =cut
291
292 sub search_related {
293   return shift->related_resultset(shift)->search(@_);
294 }
295
296 =head2 cursor
297
298 Returns a storage-driven cursor to the given resultset.
299
300 =cut
301
302 sub cursor {
303   my ($self) = @_;
304   my $attrs = { %{$self->{attrs}} };
305   return $self->{cursor}
306     ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
307           $attrs->{where},$attrs);
308 }
309
310 =head2 single
311
312 Inflates the first result without creating a cursor
313
314 =cut
315
316 sub single {
317   my ($self, $where) = @_;
318   my $attrs = { %{$self->{attrs}} };
319   if ($where) {
320     if (defined $attrs->{where}) {
321       $attrs->{where} = {
322         '-and' => 
323             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
324                $where, delete $attrs->{where} ]
325       };
326     } else {
327       $attrs->{where} = $where;
328     }
329   }
330   my @data = $self->result_source->storage->select_single(
331           $self->{from}, $attrs->{select},
332           $attrs->{where},$attrs);
333   return (@data ? $self->_construct_object(@data) : ());
334 }
335
336
337 =head2 search_like
338
339 Perform a search, but use C<LIKE> instead of equality as the condition. Note
340 that this is simply a convenience method; you most likely want to use
341 L</search> with specific operators.
342
343 For more information, see L<DBIx::Class::Manual::Cookbook>.
344
345 =cut
346
347 sub search_like {
348   my $class = shift;
349   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
350   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
351   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
352   return $class->search($query, { %$attrs });
353 }
354
355 =head2 slice
356
357 =head3 Arguments: ($first, $last)
358
359 Returns a subset of elements from the resultset.
360
361 =cut
362
363 sub slice {
364   my ($self, $min, $max) = @_;
365   my $attrs = { %{ $self->{attrs} || {} } };
366   $attrs->{offset} ||= 0;
367   $attrs->{offset} += $min;
368   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
369   my $slice = (ref $self)->new($self->result_source, $attrs);
370   return (wantarray ? $slice->all : $slice);
371 }
372
373 =head2 next
374
375 Returns the next element in the resultset (C<undef> is there is none).
376
377 Can be used to efficiently iterate over records in the resultset:
378
379   my $rs = $schema->resultset('CD')->search;
380   while (my $cd = $rs->next) {
381     print $cd->title;
382   }
383
384 =cut
385
386 sub next {
387   my ($self) = @_;
388   if (@{$self->{all_cache} || []}) {
389     $self->{all_cache_position} ||= 0;
390     return $self->{all_cache}->[$self->{all_cache_position}++];
391   }
392   if ($self->{attrs}{cache}) {
393     $self->{all_cache_position} = 1;
394     return ($self->all)[0];
395   }
396   my @row = (exists $self->{stashed_row}
397                ? @{delete $self->{stashed_row}}
398                : $self->cursor->next);
399 #  warn Dumper(\@row); use Data::Dumper;
400   return unless (@row);
401   return $self->_construct_object(@row);
402 }
403
404 sub _construct_object {
405   my ($self, @row) = @_;
406   my @as = @{ $self->{attrs}{as} };
407   
408   my $info = $self->_collapse_result(\@as, \@row);
409   
410   my $new = $self->result_class->inflate_result($self->result_source, @$info);
411   
412   $new = $self->{attrs}{record_filter}->($new)
413     if exists $self->{attrs}{record_filter};
414   return $new;
415 }
416
417 sub _collapse_result {
418   my ($self, $as, $row, $prefix) = @_;
419
420   my %const;
421
422   my @copy = @$row;
423   foreach my $this_as (@$as) {
424     my $val = shift @copy;
425     if (defined $prefix) {
426       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
427         my $remain = $1;
428         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
429         $const{$1||''}{$2} = $val;
430       }
431     } else {
432       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
433       $const{$1||''}{$2} = $val;
434     }
435   }
436
437   my $info = [ {}, {} ];
438   foreach my $key (keys %const) {
439     if (length $key) {
440       my $target = $info;
441       my @parts = split(/\./, $key);
442       foreach my $p (@parts) {
443         $target = $target->[1]->{$p} ||= [];
444       }
445       $target->[0] = $const{$key};
446     } else {
447       $info->[0] = $const{$key};
448     }
449   }
450
451   my @collapse = (defined($prefix)
452                    ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
453                        keys %{$self->{collapse}})
454                    : keys %{$self->{collapse}});
455   if (@collapse) {
456     my ($c) = sort { length $a <=> length $b } @collapse;
457     my $target = $info;
458     foreach my $p (split(/\./, $c)) {
459       $target = $target->[1]->{$p} ||= [];
460     }
461     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
462     my @co_key = @{$self->{collapse}{$c_prefix}};
463     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
464     my $tree = $self->_collapse_result($as, $row, $c_prefix);
465     my (@final, @raw);
466     while ( !(grep {
467                 !defined($tree->[0]->{$_})
468                 || $co_check{$_} ne $tree->[0]->{$_}
469               } @co_key) ) {
470       push(@final, $tree);
471       last unless (@raw = $self->cursor->next);
472       $row = $self->{stashed_row} = \@raw;
473       $tree = $self->_collapse_result($as, $row, $c_prefix);
474       #warn Data::Dumper::Dumper($tree, $row);
475     }
476     @$target = @final;
477   }
478
479   return $info;
480 }
481
482 =head2 result_source
483
484 Returns a reference to the result source for this recordset.
485
486 =cut
487
488
489 =head2 count
490
491 Performs an SQL C<COUNT> with the same query as the resultset was built
492 with to find the number of elements. If passed arguments, does a search
493 on the resultset and counts the results of that.
494
495 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
496 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
497 not support C<DISTINCT> with multiple columns. If you are using such a
498 database, you should only use columns from the main table in your C<group_by>
499 clause.
500
501 =cut
502
503 sub count {
504   my $self = shift;
505   return $self->search(@_)->count if @_ and defined $_[0];
506   unless (defined $self->{count}) {
507     return scalar @{ $self->get_cache } if @{ $self->get_cache };
508     my $select = { count => '*' };
509     my $attrs = { %{ $self->{attrs} } };
510     if (my $group_by = delete $attrs->{group_by}) {
511       delete $attrs->{having};
512       my @distinct = (ref $group_by ?  @$group_by : ($group_by));
513       # todo: try CONCAT for multi-column pk
514       my @pk = $self->result_source->primary_columns;
515       if (@pk == 1) {
516         foreach my $column (@distinct) {
517           if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
518             @distinct = ($column);
519             last;
520           }
521         } 
522       }
523
524       $select = { count => { distinct => \@distinct } };
525       #use Data::Dumper; die Dumper $select;
526     }
527
528     $attrs->{select} = $select;
529     $attrs->{as} = [qw/count/];
530     # offset, order by and page are not needed to count. record_filter is cdbi
531     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
532         
533     ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
534   }
535   return 0 unless $self->{count};
536   my $count = $self->{count};
537   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
538   $count = $self->{attrs}{rows} if
539     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
540   return $count;
541 }
542
543 =head2 count_literal
544
545 Calls L</search_literal> with the passed arguments, then L</count>.
546
547 =cut
548
549 sub count_literal { shift->search_literal(@_)->count; }
550
551 =head2 all
552
553 Returns all elements in the resultset. Called implictly if the resultset
554 is returned in list context.
555
556 =cut
557
558 sub all {
559   my ($self) = @_;
560   return @{ $self->get_cache } if @{ $self->get_cache };
561
562   my @obj;
563
564   if (keys %{$self->{collapse}}) {
565       # Using $self->cursor->all is really just an optimisation.
566       # If we're collapsing has_many prefetches it probably makes
567       # very little difference, and this is cleaner than hacking
568       # _construct_object to survive the approach
569     my @row;
570     $self->cursor->reset;
571     while (@row = $self->cursor->next) {
572       push(@obj, $self->_construct_object(@row));
573     }
574   } else {
575     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
576   }
577
578   $self->set_cache(\@obj) if $self->{attrs}{cache};
579   return @obj;
580 }
581
582 =head2 reset
583
584 Resets the resultset's cursor, so you can iterate through the elements again.
585
586 =cut
587
588 sub reset {
589   my ($self) = @_;
590   $self->{all_cache_position} = 0;
591   $self->cursor->reset;
592   return $self;
593 }
594
595 =head2 first
596
597 Resets the resultset and returns the first element.
598
599 =cut
600
601 sub first {
602   return $_[0]->reset->next;
603 }
604
605 =head2 update
606
607 =head3 Arguments: (\%values)
608
609 Sets the specified columns in the resultset to the supplied values.
610
611 =cut
612
613 sub update {
614   my ($self, $values) = @_;
615   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
616   return $self->result_source->storage->update(
617            $self->result_source->from, $values, $self->{cond});
618 }
619
620 =head2 update_all
621
622 =head3 Arguments: (\%values)
623
624 Fetches all objects and updates them one at a time.  Note that C<update_all>
625 will run cascade triggers while L</update> will not.
626
627 =cut
628
629 sub update_all {
630   my ($self, $values) = @_;
631   $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
632   foreach my $obj ($self->all) {
633     $obj->set_columns($values)->update;
634   }
635   return 1;
636 }
637
638 =head2 delete
639
640 Deletes the contents of the resultset from its result source.
641
642 =cut
643
644 sub delete {
645   my ($self) = @_;
646   my $del = {};
647   $self->throw_exception("Can't delete on resultset with condition unless hash or array")
648     unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
649   if (ref $self->{cond} eq 'ARRAY') {
650     $del = [ map { my %hash;
651       foreach my $key (keys %{$_}) {
652         $key =~ /([^.]+)$/;
653         $hash{$1} = $_->{$key};
654       }; \%hash; } @{$self->{cond}} ];
655   } elsif ((keys %{$self->{cond}})[0] eq '-and') {
656     $del->{-and} = [ map { my %hash;
657       foreach my $key (keys %{$_}) {
658         $key =~ /([^.]+)$/;
659         $hash{$1} = $_->{$key};
660       }; \%hash; } @{$self->{cond}{-and}} ];
661   } else {
662     foreach my $key (keys %{$self->{cond}}) {
663       $key =~ /([^.]+)$/;
664       $del->{$1} = $self->{cond}{$key};
665     }
666   }
667   $self->result_source->storage->delete($self->result_source->from, $del);
668   return 1;
669 }
670
671 =head2 delete_all
672
673 Fetches all objects and deletes them one at a time.  Note that C<delete_all>
674 will run cascade triggers while L</delete> will not.
675
676 =cut
677
678 sub delete_all {
679   my ($self) = @_;
680   $_->delete for $self->all;
681   return 1;
682 }
683
684 =head2 pager
685
686 Returns a L<Data::Page> object for the current resultset. Only makes
687 sense for queries with a C<page> attribute.
688
689 =cut
690
691 sub pager {
692   my ($self) = @_;
693   my $attrs = $self->{attrs};
694   $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
695   $attrs->{rows} ||= 10;
696   $self->count;
697   return $self->{pager} ||= Data::Page->new(
698     $self->{count}, $attrs->{rows}, $self->{page});
699 }
700
701 =head2 page
702
703 =head3 Arguments: ($page_num)
704
705 Returns a new resultset for the specified page.
706
707 =cut
708
709 sub page {
710   my ($self, $page) = @_;
711   my $attrs = { %{$self->{attrs}} };
712   $attrs->{page} = $page;
713   return (ref $self)->new($self->result_source, $attrs);
714 }
715
716 =head2 new_result
717
718 =head3 Arguments: (\%vals)
719
720 Creates a result in the resultset's result class.
721
722 =cut
723
724 sub new_result {
725   my ($self, $values) = @_;
726   $self->throw_exception( "new_result needs a hash" )
727     unless (ref $values eq 'HASH');
728   $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
729     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
730   my %new = %$values;
731   my $alias = $self->{attrs}{alias};
732   foreach my $key (keys %{$self->{cond}||{}}) {
733     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
734   }
735   my $obj = $self->result_class->new(\%new);
736   $obj->result_source($self->result_source) if $obj->can('result_source');
737   return $obj;
738 }
739
740 =head2 create
741
742 =head3 Arguments: (\%vals)
743
744 Inserts a record into the resultset and returns the object.
745
746 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
747
748 =cut
749
750 sub create {
751   my ($self, $attrs) = @_;
752   $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
753   return $self->new_result($attrs)->insert;
754 }
755
756 =head2 find_or_create
757
758 =head3 Arguments: (\%vals, \%attrs?)
759
760   $class->find_or_create({ key => $val, ... });
761
762 Searches for a record matching the search condition; if it doesn't find one,
763 creates one and returns that instead.
764
765   my $cd = $schema->resultset('CD')->find_or_create({
766     cdid   => 5,
767     artist => 'Massive Attack',
768     title  => 'Mezzanine',
769     year   => 2005,
770   });
771
772 Also takes an optional C<key> attribute, to search by a specific key or unique
773 constraint. For example:
774
775   my $cd = $schema->resultset('CD')->find_or_create(
776     {
777       artist => 'Massive Attack',
778       title  => 'Mezzanine',
779     },
780     { key => 'artist_title' }
781   );
782
783 See also L</find> and L</update_or_create>.
784
785 =cut
786
787 sub find_or_create {
788   my $self     = shift;
789   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
790   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
791   my $exists   = $self->find($hash, $attrs);
792   return defined $exists ? $exists : $self->create($hash);
793 }
794
795 =head2 update_or_create
796
797   $class->update_or_create({ key => $val, ... });
798
799 First, search for an existing row matching one of the unique constraints
800 (including the primary key) on the source of this resultset.  If a row is
801 found, update it with the other given column values.  Otherwise, create a new
802 row.
803
804 Takes an optional C<key> attribute to search on a specific unique constraint.
805 For example:
806
807   # In your application
808   my $cd = $schema->resultset('CD')->update_or_create(
809     {
810       artist => 'Massive Attack',
811       title  => 'Mezzanine',
812       year   => 1998,
813     },
814     { key => 'artist_title' }
815   );
816
817 If no C<key> is specified, it searches on all unique constraints defined on the
818 source, including the primary key.
819
820 If the C<key> is specified as C<primary>, search only on the primary key.
821
822 See also L</find> and L</find_or_create>.
823
824 =cut
825
826 sub update_or_create {
827   my $self = shift;
828   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
829   my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
830
831   my %unique_constraints = $self->result_source->unique_constraints;
832   my @constraint_names   = (exists $attrs->{key}
833                             ? ($attrs->{key})
834                             : keys %unique_constraints);
835
836   my @unique_hashes;
837   foreach my $name (@constraint_names) {
838     my @unique_cols = @{ $unique_constraints{$name} };
839     my %unique_hash =
840       map  { $_ => $hash->{$_} }
841       grep { exists $hash->{$_} }
842       @unique_cols;
843
844     push @unique_hashes, \%unique_hash
845       if (scalar keys %unique_hash == scalar @unique_cols);
846   }
847
848   if (@unique_hashes) {
849     my $row = $self->single(\@unique_hashes);
850     if (defined $row) {
851       $row->set_columns($hash);
852       $row->update;
853       return $row;
854     }
855   }
856
857   return $self->create($hash);
858 }
859
860 =head2 get_cache
861
862 Gets the contents of the cache for the resultset.
863
864 =cut
865
866 sub get_cache {
867   shift->{all_cache} || [];
868 }
869
870 =head2 set_cache
871
872 Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
873
874 =cut
875
876 sub set_cache {
877   my ( $self, $data ) = @_;
878   $self->throw_exception("set_cache requires an arrayref")
879     if ref $data ne 'ARRAY';
880   my $result_class = $self->result_class;
881   foreach( @$data ) {
882     $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
883       if ref $_ ne $result_class;
884   }
885   $self->{all_cache} = $data;
886 }
887
888 =head2 clear_cache
889
890 Clears the cache for the resultset.
891
892 =cut
893
894 sub clear_cache {
895   shift->set_cache([]);
896 }
897
898 =head2 related_resultset
899
900 Returns a related resultset for the supplied relationship name.
901
902   $rs = $rs->related_resultset('foo');
903
904 =cut
905
906 sub related_resultset {
907   my ( $self, $rel, @rest ) = @_;
908   $self->{related_resultsets} ||= {};
909   return $self->{related_resultsets}{$rel} ||= do {
910       #warn "fetching related resultset for rel '$rel'";
911       my $rel_obj = $self->result_source->relationship_info($rel);
912       $self->throw_exception(
913         "search_related: result source '" . $self->result_source->name .
914         "' has no such relationship ${rel}")
915         unless $rel_obj; #die Dumper $self->{attrs};
916
917       my $rs = $self->search(undef, { join => $rel });
918       my $alias = defined $rs->{attrs}{seen_join}{$rel}
919                     && $rs->{attrs}{seen_join}{$rel} > 1
920                   ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
921                   : $rel;
922
923       $self->result_source->schema->resultset($rel_obj->{class}
924            )->search( undef,
925              { %{$rs->{attrs}},
926                alias => $alias,
927                select => undef,
928                as => undef }
929            )->search(@rest);      
930   };
931 }
932
933 =head2 throw_exception
934
935 See Schema's throw_exception
936
937 =cut
938
939 sub throw_exception {
940   my $self=shift;
941   $self->result_source->schema->throw_exception(@_);
942 }
943
944 =head1 ATTRIBUTES
945
946 The resultset takes various attributes that modify its behavior. Here's an
947 overview of them:
948
949 =head2 order_by
950
951 Which column(s) to order the results by. This is currently passed through
952 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
953
954 =head2 columns
955
956 =head3 Arguments: (arrayref)
957
958 Shortcut to request a particular set of columns to be retrieved.  Adds
959 C<me.> onto the start of any column without a C<.> in it and sets C<select>
960 from that, then auto-populates C<as> from C<select> as normal. (You may also
961 use the C<cols> attribute, as in earlier versions of DBIC.)
962
963 =head2 include_columns
964
965 =head3 Arguments: (arrayref)
966
967 Shortcut to include additional columns in the returned results - for example
968
969   { include_columns => ['foo.name'], join => ['foo'] }
970
971 would add a 'name' column to the information passed to object inflation
972
973 =head2 select
974
975 =head3 Arguments: (arrayref)
976
977 Indicates which columns should be selected from the storage. You can use
978 column names, or in the case of RDBMS back ends, function or stored procedure
979 names:
980
981   $rs = $schema->resultset('Foo')->search(
982     undef,
983     {
984       select => [
985         'column_name',
986         { count => 'column_to_count' },
987         { sum => 'column_to_sum' }
988       ]
989     }
990   );
991
992 When you use function/stored procedure names and do not supply an C<as>
993 attribute, the column names returned are storage-dependent. E.g. MySQL would
994 return a column named C<count(column_to_count)> in the above example.
995
996 =head2 as
997
998 =head3 Arguments: (arrayref)
999
1000 Indicates column names for object inflation. This is used in conjunction with
1001 C<select>, usually when C<select> contains one or more function or stored
1002 procedure names:
1003
1004   $rs = $schema->resultset('Foo')->search(
1005     undef,
1006     {
1007       select => [
1008         'column1',
1009         { count => 'column2' }
1010       ],
1011       as => [qw/ column1 column2_count /]
1012     }
1013   );
1014
1015   my $foo = $rs->first(); # get the first Foo
1016
1017 If the object against which the search is performed already has an accessor
1018 matching a column name specified in C<as>, the value can be retrieved using
1019 the accessor as normal:
1020
1021   my $column1 = $foo->column1();
1022
1023 If on the other hand an accessor does not exist in the object, you need to
1024 use C<get_column> instead:
1025
1026   my $column2_count = $foo->get_column('column2_count');
1027
1028 You can create your own accessors if required - see
1029 L<DBIx::Class::Manual::Cookbook> for details.
1030
1031 =head2 join
1032
1033 Contains a list of relationships that should be joined for this query.  For
1034 example:
1035
1036   # Get CDs by Nine Inch Nails
1037   my $rs = $schema->resultset('CD')->search(
1038     { 'artist.name' => 'Nine Inch Nails' },
1039     { join => 'artist' }
1040   );
1041
1042 Can also contain a hash reference to refer to the other relation's relations.
1043 For example:
1044
1045   package MyApp::Schema::Track;
1046   use base qw/DBIx::Class/;
1047   __PACKAGE__->table('track');
1048   __PACKAGE__->add_columns(qw/trackid cd position title/);
1049   __PACKAGE__->set_primary_key('trackid');
1050   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1051   1;
1052
1053   # In your application
1054   my $rs = $schema->resultset('Artist')->search(
1055     { 'track.title' => 'Teardrop' },
1056     {
1057       join     => { cd => 'track' },
1058       order_by => 'artist.name',
1059     }
1060   );
1061
1062 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1063 similarly for a third time). For e.g.
1064
1065   my $rs = $schema->resultset('Artist')->search(
1066     { 'cds.title'   => 'Foo',
1067       'cds_2.title' => 'Bar' },
1068     { join => [ qw/cds cds/ ] });
1069
1070 will return a set of all artists that have both a cd with title Foo and a cd
1071 with title Bar.
1072
1073 If you want to fetch related objects from other tables as well, see C<prefetch>
1074 below.
1075
1076 =head2 prefetch
1077
1078 =head3 Arguments: arrayref/hashref
1079
1080 Contains one or more relationships that should be fetched along with the main 
1081 query (when they are accessed afterwards they will have already been
1082 "prefetched").  This is useful for when you know you will need the related
1083 objects, because it saves at least one query:
1084
1085   my $rs = $schema->resultset('Tag')->search(
1086     undef,
1087     {
1088       prefetch => {
1089         cd => 'artist'
1090       }
1091     }
1092   );
1093
1094 The initial search results in SQL like the following:
1095
1096   SELECT tag.*, cd.*, artist.* FROM tag
1097   JOIN cd ON tag.cd = cd.cdid
1098   JOIN artist ON cd.artist = artist.artistid
1099
1100 L<DBIx::Class> has no need to go back to the database when we access the
1101 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1102 case.
1103
1104 Simple prefetches will be joined automatically, so there is no need
1105 for a C<join> attribute in the above search. If you're prefetching to
1106 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1107 specify the join as well.
1108
1109 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1110 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1111 with an accessor type of 'single' or 'filter').
1112
1113 =head2 from
1114
1115 =head3 Arguments: (arrayref)
1116
1117 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1118 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1119 clauses.
1120
1121 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1122 C<join> will usually do what you need and it is strongly recommended that you
1123 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1124
1125 In simple terms, C<from> works as follows:
1126
1127     [
1128         { <alias> => <table>, -join-type => 'inner|left|right' }
1129         [] # nested JOIN (optional)
1130         { <table.column> = <foreign_table.foreign_key> }
1131     ]
1132
1133     JOIN
1134         <alias> <table>
1135         [JOIN ...]
1136     ON <table.column> = <foreign_table.foreign_key>
1137
1138 An easy way to follow the examples below is to remember the following:
1139
1140     Anything inside "[]" is a JOIN
1141     Anything inside "{}" is a condition for the enclosing JOIN
1142
1143 The following examples utilize a "person" table in a family tree application.
1144 In order to express parent->child relationships, this table is self-joined:
1145
1146     # Person->belongs_to('father' => 'Person');
1147     # Person->belongs_to('mother' => 'Person');
1148
1149 C<from> can be used to nest joins. Here we return all children with a father,
1150 then search against all mothers of those children:
1151
1152   $rs = $schema->resultset('Person')->search(
1153       undef,
1154       {
1155           alias => 'mother', # alias columns in accordance with "from"
1156           from => [
1157               { mother => 'person' },
1158               [
1159                   [
1160                       { child => 'person' },
1161                       [
1162                           { father => 'person' },
1163                           { 'father.person_id' => 'child.father_id' }
1164                       ]
1165                   ],
1166                   { 'mother.person_id' => 'child.mother_id' }
1167               ],
1168           ]
1169       },
1170   );
1171
1172   # Equivalent SQL:
1173   # SELECT mother.* FROM person mother
1174   # JOIN (
1175   #   person child
1176   #   JOIN person father
1177   #   ON ( father.person_id = child.father_id )
1178   # )
1179   # ON ( mother.person_id = child.mother_id )
1180
1181 The type of any join can be controlled manually. To search against only people
1182 with a father in the person table, we could explicitly use C<INNER JOIN>:
1183
1184     $rs = $schema->resultset('Person')->search(
1185         undef,
1186         {
1187             alias => 'child', # alias columns in accordance with "from"
1188             from => [
1189                 { child => 'person' },
1190                 [
1191                     { father => 'person', -join-type => 'inner' },
1192                     { 'father.id' => 'child.father_id' }
1193                 ],
1194             ]
1195         },
1196     );
1197
1198     # Equivalent SQL:
1199     # SELECT child.* FROM person child
1200     # INNER JOIN person father ON child.father_id = father.id
1201
1202 =head2 page
1203
1204 For a paged resultset, specifies which page to retrieve.  Leave unset
1205 for an unpaged resultset.
1206
1207 =head2 rows
1208
1209 For a paged resultset, how many rows per page:
1210
1211   rows => 10
1212
1213 Can also be used to simulate an SQL C<LIMIT>.
1214
1215 =head2 group_by
1216
1217 =head3 Arguments: (arrayref)
1218
1219 A arrayref of columns to group by. Can include columns of joined tables.
1220
1221   group_by => [qw/ column1 column2 ... /]
1222
1223 =head2 distinct
1224
1225 Set to 1 to group by all columns.
1226
1227 For more examples of using these attributes, see
1228 L<DBIx::Class::Manual::Cookbook>.
1229
1230 =cut
1231
1232 1;