fix page-within-page bug
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= ($attrs->{rows} * ($attrs->{page} - 1));
95   }
96
97   $attrs->{alias} ||= 'me';
98
99   my $self = {
100     _source_handle => $source,
101     result_class => $attrs->{result_class} || $source->resolve->result_class,
102     cond => $attrs->{where},
103     count => undef,
104     pager => undef,
105     attrs => $attrs
106   };
107
108   bless $self, $class;
109
110   return $self;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 For a list of attributes that can be passed to C<search>, see
138 L</ATTRIBUTES>. For more examples of using this function, see
139 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
140 documentation for the first argument, see L<SQL::Abstract>.
141
142 =cut
143
144 sub search {
145   my $self = shift;
146   my $rs = $self->search_rs( @_ );
147   return (wantarray ? $rs->all : $rs);
148 }
149
150 =head2 search_rs
151
152 =over 4
153
154 =item Arguments: $cond, \%attrs?
155
156 =item Return Value: $resultset
157
158 =back
159
160 This method does the same exact thing as search() except it will
161 always return a resultset, even in list context.
162
163 =cut
164
165 sub search_rs {
166   my $self = shift;
167
168   my $rows;
169
170   unless (@_) {                 # no search, effectively just a clone
171     $rows = $self->get_cache;
172   }
173
174   my $attrs = {};
175   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
176   my $our_attrs = { %{$self->{attrs}} };
177   my $having = delete $our_attrs->{having};
178   my $where = delete $our_attrs->{where};
179
180   my $new_attrs = { %{$our_attrs}, %{$attrs} };
181
182   # merge new attrs into inherited
183   foreach my $key (qw/join prefetch/) {
184     next unless exists $attrs->{$key};
185     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
186   }
187
188   my $cond = (@_
189     ? (
190         (@_ == 1 || ref $_[0] eq "HASH")
191           ? (
192               (ref $_[0] eq 'HASH')
193                 ? (
194                     (keys %{ $_[0] }  > 0)
195                       ? shift
196                       : undef
197                    )
198                 :  shift
199              )
200           : (
201               (@_ % 2)
202                 ? $self->throw_exception("Odd number of arguments to search")
203                 : {@_}
204              )
205       )
206     : undef
207   );
208
209   if (defined $where) {
210     $new_attrs->{where} = (
211       defined $new_attrs->{where}
212         ? { '-and' => [
213               map {
214                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
215               } $where, $new_attrs->{where}
216             ]
217           }
218         : $where);
219   }
220
221   if (defined $cond) {
222     $new_attrs->{where} = (
223       defined $new_attrs->{where}
224         ? { '-and' => [
225               map {
226                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
227               } $cond, $new_attrs->{where}
228             ]
229           }
230         : $cond);
231   }
232
233   if (defined $having) {
234     $new_attrs->{having} = (
235       defined $new_attrs->{having}
236         ? { '-and' => [
237               map {
238                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
239               } $having, $new_attrs->{having}
240             ]
241           }
242         : $having);
243   }
244
245   my $rs = (ref $self)->new($self->result_source, $new_attrs);
246   if ($rows) {
247     $rs->set_cache($rows);
248   }
249   return $rs;
250 }
251
252 =head2 search_literal
253
254 =over 4
255
256 =item Arguments: $sql_fragment, @bind_values
257
258 =item Return Value: $resultset (scalar context), @row_objs (list context)
259
260 =back
261
262   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
263   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
264
265 Pass a literal chunk of SQL to be added to the conditional part of the
266 resultset query.
267
268 =cut
269
270 sub search_literal {
271   my ($self, $cond, @vals) = @_;
272   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
273   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
274   return $self->search(\$cond, $attrs);
275 }
276
277 =head2 find
278
279 =over 4
280
281 =item Arguments: @values | \%cols, \%attrs?
282
283 =item Return Value: $row_object
284
285 =back
286
287 Finds a row based on its primary key or unique constraint. For example, to find
288 a row by its primary key:
289
290   my $cd = $schema->resultset('CD')->find(5);
291
292 You can also find a row by a specific unique constraint using the C<key>
293 attribute. For example:
294
295   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
296     key => 'cd_artist_title'
297   });
298
299 Additionally, you can specify the columns explicitly by name:
300
301   my $cd = $schema->resultset('CD')->find(
302     {
303       artist => 'Massive Attack',
304       title  => 'Mezzanine',
305     },
306     { key => 'cd_artist_title' }
307   );
308
309 If the C<key> is specified as C<primary>, it searches only on the primary key.
310
311 If no C<key> is specified, it searches on all unique constraints defined on the
312 source, including the primary key.
313
314 If your table does not have a primary key, you B<must> provide a value for the
315 C<key> attribute matching one of the unique constraints on the source.
316
317 See also L</find_or_create> and L</update_or_create>. For information on how to
318 declare unique constraints, see
319 L<DBIx::Class::ResultSource/add_unique_constraint>.
320
321 =cut
322
323 sub find {
324   my $self = shift;
325   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
326
327   # Default to the primary key, but allow a specific key
328   my @cols = exists $attrs->{key}
329     ? $self->result_source->unique_constraint_columns($attrs->{key})
330     : $self->result_source->primary_columns;
331   $self->throw_exception(
332     "Can't find unless a primary key is defined or unique constraint is specified"
333   ) unless @cols;
334
335   # Parse out a hashref from input
336   my $input_query;
337   if (ref $_[0] eq 'HASH') {
338     $input_query = { %{$_[0]} };
339   }
340   elsif (@_ == @cols) {
341     $input_query = {};
342     @{$input_query}{@cols} = @_;
343   }
344   else {
345     # Compatibility: Allow e.g. find(id => $value)
346     carp "Find by key => value deprecated; please use a hashref instead";
347     $input_query = {@_};
348   }
349
350   my (%related, $info);
351
352   KEY: foreach my $key (keys %$input_query) {
353     if (ref($input_query->{$key})
354         && ($info = $self->result_source->relationship_info($key))) {
355       my $val = delete $input_query->{$key};
356       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
357       my $rel_q = $self->result_source->resolve_condition(
358                     $info->{cond}, $val, $key
359                   );
360       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
361       @related{keys %$rel_q} = values %$rel_q;
362     }
363   }
364   if (my @keys = keys %related) {
365     @{$input_query}{@keys} = values %related;
366   }
367
368   my @unique_queries = $self->_unique_queries($input_query, $attrs);
369
370   # Build the final query: Default to the disjunction of the unique queries,
371   # but allow the input query in case the ResultSet defines the query or the
372   # user is abusing find
373   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
374   my $query = @unique_queries
375     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
376     : $self->_add_alias($input_query, $alias);
377
378   # Run the query
379   if (keys %$attrs) {
380     my $rs = $self->search($query, $attrs);
381     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
382   }
383   else {
384     return keys %{$self->_resolved_attrs->{collapse}}
385       ? $self->search($query)->next
386       : $self->single($query);
387   }
388 }
389
390 # _add_alias
391 #
392 # Add the specified alias to the specified query hash. A copy is made so the
393 # original query is not modified.
394
395 sub _add_alias {
396   my ($self, $query, $alias) = @_;
397
398   my %aliased = %$query;
399   foreach my $col (grep { ! m/\./ } keys %aliased) {
400     $aliased{"$alias.$col"} = delete $aliased{$col};
401   }
402
403   return \%aliased;
404 }
405
406 # _unique_queries
407 #
408 # Build a list of queries which satisfy unique constraints.
409
410 sub _unique_queries {
411   my ($self, $query, $attrs) = @_;
412
413   my @constraint_names = exists $attrs->{key}
414     ? ($attrs->{key})
415     : $self->result_source->unique_constraint_names;
416
417   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
418   my $num_where = scalar keys %$where;
419
420   my @unique_queries;
421   foreach my $name (@constraint_names) {
422     my @unique_cols = $self->result_source->unique_constraint_columns($name);
423     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
424
425     my $num_cols = scalar @unique_cols;
426     my $num_query = scalar keys %$unique_query;
427
428     my $total = $num_query + $num_where;
429     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
430       # The query is either unique on its own or is unique in combination with
431       # the existing where clause
432       push @unique_queries, $unique_query;
433     }
434   }
435
436   return @unique_queries;
437 }
438
439 # _build_unique_query
440 #
441 # Constrain the specified query hash based on the specified column names.
442
443 sub _build_unique_query {
444   my ($self, $query, $unique_cols) = @_;
445
446   return {
447     map  { $_ => $query->{$_} }
448     grep { exists $query->{$_} }
449       @$unique_cols
450   };
451 }
452
453 =head2 search_related
454
455 =over 4
456
457 =item Arguments: $rel, $cond, \%attrs?
458
459 =item Return Value: $new_resultset
460
461 =back
462
463   $new_rs = $cd_rs->search_related('artist', {
464     name => 'Emo-R-Us',
465   });
466
467 Searches the specified relationship, optionally specifying a condition and
468 attributes for matching records. See L</ATTRIBUTES> for more information.
469
470 =cut
471
472 sub search_related {
473   return shift->related_resultset(shift)->search(@_);
474 }
475
476 =head2 cursor
477
478 =over 4
479
480 =item Arguments: none
481
482 =item Return Value: $cursor
483
484 =back
485
486 Returns a storage-driven cursor to the given resultset. See
487 L<DBIx::Class::Cursor> for more information.
488
489 =cut
490
491 sub cursor {
492   my ($self) = @_;
493
494   my $attrs = { %{$self->_resolved_attrs} };
495   return $self->{cursor}
496     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
497           $attrs->{where},$attrs);
498 }
499
500 =head2 single
501
502 =over 4
503
504 =item Arguments: $cond?
505
506 =item Return Value: $row_object?
507
508 =back
509
510   my $cd = $schema->resultset('CD')->single({ year => 2001 });
511
512 Inflates the first result without creating a cursor if the resultset has
513 any records in it; if not returns nothing. Used by L</find> as an optimisation.
514
515 Can optionally take an additional condition *only* - this is a fast-code-path
516 method; if you need to add extra joins or similar call ->search and then
517 ->single without a condition on the $rs returned from that.
518
519 =cut
520
521 sub single {
522   my ($self, $where) = @_;
523   my $attrs = { %{$self->_resolved_attrs} };
524   if ($where) {
525     if (defined $attrs->{where}) {
526       $attrs->{where} = {
527         '-and' =>
528             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
529                $where, delete $attrs->{where} ]
530       };
531     } else {
532       $attrs->{where} = $where;
533     }
534   }
535
536 #  XXX: Disabled since it doesn't infer uniqueness in all cases
537 #  unless ($self->_is_unique_query($attrs->{where})) {
538 #    carp "Query not guaranteed to return a single row"
539 #      . "; please declare your unique constraints or use search instead";
540 #  }
541
542   my @data = $self->result_source->storage->select_single(
543     $attrs->{from}, $attrs->{select},
544     $attrs->{where}, $attrs
545   );
546
547   return (@data ? ($self->_construct_object(@data))[0] : undef);
548 }
549
550 # _is_unique_query
551 #
552 # Try to determine if the specified query is guaranteed to be unique, based on
553 # the declared unique constraints.
554
555 sub _is_unique_query {
556   my ($self, $query) = @_;
557
558   my $collapsed = $self->_collapse_query($query);
559   my $alias = $self->{attrs}{alias};
560
561   foreach my $name ($self->result_source->unique_constraint_names) {
562     my @unique_cols = map {
563       "$alias.$_"
564     } $self->result_source->unique_constraint_columns($name);
565
566     # Count the values for each unique column
567     my %seen = map { $_ => 0 } @unique_cols;
568
569     foreach my $key (keys %$collapsed) {
570       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
571       next unless exists $seen{$aliased};  # Additional constraints are okay
572       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
573     }
574
575     # If we get 0 or more than 1 value for a column, it's not necessarily unique
576     return 1 unless grep { $_ != 1 } values %seen;
577   }
578
579   return 0;
580 }
581
582 # _collapse_query
583 #
584 # Recursively collapse the query, accumulating values for each column.
585
586 sub _collapse_query {
587   my ($self, $query, $collapsed) = @_;
588
589   $collapsed ||= {};
590
591   if (ref $query eq 'ARRAY') {
592     foreach my $subquery (@$query) {
593       next unless ref $subquery;  # -or
594 #      warn "ARRAY: " . Dumper $subquery;
595       $collapsed = $self->_collapse_query($subquery, $collapsed);
596     }
597   }
598   elsif (ref $query eq 'HASH') {
599     if (keys %$query and (keys %$query)[0] eq '-and') {
600       foreach my $subquery (@{$query->{-and}}) {
601 #        warn "HASH: " . Dumper $subquery;
602         $collapsed = $self->_collapse_query($subquery, $collapsed);
603       }
604     }
605     else {
606 #      warn "LEAF: " . Dumper $query;
607       foreach my $col (keys %$query) {
608         my $value = $query->{$col};
609         $collapsed->{$col}{$value}++;
610       }
611     }
612   }
613
614   return $collapsed;
615 }
616
617 =head2 get_column
618
619 =over 4
620
621 =item Arguments: $cond?
622
623 =item Return Value: $resultsetcolumn
624
625 =back
626
627   my $max_length = $rs->get_column('length')->max;
628
629 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
630
631 =cut
632
633 sub get_column {
634   my ($self, $column) = @_;
635   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
636   return $new;
637 }
638
639 =head2 search_like
640
641 =over 4
642
643 =item Arguments: $cond, \%attrs?
644
645 =item Return Value: $resultset (scalar context), @row_objs (list context)
646
647 =back
648
649   # WHERE title LIKE '%blue%'
650   $cd_rs = $rs->search_like({ title => '%blue%'});
651
652 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
653 that this is simply a convenience method. You most likely want to use
654 L</search> with specific operators.
655
656 For more information, see L<DBIx::Class::Manual::Cookbook>.
657
658 =cut
659
660 sub search_like {
661   my $class = shift;
662   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
663   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
664   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
665   return $class->search($query, { %$attrs });
666 }
667
668 =head2 slice
669
670 =over 4
671
672 =item Arguments: $first, $last
673
674 =item Return Value: $resultset (scalar context), @row_objs (list context)
675
676 =back
677
678 Returns a resultset or object list representing a subset of elements from the
679 resultset slice is called on. Indexes are from 0, i.e., to get the first
680 three records, call:
681
682   my ($one, $two, $three) = $rs->slice(0, 2);
683
684 =cut
685
686 sub slice {
687   my ($self, $min, $max) = @_;
688   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
689   $attrs->{offset} = $self->{attrs}{offset} || 0;
690   $attrs->{offset} += $min;
691   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
692   return $self->search(undef(), $attrs);
693   #my $slice = (ref $self)->new($self->result_source, $attrs);
694   #return (wantarray ? $slice->all : $slice);
695 }
696
697 =head2 next
698
699 =over 4
700
701 =item Arguments: none
702
703 =item Return Value: $result?
704
705 =back
706
707 Returns the next element in the resultset (C<undef> is there is none).
708
709 Can be used to efficiently iterate over records in the resultset:
710
711   my $rs = $schema->resultset('CD')->search;
712   while (my $cd = $rs->next) {
713     print $cd->title;
714   }
715
716 Note that you need to store the resultset object, and call C<next> on it.
717 Calling C<< resultset('Table')->next >> repeatedly will always return the
718 first record from the resultset.
719
720 =cut
721
722 sub next {
723   my ($self) = @_;
724   if (my $cache = $self->get_cache) {
725     $self->{all_cache_position} ||= 0;
726     return $cache->[$self->{all_cache_position}++];
727   }
728   if ($self->{attrs}{cache}) {
729     $self->{all_cache_position} = 1;
730     return ($self->all)[0];
731   }
732   if ($self->{stashed_objects}) {
733     my $obj = shift(@{$self->{stashed_objects}});
734     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
735     return $obj;
736   }
737   my @row = (
738     exists $self->{stashed_row}
739       ? @{delete $self->{stashed_row}}
740       : $self->cursor->next
741   );
742   return undef unless (@row);
743   my ($row, @more) = $self->_construct_object(@row);
744   $self->{stashed_objects} = \@more if @more;
745   return $row;
746 }
747
748 sub _construct_object {
749   my ($self, @row) = @_;
750   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
751   my @new = $self->result_class->inflate_result($self->result_source, @$info);
752   @new = $self->{_attrs}{record_filter}->(@new)
753     if exists $self->{_attrs}{record_filter};
754   return @new;
755 }
756
757 sub _collapse_result {
758   my ($self, $as_proto, $row) = @_;
759
760   my @copy = @$row;
761
762   # 'foo'         => [ undef, 'foo' ]
763   # 'foo.bar'     => [ 'foo', 'bar' ]
764   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
765
766   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
767
768   my %collapse = %{$self->{_attrs}{collapse}||{}};
769
770   my @pri_index;
771
772   # if we're doing collapsing (has_many prefetch) we need to grab records
773   # until the PK changes, so fill @pri_index. if not, we leave it empty so
774   # we know we don't have to bother.
775
776   # the reason for not using the collapse stuff directly is because if you
777   # had for e.g. two artists in a row with no cds, the collapse info for
778   # both would be NULL (undef) so you'd lose the second artist
779
780   # store just the index so we can check the array positions from the row
781   # without having to contruct the full hash
782
783   if (keys %collapse) {
784     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
785     foreach my $i (0 .. $#construct_as) {
786       next if defined($construct_as[$i][0]); # only self table
787       if (delete $pri{$construct_as[$i][1]}) {
788         push(@pri_index, $i);
789       }
790       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
791     }
792   }
793
794   # no need to do an if, it'll be empty if @pri_index is empty anyway
795
796   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
797
798   my @const_rows;
799
800   do { # no need to check anything at the front, we always want the first row
801
802     my %const;
803   
804     foreach my $this_as (@construct_as) {
805       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
806     }
807
808     push(@const_rows, \%const);
809
810   } until ( # no pri_index => no collapse => drop straight out
811       !@pri_index
812     or
813       do { # get another row, stash it, drop out if different PK
814
815         @copy = $self->cursor->next;
816         $self->{stashed_row} = \@copy;
817
818         # last thing in do block, counts as true if anything doesn't match
819
820         # check xor defined first for NULL vs. NOT NULL then if one is
821         # defined the other must be so check string equality
822
823         grep {
824           (defined $pri_vals{$_} ^ defined $copy[$_])
825           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
826         } @pri_index;
827       }
828   );
829
830   my $alias = $self->{attrs}{alias};
831   my $info = [];
832
833   my %collapse_pos;
834
835   my @const_keys;
836
837   use Data::Dumper;
838
839   foreach my $const (@const_rows) {
840     scalar @const_keys or do {
841       @const_keys = sort { length($a) <=> length($b) } keys %$const;
842     };
843     foreach my $key (@const_keys) {
844       if (length $key) {
845         my $target = $info;
846         my @parts = split(/\./, $key);
847         my $cur = '';
848         my $data = $const->{$key};
849         foreach my $p (@parts) {
850           $target = $target->[1]->{$p} ||= [];
851           $cur .= ".${p}";
852           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
853             # collapsing at this point and on final part
854             my $pos = $collapse_pos{$cur};
855             CK: foreach my $ck (@ckey) {
856               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
857                 $collapse_pos{$cur} = $data;
858                 delete @collapse_pos{ # clear all positioning for sub-entries
859                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
860                 };
861                 push(@$target, []);
862                 last CK;
863               }
864             }
865           }
866           if (exists $collapse{$cur}) {
867             $target = $target->[-1];
868           }
869         }
870         $target->[0] = $data;
871       } else {
872         $info->[0] = $const->{$key};
873       }
874     }
875   }
876
877   return $info;
878 }
879
880 =head2 result_source
881
882 =over 4
883
884 =item Arguments: $result_source?
885
886 =item Return Value: $result_source
887
888 =back
889
890 An accessor for the primary ResultSource object from which this ResultSet
891 is derived.
892
893 =head2 result_class
894
895 =over 4
896
897 =item Arguments: $result_class?
898
899 =item Return Value: $result_class
900
901 =back
902
903 An accessor for the class to use when creating row objects. Defaults to 
904 C<< result_source->result_class >> - which in most cases is the name of the 
905 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
906
907 =cut
908
909
910 =head2 count
911
912 =over 4
913
914 =item Arguments: $cond, \%attrs??
915
916 =item Return Value: $count
917
918 =back
919
920 Performs an SQL C<COUNT> with the same query as the resultset was built
921 with to find the number of elements. If passed arguments, does a search
922 on the resultset and counts the results of that.
923
924 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
925 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
926 not support C<DISTINCT> with multiple columns. If you are using such a
927 database, you should only use columns from the main table in your C<group_by>
928 clause.
929
930 =cut
931
932 sub count {
933   my $self = shift;
934   return $self->search(@_)->count if @_ and defined $_[0];
935   return scalar @{ $self->get_cache } if $self->get_cache;
936   my $count = $self->_count;
937   return 0 unless $count;
938
939   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
940   $count = $self->{attrs}{rows} if
941     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
942   return $count;
943 }
944
945 sub _count { # Separated out so pager can get the full count
946   my $self = shift;
947   my $select = { count => '*' };
948
949   my $attrs = { %{$self->_resolved_attrs} };
950   if (my $group_by = delete $attrs->{group_by}) {
951     delete $attrs->{having};
952     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
953     # todo: try CONCAT for multi-column pk
954     my @pk = $self->result_source->primary_columns;
955     if (@pk == 1) {
956       my $alias = $attrs->{alias};
957       foreach my $column (@distinct) {
958         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
959           @distinct = ($column);
960           last;
961         }
962       }
963     }
964
965     $select = { count => { distinct => \@distinct } };
966   }
967
968   $attrs->{select} = $select;
969   $attrs->{as} = [qw/count/];
970
971   # offset, order by and page are not needed to count. record_filter is cdbi
972   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
973
974   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
975   my ($count) = $tmp_rs->cursor->next;
976   return $count;
977 }
978
979 =head2 count_literal
980
981 =over 4
982
983 =item Arguments: $sql_fragment, @bind_values
984
985 =item Return Value: $count
986
987 =back
988
989 Counts the results in a literal query. Equivalent to calling L</search_literal>
990 with the passed arguments, then L</count>.
991
992 =cut
993
994 sub count_literal { shift->search_literal(@_)->count; }
995
996 =head2 all
997
998 =over 4
999
1000 =item Arguments: none
1001
1002 =item Return Value: @objects
1003
1004 =back
1005
1006 Returns all elements in the resultset. Called implicitly if the resultset
1007 is returned in list context.
1008
1009 =cut
1010
1011 sub all {
1012   my ($self) = @_;
1013   return @{ $self->get_cache } if $self->get_cache;
1014
1015   my @obj;
1016
1017   # TODO: don't call resolve here
1018   if (keys %{$self->_resolved_attrs->{collapse}}) {
1019 #  if ($self->{attrs}{prefetch}) {
1020       # Using $self->cursor->all is really just an optimisation.
1021       # If we're collapsing has_many prefetches it probably makes
1022       # very little difference, and this is cleaner than hacking
1023       # _construct_object to survive the approach
1024     my @row = $self->cursor->next;
1025     while (@row) {
1026       push(@obj, $self->_construct_object(@row));
1027       @row = (exists $self->{stashed_row}
1028                ? @{delete $self->{stashed_row}}
1029                : $self->cursor->next);
1030     }
1031   } else {
1032     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1033   }
1034
1035   $self->set_cache(\@obj) if $self->{attrs}{cache};
1036   return @obj;
1037 }
1038
1039 =head2 reset
1040
1041 =over 4
1042
1043 =item Arguments: none
1044
1045 =item Return Value: $self
1046
1047 =back
1048
1049 Resets the resultset's cursor, so you can iterate through the elements again.
1050
1051 =cut
1052
1053 sub reset {
1054   my ($self) = @_;
1055   delete $self->{_attrs} if exists $self->{_attrs};
1056   $self->{all_cache_position} = 0;
1057   $self->cursor->reset;
1058   return $self;
1059 }
1060
1061 =head2 first
1062
1063 =over 4
1064
1065 =item Arguments: none
1066
1067 =item Return Value: $object?
1068
1069 =back
1070
1071 Resets the resultset and returns an object for the first result (if the
1072 resultset returns anything).
1073
1074 =cut
1075
1076 sub first {
1077   return $_[0]->reset->next;
1078 }
1079
1080 # _cond_for_update_delete
1081 #
1082 # update/delete require the condition to be modified to handle
1083 # the differing SQL syntax available.  This transforms the $self->{cond}
1084 # appropriately, returning the new condition.
1085
1086 sub _cond_for_update_delete {
1087   my ($self, $full_cond) = @_;
1088   my $cond = {};
1089
1090   $full_cond ||= $self->{cond};
1091   # No-op. No condition, we're updating/deleting everything
1092   return $cond unless ref $full_cond;
1093
1094   if (ref $full_cond eq 'ARRAY') {
1095     $cond = [
1096       map {
1097         my %hash;
1098         foreach my $key (keys %{$_}) {
1099           $key =~ /([^.]+)$/;
1100           $hash{$1} = $_->{$key};
1101         }
1102         \%hash;
1103       } @{$full_cond}
1104     ];
1105   }
1106   elsif (ref $full_cond eq 'HASH') {
1107     if ((keys %{$full_cond})[0] eq '-and') {
1108       $cond->{-and} = [];
1109
1110       my @cond = @{$full_cond->{-and}};
1111       for (my $i = 0; $i < @cond; $i++) {
1112         my $entry = $cond[$i];
1113
1114         my $hash;
1115         if (ref $entry eq 'HASH') {
1116           $hash = $self->_cond_for_update_delete($entry);
1117         }
1118         else {
1119           $entry =~ /([^.]+)$/;
1120           $hash->{$1} = $cond[++$i];
1121         }
1122
1123         push @{$cond->{-and}}, $hash;
1124       }
1125     }
1126     else {
1127       foreach my $key (keys %{$full_cond}) {
1128         $key =~ /([^.]+)$/;
1129         $cond->{$1} = $full_cond->{$key};
1130       }
1131     }
1132   }
1133   else {
1134     $self->throw_exception(
1135       "Can't update/delete on resultset with condition unless hash or array"
1136     );
1137   }
1138
1139   return $cond;
1140 }
1141
1142
1143 =head2 update
1144
1145 =over 4
1146
1147 =item Arguments: \%values
1148
1149 =item Return Value: $storage_rv
1150
1151 =back
1152
1153 Sets the specified columns in the resultset to the supplied values in a
1154 single query. Return value will be true if the update succeeded or false
1155 if no records were updated; exact type of success value is storage-dependent.
1156
1157 =cut
1158
1159 sub update {
1160   my ($self, $values) = @_;
1161   $self->throw_exception("Values for update must be a hash")
1162     unless ref $values eq 'HASH';
1163
1164   my $cond = $self->_cond_for_update_delete;
1165    
1166   return $self->result_source->storage->update(
1167     $self->result_source, $values, $cond
1168   );
1169 }
1170
1171 =head2 update_all
1172
1173 =over 4
1174
1175 =item Arguments: \%values
1176
1177 =item Return Value: 1
1178
1179 =back
1180
1181 Fetches all objects and updates them one at a time. Note that C<update_all>
1182 will run DBIC cascade triggers, while L</update> will not.
1183
1184 =cut
1185
1186 sub update_all {
1187   my ($self, $values) = @_;
1188   $self->throw_exception("Values for update must be a hash")
1189     unless ref $values eq 'HASH';
1190   foreach my $obj ($self->all) {
1191     $obj->set_columns($values)->update;
1192   }
1193   return 1;
1194 }
1195
1196 =head2 delete
1197
1198 =over 4
1199
1200 =item Arguments: none
1201
1202 =item Return Value: 1
1203
1204 =back
1205
1206 Deletes the contents of the resultset from its result source. Note that this
1207 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1208 to run. See also L<DBIx::Class::Row/delete>.
1209
1210 =cut
1211
1212 sub delete {
1213   my ($self) = @_;
1214
1215   my $cond = $self->_cond_for_update_delete;
1216
1217   $self->result_source->storage->delete($self->result_source, $cond);
1218   return 1;
1219 }
1220
1221 =head2 delete_all
1222
1223 =over 4
1224
1225 =item Arguments: none
1226
1227 =item Return Value: 1
1228
1229 =back
1230
1231 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1232 will run DBIC cascade triggers, while L</delete> will not.
1233
1234 =cut
1235
1236 sub delete_all {
1237   my ($self) = @_;
1238   $_->delete for $self->all;
1239   return 1;
1240 }
1241
1242 =head2 populate
1243
1244 =over 4
1245
1246 =item Arguments: \@data;
1247
1248 =back
1249
1250 Pass an arrayref of hashrefs. Each hashref should be a structure suitable for
1251 submitting to a $resultset->create(...) method.
1252
1253 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1254 to insert the data, as this is a faster method.
1255
1256 Otherwise, each set of data is inserted into the database using
1257 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1258 objects is returned.
1259
1260 Example:  Assuming an Artist Class that has many CDs Classes relating:
1261
1262   my $Artist_rs = $schema->resultset("Artist");
1263   
1264   ## Void Context Example 
1265   $Artist_rs->populate([
1266      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1267         { title => 'My First CD', year => 2006 },
1268         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1269       ],
1270      },
1271      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1272         { title => 'My parents sold me to a record company' ,year => 2005 },
1273         { title => 'Why Am I So Ugly?', year => 2006 },
1274         { title => 'I Got Surgery and am now Popular', year => 2007 }
1275       ],
1276      },
1277   ]);
1278   
1279   ## Array Context Example
1280   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1281     { name => "Artist One"},
1282     { name => "Artist Two"},
1283     { name => "Artist Three", cds=> [
1284     { title => "First CD", year => 2007},
1285     { title => "Second CD", year => 2008},
1286   ]}
1287   ]);
1288   
1289   print $ArtistOne->name; ## response is 'Artist One'
1290   print $ArtistThree->cds->count ## reponse is '2'
1291
1292 =cut
1293
1294 sub populate {
1295   my ($self, $data) = @_;
1296   
1297   if(defined wantarray) {
1298     my @created;
1299     foreach my $item (@$data) {
1300       push(@created, $self->create($item));
1301     }
1302     return @created;
1303   } else {
1304     my ($first, @rest) = @$data;
1305
1306     my @names = grep {!ref $first->{$_}} keys %$first;
1307     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1308     my @pks = $self->result_source->primary_columns;  
1309
1310     ## do the belongs_to relationships  
1311     foreach my $index (0..$#$data) {
1312       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1313         my @ret = $self->populate($data);
1314         return;
1315       }
1316     
1317       foreach my $rel (@rels) {
1318         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1319         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1320         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1321         my $related = $result->result_source->resolve_condition(
1322           $result->result_source->relationship_info($reverse)->{cond},
1323           $self,        
1324           $result,        
1325         );
1326
1327         delete $data->[$index]->{$rel};
1328         $data->[$index] = {%{$data->[$index]}, %$related};
1329       
1330         push @names, keys %$related if $index == 0;
1331       }
1332     }
1333
1334     ## do bulk insert on current row
1335     my @values = map {
1336       [ map {
1337          defined $_ ? $_ : $self->throw_exception("Undefined value for column!")
1338       } @$_{@names} ]
1339     } @$data;
1340
1341     $self->result_source->storage->insert_bulk(
1342       $self->result_source, 
1343       \@names, 
1344       \@values,
1345     );
1346
1347     ## do the has_many relationships
1348     foreach my $item (@$data) {
1349
1350       foreach my $rel (@rels) {
1351         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1352
1353         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1354      || $self->throw_exception('Cannot find the relating object.');
1355      
1356         my $child = $parent->$rel;
1357     
1358         my $related = $child->result_source->resolve_condition(
1359           $parent->result_source->relationship_info($rel)->{cond},
1360           $child,
1361           $parent,
1362         );
1363
1364         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1365         my @populate = map { {%$_, %$related} } @rows_to_add;
1366
1367         $child->populate( \@populate );
1368       }
1369     }
1370   }
1371 }
1372
1373 =head2 pager
1374
1375 =over 4
1376
1377 =item Arguments: none
1378
1379 =item Return Value: $pager
1380
1381 =back
1382
1383 Return Value a L<Data::Page> object for the current resultset. Only makes
1384 sense for queries with a C<page> attribute.
1385
1386 =cut
1387
1388 sub pager {
1389   my ($self) = @_;
1390   my $attrs = $self->{attrs};
1391   $self->throw_exception("Can't create pager for non-paged rs")
1392     unless $self->{attrs}{page};
1393   $attrs->{rows} ||= 10;
1394   return $self->{pager} ||= Data::Page->new(
1395     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1396 }
1397
1398 =head2 page
1399
1400 =over 4
1401
1402 =item Arguments: $page_number
1403
1404 =item Return Value: $rs
1405
1406 =back
1407
1408 Returns a resultset for the $page_number page of the resultset on which page
1409 is called, where each page contains a number of rows equal to the 'rows'
1410 attribute set on the resultset (10 by default).
1411
1412 =cut
1413
1414 sub page {
1415   my ($self, $page) = @_;
1416   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1417 }
1418
1419 =head2 new_result
1420
1421 =over 4
1422
1423 =item Arguments: \%vals
1424
1425 =item Return Value: $object
1426
1427 =back
1428
1429 Creates an object in the resultset's result class and returns it.
1430
1431 =cut
1432
1433 sub new_result {
1434   my ($self, $values) = @_;
1435   $self->throw_exception( "new_result needs a hash" )
1436     unless (ref $values eq 'HASH');
1437   $self->throw_exception(
1438     "Can't abstract implicit construct, condition not a hash"
1439   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1440
1441   my $alias = $self->{attrs}{alias};
1442   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1443   my %new = (
1444     %{ $self->_remove_alias($values, $alias) },
1445     %{ $self->_remove_alias($collapsed_cond, $alias) },
1446     -source_handle => $self->_source_handle,
1447     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1448   );
1449
1450   return $self->result_class->new(\%new);
1451 }
1452
1453 # _collapse_cond
1454 #
1455 # Recursively collapse the condition.
1456
1457 sub _collapse_cond {
1458   my ($self, $cond, $collapsed) = @_;
1459
1460   $collapsed ||= {};
1461
1462   if (ref $cond eq 'ARRAY') {
1463     foreach my $subcond (@$cond) {
1464       next unless ref $subcond;  # -or
1465 #      warn "ARRAY: " . Dumper $subcond;
1466       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1467     }
1468   }
1469   elsif (ref $cond eq 'HASH') {
1470     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1471       foreach my $subcond (@{$cond->{-and}}) {
1472 #        warn "HASH: " . Dumper $subcond;
1473         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1474       }
1475     }
1476     else {
1477 #      warn "LEAF: " . Dumper $cond;
1478       foreach my $col (keys %$cond) {
1479         my $value = $cond->{$col};
1480         $collapsed->{$col} = $value;
1481       }
1482     }
1483   }
1484
1485   return $collapsed;
1486 }
1487
1488 # _remove_alias
1489 #
1490 # Remove the specified alias from the specified query hash. A copy is made so
1491 # the original query is not modified.
1492
1493 sub _remove_alias {
1494   my ($self, $query, $alias) = @_;
1495
1496   my %orig = %{ $query || {} };
1497   my %unaliased;
1498
1499   foreach my $key (keys %orig) {
1500     if ($key !~ /\./) {
1501       $unaliased{$key} = $orig{$key};
1502       next;
1503     }
1504     $unaliased{$1} = $orig{$key}
1505       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1506   }
1507
1508   return \%unaliased;
1509 }
1510
1511 =head2 find_or_new
1512
1513 =over 4
1514
1515 =item Arguments: \%vals, \%attrs?
1516
1517 =item Return Value: $object
1518
1519 =back
1520
1521 Find an existing record from this resultset. If none exists, instantiate a new
1522 result object and return it. The object will not be saved into your storage
1523 until you call L<DBIx::Class::Row/insert> on it.
1524
1525 If you want objects to be saved immediately, use L</find_or_create> instead.
1526
1527 =cut
1528
1529 sub find_or_new {
1530   my $self     = shift;
1531   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1532   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1533   my $exists   = $self->find($hash, $attrs);
1534   return defined $exists ? $exists : $self->new_result($hash);
1535 }
1536
1537 =head2 create
1538
1539 =over 4
1540
1541 =item Arguments: \%vals
1542
1543 =item Return Value: $object
1544
1545 =back
1546
1547 Inserts a record into the resultset and returns the object representing it.
1548
1549 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1550
1551 =cut
1552
1553 sub create {
1554   my ($self, $attrs) = @_;
1555   $self->throw_exception( "create needs a hashref" )
1556     unless ref $attrs eq 'HASH';
1557   return $self->new_result($attrs)->insert;
1558 }
1559
1560 =head2 find_or_create
1561
1562 =over 4
1563
1564 =item Arguments: \%vals, \%attrs?
1565
1566 =item Return Value: $object
1567
1568 =back
1569
1570   $class->find_or_create({ key => $val, ... });
1571
1572 Tries to find a record based on its primary key or unique constraint; if none
1573 is found, creates one and returns that instead.
1574
1575   my $cd = $schema->resultset('CD')->find_or_create({
1576     cdid   => 5,
1577     artist => 'Massive Attack',
1578     title  => 'Mezzanine',
1579     year   => 2005,
1580   });
1581
1582 Also takes an optional C<key> attribute, to search by a specific key or unique
1583 constraint. For example:
1584
1585   my $cd = $schema->resultset('CD')->find_or_create(
1586     {
1587       artist => 'Massive Attack',
1588       title  => 'Mezzanine',
1589     },
1590     { key => 'cd_artist_title' }
1591   );
1592
1593 See also L</find> and L</update_or_create>. For information on how to declare
1594 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1595
1596 =cut
1597
1598 sub find_or_create {
1599   my $self     = shift;
1600   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1601   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1602   my $exists   = $self->find($hash, $attrs);
1603   return defined $exists ? $exists : $self->create($hash);
1604 }
1605
1606 =head2 update_or_create
1607
1608 =over 4
1609
1610 =item Arguments: \%col_values, { key => $unique_constraint }?
1611
1612 =item Return Value: $object
1613
1614 =back
1615
1616   $class->update_or_create({ col => $val, ... });
1617
1618 First, searches for an existing row matching one of the unique constraints
1619 (including the primary key) on the source of this resultset. If a row is
1620 found, updates it with the other given column values. Otherwise, creates a new
1621 row.
1622
1623 Takes an optional C<key> attribute to search on a specific unique constraint.
1624 For example:
1625
1626   # In your application
1627   my $cd = $schema->resultset('CD')->update_or_create(
1628     {
1629       artist => 'Massive Attack',
1630       title  => 'Mezzanine',
1631       year   => 1998,
1632     },
1633     { key => 'cd_artist_title' }
1634   );
1635
1636 If no C<key> is specified, it searches on all unique constraints defined on the
1637 source, including the primary key.
1638
1639 If the C<key> is specified as C<primary>, it searches only on the primary key.
1640
1641 See also L</find> and L</find_or_create>. For information on how to declare
1642 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1643
1644 =cut
1645
1646 sub update_or_create {
1647   my $self = shift;
1648   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1649   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1650
1651   my $row = $self->find($cond, $attrs);
1652   if (defined $row) {
1653     $row->update($cond);
1654     return $row;
1655   }
1656
1657   return $self->create($cond);
1658 }
1659
1660 =head2 get_cache
1661
1662 =over 4
1663
1664 =item Arguments: none
1665
1666 =item Return Value: \@cache_objects?
1667
1668 =back
1669
1670 Gets the contents of the cache for the resultset, if the cache is set.
1671
1672 =cut
1673
1674 sub get_cache {
1675   shift->{all_cache};
1676 }
1677
1678 =head2 set_cache
1679
1680 =over 4
1681
1682 =item Arguments: \@cache_objects
1683
1684 =item Return Value: \@cache_objects
1685
1686 =back
1687
1688 Sets the contents of the cache for the resultset. Expects an arrayref
1689 of objects of the same class as those produced by the resultset. Note that
1690 if the cache is set the resultset will return the cached objects rather
1691 than re-querying the database even if the cache attr is not set.
1692
1693 =cut
1694
1695 sub set_cache {
1696   my ( $self, $data ) = @_;
1697   $self->throw_exception("set_cache requires an arrayref")
1698       if defined($data) && (ref $data ne 'ARRAY');
1699   $self->{all_cache} = $data;
1700 }
1701
1702 =head2 clear_cache
1703
1704 =over 4
1705
1706 =item Arguments: none
1707
1708 =item Return Value: []
1709
1710 =back
1711
1712 Clears the cache for the resultset.
1713
1714 =cut
1715
1716 sub clear_cache {
1717   shift->set_cache(undef);
1718 }
1719
1720 =head2 related_resultset
1721
1722 =over 4
1723
1724 =item Arguments: $relationship_name
1725
1726 =item Return Value: $resultset
1727
1728 =back
1729
1730 Returns a related resultset for the supplied relationship name.
1731
1732   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1733
1734 =cut
1735
1736 sub related_resultset {
1737   my ($self, $rel) = @_;
1738
1739   $self->{related_resultsets} ||= {};
1740   return $self->{related_resultsets}{$rel} ||= do {
1741     my $rel_obj = $self->result_source->relationship_info($rel);
1742
1743     $self->throw_exception(
1744       "search_related: result source '" . $self->_source_handle->source_moniker .
1745         "' has no such relationship $rel")
1746       unless $rel_obj;
1747     
1748     my ($from,$seen) = $self->_resolve_from($rel);
1749
1750     my $join_count = $seen->{$rel};
1751     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1752
1753     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
1754     my %attrs = %{$self->{attrs}||{}};
1755     delete $attrs{result_class};
1756
1757     my $new_cache;
1758
1759     if (my $cache = $self->get_cache) {
1760       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
1761         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
1762                         @$cache ];
1763       }
1764     }
1765
1766     my $new = $self->_source_handle
1767                    ->schema
1768                    ->resultset($rel_obj->{class})
1769                    ->search_rs(
1770                        undef, {
1771                          %attrs,
1772                          join => undef,
1773                          prefetch => undef,
1774                          select => undef,
1775                          as => undef,
1776                          alias => $alias,
1777                          where => $self->{cond},
1778                          seen_join => $seen,
1779                          from => $from,
1780                      });
1781     $new->set_cache($new_cache) if $new_cache;
1782     $new;
1783   };
1784 }
1785
1786 sub _resolve_from {
1787   my ($self, $extra_join) = @_;
1788   my $source = $self->result_source;
1789   my $attrs = $self->{attrs};
1790   
1791   my $from = $attrs->{from}
1792     || [ { $attrs->{alias} => $source->from } ];
1793     
1794   my $seen = { %{$attrs->{seen_join}||{}} };
1795
1796   my $join = ($attrs->{join}
1797                ? [ $attrs->{join}, $extra_join ]
1798                : $extra_join);
1799   $from = [
1800     @$from,
1801     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1802   ];
1803
1804   return ($from,$seen);
1805 }
1806
1807 sub _resolved_attrs {
1808   my $self = shift;
1809   return $self->{_attrs} if $self->{_attrs};
1810
1811   my $attrs = { %{$self->{attrs}||{}} };
1812   my $source = $self->result_source;
1813   my $alias = $attrs->{alias};
1814
1815   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1816   if ($attrs->{columns}) {
1817     delete $attrs->{as};
1818   } elsif (!$attrs->{select}) {
1819     $attrs->{columns} = [ $source->columns ];
1820   }
1821  
1822   $attrs->{select} = 
1823     ($attrs->{select}
1824       ? (ref $attrs->{select} eq 'ARRAY'
1825           ? [ @{$attrs->{select}} ]
1826           : [ $attrs->{select} ])
1827       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1828     );
1829   $attrs->{as} =
1830     ($attrs->{as}
1831       ? (ref $attrs->{as} eq 'ARRAY'
1832           ? [ @{$attrs->{as}} ]
1833           : [ $attrs->{as} ])
1834       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1835     );
1836   
1837   my $adds;
1838   if ($adds = delete $attrs->{include_columns}) {
1839     $adds = [$adds] unless ref $adds eq 'ARRAY';
1840     push(@{$attrs->{select}}, @$adds);
1841     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1842   }
1843   if ($adds = delete $attrs->{'+select'}) {
1844     $adds = [$adds] unless ref $adds eq 'ARRAY';
1845     push(@{$attrs->{select}},
1846            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1847   }
1848   if (my $adds = delete $attrs->{'+as'}) {
1849     $adds = [$adds] unless ref $adds eq 'ARRAY';
1850     push(@{$attrs->{as}}, @$adds);
1851   }
1852
1853   $attrs->{from} ||= [ { 'me' => $source->from } ];
1854
1855   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1856     my $join = delete $attrs->{join} || {};
1857
1858     if (defined $attrs->{prefetch}) {
1859       $join = $self->_merge_attr(
1860         $join, $attrs->{prefetch}
1861       );
1862     }
1863
1864     $attrs->{from} =   # have to copy here to avoid corrupting the original
1865       [
1866         @{$attrs->{from}}, 
1867         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1868       ];
1869   }
1870
1871   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1872   if ($attrs->{order_by}) {
1873     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1874                            ? [ @{$attrs->{order_by}} ]
1875                            : [ $attrs->{order_by} ]);
1876   } else {
1877     $attrs->{order_by} = [];    
1878   }
1879
1880   my $collapse = $attrs->{collapse} || {};
1881   if (my $prefetch = delete $attrs->{prefetch}) {
1882     $prefetch = $self->_merge_attr({}, $prefetch);
1883     my @pre_order;
1884     my $seen = $attrs->{seen_join} || {};
1885     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1886       # bring joins back to level of current class
1887       my @prefetch = $source->resolve_prefetch(
1888         $p, $alias, $seen, \@pre_order, $collapse
1889       );
1890       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1891       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1892     }
1893     push(@{$attrs->{order_by}}, @pre_order);
1894   }
1895   $attrs->{collapse} = $collapse;
1896
1897   return $self->{_attrs} = $attrs;
1898 }
1899
1900 sub _merge_attr {
1901   my ($self, $a, $b) = @_;
1902   return $b unless defined($a);
1903   return $a unless defined($b);
1904   
1905   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1906     foreach my $key (keys %{$b}) {
1907       if (exists $a->{$key}) {
1908         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1909       } else {
1910         $a->{$key} = $b->{$key};
1911       }
1912     }
1913     return $a;
1914   } else {
1915     $a = [$a] unless ref $a eq 'ARRAY';
1916     $b = [$b] unless ref $b eq 'ARRAY';
1917
1918     my $hash = {};
1919     my @array;
1920     foreach my $x ($a, $b) {
1921       foreach my $element (@{$x}) {
1922         if (ref $element eq 'HASH') {
1923           $hash = $self->_merge_attr($hash, $element);
1924         } elsif (ref $element eq 'ARRAY') {
1925           push(@array, @{$element});
1926         } else {
1927           push(@array, $element) unless $b == $x
1928             && grep { $_ eq $element } @array;
1929         }
1930       }
1931     }
1932     
1933     @array = grep { !exists $hash->{$_} } @array;
1934
1935     return keys %{$hash}
1936       ? ( scalar(@array)
1937             ? [$hash, @array]
1938             : $hash
1939         )
1940       : \@array;
1941   }
1942 }
1943
1944 sub result_source {
1945     my $self = shift;
1946
1947     if (@_) {
1948         $self->_source_handle($_[0]->handle);
1949     } else {
1950         $self->_source_handle->resolve;
1951     }
1952 }
1953
1954 =head2 throw_exception
1955
1956 See L<DBIx::Class::Schema/throw_exception> for details.
1957
1958 =cut
1959
1960 sub throw_exception {
1961   my $self=shift;
1962   $self->_source_handle->schema->throw_exception(@_);
1963 }
1964
1965 # XXX: FIXME: Attributes docs need clearing up
1966
1967 =head1 ATTRIBUTES
1968
1969 The resultset takes various attributes that modify its behavior. Here's an
1970 overview of them:
1971
1972 =head2 order_by
1973
1974 =over 4
1975
1976 =item Value: ($order_by | \@order_by)
1977
1978 =back
1979
1980 Which column(s) to order the results by. This is currently passed
1981 through directly to SQL, so you can give e.g. C<year DESC> for a
1982 descending order on the column `year'.
1983
1984 Please note that if you have C<quote_char> enabled (see
1985 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1986 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1987 so you will need to manually quote things as appropriate.)
1988
1989 =head2 columns
1990
1991 =over 4
1992
1993 =item Value: \@columns
1994
1995 =back
1996
1997 Shortcut to request a particular set of columns to be retrieved.  Adds
1998 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1999 from that, then auto-populates C<as> from C<select> as normal. (You may also
2000 use the C<cols> attribute, as in earlier versions of DBIC.)
2001
2002 =head2 include_columns
2003
2004 =over 4
2005
2006 =item Value: \@columns
2007
2008 =back
2009
2010 Shortcut to include additional columns in the returned results - for example
2011
2012   $schema->resultset('CD')->search(undef, {
2013     include_columns => ['artist.name'],
2014     join => ['artist']
2015   });
2016
2017 would return all CDs and include a 'name' column to the information
2018 passed to object inflation. Note that the 'artist' is the name of the
2019 column (or relationship) accessor, and 'name' is the name of the column
2020 accessor in the related table.
2021
2022 =head2 select
2023
2024 =over 4
2025
2026 =item Value: \@select_columns
2027
2028 =back
2029
2030 Indicates which columns should be selected from the storage. You can use
2031 column names, or in the case of RDBMS back ends, function or stored procedure
2032 names:
2033
2034   $rs = $schema->resultset('Employee')->search(undef, {
2035     select => [
2036       'name',
2037       { count => 'employeeid' },
2038       { sum => 'salary' }
2039     ]
2040   });
2041
2042 When you use function/stored procedure names and do not supply an C<as>
2043 attribute, the column names returned are storage-dependent. E.g. MySQL would
2044 return a column named C<count(employeeid)> in the above example.
2045
2046 =head2 +select
2047
2048 =over 4
2049
2050 Indicates additional columns to be selected from storage.  Works the same as
2051 L<select> but adds columns to the selection.
2052
2053 =back
2054
2055 =head2 +as
2056
2057 =over 4
2058
2059 Indicates additional column names for those added via L<+select>.
2060
2061 =back
2062
2063 =head2 as
2064
2065 =over 4
2066
2067 =item Value: \@inflation_names
2068
2069 =back
2070
2071 Indicates column names for object inflation. That is, c< as >
2072 indicates the name that the column can be accessed as via the
2073 C<get_column> method (or via the object accessor, B<if one already
2074 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
2075 >.
2076
2077 The C< as > attribute is used in conjunction with C<select>,
2078 usually when C<select> contains one or more function or stored
2079 procedure names:
2080
2081   $rs = $schema->resultset('Employee')->search(undef, {
2082     select => [
2083       'name',
2084       { count => 'employeeid' }
2085     ],
2086     as => ['name', 'employee_count'],
2087   });
2088
2089   my $employee = $rs->first(); # get the first Employee
2090
2091 If the object against which the search is performed already has an accessor
2092 matching a column name specified in C<as>, the value can be retrieved using
2093 the accessor as normal:
2094
2095   my $name = $employee->name();
2096
2097 If on the other hand an accessor does not exist in the object, you need to
2098 use C<get_column> instead:
2099
2100   my $employee_count = $employee->get_column('employee_count');
2101
2102 You can create your own accessors if required - see
2103 L<DBIx::Class::Manual::Cookbook> for details.
2104
2105 Please note: This will NOT insert an C<AS employee_count> into the SQL
2106 statement produced, it is used for internal access only. Thus
2107 attempting to use the accessor in an C<order_by> clause or similar
2108 will fail miserably.
2109
2110 To get around this limitation, you can supply literal SQL to your
2111 C<select> attibute that contains the C<AS alias> text, eg:
2112
2113   select => [\'myfield AS alias']
2114
2115 =head2 join
2116
2117 =over 4
2118
2119 =item Value: ($rel_name | \@rel_names | \%rel_names)
2120
2121 =back
2122
2123 Contains a list of relationships that should be joined for this query.  For
2124 example:
2125
2126   # Get CDs by Nine Inch Nails
2127   my $rs = $schema->resultset('CD')->search(
2128     { 'artist.name' => 'Nine Inch Nails' },
2129     { join => 'artist' }
2130   );
2131
2132 Can also contain a hash reference to refer to the other relation's relations.
2133 For example:
2134
2135   package MyApp::Schema::Track;
2136   use base qw/DBIx::Class/;
2137   __PACKAGE__->table('track');
2138   __PACKAGE__->add_columns(qw/trackid cd position title/);
2139   __PACKAGE__->set_primary_key('trackid');
2140   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2141   1;
2142
2143   # In your application
2144   my $rs = $schema->resultset('Artist')->search(
2145     { 'track.title' => 'Teardrop' },
2146     {
2147       join     => { cd => 'track' },
2148       order_by => 'artist.name',
2149     }
2150   );
2151
2152 You need to use the relationship (not the table) name in  conditions, 
2153 because they are aliased as such. The current table is aliased as "me", so 
2154 you need to use me.column_name in order to avoid ambiguity. For example:
2155
2156   # Get CDs from 1984 with a 'Foo' track 
2157   my $rs = $schema->resultset('CD')->search(
2158     { 
2159       'me.year' => 1984,
2160       'tracks.name' => 'Foo'
2161     },
2162     { join => 'tracks' }
2163   );
2164   
2165 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2166 similarly for a third time). For e.g.
2167
2168   my $rs = $schema->resultset('Artist')->search({
2169     'cds.title'   => 'Down to Earth',
2170     'cds_2.title' => 'Popular',
2171   }, {
2172     join => [ qw/cds cds/ ],
2173   });
2174
2175 will return a set of all artists that have both a cd with title 'Down
2176 to Earth' and a cd with title 'Popular'.
2177
2178 If you want to fetch related objects from other tables as well, see C<prefetch>
2179 below.
2180
2181 =head2 prefetch
2182
2183 =over 4
2184
2185 =item Value: ($rel_name | \@rel_names | \%rel_names)
2186
2187 =back
2188
2189 Contains one or more relationships that should be fetched along with the main
2190 query (when they are accessed afterwards they will have already been
2191 "prefetched").  This is useful for when you know you will need the related
2192 objects, because it saves at least one query:
2193
2194   my $rs = $schema->resultset('Tag')->search(
2195     undef,
2196     {
2197       prefetch => {
2198         cd => 'artist'
2199       }
2200     }
2201   );
2202
2203 The initial search results in SQL like the following:
2204
2205   SELECT tag.*, cd.*, artist.* FROM tag
2206   JOIN cd ON tag.cd = cd.cdid
2207   JOIN artist ON cd.artist = artist.artistid
2208
2209 L<DBIx::Class> has no need to go back to the database when we access the
2210 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2211 case.
2212
2213 Simple prefetches will be joined automatically, so there is no need
2214 for a C<join> attribute in the above search. If you're prefetching to
2215 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2216 specify the join as well.
2217
2218 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2219 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2220 with an accessor type of 'single' or 'filter').
2221
2222 =head2 page
2223
2224 =over 4
2225
2226 =item Value: $page
2227
2228 =back
2229
2230 Makes the resultset paged and specifies the page to retrieve. Effectively
2231 identical to creating a non-pages resultset and then calling ->page($page)
2232 on it.
2233
2234 If L<rows> attribute is not specified it defualts to 10 rows per page.
2235
2236 =head2 rows
2237
2238 =over 4
2239
2240 =item Value: $rows
2241
2242 =back
2243
2244 Specifes the maximum number of rows for direct retrieval or the number of
2245 rows per page if the page attribute or method is used.
2246
2247 =head2 offset
2248
2249 =over 4
2250
2251 =item Value: $offset
2252
2253 =back
2254
2255 Specifies the (zero-based) row number for the  first row to be returned, or the
2256 of the first row of the first page if paging is used.
2257
2258 =head2 group_by
2259
2260 =over 4
2261
2262 =item Value: \@columns
2263
2264 =back
2265
2266 A arrayref of columns to group by. Can include columns of joined tables.
2267
2268   group_by => [qw/ column1 column2 ... /]
2269
2270 =head2 having
2271
2272 =over 4
2273
2274 =item Value: $condition
2275
2276 =back
2277
2278 HAVING is a select statement attribute that is applied between GROUP BY and
2279 ORDER BY. It is applied to the after the grouping calculations have been
2280 done.
2281
2282   having => { 'count(employee)' => { '>=', 100 } }
2283
2284 =head2 distinct
2285
2286 =over 4
2287
2288 =item Value: (0 | 1)
2289
2290 =back
2291
2292 Set to 1 to group by all columns.
2293
2294 =head2 where
2295
2296 =over 4
2297
2298 Adds to the WHERE clause.
2299
2300   # only return rows WHERE deleted IS NULL for all searches
2301   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2302
2303 Can be overridden by passing C<{ where => undef }> as an attribute
2304 to a resulset.
2305
2306 =back
2307
2308 =head2 cache
2309
2310 Set to 1 to cache search results. This prevents extra SQL queries if you
2311 revisit rows in your ResultSet:
2312
2313   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2314
2315   while( my $artist = $resultset->next ) {
2316     ... do stuff ...
2317   }
2318
2319   $rs->first; # without cache, this would issue a query
2320
2321 By default, searches are not cached.
2322
2323 For more examples of using these attributes, see
2324 L<DBIx::Class::Manual::Cookbook>.
2325
2326 =head2 from
2327
2328 =over 4
2329
2330 =item Value: \@from_clause
2331
2332 =back
2333
2334 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2335 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2336 clauses.
2337
2338 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2339
2340 C<join> will usually do what you need and it is strongly recommended that you
2341 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2342 And we really do mean "cannot", not just tried and failed. Attempting to use
2343 this because you're having problems with C<join> is like trying to use x86
2344 ASM because you've got a syntax error in your C. Trust us on this.
2345
2346 Now, if you're still really, really sure you need to use this (and if you're
2347 not 100% sure, ask the mailing list first), here's an explanation of how this
2348 works.
2349
2350 The syntax is as follows -
2351
2352   [
2353     { <alias1> => <table1> },
2354     [
2355       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2356       [], # nested JOIN (optional)
2357       { <table1.column1> => <table2.column2>, ... (more conditions) },
2358     ],
2359     # More of the above [ ] may follow for additional joins
2360   ]
2361
2362   <table1> <alias1>
2363   JOIN
2364     <table2> <alias2>
2365     [JOIN ...]
2366   ON <table1.column1> = <table2.column2>
2367   <more joins may follow>
2368
2369 An easy way to follow the examples below is to remember the following:
2370
2371     Anything inside "[]" is a JOIN
2372     Anything inside "{}" is a condition for the enclosing JOIN
2373
2374 The following examples utilize a "person" table in a family tree application.
2375 In order to express parent->child relationships, this table is self-joined:
2376
2377     # Person->belongs_to('father' => 'Person');
2378     # Person->belongs_to('mother' => 'Person');
2379
2380 C<from> can be used to nest joins. Here we return all children with a father,
2381 then search against all mothers of those children:
2382
2383   $rs = $schema->resultset('Person')->search(
2384       undef,
2385       {
2386           alias => 'mother', # alias columns in accordance with "from"
2387           from => [
2388               { mother => 'person' },
2389               [
2390                   [
2391                       { child => 'person' },
2392                       [
2393                           { father => 'person' },
2394                           { 'father.person_id' => 'child.father_id' }
2395                       ]
2396                   ],
2397                   { 'mother.person_id' => 'child.mother_id' }
2398               ],
2399           ]
2400       },
2401   );
2402
2403   # Equivalent SQL:
2404   # SELECT mother.* FROM person mother
2405   # JOIN (
2406   #   person child
2407   #   JOIN person father
2408   #   ON ( father.person_id = child.father_id )
2409   # )
2410   # ON ( mother.person_id = child.mother_id )
2411
2412 The type of any join can be controlled manually. To search against only people
2413 with a father in the person table, we could explicitly use C<INNER JOIN>:
2414
2415     $rs = $schema->resultset('Person')->search(
2416         undef,
2417         {
2418             alias => 'child', # alias columns in accordance with "from"
2419             from => [
2420                 { child => 'person' },
2421                 [
2422                     { father => 'person', -join_type => 'inner' },
2423                     { 'father.id' => 'child.father_id' }
2424                 ],
2425             ]
2426         },
2427     );
2428
2429     # Equivalent SQL:
2430     # SELECT child.* FROM person child
2431     # INNER JOIN person father ON child.father_id = father.id
2432
2433 =cut
2434
2435 1;