64d71f94a33602549966ab86ac723488986d5bb2
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   my $self = {
101     _source_handle => $source,
102     result_class => $attrs->{result_class} || $source->resolve->result_class,
103     cond => $attrs->{where},
104     count => undef,
105     pager => undef,
106     attrs => $attrs
107   };
108
109   bless $self, $class;
110
111   return $self;
112 }
113
114 =head2 search
115
116 =over 4
117
118 =item Arguments: $cond, \%attrs?
119
120 =item Return Value: $resultset (scalar context), @row_objs (list context)
121
122 =back
123
124   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
125   my $new_rs = $cd_rs->search({ year => 2005 });
126
127   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
128                  # year = 2005 OR year = 2004
129
130 If you need to pass in additional attributes but no additional condition,
131 call it as C<search(undef, \%attrs)>.
132
133   # "SELECT name, artistid FROM $artist_table"
134   my @all_artists = $schema->resultset('Artist')->search(undef, {
135     columns => [qw/name artistid/],
136   });
137
138 For a list of attributes that can be passed to C<search>, see
139 L</ATTRIBUTES>. For more examples of using this function, see
140 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
141 documentation for the first argument, see L<SQL::Abstract>.
142
143 =cut
144
145 sub search {
146   my $self = shift;
147   my $rs = $self->search_rs( @_ );
148   return (wantarray ? $rs->all : $rs);
149 }
150
151 =head2 search_rs
152
153 =over 4
154
155 =item Arguments: $cond, \%attrs?
156
157 =item Return Value: $resultset
158
159 =back
160
161 This method does the same exact thing as search() except it will
162 always return a resultset, even in list context.
163
164 =cut
165
166 sub search_rs {
167   my $self = shift;
168
169   my $rows;
170
171   unless (@_) {                 # no search, effectively just a clone
172     $rows = $self->get_cache;
173   }
174
175   my $attrs = {};
176   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
177   my $our_attrs = { %{$self->{attrs}} };
178   my $having = delete $our_attrs->{having};
179   my $where = delete $our_attrs->{where};
180
181   my $new_attrs = { %{$our_attrs}, %{$attrs} };
182
183   # merge new attrs into inherited
184   foreach my $key (qw/join prefetch/) {
185     next unless exists $attrs->{$key};
186     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
187   }
188
189   my $cond = (@_
190     ? (
191         (@_ == 1 || ref $_[0] eq "HASH")
192           ? (
193               (ref $_[0] eq 'HASH')
194                 ? (
195                     (keys %{ $_[0] }  > 0)
196                       ? shift
197                       : undef
198                    )
199                 :  shift
200              )
201           : (
202               (@_ % 2)
203                 ? $self->throw_exception("Odd number of arguments to search")
204                 : {@_}
205              )
206       )
207     : undef
208   );
209
210   if (defined $where) {
211     $new_attrs->{where} = (
212       defined $new_attrs->{where}
213         ? { '-and' => [
214               map {
215                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
216               } $where, $new_attrs->{where}
217             ]
218           }
219         : $where);
220   }
221
222   if (defined $cond) {
223     $new_attrs->{where} = (
224       defined $new_attrs->{where}
225         ? { '-and' => [
226               map {
227                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
228               } $cond, $new_attrs->{where}
229             ]
230           }
231         : $cond);
232   }
233
234   if (defined $having) {
235     $new_attrs->{having} = (
236       defined $new_attrs->{having}
237         ? { '-and' => [
238               map {
239                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
240               } $having, $new_attrs->{having}
241             ]
242           }
243         : $having);
244   }
245
246   my $rs = (ref $self)->new($self->result_source, $new_attrs);
247   if ($rows) {
248     $rs->set_cache($rows);
249   }
250   return $rs;
251 }
252
253 =head2 search_literal
254
255 =over 4
256
257 =item Arguments: $sql_fragment, @bind_values
258
259 =item Return Value: $resultset (scalar context), @row_objs (list context)
260
261 =back
262
263   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
264   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
265
266 Pass a literal chunk of SQL to be added to the conditional part of the
267 resultset query.
268
269 =cut
270
271 sub search_literal {
272   my ($self, $cond, @vals) = @_;
273   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
274   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
275   return $self->search(\$cond, $attrs);
276 }
277
278 =head2 find
279
280 =over 4
281
282 =item Arguments: @values | \%cols, \%attrs?
283
284 =item Return Value: $row_object
285
286 =back
287
288 Finds a row based on its primary key or unique constraint. For example, to find
289 a row by its primary key:
290
291   my $cd = $schema->resultset('CD')->find(5);
292
293 You can also find a row by a specific unique constraint using the C<key>
294 attribute. For example:
295
296   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
297     key => 'cd_artist_title'
298   });
299
300 Additionally, you can specify the columns explicitly by name:
301
302   my $cd = $schema->resultset('CD')->find(
303     {
304       artist => 'Massive Attack',
305       title  => 'Mezzanine',
306     },
307     { key => 'cd_artist_title' }
308   );
309
310 If the C<key> is specified as C<primary>, it searches only on the primary key.
311
312 If no C<key> is specified, it searches on all unique constraints defined on the
313 source, including the primary key.
314
315 If your table does not have a primary key, you B<must> provide a value for the
316 C<key> attribute matching one of the unique constraints on the source.
317
318 See also L</find_or_create> and L</update_or_create>. For information on how to
319 declare unique constraints, see
320 L<DBIx::Class::ResultSource/add_unique_constraint>.
321
322 =cut
323
324 sub find {
325   my $self = shift;
326   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
327
328   # Default to the primary key, but allow a specific key
329   my @cols = exists $attrs->{key}
330     ? $self->result_source->unique_constraint_columns($attrs->{key})
331     : $self->result_source->primary_columns;
332   $self->throw_exception(
333     "Can't find unless a primary key is defined or unique constraint is specified"
334   ) unless @cols;
335
336   # Parse out a hashref from input
337   my $input_query;
338   if (ref $_[0] eq 'HASH') {
339     $input_query = { %{$_[0]} };
340   }
341   elsif (@_ == @cols) {
342     $input_query = {};
343     @{$input_query}{@cols} = @_;
344   }
345   else {
346     # Compatibility: Allow e.g. find(id => $value)
347     carp "Find by key => value deprecated; please use a hashref instead";
348     $input_query = {@_};
349   }
350
351   my (%related, $info);
352
353   foreach my $key (keys %$input_query) {
354     if (ref($input_query->{$key})
355         && ($info = $self->result_source->relationship_info($key))) {
356       my $rel_q = $self->result_source->resolve_condition(
357                     $info->{cond}, delete $input_query->{$key}, $key
358                   );
359       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
360       @related{keys %$rel_q} = values %$rel_q;
361     }
362   }
363   if (my @keys = keys %related) {
364     @{$input_query}{@keys} = values %related;
365   }
366
367   my @unique_queries = $self->_unique_queries($input_query, $attrs);
368
369   # Build the final query: Default to the disjunction of the unique queries,
370   # but allow the input query in case the ResultSet defines the query or the
371   # user is abusing find
372   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
373   my $query = @unique_queries
374     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
375     : $self->_add_alias($input_query, $alias);
376
377   # Run the query
378   if (keys %$attrs) {
379     my $rs = $self->search($query, $attrs);
380     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
381   }
382   else {
383     return keys %{$self->_resolved_attrs->{collapse}}
384       ? $self->search($query)->next
385       : $self->single($query);
386   }
387 }
388
389 # _add_alias
390 #
391 # Add the specified alias to the specified query hash. A copy is made so the
392 # original query is not modified.
393
394 sub _add_alias {
395   my ($self, $query, $alias) = @_;
396
397   my %aliased = %$query;
398   foreach my $col (grep { ! m/\./ } keys %aliased) {
399     $aliased{"$alias.$col"} = delete $aliased{$col};
400   }
401
402   return \%aliased;
403 }
404
405 # _unique_queries
406 #
407 # Build a list of queries which satisfy unique constraints.
408
409 sub _unique_queries {
410   my ($self, $query, $attrs) = @_;
411
412   my @constraint_names = exists $attrs->{key}
413     ? ($attrs->{key})
414     : $self->result_source->unique_constraint_names;
415
416   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
417   my $num_where = scalar keys %$where;
418
419   my @unique_queries;
420   foreach my $name (@constraint_names) {
421     my @unique_cols = $self->result_source->unique_constraint_columns($name);
422     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
423
424     my $num_cols = scalar @unique_cols;
425     my $num_query = scalar keys %$unique_query;
426
427     my $total = $num_query + $num_where;
428     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
429       # The query is either unique on its own or is unique in combination with
430       # the existing where clause
431       push @unique_queries, $unique_query;
432     }
433   }
434
435   return @unique_queries;
436 }
437
438 # _build_unique_query
439 #
440 # Constrain the specified query hash based on the specified column names.
441
442 sub _build_unique_query {
443   my ($self, $query, $unique_cols) = @_;
444
445   return {
446     map  { $_ => $query->{$_} }
447     grep { exists $query->{$_} }
448       @$unique_cols
449   };
450 }
451
452 =head2 search_related
453
454 =over 4
455
456 =item Arguments: $rel, $cond, \%attrs?
457
458 =item Return Value: $new_resultset
459
460 =back
461
462   $new_rs = $cd_rs->search_related('artist', {
463     name => 'Emo-R-Us',
464   });
465
466 Searches the specified relationship, optionally specifying a condition and
467 attributes for matching records. See L</ATTRIBUTES> for more information.
468
469 =cut
470
471 sub search_related {
472   return shift->related_resultset(shift)->search(@_);
473 }
474
475 =head2 cursor
476
477 =over 4
478
479 =item Arguments: none
480
481 =item Return Value: $cursor
482
483 =back
484
485 Returns a storage-driven cursor to the given resultset. See
486 L<DBIx::Class::Cursor> for more information.
487
488 =cut
489
490 sub cursor {
491   my ($self) = @_;
492
493   my $attrs = { %{$self->_resolved_attrs} };
494   return $self->{cursor}
495     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
496           $attrs->{where},$attrs);
497 }
498
499 =head2 single
500
501 =over 4
502
503 =item Arguments: $cond?
504
505 =item Return Value: $row_object?
506
507 =back
508
509   my $cd = $schema->resultset('CD')->single({ year => 2001 });
510
511 Inflates the first result without creating a cursor if the resultset has
512 any records in it; if not returns nothing. Used by L</find> as an optimisation.
513
514 Can optionally take an additional condition *only* - this is a fast-code-path
515 method; if you need to add extra joins or similar call ->search and then
516 ->single without a condition on the $rs returned from that.
517
518 =cut
519
520 sub single {
521   my ($self, $where) = @_;
522   my $attrs = { %{$self->_resolved_attrs} };
523   if ($where) {
524     if (defined $attrs->{where}) {
525       $attrs->{where} = {
526         '-and' =>
527             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
528                $where, delete $attrs->{where} ]
529       };
530     } else {
531       $attrs->{where} = $where;
532     }
533   }
534
535 #  XXX: Disabled since it doesn't infer uniqueness in all cases
536 #  unless ($self->_is_unique_query($attrs->{where})) {
537 #    carp "Query not guaranteed to return a single row"
538 #      . "; please declare your unique constraints or use search instead";
539 #  }
540
541   my @data = $self->result_source->storage->select_single(
542     $attrs->{from}, $attrs->{select},
543     $attrs->{where}, $attrs
544   );
545
546   return (@data ? ($self->_construct_object(@data))[0] : undef);
547 }
548
549 # _is_unique_query
550 #
551 # Try to determine if the specified query is guaranteed to be unique, based on
552 # the declared unique constraints.
553
554 sub _is_unique_query {
555   my ($self, $query) = @_;
556
557   my $collapsed = $self->_collapse_query($query);
558   my $alias = $self->{attrs}{alias};
559
560   foreach my $name ($self->result_source->unique_constraint_names) {
561     my @unique_cols = map {
562       "$alias.$_"
563     } $self->result_source->unique_constraint_columns($name);
564
565     # Count the values for each unique column
566     my %seen = map { $_ => 0 } @unique_cols;
567
568     foreach my $key (keys %$collapsed) {
569       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
570       next unless exists $seen{$aliased};  # Additional constraints are okay
571       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
572     }
573
574     # If we get 0 or more than 1 value for a column, it's not necessarily unique
575     return 1 unless grep { $_ != 1 } values %seen;
576   }
577
578   return 0;
579 }
580
581 # _collapse_query
582 #
583 # Recursively collapse the query, accumulating values for each column.
584
585 sub _collapse_query {
586   my ($self, $query, $collapsed) = @_;
587
588   $collapsed ||= {};
589
590   if (ref $query eq 'ARRAY') {
591     foreach my $subquery (@$query) {
592       next unless ref $subquery;  # -or
593 #      warn "ARRAY: " . Dumper $subquery;
594       $collapsed = $self->_collapse_query($subquery, $collapsed);
595     }
596   }
597   elsif (ref $query eq 'HASH') {
598     if (keys %$query and (keys %$query)[0] eq '-and') {
599       foreach my $subquery (@{$query->{-and}}) {
600 #        warn "HASH: " . Dumper $subquery;
601         $collapsed = $self->_collapse_query($subquery, $collapsed);
602       }
603     }
604     else {
605 #      warn "LEAF: " . Dumper $query;
606       foreach my $col (keys %$query) {
607         my $value = $query->{$col};
608         $collapsed->{$col}{$value}++;
609       }
610     }
611   }
612
613   return $collapsed;
614 }
615
616 =head2 get_column
617
618 =over 4
619
620 =item Arguments: $cond?
621
622 =item Return Value: $resultsetcolumn
623
624 =back
625
626   my $max_length = $rs->get_column('length')->max;
627
628 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
629
630 =cut
631
632 sub get_column {
633   my ($self, $column) = @_;
634   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
635   return $new;
636 }
637
638 =head2 search_like
639
640 =over 4
641
642 =item Arguments: $cond, \%attrs?
643
644 =item Return Value: $resultset (scalar context), @row_objs (list context)
645
646 =back
647
648   # WHERE title LIKE '%blue%'
649   $cd_rs = $rs->search_like({ title => '%blue%'});
650
651 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
652 that this is simply a convenience method. You most likely want to use
653 L</search> with specific operators.
654
655 For more information, see L<DBIx::Class::Manual::Cookbook>.
656
657 =cut
658
659 sub search_like {
660   my $class = shift;
661   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
662   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
663   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
664   return $class->search($query, { %$attrs });
665 }
666
667 =head2 slice
668
669 =over 4
670
671 =item Arguments: $first, $last
672
673 =item Return Value: $resultset (scalar context), @row_objs (list context)
674
675 =back
676
677 Returns a resultset or object list representing a subset of elements from the
678 resultset slice is called on. Indexes are from 0, i.e., to get the first
679 three records, call:
680
681   my ($one, $two, $three) = $rs->slice(0, 2);
682
683 =cut
684
685 sub slice {
686   my ($self, $min, $max) = @_;
687   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
688   $attrs->{offset} = $self->{attrs}{offset} || 0;
689   $attrs->{offset} += $min;
690   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
691   return $self->search(undef(), $attrs);
692   #my $slice = (ref $self)->new($self->result_source, $attrs);
693   #return (wantarray ? $slice->all : $slice);
694 }
695
696 =head2 next
697
698 =over 4
699
700 =item Arguments: none
701
702 =item Return Value: $result?
703
704 =back
705
706 Returns the next element in the resultset (C<undef> is there is none).
707
708 Can be used to efficiently iterate over records in the resultset:
709
710   my $rs = $schema->resultset('CD')->search;
711   while (my $cd = $rs->next) {
712     print $cd->title;
713   }
714
715 Note that you need to store the resultset object, and call C<next> on it.
716 Calling C<< resultset('Table')->next >> repeatedly will always return the
717 first record from the resultset.
718
719 =cut
720
721 sub next {
722   my ($self) = @_;
723   if (my $cache = $self->get_cache) {
724     $self->{all_cache_position} ||= 0;
725     return $cache->[$self->{all_cache_position}++];
726   }
727   if ($self->{attrs}{cache}) {
728     $self->{all_cache_position} = 1;
729     return ($self->all)[0];
730   }
731   if ($self->{stashed_objects}) {
732     my $obj = shift(@{$self->{stashed_objects}});
733     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
734     return $obj;
735   }
736   my @row = (
737     exists $self->{stashed_row}
738       ? @{delete $self->{stashed_row}}
739       : $self->cursor->next
740   );
741   return undef unless (@row);
742   my ($row, @more) = $self->_construct_object(@row);
743   $self->{stashed_objects} = \@more if @more;
744   return $row;
745 }
746
747 sub _construct_object {
748   my ($self, @row) = @_;
749   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
750   my @new = $self->result_class->inflate_result($self->result_source, @$info);
751   @new = $self->{_attrs}{record_filter}->(@new)
752     if exists $self->{_attrs}{record_filter};
753   return @new;
754 }
755
756 sub _collapse_result {
757   my ($self, $as_proto, $row) = @_;
758
759   my @copy = @$row;
760
761   # 'foo'         => [ undef, 'foo' ]
762   # 'foo.bar'     => [ 'foo', 'bar' ]
763   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
764
765   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
766
767   my %collapse = %{$self->{_attrs}{collapse}||{}};
768
769   my @pri_index;
770
771   # if we're doing collapsing (has_many prefetch) we need to grab records
772   # until the PK changes, so fill @pri_index. if not, we leave it empty so
773   # we know we don't have to bother.
774
775   # the reason for not using the collapse stuff directly is because if you
776   # had for e.g. two artists in a row with no cds, the collapse info for
777   # both would be NULL (undef) so you'd lose the second artist
778
779   # store just the index so we can check the array positions from the row
780   # without having to contruct the full hash
781
782   if (keys %collapse) {
783     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
784     foreach my $i (0 .. $#construct_as) {
785       next if defined($construct_as[$i][0]); # only self table
786       if (delete $pri{$construct_as[$i][1]}) {
787         push(@pri_index, $i);
788       }
789       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
790     }
791   }
792
793   # no need to do an if, it'll be empty if @pri_index is empty anyway
794
795   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
796
797   my @const_rows;
798
799   do { # no need to check anything at the front, we always want the first row
800
801     my %const;
802   
803     foreach my $this_as (@construct_as) {
804       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
805     }
806
807     push(@const_rows, \%const);
808
809   } until ( # no pri_index => no collapse => drop straight out
810       !@pri_index
811     or
812       do { # get another row, stash it, drop out if different PK
813
814         @copy = $self->cursor->next;
815         $self->{stashed_row} = \@copy;
816
817         # last thing in do block, counts as true if anything doesn't match
818
819         # check xor defined first for NULL vs. NOT NULL then if one is
820         # defined the other must be so check string equality
821
822         grep {
823           (defined $pri_vals{$_} ^ defined $copy[$_])
824           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
825         } @pri_index;
826       }
827   );
828
829   my $alias = $self->{attrs}{alias};
830   my $info = [];
831
832   my %collapse_pos;
833
834   my @const_keys;
835
836   use Data::Dumper;
837
838   foreach my $const (@const_rows) {
839     scalar @const_keys or do {
840       @const_keys = sort { length($a) <=> length($b) } keys %$const;
841     };
842     foreach my $key (@const_keys) {
843       if (length $key) {
844         my $target = $info;
845         my @parts = split(/\./, $key);
846         my $cur = '';
847         my $data = $const->{$key};
848         foreach my $p (@parts) {
849           $target = $target->[1]->{$p} ||= [];
850           $cur .= ".${p}";
851           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
852             # collapsing at this point and on final part
853             my $pos = $collapse_pos{$cur};
854             CK: foreach my $ck (@ckey) {
855               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
856                 $collapse_pos{$cur} = $data;
857                 delete @collapse_pos{ # clear all positioning for sub-entries
858                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
859                 };
860                 push(@$target, []);
861                 last CK;
862               }
863             }
864           }
865           if (exists $collapse{$cur}) {
866             $target = $target->[-1];
867           }
868         }
869         $target->[0] = $data;
870       } else {
871         $info->[0] = $const->{$key};
872       }
873     }
874   }
875
876   return $info;
877 }
878
879 =head2 result_source
880
881 =over 4
882
883 =item Arguments: $result_source?
884
885 =item Return Value: $result_source
886
887 =back
888
889 An accessor for the primary ResultSource object from which this ResultSet
890 is derived.
891
892 =head2 result_class
893
894 =over 4
895
896 =item Arguments: $result_class?
897
898 =item Return Value: $result_class
899
900 =back
901
902 An accessor for the class to use when creating row objects. Defaults to 
903 C<< result_source->result_class >> - which in most cases is the name of the 
904 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
905
906 =cut
907
908
909 =head2 count
910
911 =over 4
912
913 =item Arguments: $cond, \%attrs??
914
915 =item Return Value: $count
916
917 =back
918
919 Performs an SQL C<COUNT> with the same query as the resultset was built
920 with to find the number of elements. If passed arguments, does a search
921 on the resultset and counts the results of that.
922
923 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
924 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
925 not support C<DISTINCT> with multiple columns. If you are using such a
926 database, you should only use columns from the main table in your C<group_by>
927 clause.
928
929 =cut
930
931 sub count {
932   my $self = shift;
933   return $self->search(@_)->count if @_ and defined $_[0];
934   return scalar @{ $self->get_cache } if $self->get_cache;
935   my $count = $self->_count;
936   return 0 unless $count;
937
938   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
939   $count = $self->{attrs}{rows} if
940     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
941   return $count;
942 }
943
944 sub _count { # Separated out so pager can get the full count
945   my $self = shift;
946   my $select = { count => '*' };
947
948   my $attrs = { %{$self->_resolved_attrs} };
949   if (my $group_by = delete $attrs->{group_by}) {
950     delete $attrs->{having};
951     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
952     # todo: try CONCAT for multi-column pk
953     my @pk = $self->result_source->primary_columns;
954     if (@pk == 1) {
955       my $alias = $attrs->{alias};
956       foreach my $column (@distinct) {
957         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
958           @distinct = ($column);
959           last;
960         }
961       }
962     }
963
964     $select = { count => { distinct => \@distinct } };
965   }
966
967   $attrs->{select} = $select;
968   $attrs->{as} = [qw/count/];
969
970   # offset, order by and page are not needed to count. record_filter is cdbi
971   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
972
973   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
974   my ($count) = $tmp_rs->cursor->next;
975   return $count;
976 }
977
978 =head2 count_literal
979
980 =over 4
981
982 =item Arguments: $sql_fragment, @bind_values
983
984 =item Return Value: $count
985
986 =back
987
988 Counts the results in a literal query. Equivalent to calling L</search_literal>
989 with the passed arguments, then L</count>.
990
991 =cut
992
993 sub count_literal { shift->search_literal(@_)->count; }
994
995 =head2 all
996
997 =over 4
998
999 =item Arguments: none
1000
1001 =item Return Value: @objects
1002
1003 =back
1004
1005 Returns all elements in the resultset. Called implicitly if the resultset
1006 is returned in list context.
1007
1008 =cut
1009
1010 sub all {
1011   my ($self) = @_;
1012   return @{ $self->get_cache } if $self->get_cache;
1013
1014   my @obj;
1015
1016   # TODO: don't call resolve here
1017   if (keys %{$self->_resolved_attrs->{collapse}}) {
1018 #  if ($self->{attrs}{prefetch}) {
1019       # Using $self->cursor->all is really just an optimisation.
1020       # If we're collapsing has_many prefetches it probably makes
1021       # very little difference, and this is cleaner than hacking
1022       # _construct_object to survive the approach
1023     my @row = $self->cursor->next;
1024     while (@row) {
1025       push(@obj, $self->_construct_object(@row));
1026       @row = (exists $self->{stashed_row}
1027                ? @{delete $self->{stashed_row}}
1028                : $self->cursor->next);
1029     }
1030   } else {
1031     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1032   }
1033
1034   $self->set_cache(\@obj) if $self->{attrs}{cache};
1035   return @obj;
1036 }
1037
1038 =head2 reset
1039
1040 =over 4
1041
1042 =item Arguments: none
1043
1044 =item Return Value: $self
1045
1046 =back
1047
1048 Resets the resultset's cursor, so you can iterate through the elements again.
1049
1050 =cut
1051
1052 sub reset {
1053   my ($self) = @_;
1054   delete $self->{_attrs} if exists $self->{_attrs};
1055   $self->{all_cache_position} = 0;
1056   $self->cursor->reset;
1057   return $self;
1058 }
1059
1060 =head2 first
1061
1062 =over 4
1063
1064 =item Arguments: none
1065
1066 =item Return Value: $object?
1067
1068 =back
1069
1070 Resets the resultset and returns an object for the first result (if the
1071 resultset returns anything).
1072
1073 =cut
1074
1075 sub first {
1076   return $_[0]->reset->next;
1077 }
1078
1079 # _cond_for_update_delete
1080 #
1081 # update/delete require the condition to be modified to handle
1082 # the differing SQL syntax available.  This transforms the $self->{cond}
1083 # appropriately, returning the new condition.
1084
1085 sub _cond_for_update_delete {
1086   my ($self, $full_cond) = @_;
1087   my $cond = {};
1088
1089   $full_cond ||= $self->{cond};
1090   # No-op. No condition, we're updating/deleting everything
1091   return $cond unless ref $full_cond;
1092
1093   if (ref $full_cond eq 'ARRAY') {
1094     $cond = [
1095       map {
1096         my %hash;
1097         foreach my $key (keys %{$_}) {
1098           $key =~ /([^.]+)$/;
1099           $hash{$1} = $_->{$key};
1100         }
1101         \%hash;
1102       } @{$full_cond}
1103     ];
1104   }
1105   elsif (ref $full_cond eq 'HASH') {
1106     if ((keys %{$full_cond})[0] eq '-and') {
1107       $cond->{-and} = [];
1108
1109       my @cond = @{$full_cond->{-and}};
1110       for (my $i = 0; $i < @cond; $i++) {
1111         my $entry = $cond[$i];
1112
1113         my $hash;
1114         if (ref $entry eq 'HASH') {
1115           $hash = $self->_cond_for_update_delete($entry);
1116         }
1117         else {
1118           $entry =~ /([^.]+)$/;
1119           $hash->{$1} = $cond[++$i];
1120         }
1121
1122         push @{$cond->{-and}}, $hash;
1123       }
1124     }
1125     else {
1126       foreach my $key (keys %{$full_cond}) {
1127         $key =~ /([^.]+)$/;
1128         $cond->{$1} = $full_cond->{$key};
1129       }
1130     }
1131   }
1132   else {
1133     $self->throw_exception(
1134       "Can't update/delete on resultset with condition unless hash or array"
1135     );
1136   }
1137
1138   return $cond;
1139 }
1140
1141
1142 =head2 update
1143
1144 =over 4
1145
1146 =item Arguments: \%values
1147
1148 =item Return Value: $storage_rv
1149
1150 =back
1151
1152 Sets the specified columns in the resultset to the supplied values in a
1153 single query. Return value will be true if the update succeeded or false
1154 if no records were updated; exact type of success value is storage-dependent.
1155
1156 =cut
1157
1158 sub update {
1159   my ($self, $values) = @_;
1160   $self->throw_exception("Values for update must be a hash")
1161     unless ref $values eq 'HASH';
1162
1163   my $cond = $self->_cond_for_update_delete;
1164    
1165   return $self->result_source->storage->update(
1166     $self->result_source, $values, $cond
1167   );
1168 }
1169
1170 =head2 update_all
1171
1172 =over 4
1173
1174 =item Arguments: \%values
1175
1176 =item Return Value: 1
1177
1178 =back
1179
1180 Fetches all objects and updates them one at a time. Note that C<update_all>
1181 will run DBIC cascade triggers, while L</update> will not.
1182
1183 =cut
1184
1185 sub update_all {
1186   my ($self, $values) = @_;
1187   $self->throw_exception("Values for update must be a hash")
1188     unless ref $values eq 'HASH';
1189   foreach my $obj ($self->all) {
1190     $obj->set_columns($values)->update;
1191   }
1192   return 1;
1193 }
1194
1195 =head2 delete
1196
1197 =over 4
1198
1199 =item Arguments: none
1200
1201 =item Return Value: 1
1202
1203 =back
1204
1205 Deletes the contents of the resultset from its result source. Note that this
1206 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1207 to run. See also L<DBIx::Class::Row/delete>.
1208
1209 =cut
1210
1211 sub delete {
1212   my ($self) = @_;
1213
1214   my $cond = $self->_cond_for_update_delete;
1215
1216   $self->result_source->storage->delete($self->result_source, $cond);
1217   return 1;
1218 }
1219
1220 =head2 delete_all
1221
1222 =over 4
1223
1224 =item Arguments: none
1225
1226 =item Return Value: 1
1227
1228 =back
1229
1230 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1231 will run DBIC cascade triggers, while L</delete> will not.
1232
1233 =cut
1234
1235 sub delete_all {
1236   my ($self) = @_;
1237   $_->delete for $self->all;
1238   return 1;
1239 }
1240
1241 =head2 pager
1242
1243 =over 4
1244
1245 =item Arguments: none
1246
1247 =item Return Value: $pager
1248
1249 =back
1250
1251 Return Value a L<Data::Page> object for the current resultset. Only makes
1252 sense for queries with a C<page> attribute.
1253
1254 =cut
1255
1256 sub pager {
1257   my ($self) = @_;
1258   my $attrs = $self->{attrs};
1259   $self->throw_exception("Can't create pager for non-paged rs")
1260     unless $self->{attrs}{page};
1261   $attrs->{rows} ||= 10;
1262   return $self->{pager} ||= Data::Page->new(
1263     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1264 }
1265
1266 =head2 page
1267
1268 =over 4
1269
1270 =item Arguments: $page_number
1271
1272 =item Return Value: $rs
1273
1274 =back
1275
1276 Returns a resultset for the $page_number page of the resultset on which page
1277 is called, where each page contains a number of rows equal to the 'rows'
1278 attribute set on the resultset (10 by default).
1279
1280 =cut
1281
1282 sub page {
1283   my ($self, $page) = @_;
1284   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1285 }
1286
1287 =head2 new_result
1288
1289 =over 4
1290
1291 =item Arguments: \%vals
1292
1293 =item Return Value: $object
1294
1295 =back
1296
1297 Creates an object in the resultset's result class and returns it.
1298
1299 =cut
1300
1301 sub new_result {
1302   my ($self, $values) = @_;
1303   $self->throw_exception( "new_result needs a hash" )
1304     unless (ref $values eq 'HASH');
1305   $self->throw_exception(
1306     "Can't abstract implicit construct, condition not a hash"
1307   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1308
1309   my $alias = $self->{attrs}{alias};
1310   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1311   my %new = (
1312     %{ $self->_remove_alias($values, $alias) },
1313     %{ $self->_remove_alias($collapsed_cond, $alias) },
1314     -source_handle => $self->_source_handle,
1315     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1316   );
1317
1318   return $self->result_class->new(\%new);
1319 }
1320
1321 # _collapse_cond
1322 #
1323 # Recursively collapse the condition.
1324
1325 sub _collapse_cond {
1326   my ($self, $cond, $collapsed) = @_;
1327
1328   $collapsed ||= {};
1329
1330   if (ref $cond eq 'ARRAY') {
1331     foreach my $subcond (@$cond) {
1332       next unless ref $subcond;  # -or
1333 #      warn "ARRAY: " . Dumper $subcond;
1334       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1335     }
1336   }
1337   elsif (ref $cond eq 'HASH') {
1338     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1339       foreach my $subcond (@{$cond->{-and}}) {
1340 #        warn "HASH: " . Dumper $subcond;
1341         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1342       }
1343     }
1344     else {
1345 #      warn "LEAF: " . Dumper $cond;
1346       foreach my $col (keys %$cond) {
1347         my $value = $cond->{$col};
1348         $collapsed->{$col} = $value;
1349       }
1350     }
1351   }
1352
1353   return $collapsed;
1354 }
1355
1356 # _remove_alias
1357 #
1358 # Remove the specified alias from the specified query hash. A copy is made so
1359 # the original query is not modified.
1360
1361 sub _remove_alias {
1362   my ($self, $query, $alias) = @_;
1363
1364   my %orig = %{ $query || {} };
1365   my %unaliased;
1366
1367   foreach my $key (keys %orig) {
1368     if ($key !~ /\./) {
1369       $unaliased{$key} = $orig{$key};
1370       next;
1371     }
1372     $unaliased{$1} = $orig{$key}
1373       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1374   }
1375
1376   return \%unaliased;
1377 }
1378
1379 =head2 find_or_new
1380
1381 =over 4
1382
1383 =item Arguments: \%vals, \%attrs?
1384
1385 =item Return Value: $object
1386
1387 =back
1388
1389 Find an existing record from this resultset. If none exists, instantiate a new
1390 result object and return it. The object will not be saved into your storage
1391 until you call L<DBIx::Class::Row/insert> on it.
1392
1393 If you want objects to be saved immediately, use L</find_or_create> instead.
1394
1395 =cut
1396
1397 sub find_or_new {
1398   my $self     = shift;
1399   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1400   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1401   my $exists   = $self->find($hash, $attrs);
1402   return defined $exists ? $exists : $self->new_result($hash);
1403 }
1404
1405 =head2 create
1406
1407 =over 4
1408
1409 =item Arguments: \%vals
1410
1411 =item Return Value: $object
1412
1413 =back
1414
1415 Inserts a record into the resultset and returns the object representing it.
1416
1417 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1418
1419 =cut
1420
1421 sub create {
1422   my ($self, $attrs) = @_;
1423   $self->throw_exception( "create needs a hashref" )
1424     unless ref $attrs eq 'HASH';
1425   return $self->new_result($attrs)->insert;
1426 }
1427
1428 =head2 find_or_create
1429
1430 =over 4
1431
1432 =item Arguments: \%vals, \%attrs?
1433
1434 =item Return Value: $object
1435
1436 =back
1437
1438   $class->find_or_create({ key => $val, ... });
1439
1440 Tries to find a record based on its primary key or unique constraint; if none
1441 is found, creates one and returns that instead.
1442
1443   my $cd = $schema->resultset('CD')->find_or_create({
1444     cdid   => 5,
1445     artist => 'Massive Attack',
1446     title  => 'Mezzanine',
1447     year   => 2005,
1448   });
1449
1450 Also takes an optional C<key> attribute, to search by a specific key or unique
1451 constraint. For example:
1452
1453   my $cd = $schema->resultset('CD')->find_or_create(
1454     {
1455       artist => 'Massive Attack',
1456       title  => 'Mezzanine',
1457     },
1458     { key => 'cd_artist_title' }
1459   );
1460
1461 See also L</find> and L</update_or_create>. For information on how to declare
1462 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1463
1464 =cut
1465
1466 sub find_or_create {
1467   my $self     = shift;
1468   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1469   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1470   my $exists   = $self->find($hash, $attrs);
1471   return defined $exists ? $exists : $self->create($hash);
1472 }
1473
1474 =head2 update_or_create
1475
1476 =over 4
1477
1478 =item Arguments: \%col_values, { key => $unique_constraint }?
1479
1480 =item Return Value: $object
1481
1482 =back
1483
1484   $class->update_or_create({ col => $val, ... });
1485
1486 First, searches for an existing row matching one of the unique constraints
1487 (including the primary key) on the source of this resultset. If a row is
1488 found, updates it with the other given column values. Otherwise, creates a new
1489 row.
1490
1491 Takes an optional C<key> attribute to search on a specific unique constraint.
1492 For example:
1493
1494   # In your application
1495   my $cd = $schema->resultset('CD')->update_or_create(
1496     {
1497       artist => 'Massive Attack',
1498       title  => 'Mezzanine',
1499       year   => 1998,
1500     },
1501     { key => 'cd_artist_title' }
1502   );
1503
1504 If no C<key> is specified, it searches on all unique constraints defined on the
1505 source, including the primary key.
1506
1507 If the C<key> is specified as C<primary>, it searches only on the primary key.
1508
1509 See also L</find> and L</find_or_create>. For information on how to declare
1510 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1511
1512 =cut
1513
1514 sub update_or_create {
1515   my $self = shift;
1516   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1517   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1518
1519   my $row = $self->find($cond, $attrs);
1520   if (defined $row) {
1521     $row->update($cond);
1522     return $row;
1523   }
1524
1525   return $self->create($cond);
1526 }
1527
1528 =head2 get_cache
1529
1530 =over 4
1531
1532 =item Arguments: none
1533
1534 =item Return Value: \@cache_objects?
1535
1536 =back
1537
1538 Gets the contents of the cache for the resultset, if the cache is set.
1539
1540 =cut
1541
1542 sub get_cache {
1543   shift->{all_cache};
1544 }
1545
1546 =head2 set_cache
1547
1548 =over 4
1549
1550 =item Arguments: \@cache_objects
1551
1552 =item Return Value: \@cache_objects
1553
1554 =back
1555
1556 Sets the contents of the cache for the resultset. Expects an arrayref
1557 of objects of the same class as those produced by the resultset. Note that
1558 if the cache is set the resultset will return the cached objects rather
1559 than re-querying the database even if the cache attr is not set.
1560
1561 =cut
1562
1563 sub set_cache {
1564   my ( $self, $data ) = @_;
1565   $self->throw_exception("set_cache requires an arrayref")
1566       if defined($data) && (ref $data ne 'ARRAY');
1567   $self->{all_cache} = $data;
1568 }
1569
1570 =head2 clear_cache
1571
1572 =over 4
1573
1574 =item Arguments: none
1575
1576 =item Return Value: []
1577
1578 =back
1579
1580 Clears the cache for the resultset.
1581
1582 =cut
1583
1584 sub clear_cache {
1585   shift->set_cache(undef);
1586 }
1587
1588 =head2 related_resultset
1589
1590 =over 4
1591
1592 =item Arguments: $relationship_name
1593
1594 =item Return Value: $resultset
1595
1596 =back
1597
1598 Returns a related resultset for the supplied relationship name.
1599
1600   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1601
1602 =cut
1603
1604 sub related_resultset {
1605   my ($self, $rel) = @_;
1606
1607   $self->{related_resultsets} ||= {};
1608   return $self->{related_resultsets}{$rel} ||= do {
1609     my $rel_obj = $self->result_source->relationship_info($rel);
1610
1611     $self->throw_exception(
1612       "search_related: result source '" . $self->_source_handle->source_moniker .
1613         "' has no such relationship $rel")
1614       unless $rel_obj;
1615     
1616     my ($from,$seen) = $self->_resolve_from($rel);
1617
1618     my $join_count = $seen->{$rel};
1619     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1620
1621     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
1622     my %attrs = %{$self->{attrs}||{}};
1623     delete $attrs{result_class};
1624
1625     my $new_cache;
1626
1627     if (my $cache = $self->get_cache) {
1628       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
1629         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
1630                         @$cache ];
1631       }
1632     }
1633
1634     my $new = $self->_source_handle
1635                    ->schema
1636                    ->resultset($rel_obj->{class})
1637                    ->search_rs(
1638                        undef, {
1639                          %attrs,
1640                          join => undef,
1641                          prefetch => undef,
1642                          select => undef,
1643                          as => undef,
1644                          alias => $alias,
1645                          where => $self->{cond},
1646                          seen_join => $seen,
1647                          from => $from,
1648                      });
1649     $new->set_cache($new_cache) if $new_cache;
1650     $new;
1651   };
1652 }
1653
1654 sub _resolve_from {
1655   my ($self, $extra_join) = @_;
1656   my $source = $self->result_source;
1657   my $attrs = $self->{attrs};
1658   
1659   my $from = $attrs->{from}
1660     || [ { $attrs->{alias} => $source->from } ];
1661     
1662   my $seen = { %{$attrs->{seen_join}||{}} };
1663
1664   my $join = ($attrs->{join}
1665                ? [ $attrs->{join}, $extra_join ]
1666                : $extra_join);
1667   $from = [
1668     @$from,
1669     ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
1670   ];
1671
1672   return ($from,$seen);
1673 }
1674
1675 sub _resolved_attrs {
1676   my $self = shift;
1677   return $self->{_attrs} if $self->{_attrs};
1678
1679   my $attrs = { %{$self->{attrs}||{}} };
1680   my $source = $self->result_source;
1681   my $alias = $attrs->{alias};
1682
1683   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1684   if ($attrs->{columns}) {
1685     delete $attrs->{as};
1686   } elsif (!$attrs->{select}) {
1687     $attrs->{columns} = [ $source->columns ];
1688   }
1689  
1690   $attrs->{select} = 
1691     ($attrs->{select}
1692       ? (ref $attrs->{select} eq 'ARRAY'
1693           ? [ @{$attrs->{select}} ]
1694           : [ $attrs->{select} ])
1695       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1696     );
1697   $attrs->{as} =
1698     ($attrs->{as}
1699       ? (ref $attrs->{as} eq 'ARRAY'
1700           ? [ @{$attrs->{as}} ]
1701           : [ $attrs->{as} ])
1702       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1703     );
1704   
1705   my $adds;
1706   if ($adds = delete $attrs->{include_columns}) {
1707     $adds = [$adds] unless ref $adds eq 'ARRAY';
1708     push(@{$attrs->{select}}, @$adds);
1709     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1710   }
1711   if ($adds = delete $attrs->{'+select'}) {
1712     $adds = [$adds] unless ref $adds eq 'ARRAY';
1713     push(@{$attrs->{select}},
1714            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1715   }
1716   if (my $adds = delete $attrs->{'+as'}) {
1717     $adds = [$adds] unless ref $adds eq 'ARRAY';
1718     push(@{$attrs->{as}}, @$adds);
1719   }
1720
1721   $attrs->{from} ||= [ { 'me' => $source->from } ];
1722
1723   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1724     my $join = delete $attrs->{join} || {};
1725
1726     if (defined $attrs->{prefetch}) {
1727       $join = $self->_merge_attr(
1728         $join, $attrs->{prefetch}
1729       );
1730     }
1731
1732     $attrs->{from} =   # have to copy here to avoid corrupting the original
1733       [
1734         @{$attrs->{from}}, 
1735         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1736       ];
1737   }
1738
1739   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1740   if ($attrs->{order_by}) {
1741     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1742                            ? [ @{$attrs->{order_by}} ]
1743                            : [ $attrs->{order_by} ]);
1744   } else {
1745     $attrs->{order_by} = [];    
1746   }
1747
1748   my $collapse = $attrs->{collapse} || {};
1749   if (my $prefetch = delete $attrs->{prefetch}) {
1750     $prefetch = $self->_merge_attr({}, $prefetch);
1751     my @pre_order;
1752     my $seen = $attrs->{seen_join} || {};
1753     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1754       # bring joins back to level of current class
1755       my @prefetch = $source->resolve_prefetch(
1756         $p, $alias, $seen, \@pre_order, $collapse
1757       );
1758       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1759       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1760     }
1761     push(@{$attrs->{order_by}}, @pre_order);
1762   }
1763   $attrs->{collapse} = $collapse;
1764
1765   return $self->{_attrs} = $attrs;
1766 }
1767
1768 sub _merge_attr {
1769   my ($self, $a, $b) = @_;
1770   return $b unless defined($a);
1771   return $a unless defined($b);
1772   
1773   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
1774     foreach my $key (keys %{$b}) {
1775       if (exists $a->{$key}) {
1776         $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
1777       } else {
1778         $a->{$key} = $b->{$key};
1779       }
1780     }
1781     return $a;
1782   } else {
1783     $a = [$a] unless ref $a eq 'ARRAY';
1784     $b = [$b] unless ref $b eq 'ARRAY';
1785
1786     my $hash = {};
1787     my @array;
1788     foreach my $x ($a, $b) {
1789       foreach my $element (@{$x}) {
1790         if (ref $element eq 'HASH') {
1791           $hash = $self->_merge_attr($hash, $element);
1792         } elsif (ref $element eq 'ARRAY') {
1793           push(@array, @{$element});
1794         } else {
1795           push(@array, $element) unless $b == $x
1796             && grep { $_ eq $element } @array;
1797         }
1798       }
1799     }
1800     
1801     @array = grep { !exists $hash->{$_} } @array;
1802
1803     return keys %{$hash}
1804       ? ( scalar(@array)
1805             ? [$hash, @array]
1806             : $hash
1807         )
1808       : \@array;
1809   }
1810 }
1811
1812 sub result_source {
1813     my $self = shift;
1814
1815     if (@_) {
1816         $self->_source_handle($_[0]->handle);
1817     } else {
1818         $self->_source_handle->resolve;
1819     }
1820 }
1821
1822 =head2 throw_exception
1823
1824 See L<DBIx::Class::Schema/throw_exception> for details.
1825
1826 =cut
1827
1828 sub throw_exception {
1829   my $self=shift;
1830   $self->_source_handle->schema->throw_exception(@_);
1831 }
1832
1833 # XXX: FIXME: Attributes docs need clearing up
1834
1835 =head1 ATTRIBUTES
1836
1837 The resultset takes various attributes that modify its behavior. Here's an
1838 overview of them:
1839
1840 =head2 order_by
1841
1842 =over 4
1843
1844 =item Value: ($order_by | \@order_by)
1845
1846 =back
1847
1848 Which column(s) to order the results by. This is currently passed
1849 through directly to SQL, so you can give e.g. C<year DESC> for a
1850 descending order on the column `year'.
1851
1852 Please note that if you have C<quote_char> enabled (see
1853 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
1854 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1855 so you will need to manually quote things as appropriate.)
1856
1857 =head2 columns
1858
1859 =over 4
1860
1861 =item Value: \@columns
1862
1863 =back
1864
1865 Shortcut to request a particular set of columns to be retrieved.  Adds
1866 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1867 from that, then auto-populates C<as> from C<select> as normal. (You may also
1868 use the C<cols> attribute, as in earlier versions of DBIC.)
1869
1870 =head2 include_columns
1871
1872 =over 4
1873
1874 =item Value: \@columns
1875
1876 =back
1877
1878 Shortcut to include additional columns in the returned results - for example
1879
1880   $schema->resultset('CD')->search(undef, {
1881     include_columns => ['artist.name'],
1882     join => ['artist']
1883   });
1884
1885 would return all CDs and include a 'name' column to the information
1886 passed to object inflation. Note that the 'artist' is the name of the
1887 column (or relationship) accessor, and 'name' is the name of the column
1888 accessor in the related table.
1889
1890 =head2 select
1891
1892 =over 4
1893
1894 =item Value: \@select_columns
1895
1896 =back
1897
1898 Indicates which columns should be selected from the storage. You can use
1899 column names, or in the case of RDBMS back ends, function or stored procedure
1900 names:
1901
1902   $rs = $schema->resultset('Employee')->search(undef, {
1903     select => [
1904       'name',
1905       { count => 'employeeid' },
1906       { sum => 'salary' }
1907     ]
1908   });
1909
1910 When you use function/stored procedure names and do not supply an C<as>
1911 attribute, the column names returned are storage-dependent. E.g. MySQL would
1912 return a column named C<count(employeeid)> in the above example.
1913
1914 =head2 +select
1915
1916 =over 4
1917
1918 Indicates additional columns to be selected from storage.  Works the same as
1919 L<select> but adds columns to the selection.
1920
1921 =back
1922
1923 =head2 +as
1924
1925 =over 4
1926
1927 Indicates additional column names for those added via L<+select>.
1928
1929 =back
1930
1931 =head2 as
1932
1933 =over 4
1934
1935 =item Value: \@inflation_names
1936
1937 =back
1938
1939 Indicates column names for object inflation. That is, c< as >
1940 indicates the name that the column can be accessed as via the
1941 C<get_column> method (or via the object accessor, B<if one already
1942 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
1943 >.
1944
1945 The C< as > attribute is used in conjunction with C<select>,
1946 usually when C<select> contains one or more function or stored
1947 procedure names:
1948
1949   $rs = $schema->resultset('Employee')->search(undef, {
1950     select => [
1951       'name',
1952       { count => 'employeeid' }
1953     ],
1954     as => ['name', 'employee_count'],
1955   });
1956
1957   my $employee = $rs->first(); # get the first Employee
1958
1959 If the object against which the search is performed already has an accessor
1960 matching a column name specified in C<as>, the value can be retrieved using
1961 the accessor as normal:
1962
1963   my $name = $employee->name();
1964
1965 If on the other hand an accessor does not exist in the object, you need to
1966 use C<get_column> instead:
1967
1968   my $employee_count = $employee->get_column('employee_count');
1969
1970 You can create your own accessors if required - see
1971 L<DBIx::Class::Manual::Cookbook> for details.
1972
1973 Please note: This will NOT insert an C<AS employee_count> into the SQL
1974 statement produced, it is used for internal access only. Thus
1975 attempting to use the accessor in an C<order_by> clause or similar
1976 will fail miserably.
1977
1978 To get around this limitation, you can supply literal SQL to your
1979 C<select> attibute that contains the C<AS alias> text, eg:
1980
1981   select => [\'myfield AS alias']
1982
1983 =head2 join
1984
1985 =over 4
1986
1987 =item Value: ($rel_name | \@rel_names | \%rel_names)
1988
1989 =back
1990
1991 Contains a list of relationships that should be joined for this query.  For
1992 example:
1993
1994   # Get CDs by Nine Inch Nails
1995   my $rs = $schema->resultset('CD')->search(
1996     { 'artist.name' => 'Nine Inch Nails' },
1997     { join => 'artist' }
1998   );
1999
2000 Can also contain a hash reference to refer to the other relation's relations.
2001 For example:
2002
2003   package MyApp::Schema::Track;
2004   use base qw/DBIx::Class/;
2005   __PACKAGE__->table('track');
2006   __PACKAGE__->add_columns(qw/trackid cd position title/);
2007   __PACKAGE__->set_primary_key('trackid');
2008   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2009   1;
2010
2011   # In your application
2012   my $rs = $schema->resultset('Artist')->search(
2013     { 'track.title' => 'Teardrop' },
2014     {
2015       join     => { cd => 'track' },
2016       order_by => 'artist.name',
2017     }
2018   );
2019
2020 You need to use the relationship (not the table) name in  conditions, 
2021 because they are aliased as such. The current table is aliased as "me", so 
2022 you need to use me.column_name in order to avoid ambiguity. For example:
2023
2024   # Get CDs from 1984 with a 'Foo' track 
2025   my $rs = $schema->resultset('CD')->search(
2026     { 
2027       'me.year' => 1984,
2028       'tracks.name' => 'Foo'
2029     },
2030     { join => 'tracks' }
2031   );
2032   
2033 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2034 similarly for a third time). For e.g.
2035
2036   my $rs = $schema->resultset('Artist')->search({
2037     'cds.title'   => 'Down to Earth',
2038     'cds_2.title' => 'Popular',
2039   }, {
2040     join => [ qw/cds cds/ ],
2041   });
2042
2043 will return a set of all artists that have both a cd with title 'Down
2044 to Earth' and a cd with title 'Popular'.
2045
2046 If you want to fetch related objects from other tables as well, see C<prefetch>
2047 below.
2048
2049 =head2 prefetch
2050
2051 =over 4
2052
2053 =item Value: ($rel_name | \@rel_names | \%rel_names)
2054
2055 =back
2056
2057 Contains one or more relationships that should be fetched along with the main
2058 query (when they are accessed afterwards they will have already been
2059 "prefetched").  This is useful for when you know you will need the related
2060 objects, because it saves at least one query:
2061
2062   my $rs = $schema->resultset('Tag')->search(
2063     undef,
2064     {
2065       prefetch => {
2066         cd => 'artist'
2067       }
2068     }
2069   );
2070
2071 The initial search results in SQL like the following:
2072
2073   SELECT tag.*, cd.*, artist.* FROM tag
2074   JOIN cd ON tag.cd = cd.cdid
2075   JOIN artist ON cd.artist = artist.artistid
2076
2077 L<DBIx::Class> has no need to go back to the database when we access the
2078 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2079 case.
2080
2081 Simple prefetches will be joined automatically, so there is no need
2082 for a C<join> attribute in the above search. If you're prefetching to
2083 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2084 specify the join as well.
2085
2086 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2087 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2088 with an accessor type of 'single' or 'filter').
2089
2090 =head2 page
2091
2092 =over 4
2093
2094 =item Value: $page
2095
2096 =back
2097
2098 Makes the resultset paged and specifies the page to retrieve. Effectively
2099 identical to creating a non-pages resultset and then calling ->page($page)
2100 on it.
2101
2102 If L<rows> attribute is not specified it defualts to 10 rows per page.
2103
2104 =head2 rows
2105
2106 =over 4
2107
2108 =item Value: $rows
2109
2110 =back
2111
2112 Specifes the maximum number of rows for direct retrieval or the number of
2113 rows per page if the page attribute or method is used.
2114
2115 =head2 offset
2116
2117 =over 4
2118
2119 =item Value: $offset
2120
2121 =back
2122
2123 Specifies the (zero-based) row number for the  first row to be returned, or the
2124 of the first row of the first page if paging is used.
2125
2126 =head2 group_by
2127
2128 =over 4
2129
2130 =item Value: \@columns
2131
2132 =back
2133
2134 A arrayref of columns to group by. Can include columns of joined tables.
2135
2136   group_by => [qw/ column1 column2 ... /]
2137
2138 =head2 having
2139
2140 =over 4
2141
2142 =item Value: $condition
2143
2144 =back
2145
2146 HAVING is a select statement attribute that is applied between GROUP BY and
2147 ORDER BY. It is applied to the after the grouping calculations have been
2148 done.
2149
2150   having => { 'count(employee)' => { '>=', 100 } }
2151
2152 =head2 distinct
2153
2154 =over 4
2155
2156 =item Value: (0 | 1)
2157
2158 =back
2159
2160 Set to 1 to group by all columns.
2161
2162 =head2 where
2163
2164 =over 4
2165
2166 Adds to the WHERE clause.
2167
2168   # only return rows WHERE deleted IS NULL for all searches
2169   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2170
2171 Can be overridden by passing C<{ where => undef }> as an attribute
2172 to a resulset.
2173
2174 =back
2175
2176 =head2 cache
2177
2178 Set to 1 to cache search results. This prevents extra SQL queries if you
2179 revisit rows in your ResultSet:
2180
2181   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2182
2183   while( my $artist = $resultset->next ) {
2184     ... do stuff ...
2185   }
2186
2187   $rs->first; # without cache, this would issue a query
2188
2189 By default, searches are not cached.
2190
2191 For more examples of using these attributes, see
2192 L<DBIx::Class::Manual::Cookbook>.
2193
2194 =head2 from
2195
2196 =over 4
2197
2198 =item Value: \@from_clause
2199
2200 =back
2201
2202 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2203 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2204 clauses.
2205
2206 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2207
2208 C<join> will usually do what you need and it is strongly recommended that you
2209 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2210 And we really do mean "cannot", not just tried and failed. Attempting to use
2211 this because you're having problems with C<join> is like trying to use x86
2212 ASM because you've got a syntax error in your C. Trust us on this.
2213
2214 Now, if you're still really, really sure you need to use this (and if you're
2215 not 100% sure, ask the mailing list first), here's an explanation of how this
2216 works.
2217
2218 The syntax is as follows -
2219
2220   [
2221     { <alias1> => <table1> },
2222     [
2223       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2224       [], # nested JOIN (optional)
2225       { <table1.column1> => <table2.column2>, ... (more conditions) },
2226     ],
2227     # More of the above [ ] may follow for additional joins
2228   ]
2229
2230   <table1> <alias1>
2231   JOIN
2232     <table2> <alias2>
2233     [JOIN ...]
2234   ON <table1.column1> = <table2.column2>
2235   <more joins may follow>
2236
2237 An easy way to follow the examples below is to remember the following:
2238
2239     Anything inside "[]" is a JOIN
2240     Anything inside "{}" is a condition for the enclosing JOIN
2241
2242 The following examples utilize a "person" table in a family tree application.
2243 In order to express parent->child relationships, this table is self-joined:
2244
2245     # Person->belongs_to('father' => 'Person');
2246     # Person->belongs_to('mother' => 'Person');
2247
2248 C<from> can be used to nest joins. Here we return all children with a father,
2249 then search against all mothers of those children:
2250
2251   $rs = $schema->resultset('Person')->search(
2252       undef,
2253       {
2254           alias => 'mother', # alias columns in accordance with "from"
2255           from => [
2256               { mother => 'person' },
2257               [
2258                   [
2259                       { child => 'person' },
2260                       [
2261                           { father => 'person' },
2262                           { 'father.person_id' => 'child.father_id' }
2263                       ]
2264                   ],
2265                   { 'mother.person_id' => 'child.mother_id' }
2266               ],
2267           ]
2268       },
2269   );
2270
2271   # Equivalent SQL:
2272   # SELECT mother.* FROM person mother
2273   # JOIN (
2274   #   person child
2275   #   JOIN person father
2276   #   ON ( father.person_id = child.father_id )
2277   # )
2278   # ON ( mother.person_id = child.mother_id )
2279
2280 The type of any join can be controlled manually. To search against only people
2281 with a father in the person table, we could explicitly use C<INNER JOIN>:
2282
2283     $rs = $schema->resultset('Person')->search(
2284         undef,
2285         {
2286             alias => 'child', # alias columns in accordance with "from"
2287             from => [
2288                 { child => 'person' },
2289                 [
2290                     { father => 'person', -join_type => 'inner' },
2291                     { 'father.id' => 'child.father_id' }
2292                 ],
2293             ]
2294         },
2295     );
2296
2297     # Equivalent SQL:
2298     # SELECT child.* FROM person child
2299     # INNER JOIN person father ON child.father_id = father.id
2300
2301 =cut
2302
2303 1;