syntax fix: $#{@x} is invalid syntax that happens to work on released perls, but...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see
139 L</ATTRIBUTES>. For more examples of using this function, see
140 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
141 documentation for the first argument, see L<SQL::Abstract>.
142
143 =cut
144
145 sub search {
146   my $self = shift;
147   my $rs = $self->search_rs( @_ );
148   return (wantarray ? $rs->all : $rs);
149 }
150
151 =head2 search_rs
152
153 =over 4
154
155 =item Arguments: $cond, \%attrs?
156
157 =item Return Value: $resultset
158
159 =back
160
161 This method does the same exact thing as search() except it will
162 always return a resultset, even in list context.
163
164 =cut
165
166 sub search_rs {
167   my $self = shift;
168
169   my $rows;
170
171   unless (@_) {                 # no search, effectively just a clone
172     $rows = $self->get_cache;
173   }
174
175   my $attrs = {};
176   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
177   my $our_attrs = { %{$self->{attrs}} };
178   my $having = delete $our_attrs->{having};
179   my $where = delete $our_attrs->{where};
180
181   my $new_attrs = { %{$our_attrs}, %{$attrs} };
182
183   # merge new attrs into inherited
184   foreach my $key (qw/join prefetch/) {
185     next unless exists $attrs->{$key};
186     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
187   }
188
189   my $cond = (@_
190     ? (
191         (@_ == 1 || ref $_[0] eq "HASH")
192           ? (
193               (ref $_[0] eq 'HASH')
194                 ? (
195                     (keys %{ $_[0] }  > 0)
196                       ? shift
197                       : undef
198                    )
199                 :  shift
200              )
201           : (
202               (@_ % 2)
203                 ? $self->throw_exception("Odd number of arguments to search")
204                 : {@_}
205              )
206       )
207     : undef
208   );
209
210   if (defined $where) {
211     $new_attrs->{where} = (
212       defined $new_attrs->{where}
213         ? { '-and' => [
214               map {
215                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
216               } $where, $new_attrs->{where}
217             ]
218           }
219         : $where);
220   }
221
222   if (defined $cond) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $cond, $new_attrs->{where}
229             ]
230           }
231         : $cond);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->result_source, $new_attrs);
247   if ($rows) {
248     $rs->set_cache($rows);
249   }
250   return $rs;
251 }
252
253 =head2 search_literal
254
255 =over 4
256
257 =item Arguments: $sql_fragment, @bind_values
258
259 =item Return Value: $resultset (scalar context), @row_objs (list context)
260
261 =back
262
263   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
264   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
265
266 Pass a literal chunk of SQL to be added to the conditional part of the
267 resultset query.
268
269 =cut
270
271 sub search_literal {
272   my ($self, $cond, @vals) = @_;
273   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
274   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
275   return $self->search(\$cond, $attrs);
276 }
277
278 =head2 find
279
280 =over 4
281
282 =item Arguments: @values | \%cols, \%attrs?
283
284 =item Return Value: $row_object
285
286 =back
287
288 Finds a row based on its primary key or unique constraint. For example, to find
289 a row by its primary key:
290
291   my $cd = $schema->resultset('CD')->find(5);
292
293 You can also find a row by a specific unique constraint using the C<key>
294 attribute. For example:
295
296   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
297     key => 'cd_artist_title'
298   });
299
300 Additionally, you can specify the columns explicitly by name:
301
302   my $cd = $schema->resultset('CD')->find(
303     {
304       artist => 'Massive Attack',
305       title  => 'Mezzanine',
306     },
307     { key => 'cd_artist_title' }
308   );
309
310 If the C<key> is specified as C<primary>, it searches only on the primary key.
311
312 If no C<key> is specified, it searches on all unique constraints defined on the
313 source, including the primary key.
314
315 If your table does not have a primary key, you B<must> provide a value for the
316 C<key> attribute matching one of the unique constraints on the source.
317
318 See also L</find_or_create> and L</update_or_create>. For information on how to
319 declare unique constraints, see
320 L<DBIx::Class::ResultSource/add_unique_constraint>.
321
322 =cut
323
324 sub find {
325   my $self = shift;
326   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
327
328   # Default to the primary key, but allow a specific key
329   my @cols = exists $attrs->{key}
330     ? $self->result_source->unique_constraint_columns($attrs->{key})
331     : $self->result_source->primary_columns;
332   $self->throw_exception(
333     "Can't find unless a primary key is defined or unique constraint is specified"
334   ) unless @cols;
335
336   # Parse out a hashref from input
337   my $input_query;
338   if (ref $_[0] eq 'HASH') {
339     $input_query = { %{$_[0]} };
340   }
341   elsif (@_ == @cols) {
342     $input_query = {};
343     @{$input_query}{@cols} = @_;
344   }
345   else {
346     # Compatibility: Allow e.g. find(id => $value)
347     carp "Find by key => value deprecated; please use a hashref instead";
348     $input_query = {@_};
349   }
350
351   my (%related, $info);
352
353   KEY: foreach my $key (keys %$input_query) {
354     if (ref($input_query->{$key})
355         && ($info = $self->result_source->relationship_info($key))) {
356       my $val = delete $input_query->{$key};
357       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
358       my $rel_q = $self->result_source->resolve_condition(
359                     $info->{cond}, $val, $key
360                   );
361       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
362       @related{keys %$rel_q} = values %$rel_q;
363     }
364   }
365   if (my @keys = keys %related) {
366     @{$input_query}{@keys} = values %related;
367   }
368
369   my @unique_queries = $self->_unique_queries($input_query, $attrs);
370
371   # Build the final query: Default to the disjunction of the unique queries,
372   # but allow the input query in case the ResultSet defines the query or the
373   # user is abusing find
374   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
375   my $query = @unique_queries
376     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
377     : $self->_add_alias($input_query, $alias);
378
379   # Run the query
380   if (keys %$attrs) {
381     my $rs = $self->search($query, $attrs);
382     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
383   }
384   else {
385     return keys %{$self->_resolved_attrs->{collapse}}
386       ? $self->search($query)->next
387       : $self->single($query);
388   }
389 }
390
391 # _add_alias
392 #
393 # Add the specified alias to the specified query hash. A copy is made so the
394 # original query is not modified.
395
396 sub _add_alias {
397   my ($self, $query, $alias) = @_;
398
399   my %aliased = %$query;
400   foreach my $col (grep { ! m/\./ } keys %aliased) {
401     $aliased{"$alias.$col"} = delete $aliased{$col};
402   }
403
404   return \%aliased;
405 }
406
407 # _unique_queries
408 #
409 # Build a list of queries which satisfy unique constraints.
410
411 sub _unique_queries {
412   my ($self, $query, $attrs) = @_;
413
414   my @constraint_names = exists $attrs->{key}
415     ? ($attrs->{key})
416     : $self->result_source->unique_constraint_names;
417
418   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
419   my $num_where = scalar keys %$where;
420
421   my @unique_queries;
422   foreach my $name (@constraint_names) {
423     my @unique_cols = $self->result_source->unique_constraint_columns($name);
424     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
425
426     my $num_cols = scalar @unique_cols;
427     my $num_query = scalar keys %$unique_query;
428
429     my $total = $num_query + $num_where;
430     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
431       # The query is either unique on its own or is unique in combination with
432       # the existing where clause
433       push @unique_queries, $unique_query;
434     }
435   }
436
437   return @unique_queries;
438 }
439
440 # _build_unique_query
441 #
442 # Constrain the specified query hash based on the specified column names.
443
444 sub _build_unique_query {
445   my ($self, $query, $unique_cols) = @_;
446
447   return {
448     map  { $_ => $query->{$_} }
449     grep { exists $query->{$_} }
450       @$unique_cols
451   };
452 }
453
454 =head2 search_related
455
456 =over 4
457
458 =item Arguments: $rel, $cond, \%attrs?
459
460 =item Return Value: $new_resultset
461
462 =back
463
464   $new_rs = $cd_rs->search_related('artist', {
465     name => 'Emo-R-Us',
466   });
467
468 Searches the specified relationship, optionally specifying a condition and
469 attributes for matching records. See L</ATTRIBUTES> for more information.
470
471 =cut
472
473 sub search_related {
474   return shift->related_resultset(shift)->search(@_);
475 }
476
477 =head2 cursor
478
479 =over 4
480
481 =item Arguments: none
482
483 =item Return Value: $cursor
484
485 =back
486
487 Returns a storage-driven cursor to the given resultset. See
488 L<DBIx::Class::Cursor> for more information.
489
490 =cut
491
492 sub cursor {
493   my ($self) = @_;
494
495   my $attrs = { %{$self->_resolved_attrs} };
496   return $self->{cursor}
497     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
498           $attrs->{where},$attrs);
499 }
500
501 =head2 single
502
503 =over 4
504
505 =item Arguments: $cond?
506
507 =item Return Value: $row_object?
508
509 =back
510
511   my $cd = $schema->resultset('CD')->single({ year => 2001 });
512
513 Inflates the first result without creating a cursor if the resultset has
514 any records in it; if not returns nothing. Used by L</find> as an optimisation.
515
516 Can optionally take an additional condition *only* - this is a fast-code-path
517 method; if you need to add extra joins or similar call ->search and then
518 ->single without a condition on the $rs returned from that.
519
520 =cut
521
522 sub single {
523   my ($self, $where) = @_;
524   my $attrs = { %{$self->_resolved_attrs} };
525   if ($where) {
526     if (defined $attrs->{where}) {
527       $attrs->{where} = {
528         '-and' =>
529             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
530                $where, delete $attrs->{where} ]
531       };
532     } else {
533       $attrs->{where} = $where;
534     }
535   }
536
537 #  XXX: Disabled since it doesn't infer uniqueness in all cases
538 #  unless ($self->_is_unique_query($attrs->{where})) {
539 #    carp "Query not guaranteed to return a single row"
540 #      . "; please declare your unique constraints or use search instead";
541 #  }
542
543   my @data = $self->result_source->storage->select_single(
544     $attrs->{from}, $attrs->{select},
545     $attrs->{where}, $attrs
546   );
547
548   return (@data ? ($self->_construct_object(@data))[0] : undef);
549 }
550
551 # _is_unique_query
552 #
553 # Try to determine if the specified query is guaranteed to be unique, based on
554 # the declared unique constraints.
555
556 sub _is_unique_query {
557   my ($self, $query) = @_;
558
559   my $collapsed = $self->_collapse_query($query);
560   my $alias = $self->{attrs}{alias};
561
562   foreach my $name ($self->result_source->unique_constraint_names) {
563     my @unique_cols = map {
564       "$alias.$_"
565     } $self->result_source->unique_constraint_columns($name);
566
567     # Count the values for each unique column
568     my %seen = map { $_ => 0 } @unique_cols;
569
570     foreach my $key (keys %$collapsed) {
571       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
572       next unless exists $seen{$aliased};  # Additional constraints are okay
573       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
574     }
575
576     # If we get 0 or more than 1 value for a column, it's not necessarily unique
577     return 1 unless grep { $_ != 1 } values %seen;
578   }
579
580   return 0;
581 }
582
583 # _collapse_query
584 #
585 # Recursively collapse the query, accumulating values for each column.
586
587 sub _collapse_query {
588   my ($self, $query, $collapsed) = @_;
589
590   $collapsed ||= {};
591
592   if (ref $query eq 'ARRAY') {
593     foreach my $subquery (@$query) {
594       next unless ref $subquery;  # -or
595 #      warn "ARRAY: " . Dumper $subquery;
596       $collapsed = $self->_collapse_query($subquery, $collapsed);
597     }
598   }
599   elsif (ref $query eq 'HASH') {
600     if (keys %$query and (keys %$query)[0] eq '-and') {
601       foreach my $subquery (@{$query->{-and}}) {
602 #        warn "HASH: " . Dumper $subquery;
603         $collapsed = $self->_collapse_query($subquery, $collapsed);
604       }
605     }
606     else {
607 #      warn "LEAF: " . Dumper $query;
608       foreach my $col (keys %$query) {
609         my $value = $query->{$col};
610         $collapsed->{$col}{$value}++;
611       }
612     }
613   }
614
615   return $collapsed;
616 }
617
618 =head2 get_column
619
620 =over 4
621
622 =item Arguments: $cond?
623
624 =item Return Value: $resultsetcolumn
625
626 =back
627
628   my $max_length = $rs->get_column('length')->max;
629
630 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
631
632 =cut
633
634 sub get_column {
635   my ($self, $column) = @_;
636   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
637   return $new;
638 }
639
640 =head2 search_like
641
642 =over 4
643
644 =item Arguments: $cond, \%attrs?
645
646 =item Return Value: $resultset (scalar context), @row_objs (list context)
647
648 =back
649
650   # WHERE title LIKE '%blue%'
651   $cd_rs = $rs->search_like({ title => '%blue%'});
652
653 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
654 that this is simply a convenience method. You most likely want to use
655 L</search> with specific operators.
656
657 For more information, see L<DBIx::Class::Manual::Cookbook>.
658
659 =cut
660
661 sub search_like {
662   my $class = shift;
663   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
664   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
665   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
666   return $class->search($query, { %$attrs });
667 }
668
669 =head2 slice
670
671 =over 4
672
673 =item Arguments: $first, $last
674
675 =item Return Value: $resultset (scalar context), @row_objs (list context)
676
677 =back
678
679 Returns a resultset or object list representing a subset of elements from the
680 resultset slice is called on. Indexes are from 0, i.e., to get the first
681 three records, call:
682
683   my ($one, $two, $three) = $rs->slice(0, 2);
684
685 =cut
686
687 sub slice {
688   my ($self, $min, $max) = @_;
689   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
690   $attrs->{offset} = $self->{attrs}{offset} || 0;
691   $attrs->{offset} += $min;
692   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
693   return $self->search(undef(), $attrs);
694   #my $slice = (ref $self)->new($self->result_source, $attrs);
695   #return (wantarray ? $slice->all : $slice);
696 }
697
698 =head2 next
699
700 =over 4
701
702 =item Arguments: none
703
704 =item Return Value: $result?
705
706 =back
707
708 Returns the next element in the resultset (C<undef> is there is none).
709
710 Can be used to efficiently iterate over records in the resultset:
711
712   my $rs = $schema->resultset('CD')->search;
713   while (my $cd = $rs->next) {
714     print $cd->title;
715   }
716
717 Note that you need to store the resultset object, and call C<next> on it.
718 Calling C<< resultset('Table')->next >> repeatedly will always return the
719 first record from the resultset.
720
721 =cut
722
723 sub next {
724   my ($self) = @_;
725   if (my $cache = $self->get_cache) {
726     $self->{all_cache_position} ||= 0;
727     return $cache->[$self->{all_cache_position}++];
728   }
729   if ($self->{attrs}{cache}) {
730     $self->{all_cache_position} = 1;
731     return ($self->all)[0];
732   }
733   if ($self->{stashed_objects}) {
734     my $obj = shift(@{$self->{stashed_objects}});
735     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
736     return $obj;
737   }
738   my @row = (
739     exists $self->{stashed_row}
740       ? @{delete $self->{stashed_row}}
741       : $self->cursor->next
742   );
743   return undef unless (@row);
744   my ($row, @more) = $self->_construct_object(@row);
745   $self->{stashed_objects} = \@more if @more;
746   return $row;
747 }
748
749 sub _construct_object {
750   my ($self, @row) = @_;
751   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
752   my @new = $self->result_class->inflate_result($self->result_source, @$info);
753   @new = $self->{_attrs}{record_filter}->(@new)
754     if exists $self->{_attrs}{record_filter};
755   return @new;
756 }
757
758 sub _collapse_result {
759   my ($self, $as_proto, $row) = @_;
760
761   my @copy = @$row;
762
763   # 'foo'         => [ undef, 'foo' ]
764   # 'foo.bar'     => [ 'foo', 'bar' ]
765   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
766
767   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
768
769   my %collapse = %{$self->{_attrs}{collapse}||{}};
770
771   my @pri_index;
772
773   # if we're doing collapsing (has_many prefetch) we need to grab records
774   # until the PK changes, so fill @pri_index. if not, we leave it empty so
775   # we know we don't have to bother.
776
777   # the reason for not using the collapse stuff directly is because if you
778   # had for e.g. two artists in a row with no cds, the collapse info for
779   # both would be NULL (undef) so you'd lose the second artist
780
781   # store just the index so we can check the array positions from the row
782   # without having to contruct the full hash
783
784   if (keys %collapse) {
785     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
786     foreach my $i (0 .. $#construct_as) {
787       next if defined($construct_as[$i][0]); # only self table
788       if (delete $pri{$construct_as[$i][1]}) {
789         push(@pri_index, $i);
790       }
791       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
792     }
793   }
794
795   # no need to do an if, it'll be empty if @pri_index is empty anyway
796
797   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
798
799   my @const_rows;
800
801   do { # no need to check anything at the front, we always want the first row
802
803     my %const;
804   
805     foreach my $this_as (@construct_as) {
806       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
807     }
808
809     push(@const_rows, \%const);
810
811   } until ( # no pri_index => no collapse => drop straight out
812       !@pri_index
813     or
814       do { # get another row, stash it, drop out if different PK
815
816         @copy = $self->cursor->next;
817         $self->{stashed_row} = \@copy;
818
819         # last thing in do block, counts as true if anything doesn't match
820
821         # check xor defined first for NULL vs. NOT NULL then if one is
822         # defined the other must be so check string equality
823
824         grep {
825           (defined $pri_vals{$_} ^ defined $copy[$_])
826           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
827         } @pri_index;
828       }
829   );
830
831   my $alias = $self->{attrs}{alias};
832   my $info = [];
833
834   my %collapse_pos;
835
836   my @const_keys;
837
838   use Data::Dumper;
839
840   foreach my $const (@const_rows) {
841     scalar @const_keys or do {
842       @const_keys = sort { length($a) <=> length($b) } keys %$const;
843     };
844     foreach my $key (@const_keys) {
845       if (length $key) {
846         my $target = $info;
847         my @parts = split(/\./, $key);
848         my $cur = '';
849         my $data = $const->{$key};
850         foreach my $p (@parts) {
851           $target = $target->[1]->{$p} ||= [];
852           $cur .= ".${p}";
853           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
854             # collapsing at this point and on final part
855             my $pos = $collapse_pos{$cur};
856             CK: foreach my $ck (@ckey) {
857               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
858                 $collapse_pos{$cur} = $data;
859                 delete @collapse_pos{ # clear all positioning for sub-entries
860                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
861                 };
862                 push(@$target, []);
863                 last CK;
864               }
865             }
866           }
867           if (exists $collapse{$cur}) {
868             $target = $target->[-1];
869           }
870         }
871         $target->[0] = $data;
872       } else {
873         $info->[0] = $const->{$key};
874       }
875     }
876   }
877
878   return $info;
879 }
880
881 =head2 result_source
882
883 =over 4
884
885 =item Arguments: $result_source?
886
887 =item Return Value: $result_source
888
889 =back
890
891 An accessor for the primary ResultSource object from which this ResultSet
892 is derived.
893
894 =head2 result_class
895
896 =over 4
897
898 =item Arguments: $result_class?
899
900 =item Return Value: $result_class
901
902 =back
903
904 An accessor for the class to use when creating row objects. Defaults to 
905 C<< result_source->result_class >> - which in most cases is the name of the 
906 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
907
908 =cut
909
910
911 =head2 count
912
913 =over 4
914
915 =item Arguments: $cond, \%attrs??
916
917 =item Return Value: $count
918
919 =back
920
921 Performs an SQL C<COUNT> with the same query as the resultset was built
922 with to find the number of elements. If passed arguments, does a search
923 on the resultset and counts the results of that.
924
925 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
926 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
927 not support C<DISTINCT> with multiple columns. If you are using such a
928 database, you should only use columns from the main table in your C<group_by>
929 clause.
930
931 =cut
932
933 sub count {
934   my $self = shift;
935   return $self->search(@_)->count if @_ and defined $_[0];
936   return scalar @{ $self->get_cache } if $self->get_cache;
937   my $count = $self->_count;
938   return 0 unless $count;
939
940   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
941   $count = $self->{attrs}{rows} if
942     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
943   return $count;
944 }
945
946 sub _count { # Separated out so pager can get the full count
947   my $self = shift;
948   my $select = { count => '*' };
949
950   my $attrs = { %{$self->_resolved_attrs} };
951   if (my $group_by = delete $attrs->{group_by}) {
952     delete $attrs->{having};
953     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
954     # todo: try CONCAT for multi-column pk
955     my @pk = $self->result_source->primary_columns;
956     if (@pk == 1) {
957       my $alias = $attrs->{alias};
958       foreach my $column (@distinct) {
959         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
960           @distinct = ($column);
961           last;
962         }
963       }
964     }
965
966     $select = { count => { distinct => \@distinct } };
967   }
968
969   $attrs->{select} = $select;
970   $attrs->{as} = [qw/count/];
971
972   # offset, order by and page are not needed to count. record_filter is cdbi
973   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
974
975   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
976   my ($count) = $tmp_rs->cursor->next;
977   return $count;
978 }
979
980 =head2 count_literal
981
982 =over 4
983
984 =item Arguments: $sql_fragment, @bind_values
985
986 =item Return Value: $count
987
988 =back
989
990 Counts the results in a literal query. Equivalent to calling L</search_literal>
991 with the passed arguments, then L</count>.
992
993 =cut
994
995 sub count_literal { shift->search_literal(@_)->count; }
996
997 =head2 all
998
999 =over 4
1000
1001 =item Arguments: none
1002
1003 =item Return Value: @objects
1004
1005 =back
1006
1007 Returns all elements in the resultset. Called implicitly if the resultset
1008 is returned in list context.
1009
1010 =cut
1011
1012 sub all {
1013   my ($self) = @_;
1014   return @{ $self->get_cache } if $self->get_cache;
1015
1016   my @obj;
1017
1018   # TODO: don't call resolve here
1019   if (keys %{$self->_resolved_attrs->{collapse}}) {
1020 #  if ($self->{attrs}{prefetch}) {
1021       # Using $self->cursor->all is really just an optimisation.
1022       # If we're collapsing has_many prefetches it probably makes
1023       # very little difference, and this is cleaner than hacking
1024       # _construct_object to survive the approach
1025     my @row = $self->cursor->next;
1026     while (@row) {
1027       push(@obj, $self->_construct_object(@row));
1028       @row = (exists $self->{stashed_row}
1029                ? @{delete $self->{stashed_row}}
1030                : $self->cursor->next);
1031     }
1032   } else {
1033     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1034   }
1035
1036   $self->set_cache(\@obj) if $self->{attrs}{cache};
1037   return @obj;
1038 }
1039
1040 =head2 reset
1041
1042 =over 4
1043
1044 =item Arguments: none
1045
1046 =item Return Value: $self
1047
1048 =back
1049
1050 Resets the resultset's cursor, so you can iterate through the elements again.
1051
1052 =cut
1053
1054 sub reset {
1055   my ($self) = @_;
1056   delete $self->{_attrs} if exists $self->{_attrs};
1057   $self->{all_cache_position} = 0;
1058   $self->cursor->reset;
1059   return $self;
1060 }
1061
1062 =head2 first
1063
1064 =over 4
1065
1066 =item Arguments: none
1067
1068 =item Return Value: $object?
1069
1070 =back
1071
1072 Resets the resultset and returns an object for the first result (if the
1073 resultset returns anything).
1074
1075 =cut
1076
1077 sub first {
1078   return $_[0]->reset->next;
1079 }
1080
1081 # _cond_for_update_delete
1082 #
1083 # update/delete require the condition to be modified to handle
1084 # the differing SQL syntax available.  This transforms the $self->{cond}
1085 # appropriately, returning the new condition.
1086
1087 sub _cond_for_update_delete {
1088   my ($self, $full_cond) = @_;
1089   my $cond = {};
1090
1091   $full_cond ||= $self->{cond};
1092   # No-op. No condition, we're updating/deleting everything
1093   return $cond unless ref $full_cond;
1094
1095   if (ref $full_cond eq 'ARRAY') {
1096     $cond = [
1097       map {
1098         my %hash;
1099         foreach my $key (keys %{$_}) {
1100           $key =~ /([^.]+)$/;
1101           $hash{$1} = $_->{$key};
1102         }
1103         \%hash;
1104       } @{$full_cond}
1105     ];
1106   }
1107   elsif (ref $full_cond eq 'HASH') {
1108     if ((keys %{$full_cond})[0] eq '-and') {
1109       $cond->{-and} = [];
1110
1111       my @cond = @{$full_cond->{-and}};
1112       for (my $i = 0; $i < @cond; $i++) {
1113         my $entry = $cond[$i];
1114
1115         my $hash;
1116         if (ref $entry eq 'HASH') {
1117           $hash = $self->_cond_for_update_delete($entry);
1118         }
1119         else {
1120           $entry =~ /([^.]+)$/;
1121           $hash->{$1} = $cond[++$i];
1122         }
1123
1124         push @{$cond->{-and}}, $hash;
1125       }
1126     }
1127     else {
1128       foreach my $key (keys %{$full_cond}) {
1129         $key =~ /([^.]+)$/;
1130         $cond->{$1} = $full_cond->{$key};
1131       }
1132     }
1133   }
1134   else {
1135     $self->throw_exception(
1136       "Can't update/delete on resultset with condition unless hash or array"
1137     );
1138   }
1139
1140   return $cond;
1141 }
1142
1143
1144 =head2 update
1145
1146 =over 4
1147
1148 =item Arguments: \%values
1149
1150 =item Return Value: $storage_rv
1151
1152 =back
1153
1154 Sets the specified columns in the resultset to the supplied values in a
1155 single query. Return value will be true if the update succeeded or false
1156 if no records were updated; exact type of success value is storage-dependent.
1157
1158 =cut
1159
1160 sub update {
1161   my ($self, $values) = @_;
1162   $self->throw_exception("Values for update must be a hash")
1163     unless ref $values eq 'HASH';
1164
1165   my $cond = $self->_cond_for_update_delete;
1166    
1167   return $self->result_source->storage->update(
1168     $self->result_source, $values, $cond
1169   );
1170 }
1171
1172 =head2 update_all
1173
1174 =over 4
1175
1176 =item Arguments: \%values
1177
1178 =item Return Value: 1
1179
1180 =back
1181
1182 Fetches all objects and updates them one at a time. Note that C<update_all>
1183 will run DBIC cascade triggers, while L</update> will not.
1184
1185 =cut
1186
1187 sub update_all {
1188   my ($self, $values) = @_;
1189   $self->throw_exception("Values for update must be a hash")
1190     unless ref $values eq 'HASH';
1191   foreach my $obj ($self->all) {
1192     $obj->set_columns($values)->update;
1193   }
1194   return 1;
1195 }
1196
1197 =head2 delete
1198
1199 =over 4
1200
1201 =item Arguments: none
1202
1203 =item Return Value: 1
1204
1205 =back
1206
1207 Deletes the contents of the resultset from its result source. Note that this
1208 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1209 to run. See also L<DBIx::Class::Row/delete>.
1210
1211 =cut
1212
1213 sub delete {
1214   my ($self) = @_;
1215
1216   my $cond = $self->_cond_for_update_delete;
1217
1218   $self->result_source->storage->delete($self->result_source, $cond);
1219   return 1;
1220 }
1221
1222 =head2 delete_all
1223
1224 =over 4
1225
1226 =item Arguments: none
1227
1228 =item Return Value: 1
1229
1230 =back
1231
1232 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1233 will run DBIC cascade triggers, while L</delete> will not.
1234
1235 =cut
1236
1237 sub delete_all {
1238   my ($self) = @_;
1239   $_->delete for $self->all;
1240   return 1;
1241 }
1242
1243 =head2 populate
1244
1245 =over 4
1246
1247 =item Arguments: \@data;
1248
1249 =back
1250
1251 Pass an arrayref of hashrefs. Each hashref should be a structure suitable for
1252 submitting to a $resultset->create(...) method.
1253
1254 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1255 to insert the data, as this is a faster method.
1256
1257 Otherwise, each set of data is inserted into the database using
1258 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1259 objects is returned.
1260
1261 Example:  Assuming an Artist Class that has many CDs Classes relating:
1262
1263   my $Artist_rs = $schema->resultset("Artist");
1264   
1265   ## Void Context Example 
1266   $Artist_rs->populate([
1267      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1268         { title => 'My First CD', year => 2006 },
1269         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1270       ],
1271      },
1272      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1273         { title => 'My parents sold me to a record company' ,year => 2005 },
1274         { title => 'Why Am I So Ugly?', year => 2006 },
1275         { title => 'I Got Surgery and am now Popular', year => 2007 }
1276       ],
1277      },
1278   ]);
1279   
1280   ## Array Context Example
1281   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1282     { name => "Artist One"},
1283     { name => "Artist Two"},
1284     { name => "Artist Three", cds=> [
1285     { title => "First CD", year => 2007},
1286     { title => "Second CD", year => 2008},
1287   ]}
1288   ]);
1289   
1290   print $ArtistOne->name; ## response is 'Artist One'
1291   print $ArtistThree->cds->count ## reponse is '2'
1292
1293 =cut
1294
1295 sub populate {
1296   my ($self, $data) = @_;
1297   
1298   if(defined wantarray) {
1299     my @created;
1300     foreach my $item (@$data) {
1301       push(@created, $self->create($item));
1302     }
1303     return @created;
1304   } else {
1305     my ($first, @rest) = @$data;
1306
1307     my @names = grep {!ref $first->{$_}} keys %$first;
1308     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1309     my @pks = $self->result_source->primary_columns;  
1310
1311     ## do the belongs_to relationships  
1312     foreach my $index (0..$#$data) {
1313       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1314         my @ret = $self->populate($data);
1315         return;
1316       }
1317     
1318       foreach my $rel (@rels) {
1319         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1320         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1321         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1322         my $related = $result->result_source->resolve_condition(
1323           $result->result_source->relationship_info($reverse)->{cond},
1324           $self,        
1325           $result,        
1326         );
1327
1328         delete $data->[$index]->{$rel};
1329         $data->[$index] = {%{$data->[$index]}, %$related};
1330       
1331         push @names, keys %$related if $index == 0;
1332       }
1333     }
1334
1335     ## do bulk insert on current row
1336     my @values = map {
1337       [ map {
1338          defined $_ ? $_ : $self->throw_exception("Undefined value for column!")
1339       } @$_{@names} ]
1340     } @$data;
1341
1342     $self->result_source->storage->insert_bulk(
1343       $self->result_source, 
1344       \@names, 
1345       \@values,
1346     );
1347
1348     ## do the has_many relationships
1349     foreach my $item (@$data) {
1350
1351       foreach my $rel (@rels) {
1352         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1353
1354         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1355      || $self->throw_exception('Cannot find the relating object.');
1356      
1357         my $child = $parent->$rel;
1358     
1359         my $related = $child->result_source->resolve_condition(
1360           $parent->result_source->relationship_info($rel)->{cond},
1361           $child,
1362           $parent,
1363         );
1364
1365         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1366         my @populate = map { {%$_, %$related} } @rows_to_add;
1367
1368         $child->populate( \@populate );
1369       }
1370     }
1371   }
1372 }
1373
1374 =head2 pager
1375
1376 =over 4
1377
1378 =item Arguments: none
1379
1380 =item Return Value: $pager
1381
1382 =back
1383
1384 Return Value a L<Data::Page> object for the current resultset. Only makes
1385 sense for queries with a C<page> attribute.
1386
1387 =cut
1388
1389 sub pager {
1390   my ($self) = @_;
1391   my $attrs = $self->{attrs};
1392   $self->throw_exception("Can't create pager for non-paged rs")
1393     unless $self->{attrs}{page};
1394   $attrs->{rows} ||= 10;
1395   return $self->{pager} ||= Data::Page->new(
1396     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1397 }
1398
1399 =head2 page
1400
1401 =over 4
1402
1403 =item Arguments: $page_number
1404
1405 =item Return Value: $rs
1406
1407 =back
1408
1409 Returns a resultset for the $page_number page of the resultset on which page
1410 is called, where each page contains a number of rows equal to the 'rows'
1411 attribute set on the resultset (10 by default).
1412
1413 =cut
1414
1415 sub page {
1416   my ($self, $page) = @_;
1417   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1418 }
1419
1420 =head2 new_result
1421
1422 =over 4
1423
1424 =item Arguments: \%vals
1425
1426 =item Return Value: $object
1427
1428 =back
1429
1430 Creates an object in the resultset's result class and returns it.
1431
1432 =cut
1433
1434 sub new_result {
1435   my ($self, $values) = @_;
1436   $self->throw_exception( "new_result needs a hash" )
1437     unless (ref $values eq 'HASH');
1438   $self->throw_exception(
1439     "Can't abstract implicit construct, condition not a hash"
1440   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1441
1442   my $alias = $self->{attrs}{alias};
1443   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1444   my %new = (
1445     %{ $self->_remove_alias($values, $alias) },
1446     %{ $self->_remove_alias($collapsed_cond, $alias) },
1447     -source_handle => $self->_source_handle,
1448     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1449   );
1450
1451   return $self->result_class->new(\%new);
1452 }
1453
1454 # _collapse_cond
1455 #
1456 # Recursively collapse the condition.
1457
1458 sub _collapse_cond {
1459   my ($self, $cond, $collapsed) = @_;
1460
1461   $collapsed ||= {};
1462
1463   if (ref $cond eq 'ARRAY') {
1464     foreach my $subcond (@$cond) {
1465       next unless ref $subcond;  # -or
1466 #      warn "ARRAY: " . Dumper $subcond;
1467       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1468     }
1469   }
1470   elsif (ref $cond eq 'HASH') {
1471     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1472       foreach my $subcond (@{$cond->{-and}}) {
1473 #        warn "HASH: " . Dumper $subcond;
1474         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1475       }
1476     }
1477     else {
1478 #      warn "LEAF: " . Dumper $cond;
1479       foreach my $col (keys %$cond) {
1480         my $value = $cond->{$col};
1481         $collapsed->{$col} = $value;
1482       }
1483     }
1484   }
1485
1486   return $collapsed;
1487 }
1488
1489 # _remove_alias
1490 #
1491 # Remove the specified alias from the specified query hash. A copy is made so
1492 # the original query is not modified.
1493
1494 sub _remove_alias {
1495   my ($self, $query, $alias) = @_;
1496
1497   my %orig = %{ $query || {} };
1498   my %unaliased;
1499
1500   foreach my $key (keys %orig) {
1501     if ($key !~ /\./) {
1502       $unaliased{$key} = $orig{$key};
1503       next;
1504     }
1505     $unaliased{$1} = $orig{$key}
1506       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1507   }
1508
1509   return \%unaliased;
1510 }
1511
1512 =head2 find_or_new
1513
1514 =over 4
1515
1516 =item Arguments: \%vals, \%attrs?
1517
1518 =item Return Value: $object
1519
1520 =back
1521
1522 Find an existing record from this resultset. If none exists, instantiate a new
1523 result object and return it. The object will not be saved into your storage
1524 until you call L<DBIx::Class::Row/insert> on it.
1525
1526 If you want objects to be saved immediately, use L</find_or_create> instead.
1527
1528 =cut
1529
1530 sub find_or_new {
1531   my $self     = shift;
1532   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1533   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1534   my $exists   = $self->find($hash, $attrs);
1535   return defined $exists ? $exists : $self->new_result($hash);
1536 }
1537
1538 =head2 create
1539
1540 =over 4
1541
1542 =item Arguments: \%vals
1543
1544 =item Return Value: $object
1545
1546 =back
1547
1548 Inserts a record into the resultset and returns the object representing it.
1549
1550 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1551
1552 =cut
1553
1554 sub create {
1555   my ($self, $attrs) = @_;
1556   $self->throw_exception( "create needs a hashref" )
1557     unless ref $attrs eq 'HASH';
1558   return $self->new_result($attrs)->insert;
1559 }
1560
1561 =head2 find_or_create
1562
1563 =over 4
1564
1565 =item Arguments: \%vals, \%attrs?
1566
1567 =item Return Value: $object
1568
1569 =back
1570
1571   $class->find_or_create({ key => $val, ... });
1572
1573 Tries to find a record based on its primary key or unique constraint; if none
1574 is found, creates one and returns that instead.
1575
1576   my $cd = $schema->resultset('CD')->find_or_create({
1577     cdid   => 5,
1578     artist => 'Massive Attack',
1579     title  => 'Mezzanine',
1580     year   => 2005,
1581   });
1582
1583 Also takes an optional C<key> attribute, to search by a specific key or unique
1584 constraint. For example:
1585
1586   my $cd = $schema->resultset('CD')->find_or_create(
1587     {
1588       artist => 'Massive Attack',
1589       title  => 'Mezzanine',
1590     },
1591     { key => 'cd_artist_title' }
1592   );
1593
1594 See also L</find> and L</update_or_create>. For information on how to declare
1595 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1596
1597 =cut
1598
1599 sub find_or_create {
1600   my $self     = shift;
1601   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1602   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1603   my $exists   = $self->find($hash, $attrs);
1604   return defined $exists ? $exists : $self->create($hash);
1605 }
1606
1607 =head2 update_or_create
1608
1609 =over 4
1610
1611 =item Arguments: \%col_values, { key => $unique_constraint }?
1612
1613 =item Return Value: $object
1614
1615 =back
1616
1617   $class->update_or_create({ col => $val, ... });
1618
1619 First, searches for an existing row matching one of the unique constraints
1620 (including the primary key) on the source of this resultset. If a row is
1621 found, updates it with the other given column values. Otherwise, creates a new
1622 row.
1623
1624 Takes an optional C<key> attribute to search on a specific unique constraint.
1625 For example:
1626
1627   # In your application
1628   my $cd = $schema->resultset('CD')->update_or_create(
1629     {
1630       artist => 'Massive Attack',
1631       title  => 'Mezzanine',
1632       year   => 1998,
1633     },
1634     { key => 'cd_artist_title' }
1635   );
1636
1637 If no C<key> is specified, it searches on all unique constraints defined on the
1638 source, including the primary key.
1639
1640 If the C<key> is specified as C<primary>, it searches only on the primary key.
1641
1642 See also L</find> and L</find_or_create>. For information on how to declare
1643 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1644
1645 =cut
1646
1647 sub update_or_create {
1648   my $self = shift;
1649   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1650   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1651
1652   my $row = $self->find($cond, $attrs);
1653   if (defined $row) {
1654     $row->update($cond);
1655     return $row;
1656   }
1657
1658   return $self->create($cond);
1659 }
1660
1661 =head2 get_cache
1662
1663 =over 4
1664
1665 =item Arguments: none
1666
1667 =item Return Value: \@cache_objects?
1668
1669 =back
1670
1671 Gets the contents of the cache for the resultset, if the cache is set.
1672
1673 =cut
1674
1675 sub get_cache {
1676   shift->{all_cache};
1677 }
1678
1679 =head2 set_cache
1680
1681 =over 4
1682
1683 =item Arguments: \@cache_objects
1684
1685 =item Return Value: \@cache_objects
1686
1687 =back
1688
1689 Sets the contents of the cache for the resultset. Expects an arrayref
1690 of objects of the same class as those produced by the resultset. Note that
1691 if the cache is set the resultset will return the cached objects rather
1692 than re-querying the database even if the cache attr is not set.
1693
1694 =cut
1695
1696 sub set_cache {
1697   my ( $self, $data ) = @_;
1698   $self->throw_exception("set_cache requires an arrayref")
1699       if defined($data) && (ref $data ne 'ARRAY');
1700   $self->{all_cache} = $data;
1701 }
1702
1703 =head2 clear_cache
1704
1705 =over 4
1706
1707 =item Arguments: none
1708
1709 =item Return Value: []
1710
1711 =back
1712
1713 Clears the cache for the resultset.
1714
1715 =cut
1716
1717 sub clear_cache {
1718   shift->set_cache(undef);
1719 }
1720
1721 =head2 related_resultset
1722
1723 =over 4
1724
1725 =item Arguments: $relationship_name
1726
1727 =item Return Value: $resultset
1728
1729 =back
1730
1731 Returns a related resultset for the supplied relationship name.
1732
1733   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1734
1735 =cut
1736
1737 sub related_resultset {
1738   my ($self, $rel) = @_;
1739
1740   $self->{related_resultsets} ||= {};
1741   return $self->{related_resultsets}{$rel} ||= do {
1742     my $rel_obj = $self->result_source->relationship_info($rel);
1743
1744     $self->throw_exception(
1745       "search_related: result source '" . $self->_source_handle->source_moniker .
1746         "' has no such relationship $rel")
1747       unless $rel_obj;
1748     
1749     my ($from,$seen) = $self->_resolve_from($rel);
1750
1751     my $join_count = $seen->{$rel};
1752     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1753
1754     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
1755     my %attrs = %{$self->{attrs}||{}};
1756     delete $attrs{result_class};
1757
1758     my $new_cache;
1759
1760     if (my $cache = $self->get_cache) {
1761       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
1762         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
1763                         @$cache ];
1764       }
1765     }
1766
1767     my $new = $self->_source_handle
1768                    ->schema
1769                    ->resultset($rel_obj->{class})
1770                    ->search_rs(
1771                        undef, {
1772                          %attrs,
1773                          join => undef,
1774                          prefetch => undef,
1775                          select => undef,
1776                          as => undef,
1777                          alias => $alias,
1778                          where => $self->{cond},
1779                          seen_join => $seen,
1780                          from => $from,
1781                      });
1782     $new->set_cache($new_cache) if $new_cache;
1783     $new;
1784   };
1785 }
1786
1787 sub _resolve_from {
1788   my ($self, $extra_join) = @_;
1789   my $source = $self->result_source;
1790   my $attrs = $self->{attrs};
1791   
1792   my $from = $attrs->{from}
1793     || [ { $attrs->{alias} => $source->from } ];
1794     
1795   my $seen = { %{$attrs->{seen_join}||{}} };
1796
1797   my $join = ($attrs->{join}
1798                ? [ $attrs->{join}, $extra_join ]
1799                : $extra_join);
1800   $from = [
1801     @$from,
1802     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1803   ];
1804
1805   return ($from,$seen);
1806 }
1807
1808 sub _resolved_attrs {
1809   my $self = shift;
1810   return $self->{_attrs} if $self->{_attrs};
1811
1812   my $attrs = { %{$self->{attrs}||{}} };
1813   my $source = $self->result_source;
1814   my $alias = $attrs->{alias};
1815
1816   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1817   if ($attrs->{columns}) {
1818     delete $attrs->{as};
1819   } elsif (!$attrs->{select}) {
1820     $attrs->{columns} = [ $source->columns ];
1821   }
1822  
1823   $attrs->{select} = 
1824     ($attrs->{select}
1825       ? (ref $attrs->{select} eq 'ARRAY'
1826           ? [ @{$attrs->{select}} ]
1827           : [ $attrs->{select} ])
1828       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1829     );
1830   $attrs->{as} =
1831     ($attrs->{as}
1832       ? (ref $attrs->{as} eq 'ARRAY'
1833           ? [ @{$attrs->{as}} ]
1834           : [ $attrs->{as} ])
1835       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1836     );
1837   
1838   my $adds;
1839   if ($adds = delete $attrs->{include_columns}) {
1840     $adds = [$adds] unless ref $adds eq 'ARRAY';
1841     push(@{$attrs->{select}}, @$adds);
1842     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1843   }
1844   if ($adds = delete $attrs->{'+select'}) {
1845     $adds = [$adds] unless ref $adds eq 'ARRAY';
1846     push(@{$attrs->{select}},
1847            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1848   }
1849   if (my $adds = delete $attrs->{'+as'}) {
1850     $adds = [$adds] unless ref $adds eq 'ARRAY';
1851     push(@{$attrs->{as}}, @$adds);
1852   }
1853
1854   $attrs->{from} ||= [ { 'me' => $source->from } ];
1855
1856   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1857     my $join = delete $attrs->{join} || {};
1858
1859     if (defined $attrs->{prefetch}) {
1860       $join = $self->_merge_attr(
1861         $join, $attrs->{prefetch}
1862       );
1863     }
1864
1865     $attrs->{from} =   # have to copy here to avoid corrupting the original
1866       [
1867         @{$attrs->{from}}, 
1868         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1869       ];
1870   }
1871
1872   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1873   if ($attrs->{order_by}) {
1874     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1875                            ? [ @{$attrs->{order_by}} ]
1876                            : [ $attrs->{order_by} ]);
1877   } else {
1878     $attrs->{order_by} = [];    
1879   }
1880
1881   my $collapse = $attrs->{collapse} || {};
1882   if (my $prefetch = delete $attrs->{prefetch}) {
1883     $prefetch = $self->_merge_attr({}, $prefetch);
1884     my @pre_order;
1885     my $seen = $attrs->{seen_join} || {};
1886     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1887       # bring joins back to level of current class
1888       my @prefetch = $source->resolve_prefetch(
1889         $p, $alias, $seen, \@pre_order, $collapse
1890       );
1891       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1892       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1893     }
1894     push(@{$attrs->{order_by}}, @pre_order);
1895   }
1896   $attrs->{collapse} = $collapse;
1897
1898   return $self->{_attrs} = $attrs;
1899 }
1900
1901 sub _merge_attr {
1902   my ($self, $a, $b) = @_;
1903   return $b unless defined($a);
1904   return $a unless defined($b);
1905   
1906   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1907     foreach my $key (keys %{$b}) {
1908       if (exists $a->{$key}) {
1909         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1910       } else {
1911         $a->{$key} = $b->{$key};
1912       }
1913     }
1914     return $a;
1915   } else {
1916     $a = [$a] unless ref $a eq 'ARRAY';
1917     $b = [$b] unless ref $b eq 'ARRAY';
1918
1919     my $hash = {};
1920     my @array;
1921     foreach my $x ($a, $b) {
1922       foreach my $element (@{$x}) {
1923         if (ref $element eq 'HASH') {
1924           $hash = $self->_merge_attr($hash, $element);
1925         } elsif (ref $element eq 'ARRAY') {
1926           push(@array, @{$element});
1927         } else {
1928           push(@array, $element) unless $b == $x
1929             && grep { $_ eq $element } @array;
1930         }
1931       }
1932     }
1933     
1934     @array = grep { !exists $hash->{$_} } @array;
1935
1936     return keys %{$hash}
1937       ? ( scalar(@array)
1938             ? [$hash, @array]
1939             : $hash
1940         )
1941       : \@array;
1942   }
1943 }
1944
1945 sub result_source {
1946     my $self = shift;
1947
1948     if (@_) {
1949         $self->_source_handle($_[0]->handle);
1950     } else {
1951         $self->_source_handle->resolve;
1952     }
1953 }
1954
1955 =head2 throw_exception
1956
1957 See L<DBIx::Class::Schema/throw_exception> for details.
1958
1959 =cut
1960
1961 sub throw_exception {
1962   my $self=shift;
1963   $self->_source_handle->schema->throw_exception(@_);
1964 }
1965
1966 # XXX: FIXME: Attributes docs need clearing up
1967
1968 =head1 ATTRIBUTES
1969
1970 The resultset takes various attributes that modify its behavior. Here's an
1971 overview of them:
1972
1973 =head2 order_by
1974
1975 =over 4
1976
1977 =item Value: ($order_by | \@order_by)
1978
1979 =back
1980
1981 Which column(s) to order the results by. This is currently passed
1982 through directly to SQL, so you can give e.g. C<year DESC> for a
1983 descending order on the column `year'.
1984
1985 Please note that if you have C<quote_char> enabled (see
1986 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1987 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1988 so you will need to manually quote things as appropriate.)
1989
1990 =head2 columns
1991
1992 =over 4
1993
1994 =item Value: \@columns
1995
1996 =back
1997
1998 Shortcut to request a particular set of columns to be retrieved.  Adds
1999 C<me.> onto the start of any column without a C<.> in it and sets C<select>
2000 from that, then auto-populates C<as> from C<select> as normal. (You may also
2001 use the C<cols> attribute, as in earlier versions of DBIC.)
2002
2003 =head2 include_columns
2004
2005 =over 4
2006
2007 =item Value: \@columns
2008
2009 =back
2010
2011 Shortcut to include additional columns in the returned results - for example
2012
2013   $schema->resultset('CD')->search(undef, {
2014     include_columns => ['artist.name'],
2015     join => ['artist']
2016   });
2017
2018 would return all CDs and include a 'name' column to the information
2019 passed to object inflation. Note that the 'artist' is the name of the
2020 column (or relationship) accessor, and 'name' is the name of the column
2021 accessor in the related table.
2022
2023 =head2 select
2024
2025 =over 4
2026
2027 =item Value: \@select_columns
2028
2029 =back
2030
2031 Indicates which columns should be selected from the storage. You can use
2032 column names, or in the case of RDBMS back ends, function or stored procedure
2033 names:
2034
2035   $rs = $schema->resultset('Employee')->search(undef, {
2036     select => [
2037       'name',
2038       { count => 'employeeid' },
2039       { sum => 'salary' }
2040     ]
2041   });
2042
2043 When you use function/stored procedure names and do not supply an C<as>
2044 attribute, the column names returned are storage-dependent. E.g. MySQL would
2045 return a column named C<count(employeeid)> in the above example.
2046
2047 =head2 +select
2048
2049 =over 4
2050
2051 Indicates additional columns to be selected from storage.  Works the same as
2052 L<select> but adds columns to the selection.
2053
2054 =back
2055
2056 =head2 +as
2057
2058 =over 4
2059
2060 Indicates additional column names for those added via L<+select>.
2061
2062 =back
2063
2064 =head2 as
2065
2066 =over 4
2067
2068 =item Value: \@inflation_names
2069
2070 =back
2071
2072 Indicates column names for object inflation. That is, c< as >
2073 indicates the name that the column can be accessed as via the
2074 C<get_column> method (or via the object accessor, B<if one already
2075 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
2076 >.
2077
2078 The C< as > attribute is used in conjunction with C<select>,
2079 usually when C<select> contains one or more function or stored
2080 procedure names:
2081
2082   $rs = $schema->resultset('Employee')->search(undef, {
2083     select => [
2084       'name',
2085       { count => 'employeeid' }
2086     ],
2087     as => ['name', 'employee_count'],
2088   });
2089
2090   my $employee = $rs->first(); # get the first Employee
2091
2092 If the object against which the search is performed already has an accessor
2093 matching a column name specified in C<as>, the value can be retrieved using
2094 the accessor as normal:
2095
2096   my $name = $employee->name();
2097
2098 If on the other hand an accessor does not exist in the object, you need to
2099 use C<get_column> instead:
2100
2101   my $employee_count = $employee->get_column('employee_count');
2102
2103 You can create your own accessors if required - see
2104 L<DBIx::Class::Manual::Cookbook> for details.
2105
2106 Please note: This will NOT insert an C<AS employee_count> into the SQL
2107 statement produced, it is used for internal access only. Thus
2108 attempting to use the accessor in an C<order_by> clause or similar
2109 will fail miserably.
2110
2111 To get around this limitation, you can supply literal SQL to your
2112 C<select> attibute that contains the C<AS alias> text, eg:
2113
2114   select => [\'myfield AS alias']
2115
2116 =head2 join
2117
2118 =over 4
2119
2120 =item Value: ($rel_name | \@rel_names | \%rel_names)
2121
2122 =back
2123
2124 Contains a list of relationships that should be joined for this query.  For
2125 example:
2126
2127   # Get CDs by Nine Inch Nails
2128   my $rs = $schema->resultset('CD')->search(
2129     { 'artist.name' => 'Nine Inch Nails' },
2130     { join => 'artist' }
2131   );
2132
2133 Can also contain a hash reference to refer to the other relation's relations.
2134 For example:
2135
2136   package MyApp::Schema::Track;
2137   use base qw/DBIx::Class/;
2138   __PACKAGE__->table('track');
2139   __PACKAGE__->add_columns(qw/trackid cd position title/);
2140   __PACKAGE__->set_primary_key('trackid');
2141   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2142   1;
2143
2144   # In your application
2145   my $rs = $schema->resultset('Artist')->search(
2146     { 'track.title' => 'Teardrop' },
2147     {
2148       join     => { cd => 'track' },
2149       order_by => 'artist.name',
2150     }
2151   );
2152
2153 You need to use the relationship (not the table) name in  conditions, 
2154 because they are aliased as such. The current table is aliased as "me", so 
2155 you need to use me.column_name in order to avoid ambiguity. For example:
2156
2157   # Get CDs from 1984 with a 'Foo' track 
2158   my $rs = $schema->resultset('CD')->search(
2159     { 
2160       'me.year' => 1984,
2161       'tracks.name' => 'Foo'
2162     },
2163     { join => 'tracks' }
2164   );
2165   
2166 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2167 similarly for a third time). For e.g.
2168
2169   my $rs = $schema->resultset('Artist')->search({
2170     'cds.title'   => 'Down to Earth',
2171     'cds_2.title' => 'Popular',
2172   }, {
2173     join => [ qw/cds cds/ ],
2174   });
2175
2176 will return a set of all artists that have both a cd with title 'Down
2177 to Earth' and a cd with title 'Popular'.
2178
2179 If you want to fetch related objects from other tables as well, see C<prefetch>
2180 below.
2181
2182 =head2 prefetch
2183
2184 =over 4
2185
2186 =item Value: ($rel_name | \@rel_names | \%rel_names)
2187
2188 =back
2189
2190 Contains one or more relationships that should be fetched along with the main
2191 query (when they are accessed afterwards they will have already been
2192 "prefetched").  This is useful for when you know you will need the related
2193 objects, because it saves at least one query:
2194
2195   my $rs = $schema->resultset('Tag')->search(
2196     undef,
2197     {
2198       prefetch => {
2199         cd => 'artist'
2200       }
2201     }
2202   );
2203
2204 The initial search results in SQL like the following:
2205
2206   SELECT tag.*, cd.*, artist.* FROM tag
2207   JOIN cd ON tag.cd = cd.cdid
2208   JOIN artist ON cd.artist = artist.artistid
2209
2210 L<DBIx::Class> has no need to go back to the database when we access the
2211 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2212 case.
2213
2214 Simple prefetches will be joined automatically, so there is no need
2215 for a C<join> attribute in the above search. If you're prefetching to
2216 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2217 specify the join as well.
2218
2219 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2220 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2221 with an accessor type of 'single' or 'filter').
2222
2223 =head2 page
2224
2225 =over 4
2226
2227 =item Value: $page
2228
2229 =back
2230
2231 Makes the resultset paged and specifies the page to retrieve. Effectively
2232 identical to creating a non-pages resultset and then calling ->page($page)
2233 on it.
2234
2235 If L<rows> attribute is not specified it defualts to 10 rows per page.
2236
2237 =head2 rows
2238
2239 =over 4
2240
2241 =item Value: $rows
2242
2243 =back
2244
2245 Specifes the maximum number of rows for direct retrieval or the number of
2246 rows per page if the page attribute or method is used.
2247
2248 =head2 offset
2249
2250 =over 4
2251
2252 =item Value: $offset
2253
2254 =back
2255
2256 Specifies the (zero-based) row number for the  first row to be returned, or the
2257 of the first row of the first page if paging is used.
2258
2259 =head2 group_by
2260
2261 =over 4
2262
2263 =item Value: \@columns
2264
2265 =back
2266
2267 A arrayref of columns to group by. Can include columns of joined tables.
2268
2269   group_by => [qw/ column1 column2 ... /]
2270
2271 =head2 having
2272
2273 =over 4
2274
2275 =item Value: $condition
2276
2277 =back
2278
2279 HAVING is a select statement attribute that is applied between GROUP BY and
2280 ORDER BY. It is applied to the after the grouping calculations have been
2281 done.
2282
2283   having => { 'count(employee)' => { '>=', 100 } }
2284
2285 =head2 distinct
2286
2287 =over 4
2288
2289 =item Value: (0 | 1)
2290
2291 =back
2292
2293 Set to 1 to group by all columns.
2294
2295 =head2 where
2296
2297 =over 4
2298
2299 Adds to the WHERE clause.
2300
2301   # only return rows WHERE deleted IS NULL for all searches
2302   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2303
2304 Can be overridden by passing C<{ where => undef }> as an attribute
2305 to a resulset.
2306
2307 =back
2308
2309 =head2 cache
2310
2311 Set to 1 to cache search results. This prevents extra SQL queries if you
2312 revisit rows in your ResultSet:
2313
2314   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2315
2316   while( my $artist = $resultset->next ) {
2317     ... do stuff ...
2318   }
2319
2320   $rs->first; # without cache, this would issue a query
2321
2322 By default, searches are not cached.
2323
2324 For more examples of using these attributes, see
2325 L<DBIx::Class::Manual::Cookbook>.
2326
2327 =head2 from
2328
2329 =over 4
2330
2331 =item Value: \@from_clause
2332
2333 =back
2334
2335 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2336 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2337 clauses.
2338
2339 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2340
2341 C<join> will usually do what you need and it is strongly recommended that you
2342 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2343 And we really do mean "cannot", not just tried and failed. Attempting to use
2344 this because you're having problems with C<join> is like trying to use x86
2345 ASM because you've got a syntax error in your C. Trust us on this.
2346
2347 Now, if you're still really, really sure you need to use this (and if you're
2348 not 100% sure, ask the mailing list first), here's an explanation of how this
2349 works.
2350
2351 The syntax is as follows -
2352
2353   [
2354     { <alias1> => <table1> },
2355     [
2356       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2357       [], # nested JOIN (optional)
2358       { <table1.column1> => <table2.column2>, ... (more conditions) },
2359     ],
2360     # More of the above [ ] may follow for additional joins
2361   ]
2362
2363   <table1> <alias1>
2364   JOIN
2365     <table2> <alias2>
2366     [JOIN ...]
2367   ON <table1.column1> = <table2.column2>
2368   <more joins may follow>
2369
2370 An easy way to follow the examples below is to remember the following:
2371
2372     Anything inside "[]" is a JOIN
2373     Anything inside "{}" is a condition for the enclosing JOIN
2374
2375 The following examples utilize a "person" table in a family tree application.
2376 In order to express parent->child relationships, this table is self-joined:
2377
2378     # Person->belongs_to('father' => 'Person');
2379     # Person->belongs_to('mother' => 'Person');
2380
2381 C<from> can be used to nest joins. Here we return all children with a father,
2382 then search against all mothers of those children:
2383
2384   $rs = $schema->resultset('Person')->search(
2385       undef,
2386       {
2387           alias => 'mother', # alias columns in accordance with "from"
2388           from => [
2389               { mother => 'person' },
2390               [
2391                   [
2392                       { child => 'person' },
2393                       [
2394                           { father => 'person' },
2395                           { 'father.person_id' => 'child.father_id' }
2396                       ]
2397                   ],
2398                   { 'mother.person_id' => 'child.mother_id' }
2399               ],
2400           ]
2401       },
2402   );
2403
2404   # Equivalent SQL:
2405   # SELECT mother.* FROM person mother
2406   # JOIN (
2407   #   person child
2408   #   JOIN person father
2409   #   ON ( father.person_id = child.father_id )
2410   # )
2411   # ON ( mother.person_id = child.mother_id )
2412
2413 The type of any join can be controlled manually. To search against only people
2414 with a father in the person table, we could explicitly use C<INNER JOIN>:
2415
2416     $rs = $schema->resultset('Person')->search(
2417         undef,
2418         {
2419             alias => 'child', # alias columns in accordance with "from"
2420             from => [
2421                 { child => 'person' },
2422                 [
2423                     { father => 'person', -join_type => 'inner' },
2424                     { 'father.id' => 'child.father_id' }
2425                 ],
2426             ]
2427         },
2428     );
2429
2430     # Equivalent SQL:
2431     # SELECT child.* FROM person child
2432     # INNER JOIN person father ON child.father_id = father.id
2433
2434 =cut
2435
2436 1;