fix for count after slice (report and tests from JOHANL)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use base qw/DBIx::Class/;
15
16 __PACKAGE__->mk_group_accessors('simple' => qw/result_class _source_handle/);
17
18 =head1 NAME
19
20 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
21
22 =head1 SYNOPSIS
23
24   my $rs   = $schema->resultset('User')->search(registered => 1);
25   my @rows = $schema->resultset('CD')->search(year => 2005);
26
27 =head1 DESCRIPTION
28
29 The resultset is also known as an iterator. It is responsible for handling
30 queries that may return an arbitrary number of rows, e.g. via L</search>
31 or a C<has_many> relationship.
32
33 In the examples below, the following table classes are used:
34
35   package MyApp::Schema::Artist;
36   use base qw/DBIx::Class/;
37   __PACKAGE__->load_components(qw/Core/);
38   __PACKAGE__->table('artist');
39   __PACKAGE__->add_columns(qw/artistid name/);
40   __PACKAGE__->set_primary_key('artistid');
41   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
42   1;
43
44   package MyApp::Schema::CD;
45   use base qw/DBIx::Class/;
46   __PACKAGE__->load_components(qw/Core/);
47   __PACKAGE__->table('cd');
48   __PACKAGE__->add_columns(qw/cdid artist title year/);
49   __PACKAGE__->set_primary_key('cdid');
50   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
51   1;
52
53 =head1 METHODS
54
55 =head2 new
56
57 =over 4
58
59 =item Arguments: $source, \%$attrs
60
61 =item Return Value: $rs
62
63 =back
64
65 The resultset constructor. Takes a source object (usually a
66 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
67 L</ATTRIBUTES> below).  Does not perform any queries -- these are
68 executed as needed by the other methods.
69
70 Generally you won't need to construct a resultset manually.  You'll
71 automatically get one from e.g. a L</search> called in scalar context:
72
73   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
74
75 IMPORTANT: If called on an object, proxies to new_result instead so
76
77   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
78
79 will return a CD object, not a ResultSet.
80
81 =cut
82
83 sub new {
84   my $class = shift;
85   return $class->new_result(@_) if ref $class;
86
87   my ($source, $attrs) = @_;
88   $source = $source->handle 
89     unless $source->isa('DBIx::Class::ResultSourceHandle');
90   $attrs = { %{$attrs||{}} };
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94   }
95
96   $attrs->{alias} ||= 'me';
97
98   my $self = {
99     _source_handle => $source,
100     result_class => $attrs->{result_class} || $source->resolve->result_class,
101     cond => $attrs->{where},
102     count => undef,
103     pager => undef,
104     attrs => $attrs
105   };
106
107   bless $self, $class;
108
109   return $self;
110 }
111
112 =head2 search
113
114 =over 4
115
116 =item Arguments: $cond, \%attrs?
117
118 =item Return Value: $resultset (scalar context), @row_objs (list context)
119
120 =back
121
122   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
123   my $new_rs = $cd_rs->search({ year => 2005 });
124
125   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
126                  # year = 2005 OR year = 2004
127
128 If you need to pass in additional attributes but no additional condition,
129 call it as C<search(undef, \%attrs)>.
130
131   # "SELECT name, artistid FROM $artist_table"
132   my @all_artists = $schema->resultset('Artist')->search(undef, {
133     columns => [qw/name artistid/],
134   });
135
136 For a list of attributes that can be passed to C<search>, see
137 L</ATTRIBUTES>. For more examples of using this function, see
138 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
139 documentation for the first argument, see L<SQL::Abstract>.
140
141 =cut
142
143 sub search {
144   my $self = shift;
145   my $rs = $self->search_rs( @_ );
146   return (wantarray ? $rs->all : $rs);
147 }
148
149 =head2 search_rs
150
151 =over 4
152
153 =item Arguments: $cond, \%attrs?
154
155 =item Return Value: $resultset
156
157 =back
158
159 This method does the same exact thing as search() except it will
160 always return a resultset, even in list context.
161
162 =cut
163
164 sub search_rs {
165   my $self = shift;
166
167   my $rows;
168
169   unless (@_) {                 # no search, effectively just a clone
170     $rows = $self->get_cache;
171   }
172
173   my $attrs = {};
174   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
175   my $our_attrs = { %{$self->{attrs}} };
176   my $having = delete $our_attrs->{having};
177   my $where = delete $our_attrs->{where};
178
179   my $new_attrs = { %{$our_attrs}, %{$attrs} };
180
181   # merge new attrs into inherited
182   foreach my $key (qw/join prefetch/) {
183     next unless exists $attrs->{$key};
184     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
185   }
186
187   my $cond = (@_
188     ? (
189         (@_ == 1 || ref $_[0] eq "HASH")
190           ? (
191               (ref $_[0] eq 'HASH')
192                 ? (
193                     (keys %{ $_[0] }  > 0)
194                       ? shift
195                       : undef
196                    )
197                 :  shift
198              )
199           : (
200               (@_ % 2)
201                 ? $self->throw_exception("Odd number of arguments to search")
202                 : {@_}
203              )
204       )
205     : undef
206   );
207
208   if (defined $where) {
209     $new_attrs->{where} = (
210       defined $new_attrs->{where}
211         ? { '-and' => [
212               map {
213                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
214               } $where, $new_attrs->{where}
215             ]
216           }
217         : $where);
218   }
219
220   if (defined $cond) {
221     $new_attrs->{where} = (
222       defined $new_attrs->{where}
223         ? { '-and' => [
224               map {
225                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
226               } $cond, $new_attrs->{where}
227             ]
228           }
229         : $cond);
230   }
231
232   if (defined $having) {
233     $new_attrs->{having} = (
234       defined $new_attrs->{having}
235         ? { '-and' => [
236               map {
237                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
238               } $having, $new_attrs->{having}
239             ]
240           }
241         : $having);
242   }
243
244   my $rs = (ref $self)->new($self->result_source, $new_attrs);
245   if ($rows) {
246     $rs->set_cache($rows);
247   }
248   return $rs;
249 }
250
251 =head2 search_literal
252
253 =over 4
254
255 =item Arguments: $sql_fragment, @bind_values
256
257 =item Return Value: $resultset (scalar context), @row_objs (list context)
258
259 =back
260
261   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
262   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
263
264 Pass a literal chunk of SQL to be added to the conditional part of the
265 resultset query.
266
267 =cut
268
269 sub search_literal {
270   my ($self, $cond, @vals) = @_;
271   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
272   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
273   return $self->search(\$cond, $attrs);
274 }
275
276 =head2 find
277
278 =over 4
279
280 =item Arguments: @values | \%cols, \%attrs?
281
282 =item Return Value: $row_object
283
284 =back
285
286 Finds a row based on its primary key or unique constraint. For example, to find
287 a row by its primary key:
288
289   my $cd = $schema->resultset('CD')->find(5);
290
291 You can also find a row by a specific unique constraint using the C<key>
292 attribute. For example:
293
294   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
295     key => 'cd_artist_title'
296   });
297
298 Additionally, you can specify the columns explicitly by name:
299
300   my $cd = $schema->resultset('CD')->find(
301     {
302       artist => 'Massive Attack',
303       title  => 'Mezzanine',
304     },
305     { key => 'cd_artist_title' }
306   );
307
308 If the C<key> is specified as C<primary>, it searches only on the primary key.
309
310 If no C<key> is specified, it searches on all unique constraints defined on the
311 source, including the primary key.
312
313 If your table does not have a primary key, you B<must> provide a value for the
314 C<key> attribute matching one of the unique constraints on the source.
315
316 See also L</find_or_create> and L</update_or_create>. For information on how to
317 declare unique constraints, see
318 L<DBIx::Class::ResultSource/add_unique_constraint>.
319
320 =cut
321
322 sub find {
323   my $self = shift;
324   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
325
326   # Default to the primary key, but allow a specific key
327   my @cols = exists $attrs->{key}
328     ? $self->result_source->unique_constraint_columns($attrs->{key})
329     : $self->result_source->primary_columns;
330   $self->throw_exception(
331     "Can't find unless a primary key is defined or unique constraint is specified"
332   ) unless @cols;
333
334   # Parse out a hashref from input
335   my $input_query;
336   if (ref $_[0] eq 'HASH') {
337     $input_query = { %{$_[0]} };
338   }
339   elsif (@_ == @cols) {
340     $input_query = {};
341     @{$input_query}{@cols} = @_;
342   }
343   else {
344     # Compatibility: Allow e.g. find(id => $value)
345     carp "Find by key => value deprecated; please use a hashref instead";
346     $input_query = {@_};
347   }
348
349   my (%related, $info);
350
351   KEY: foreach my $key (keys %$input_query) {
352     if (ref($input_query->{$key})
353         && ($info = $self->result_source->relationship_info($key))) {
354       my $val = delete $input_query->{$key};
355       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
356       my $rel_q = $self->result_source->resolve_condition(
357                     $info->{cond}, $val, $key
358                   );
359       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
360       @related{keys %$rel_q} = values %$rel_q;
361     }
362   }
363   if (my @keys = keys %related) {
364     @{$input_query}{@keys} = values %related;
365   }
366
367   my @unique_queries = $self->_unique_queries($input_query, $attrs);
368
369   # Build the final query: Default to the disjunction of the unique queries,
370   # but allow the input query in case the ResultSet defines the query or the
371   # user is abusing find
372   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
373   my $query = @unique_queries
374     ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
375     : $self->_add_alias($input_query, $alias);
376
377   # Run the query
378   if (keys %$attrs) {
379     my $rs = $self->search($query, $attrs);
380     return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
381   }
382   else {
383     return keys %{$self->_resolved_attrs->{collapse}}
384       ? $self->search($query)->next
385       : $self->single($query);
386   }
387 }
388
389 # _add_alias
390 #
391 # Add the specified alias to the specified query hash. A copy is made so the
392 # original query is not modified.
393
394 sub _add_alias {
395   my ($self, $query, $alias) = @_;
396
397   my %aliased = %$query;
398   foreach my $col (grep { ! m/\./ } keys %aliased) {
399     $aliased{"$alias.$col"} = delete $aliased{$col};
400   }
401
402   return \%aliased;
403 }
404
405 # _unique_queries
406 #
407 # Build a list of queries which satisfy unique constraints.
408
409 sub _unique_queries {
410   my ($self, $query, $attrs) = @_;
411
412   my @constraint_names = exists $attrs->{key}
413     ? ($attrs->{key})
414     : $self->result_source->unique_constraint_names;
415
416   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
417   my $num_where = scalar keys %$where;
418
419   my @unique_queries;
420   foreach my $name (@constraint_names) {
421     my @unique_cols = $self->result_source->unique_constraint_columns($name);
422     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
423
424     my $num_cols = scalar @unique_cols;
425     my $num_query = scalar keys %$unique_query;
426
427     my $total = $num_query + $num_where;
428     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
429       # The query is either unique on its own or is unique in combination with
430       # the existing where clause
431       push @unique_queries, $unique_query;
432     }
433   }
434
435   return @unique_queries;
436 }
437
438 # _build_unique_query
439 #
440 # Constrain the specified query hash based on the specified column names.
441
442 sub _build_unique_query {
443   my ($self, $query, $unique_cols) = @_;
444
445   return {
446     map  { $_ => $query->{$_} }
447     grep { exists $query->{$_} }
448       @$unique_cols
449   };
450 }
451
452 =head2 search_related
453
454 =over 4
455
456 =item Arguments: $rel, $cond, \%attrs?
457
458 =item Return Value: $new_resultset
459
460 =back
461
462   $new_rs = $cd_rs->search_related('artist', {
463     name => 'Emo-R-Us',
464   });
465
466 Searches the specified relationship, optionally specifying a condition and
467 attributes for matching records. See L</ATTRIBUTES> for more information.
468
469 =cut
470
471 sub search_related {
472   return shift->related_resultset(shift)->search(@_);
473 }
474
475 =head2 cursor
476
477 =over 4
478
479 =item Arguments: none
480
481 =item Return Value: $cursor
482
483 =back
484
485 Returns a storage-driven cursor to the given resultset. See
486 L<DBIx::Class::Cursor> for more information.
487
488 =cut
489
490 sub cursor {
491   my ($self) = @_;
492
493   my $attrs = { %{$self->_resolved_attrs} };
494   return $self->{cursor}
495     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
496           $attrs->{where},$attrs);
497 }
498
499 =head2 single
500
501 =over 4
502
503 =item Arguments: $cond?
504
505 =item Return Value: $row_object?
506
507 =back
508
509   my $cd = $schema->resultset('CD')->single({ year => 2001 });
510
511 Inflates the first result without creating a cursor if the resultset has
512 any records in it; if not returns nothing. Used by L</find> as an optimisation.
513
514 Can optionally take an additional condition *only* - this is a fast-code-path
515 method; if you need to add extra joins or similar call ->search and then
516 ->single without a condition on the $rs returned from that.
517
518 =cut
519
520 sub single {
521   my ($self, $where) = @_;
522   my $attrs = { %{$self->_resolved_attrs} };
523   if ($where) {
524     if (defined $attrs->{where}) {
525       $attrs->{where} = {
526         '-and' =>
527             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
528                $where, delete $attrs->{where} ]
529       };
530     } else {
531       $attrs->{where} = $where;
532     }
533   }
534
535 #  XXX: Disabled since it doesn't infer uniqueness in all cases
536 #  unless ($self->_is_unique_query($attrs->{where})) {
537 #    carp "Query not guaranteed to return a single row"
538 #      . "; please declare your unique constraints or use search instead";
539 #  }
540
541   my @data = $self->result_source->storage->select_single(
542     $attrs->{from}, $attrs->{select},
543     $attrs->{where}, $attrs
544   );
545
546   return (@data ? ($self->_construct_object(@data))[0] : undef);
547 }
548
549 # _is_unique_query
550 #
551 # Try to determine if the specified query is guaranteed to be unique, based on
552 # the declared unique constraints.
553
554 sub _is_unique_query {
555   my ($self, $query) = @_;
556
557   my $collapsed = $self->_collapse_query($query);
558   my $alias = $self->{attrs}{alias};
559
560   foreach my $name ($self->result_source->unique_constraint_names) {
561     my @unique_cols = map {
562       "$alias.$_"
563     } $self->result_source->unique_constraint_columns($name);
564
565     # Count the values for each unique column
566     my %seen = map { $_ => 0 } @unique_cols;
567
568     foreach my $key (keys %$collapsed) {
569       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
570       next unless exists $seen{$aliased};  # Additional constraints are okay
571       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
572     }
573
574     # If we get 0 or more than 1 value for a column, it's not necessarily unique
575     return 1 unless grep { $_ != 1 } values %seen;
576   }
577
578   return 0;
579 }
580
581 # _collapse_query
582 #
583 # Recursively collapse the query, accumulating values for each column.
584
585 sub _collapse_query {
586   my ($self, $query, $collapsed) = @_;
587
588   $collapsed ||= {};
589
590   if (ref $query eq 'ARRAY') {
591     foreach my $subquery (@$query) {
592       next unless ref $subquery;  # -or
593 #      warn "ARRAY: " . Dumper $subquery;
594       $collapsed = $self->_collapse_query($subquery, $collapsed);
595     }
596   }
597   elsif (ref $query eq 'HASH') {
598     if (keys %$query and (keys %$query)[0] eq '-and') {
599       foreach my $subquery (@{$query->{-and}}) {
600 #        warn "HASH: " . Dumper $subquery;
601         $collapsed = $self->_collapse_query($subquery, $collapsed);
602       }
603     }
604     else {
605 #      warn "LEAF: " . Dumper $query;
606       foreach my $col (keys %$query) {
607         my $value = $query->{$col};
608         $collapsed->{$col}{$value}++;
609       }
610     }
611   }
612
613   return $collapsed;
614 }
615
616 =head2 get_column
617
618 =over 4
619
620 =item Arguments: $cond?
621
622 =item Return Value: $resultsetcolumn
623
624 =back
625
626   my $max_length = $rs->get_column('length')->max;
627
628 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
629
630 =cut
631
632 sub get_column {
633   my ($self, $column) = @_;
634   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
635   return $new;
636 }
637
638 =head2 search_like
639
640 =over 4
641
642 =item Arguments: $cond, \%attrs?
643
644 =item Return Value: $resultset (scalar context), @row_objs (list context)
645
646 =back
647
648   # WHERE title LIKE '%blue%'
649   $cd_rs = $rs->search_like({ title => '%blue%'});
650
651 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
652 that this is simply a convenience method. You most likely want to use
653 L</search> with specific operators.
654
655 For more information, see L<DBIx::Class::Manual::Cookbook>.
656
657 =cut
658
659 sub search_like {
660   my $class = shift;
661   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
662   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
663   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
664   return $class->search($query, { %$attrs });
665 }
666
667 =head2 slice
668
669 =over 4
670
671 =item Arguments: $first, $last
672
673 =item Return Value: $resultset (scalar context), @row_objs (list context)
674
675 =back
676
677 Returns a resultset or object list representing a subset of elements from the
678 resultset slice is called on. Indexes are from 0, i.e., to get the first
679 three records, call:
680
681   my ($one, $two, $three) = $rs->slice(0, 2);
682
683 =cut
684
685 sub slice {
686   my ($self, $min, $max) = @_;
687   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
688   $attrs->{offset} = $self->{attrs}{offset} || 0;
689   $attrs->{offset} += $min;
690   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
691   return $self->search(undef(), $attrs);
692   #my $slice = (ref $self)->new($self->result_source, $attrs);
693   #return (wantarray ? $slice->all : $slice);
694 }
695
696 =head2 next
697
698 =over 4
699
700 =item Arguments: none
701
702 =item Return Value: $result?
703
704 =back
705
706 Returns the next element in the resultset (C<undef> is there is none).
707
708 Can be used to efficiently iterate over records in the resultset:
709
710   my $rs = $schema->resultset('CD')->search;
711   while (my $cd = $rs->next) {
712     print $cd->title;
713   }
714
715 Note that you need to store the resultset object, and call C<next> on it.
716 Calling C<< resultset('Table')->next >> repeatedly will always return the
717 first record from the resultset.
718
719 =cut
720
721 sub next {
722   my ($self) = @_;
723   if (my $cache = $self->get_cache) {
724     $self->{all_cache_position} ||= 0;
725     return $cache->[$self->{all_cache_position}++];
726   }
727   if ($self->{attrs}{cache}) {
728     $self->{all_cache_position} = 1;
729     return ($self->all)[0];
730   }
731   if ($self->{stashed_objects}) {
732     my $obj = shift(@{$self->{stashed_objects}});
733     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
734     return $obj;
735   }
736   my @row = (
737     exists $self->{stashed_row}
738       ? @{delete $self->{stashed_row}}
739       : $self->cursor->next
740   );
741   return undef unless (@row);
742   my ($row, @more) = $self->_construct_object(@row);
743   $self->{stashed_objects} = \@more if @more;
744   return $row;
745 }
746
747 sub _construct_object {
748   my ($self, @row) = @_;
749   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
750   my @new = $self->result_class->inflate_result($self->result_source, @$info);
751   @new = $self->{_attrs}{record_filter}->(@new)
752     if exists $self->{_attrs}{record_filter};
753   return @new;
754 }
755
756 sub _collapse_result {
757   my ($self, $as_proto, $row) = @_;
758
759   my @copy = @$row;
760
761   # 'foo'         => [ undef, 'foo' ]
762   # 'foo.bar'     => [ 'foo', 'bar' ]
763   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
764
765   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
766
767   my %collapse = %{$self->{_attrs}{collapse}||{}};
768
769   my @pri_index;
770
771   # if we're doing collapsing (has_many prefetch) we need to grab records
772   # until the PK changes, so fill @pri_index. if not, we leave it empty so
773   # we know we don't have to bother.
774
775   # the reason for not using the collapse stuff directly is because if you
776   # had for e.g. two artists in a row with no cds, the collapse info for
777   # both would be NULL (undef) so you'd lose the second artist
778
779   # store just the index so we can check the array positions from the row
780   # without having to contruct the full hash
781
782   if (keys %collapse) {
783     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
784     foreach my $i (0 .. $#construct_as) {
785       next if defined($construct_as[$i][0]); # only self table
786       if (delete $pri{$construct_as[$i][1]}) {
787         push(@pri_index, $i);
788       }
789       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
790     }
791   }
792
793   # no need to do an if, it'll be empty if @pri_index is empty anyway
794
795   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
796
797   my @const_rows;
798
799   do { # no need to check anything at the front, we always want the first row
800
801     my %const;
802   
803     foreach my $this_as (@construct_as) {
804       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
805     }
806
807     push(@const_rows, \%const);
808
809   } until ( # no pri_index => no collapse => drop straight out
810       !@pri_index
811     or
812       do { # get another row, stash it, drop out if different PK
813
814         @copy = $self->cursor->next;
815         $self->{stashed_row} = \@copy;
816
817         # last thing in do block, counts as true if anything doesn't match
818
819         # check xor defined first for NULL vs. NOT NULL then if one is
820         # defined the other must be so check string equality
821
822         grep {
823           (defined $pri_vals{$_} ^ defined $copy[$_])
824           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
825         } @pri_index;
826       }
827   );
828
829   my $alias = $self->{attrs}{alias};
830   my $info = [];
831
832   my %collapse_pos;
833
834   my @const_keys;
835
836   foreach my $const (@const_rows) {
837     scalar @const_keys or do {
838       @const_keys = sort { length($a) <=> length($b) } keys %$const;
839     };
840     foreach my $key (@const_keys) {
841       if (length $key) {
842         my $target = $info;
843         my @parts = split(/\./, $key);
844         my $cur = '';
845         my $data = $const->{$key};
846         foreach my $p (@parts) {
847           $target = $target->[1]->{$p} ||= [];
848           $cur .= ".${p}";
849           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
850             # collapsing at this point and on final part
851             my $pos = $collapse_pos{$cur};
852             CK: foreach my $ck (@ckey) {
853               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
854                 $collapse_pos{$cur} = $data;
855                 delete @collapse_pos{ # clear all positioning for sub-entries
856                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
857                 };
858                 push(@$target, []);
859                 last CK;
860               }
861             }
862           }
863           if (exists $collapse{$cur}) {
864             $target = $target->[-1];
865           }
866         }
867         $target->[0] = $data;
868       } else {
869         $info->[0] = $const->{$key};
870       }
871     }
872   }
873
874   return $info;
875 }
876
877 =head2 result_source
878
879 =over 4
880
881 =item Arguments: $result_source?
882
883 =item Return Value: $result_source
884
885 =back
886
887 An accessor for the primary ResultSource object from which this ResultSet
888 is derived.
889
890 =head2 result_class
891
892 =over 4
893
894 =item Arguments: $result_class?
895
896 =item Return Value: $result_class
897
898 =back
899
900 An accessor for the class to use when creating row objects. Defaults to 
901 C<< result_source->result_class >> - which in most cases is the name of the 
902 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
903
904 =cut
905
906
907 =head2 count
908
909 =over 4
910
911 =item Arguments: $cond, \%attrs??
912
913 =item Return Value: $count
914
915 =back
916
917 Performs an SQL C<COUNT> with the same query as the resultset was built
918 with to find the number of elements. If passed arguments, does a search
919 on the resultset and counts the results of that.
920
921 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
922 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
923 not support C<DISTINCT> with multiple columns. If you are using such a
924 database, you should only use columns from the main table in your C<group_by>
925 clause.
926
927 =cut
928
929 sub count {
930   my $self = shift;
931   return $self->search(@_)->count if @_ and defined $_[0];
932   return scalar @{ $self->get_cache } if $self->get_cache;
933   my $count = $self->_count;
934   return 0 unless $count;
935
936   # need to take offset from resolved attrs
937
938   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
939   $count = $self->{attrs}{rows} if
940     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
941   $count = 0 if ($count < 0);
942   return $count;
943 }
944
945 sub _count { # Separated out so pager can get the full count
946   my $self = shift;
947   my $select = { count => '*' };
948
949   my $attrs = { %{$self->_resolved_attrs} };
950   if (my $group_by = delete $attrs->{group_by}) {
951     delete $attrs->{having};
952     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
953     # todo: try CONCAT for multi-column pk
954     my @pk = $self->result_source->primary_columns;
955     if (@pk == 1) {
956       my $alias = $attrs->{alias};
957       foreach my $column (@distinct) {
958         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
959           @distinct = ($column);
960           last;
961         }
962       }
963     }
964
965     $select = { count => { distinct => \@distinct } };
966   }
967
968   $attrs->{select} = $select;
969   $attrs->{as} = [qw/count/];
970
971   # offset, order by and page are not needed to count. record_filter is cdbi
972   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
973
974   my $tmp_rs = (ref $self)->new($self->_source_handle, $attrs);
975   my ($count) = $tmp_rs->cursor->next;
976   return $count;
977 }
978
979 =head2 count_literal
980
981 =over 4
982
983 =item Arguments: $sql_fragment, @bind_values
984
985 =item Return Value: $count
986
987 =back
988
989 Counts the results in a literal query. Equivalent to calling L</search_literal>
990 with the passed arguments, then L</count>.
991
992 =cut
993
994 sub count_literal { shift->search_literal(@_)->count; }
995
996 =head2 all
997
998 =over 4
999
1000 =item Arguments: none
1001
1002 =item Return Value: @objects
1003
1004 =back
1005
1006 Returns all elements in the resultset. Called implicitly if the resultset
1007 is returned in list context.
1008
1009 =cut
1010
1011 sub all {
1012   my ($self) = @_;
1013   return @{ $self->get_cache } if $self->get_cache;
1014
1015   my @obj;
1016
1017   # TODO: don't call resolve here
1018   if (keys %{$self->_resolved_attrs->{collapse}}) {
1019 #  if ($self->{attrs}{prefetch}) {
1020       # Using $self->cursor->all is really just an optimisation.
1021       # If we're collapsing has_many prefetches it probably makes
1022       # very little difference, and this is cleaner than hacking
1023       # _construct_object to survive the approach
1024     my @row = $self->cursor->next;
1025     while (@row) {
1026       push(@obj, $self->_construct_object(@row));
1027       @row = (exists $self->{stashed_row}
1028                ? @{delete $self->{stashed_row}}
1029                : $self->cursor->next);
1030     }
1031   } else {
1032     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1033   }
1034
1035   $self->set_cache(\@obj) if $self->{attrs}{cache};
1036   return @obj;
1037 }
1038
1039 =head2 reset
1040
1041 =over 4
1042
1043 =item Arguments: none
1044
1045 =item Return Value: $self
1046
1047 =back
1048
1049 Resets the resultset's cursor, so you can iterate through the elements again.
1050
1051 =cut
1052
1053 sub reset {
1054   my ($self) = @_;
1055   delete $self->{_attrs} if exists $self->{_attrs};
1056   $self->{all_cache_position} = 0;
1057   $self->cursor->reset;
1058   return $self;
1059 }
1060
1061 =head2 first
1062
1063 =over 4
1064
1065 =item Arguments: none
1066
1067 =item Return Value: $object?
1068
1069 =back
1070
1071 Resets the resultset and returns an object for the first result (if the
1072 resultset returns anything).
1073
1074 =cut
1075
1076 sub first {
1077   return $_[0]->reset->next;
1078 }
1079
1080 # _cond_for_update_delete
1081 #
1082 # update/delete require the condition to be modified to handle
1083 # the differing SQL syntax available.  This transforms the $self->{cond}
1084 # appropriately, returning the new condition.
1085
1086 sub _cond_for_update_delete {
1087   my ($self, $full_cond) = @_;
1088   my $cond = {};
1089
1090   $full_cond ||= $self->{cond};
1091   # No-op. No condition, we're updating/deleting everything
1092   return $cond unless ref $full_cond;
1093
1094   if (ref $full_cond eq 'ARRAY') {
1095     $cond = [
1096       map {
1097         my %hash;
1098         foreach my $key (keys %{$_}) {
1099           $key =~ /([^.]+)$/;
1100           $hash{$1} = $_->{$key};
1101         }
1102         \%hash;
1103       } @{$full_cond}
1104     ];
1105   }
1106   elsif (ref $full_cond eq 'HASH') {
1107     if ((keys %{$full_cond})[0] eq '-and') {
1108       $cond->{-and} = [];
1109
1110       my @cond = @{$full_cond->{-and}};
1111       for (my $i = 0; $i < @cond; $i++) {
1112         my $entry = $cond[$i];
1113
1114         my $hash;
1115         if (ref $entry eq 'HASH') {
1116           $hash = $self->_cond_for_update_delete($entry);
1117         }
1118         else {
1119           $entry =~ /([^.]+)$/;
1120           $hash->{$1} = $cond[++$i];
1121         }
1122
1123         push @{$cond->{-and}}, $hash;
1124       }
1125     }
1126     else {
1127       foreach my $key (keys %{$full_cond}) {
1128         $key =~ /([^.]+)$/;
1129         $cond->{$1} = $full_cond->{$key};
1130       }
1131     }
1132   }
1133   else {
1134     $self->throw_exception(
1135       "Can't update/delete on resultset with condition unless hash or array"
1136     );
1137   }
1138
1139   return $cond;
1140 }
1141
1142
1143 =head2 update
1144
1145 =over 4
1146
1147 =item Arguments: \%values
1148
1149 =item Return Value: $storage_rv
1150
1151 =back
1152
1153 Sets the specified columns in the resultset to the supplied values in a
1154 single query. Return value will be true if the update succeeded or false
1155 if no records were updated; exact type of success value is storage-dependent.
1156
1157 =cut
1158
1159 sub update {
1160   my ($self, $values) = @_;
1161   $self->throw_exception("Values for update must be a hash")
1162     unless ref $values eq 'HASH';
1163
1164   my $cond = $self->_cond_for_update_delete;
1165    
1166   return $self->result_source->storage->update(
1167     $self->result_source, $values, $cond
1168   );
1169 }
1170
1171 =head2 update_all
1172
1173 =over 4
1174
1175 =item Arguments: \%values
1176
1177 =item Return Value: 1
1178
1179 =back
1180
1181 Fetches all objects and updates them one at a time. Note that C<update_all>
1182 will run DBIC cascade triggers, while L</update> will not.
1183
1184 =cut
1185
1186 sub update_all {
1187   my ($self, $values) = @_;
1188   $self->throw_exception("Values for update must be a hash")
1189     unless ref $values eq 'HASH';
1190   foreach my $obj ($self->all) {
1191     $obj->set_columns($values)->update;
1192   }
1193   return 1;
1194 }
1195
1196 =head2 delete
1197
1198 =over 4
1199
1200 =item Arguments: none
1201
1202 =item Return Value: 1
1203
1204 =back
1205
1206 Deletes the contents of the resultset from its result source. Note that this
1207 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1208 to run. See also L<DBIx::Class::Row/delete>.
1209
1210 =cut
1211
1212 sub delete {
1213   my ($self) = @_;
1214
1215   my $cond = $self->_cond_for_update_delete;
1216
1217   $self->result_source->storage->delete($self->result_source, $cond);
1218   return 1;
1219 }
1220
1221 =head2 delete_all
1222
1223 =over 4
1224
1225 =item Arguments: none
1226
1227 =item Return Value: 1
1228
1229 =back
1230
1231 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1232 will run DBIC cascade triggers, while L</delete> will not.
1233
1234 =cut
1235
1236 sub delete_all {
1237   my ($self) = @_;
1238   $_->delete for $self->all;
1239   return 1;
1240 }
1241
1242 =head2 populate
1243
1244 =over 4
1245
1246 =item Arguments: \@data;
1247
1248 =back
1249
1250 Pass an arrayref of hashrefs. Each hashref should be a structure suitable for
1251 submitting to a $resultset->create(...) method.
1252
1253 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1254 to insert the data, as this is a faster method.
1255
1256 Otherwise, each set of data is inserted into the database using
1257 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1258 objects is returned.
1259
1260 Example:  Assuming an Artist Class that has many CDs Classes relating:
1261
1262   my $Artist_rs = $schema->resultset("Artist");
1263   
1264   ## Void Context Example 
1265   $Artist_rs->populate([
1266      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1267         { title => 'My First CD', year => 2006 },
1268         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1269       ],
1270      },
1271      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1272         { title => 'My parents sold me to a record company' ,year => 2005 },
1273         { title => 'Why Am I So Ugly?', year => 2006 },
1274         { title => 'I Got Surgery and am now Popular', year => 2007 }
1275       ],
1276      },
1277   ]);
1278   
1279   ## Array Context Example
1280   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1281     { name => "Artist One"},
1282     { name => "Artist Two"},
1283     { name => "Artist Three", cds=> [
1284     { title => "First CD", year => 2007},
1285     { title => "Second CD", year => 2008},
1286   ]}
1287   ]);
1288   
1289   print $ArtistOne->name; ## response is 'Artist One'
1290   print $ArtistThree->cds->count ## reponse is '2'
1291
1292 =cut
1293
1294 sub populate {
1295   my ($self, $data) = @_;
1296   
1297   if(defined wantarray) {
1298     my @created;
1299     foreach my $item (@$data) {
1300       push(@created, $self->create($item));
1301     }
1302     return @created;
1303   } else {
1304     my ($first, @rest) = @$data;
1305
1306     my @names = grep {!ref $first->{$_}} keys %$first;
1307     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1308     my @pks = $self->result_source->primary_columns;  
1309
1310     ## do the belongs_to relationships  
1311     foreach my $index (0..$#$data) {
1312       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1313         my @ret = $self->populate($data);
1314         return;
1315       }
1316     
1317       foreach my $rel (@rels) {
1318         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1319         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1320         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1321         my $related = $result->result_source->resolve_condition(
1322           $result->result_source->relationship_info($reverse)->{cond},
1323           $self,        
1324           $result,        
1325         );
1326
1327         delete $data->[$index]->{$rel};
1328         $data->[$index] = {%{$data->[$index]}, %$related};
1329       
1330         push @names, keys %$related if $index == 0;
1331       }
1332     }
1333
1334     ## do bulk insert on current row
1335     my @values = map { [ @$_{@names} ] } @$data;
1336
1337     $self->result_source->storage->insert_bulk(
1338       $self->result_source, 
1339       \@names, 
1340       \@values,
1341     );
1342
1343     ## do the has_many relationships
1344     foreach my $item (@$data) {
1345
1346       foreach my $rel (@rels) {
1347         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1348
1349         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1350      || $self->throw_exception('Cannot find the relating object.');
1351      
1352         my $child = $parent->$rel;
1353     
1354         my $related = $child->result_source->resolve_condition(
1355           $parent->result_source->relationship_info($rel)->{cond},
1356           $child,
1357           $parent,
1358         );
1359
1360         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1361         my @populate = map { {%$_, %$related} } @rows_to_add;
1362
1363         $child->populate( \@populate );
1364       }
1365     }
1366   }
1367 }
1368
1369 =head2 pager
1370
1371 =over 4
1372
1373 =item Arguments: none
1374
1375 =item Return Value: $pager
1376
1377 =back
1378
1379 Return Value a L<Data::Page> object for the current resultset. Only makes
1380 sense for queries with a C<page> attribute.
1381
1382 =cut
1383
1384 sub pager {
1385   my ($self) = @_;
1386   my $attrs = $self->{attrs};
1387   $self->throw_exception("Can't create pager for non-paged rs")
1388     unless $self->{attrs}{page};
1389   $attrs->{rows} ||= 10;
1390   return $self->{pager} ||= Data::Page->new(
1391     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1392 }
1393
1394 =head2 page
1395
1396 =over 4
1397
1398 =item Arguments: $page_number
1399
1400 =item Return Value: $rs
1401
1402 =back
1403
1404 Returns a resultset for the $page_number page of the resultset on which page
1405 is called, where each page contains a number of rows equal to the 'rows'
1406 attribute set on the resultset (10 by default).
1407
1408 =cut
1409
1410 sub page {
1411   my ($self, $page) = @_;
1412   return (ref $self)->new($self->_source_handle, { %{$self->{attrs}}, page => $page });
1413 }
1414
1415 =head2 new_result
1416
1417 =over 4
1418
1419 =item Arguments: \%vals
1420
1421 =item Return Value: $object
1422
1423 =back
1424
1425 Creates an object in the resultset's result class and returns it.
1426
1427 =cut
1428
1429 sub new_result {
1430   my ($self, $values) = @_;
1431   $self->throw_exception( "new_result needs a hash" )
1432     unless (ref $values eq 'HASH');
1433   $self->throw_exception(
1434     "Can't abstract implicit construct, condition not a hash"
1435   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1436
1437   my $alias = $self->{attrs}{alias};
1438   my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
1439   my %new = (
1440     %{ $self->_remove_alias($values, $alias) },
1441     %{ $self->_remove_alias($collapsed_cond, $alias) },
1442     -source_handle => $self->_source_handle,
1443     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1444   );
1445
1446   return $self->result_class->new(\%new);
1447 }
1448
1449 # _collapse_cond
1450 #
1451 # Recursively collapse the condition.
1452
1453 sub _collapse_cond {
1454   my ($self, $cond, $collapsed) = @_;
1455
1456   $collapsed ||= {};
1457
1458   if (ref $cond eq 'ARRAY') {
1459     foreach my $subcond (@$cond) {
1460       next unless ref $subcond;  # -or
1461 #      warn "ARRAY: " . Dumper $subcond;
1462       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1463     }
1464   }
1465   elsif (ref $cond eq 'HASH') {
1466     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1467       foreach my $subcond (@{$cond->{-and}}) {
1468 #        warn "HASH: " . Dumper $subcond;
1469         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1470       }
1471     }
1472     else {
1473 #      warn "LEAF: " . Dumper $cond;
1474       foreach my $col (keys %$cond) {
1475         my $value = $cond->{$col};
1476         $collapsed->{$col} = $value;
1477       }
1478     }
1479   }
1480
1481   return $collapsed;
1482 }
1483
1484 # _remove_alias
1485 #
1486 # Remove the specified alias from the specified query hash. A copy is made so
1487 # the original query is not modified.
1488
1489 sub _remove_alias {
1490   my ($self, $query, $alias) = @_;
1491
1492   my %orig = %{ $query || {} };
1493   my %unaliased;
1494
1495   foreach my $key (keys %orig) {
1496     if ($key !~ /\./) {
1497       $unaliased{$key} = $orig{$key};
1498       next;
1499     }
1500     $unaliased{$1} = $orig{$key}
1501       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1502   }
1503
1504   return \%unaliased;
1505 }
1506
1507 =head2 find_or_new
1508
1509 =over 4
1510
1511 =item Arguments: \%vals, \%attrs?
1512
1513 =item Return Value: $object
1514
1515 =back
1516
1517 Find an existing record from this resultset. If none exists, instantiate a new
1518 result object and return it. The object will not be saved into your storage
1519 until you call L<DBIx::Class::Row/insert> on it.
1520
1521 If you want objects to be saved immediately, use L</find_or_create> instead.
1522
1523 =cut
1524
1525 sub find_or_new {
1526   my $self     = shift;
1527   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1528   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1529   my $exists   = $self->find($hash, $attrs);
1530   return defined $exists ? $exists : $self->new_result($hash);
1531 }
1532
1533 =head2 create
1534
1535 =over 4
1536
1537 =item Arguments: \%vals
1538
1539 =item Return Value: $object
1540
1541 =back
1542
1543 Inserts a record into the resultset and returns the object representing it.
1544
1545 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1546
1547 =cut
1548
1549 sub create {
1550   my ($self, $attrs) = @_;
1551   $self->throw_exception( "create needs a hashref" )
1552     unless ref $attrs eq 'HASH';
1553   return $self->new_result($attrs)->insert;
1554 }
1555
1556 =head2 find_or_create
1557
1558 =over 4
1559
1560 =item Arguments: \%vals, \%attrs?
1561
1562 =item Return Value: $object
1563
1564 =back
1565
1566   $class->find_or_create({ key => $val, ... });
1567
1568 Tries to find a record based on its primary key or unique constraint; if none
1569 is found, creates one and returns that instead.
1570
1571   my $cd = $schema->resultset('CD')->find_or_create({
1572     cdid   => 5,
1573     artist => 'Massive Attack',
1574     title  => 'Mezzanine',
1575     year   => 2005,
1576   });
1577
1578 Also takes an optional C<key> attribute, to search by a specific key or unique
1579 constraint. For example:
1580
1581   my $cd = $schema->resultset('CD')->find_or_create(
1582     {
1583       artist => 'Massive Attack',
1584       title  => 'Mezzanine',
1585     },
1586     { key => 'cd_artist_title' }
1587   );
1588
1589 See also L</find> and L</update_or_create>. For information on how to declare
1590 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1591
1592 =cut
1593
1594 sub find_or_create {
1595   my $self     = shift;
1596   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1597   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1598   my $exists   = $self->find($hash, $attrs);
1599   return defined $exists ? $exists : $self->create($hash);
1600 }
1601
1602 =head2 update_or_create
1603
1604 =over 4
1605
1606 =item Arguments: \%col_values, { key => $unique_constraint }?
1607
1608 =item Return Value: $object
1609
1610 =back
1611
1612   $class->update_or_create({ col => $val, ... });
1613
1614 First, searches for an existing row matching one of the unique constraints
1615 (including the primary key) on the source of this resultset. If a row is
1616 found, updates it with the other given column values. Otherwise, creates a new
1617 row.
1618
1619 Takes an optional C<key> attribute to search on a specific unique constraint.
1620 For example:
1621
1622   # In your application
1623   my $cd = $schema->resultset('CD')->update_or_create(
1624     {
1625       artist => 'Massive Attack',
1626       title  => 'Mezzanine',
1627       year   => 1998,
1628     },
1629     { key => 'cd_artist_title' }
1630   );
1631
1632 If no C<key> is specified, it searches on all unique constraints defined on the
1633 source, including the primary key.
1634
1635 If the C<key> is specified as C<primary>, it searches only on the primary key.
1636
1637 See also L</find> and L</find_or_create>. For information on how to declare
1638 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1639
1640 =cut
1641
1642 sub update_or_create {
1643   my $self = shift;
1644   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1645   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1646
1647   my $row = $self->find($cond, $attrs);
1648   if (defined $row) {
1649     $row->update($cond);
1650     return $row;
1651   }
1652
1653   return $self->create($cond);
1654 }
1655
1656 =head2 get_cache
1657
1658 =over 4
1659
1660 =item Arguments: none
1661
1662 =item Return Value: \@cache_objects?
1663
1664 =back
1665
1666 Gets the contents of the cache for the resultset, if the cache is set.
1667
1668 =cut
1669
1670 sub get_cache {
1671   shift->{all_cache};
1672 }
1673
1674 =head2 set_cache
1675
1676 =over 4
1677
1678 =item Arguments: \@cache_objects
1679
1680 =item Return Value: \@cache_objects
1681
1682 =back
1683
1684 Sets the contents of the cache for the resultset. Expects an arrayref
1685 of objects of the same class as those produced by the resultset. Note that
1686 if the cache is set the resultset will return the cached objects rather
1687 than re-querying the database even if the cache attr is not set.
1688
1689 =cut
1690
1691 sub set_cache {
1692   my ( $self, $data ) = @_;
1693   $self->throw_exception("set_cache requires an arrayref")
1694       if defined($data) && (ref $data ne 'ARRAY');
1695   $self->{all_cache} = $data;
1696 }
1697
1698 =head2 clear_cache
1699
1700 =over 4
1701
1702 =item Arguments: none
1703
1704 =item Return Value: []
1705
1706 =back
1707
1708 Clears the cache for the resultset.
1709
1710 =cut
1711
1712 sub clear_cache {
1713   shift->set_cache(undef);
1714 }
1715
1716 =head2 related_resultset
1717
1718 =over 4
1719
1720 =item Arguments: $relationship_name
1721
1722 =item Return Value: $resultset
1723
1724 =back
1725
1726 Returns a related resultset for the supplied relationship name.
1727
1728   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1729
1730 =cut
1731
1732 sub related_resultset {
1733   my ($self, $rel) = @_;
1734
1735   $self->{related_resultsets} ||= {};
1736   return $self->{related_resultsets}{$rel} ||= do {
1737     my $rel_obj = $self->result_source->relationship_info($rel);
1738
1739     $self->throw_exception(
1740       "search_related: result source '" . $self->_source_handle->source_moniker .
1741         "' has no such relationship $rel")
1742       unless $rel_obj;
1743     
1744     my ($from,$seen) = $self->_resolve_from($rel);
1745
1746     my $join_count = $seen->{$rel};
1747     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
1748
1749     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
1750     my %attrs = %{$self->{attrs}||{}};
1751     delete $attrs{result_class};
1752
1753     my $new_cache;
1754
1755     if (my $cache = $self->get_cache) {
1756       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
1757         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
1758                         @$cache ];
1759       }
1760     }
1761
1762     my $new = $self->_source_handle
1763                    ->schema
1764                    ->resultset($rel_obj->{class})
1765                    ->search_rs(
1766                        undef, {
1767                          %attrs,
1768                          join => undef,
1769                          prefetch => undef,
1770                          select => undef,
1771                          as => undef,
1772                          alias => $alias,
1773                          where => $self->{cond},
1774                          seen_join => $seen,
1775                          from => $from,
1776                      });
1777     $new->set_cache($new_cache) if $new_cache;
1778     $new;
1779   };
1780 }
1781
1782 sub _resolve_from {
1783   my ($self, $extra_join) = @_;
1784   my $source = $self->result_source;
1785   my $attrs = $self->{attrs};
1786   
1787   my $from = $attrs->{from}
1788     || [ { $attrs->{alias} => $source->from } ];
1789     
1790   my $seen = { %{$attrs->{seen_join}||{}} };
1791
1792   my $join = ($attrs->{join}
1793                ? [ $attrs->{join}, $extra_join ]
1794                : $extra_join);
1795
1796   # we need to take the prefetch the attrs into account before we 
1797   # ->resolve_join as otherwise they get lost - captainL
1798   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
1799
1800   $from = [
1801     @$from,
1802     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
1803   ];
1804
1805   return ($from,$seen);
1806 }
1807
1808 sub _resolved_attrs {
1809   my $self = shift;
1810   return $self->{_attrs} if $self->{_attrs};
1811
1812   my $attrs = { %{$self->{attrs}||{}} };
1813   my $source = $self->result_source;
1814   my $alias = $attrs->{alias};
1815
1816   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
1817   if ($attrs->{columns}) {
1818     delete $attrs->{as};
1819   } elsif (!$attrs->{select}) {
1820     $attrs->{columns} = [ $source->columns ];
1821   }
1822  
1823   $attrs->{select} = 
1824     ($attrs->{select}
1825       ? (ref $attrs->{select} eq 'ARRAY'
1826           ? [ @{$attrs->{select}} ]
1827           : [ $attrs->{select} ])
1828       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
1829     );
1830   $attrs->{as} =
1831     ($attrs->{as}
1832       ? (ref $attrs->{as} eq 'ARRAY'
1833           ? [ @{$attrs->{as}} ]
1834           : [ $attrs->{as} ])
1835       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
1836     );
1837   
1838   my $adds;
1839   if ($adds = delete $attrs->{include_columns}) {
1840     $adds = [$adds] unless ref $adds eq 'ARRAY';
1841     push(@{$attrs->{select}}, @$adds);
1842     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
1843   }
1844   if ($adds = delete $attrs->{'+select'}) {
1845     $adds = [$adds] unless ref $adds eq 'ARRAY';
1846     push(@{$attrs->{select}},
1847            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
1848   }
1849   if (my $adds = delete $attrs->{'+as'}) {
1850     $adds = [$adds] unless ref $adds eq 'ARRAY';
1851     push(@{$attrs->{as}}, @$adds);
1852   }
1853
1854   $attrs->{from} ||= [ { 'me' => $source->from } ];
1855
1856   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
1857     my $join = delete $attrs->{join} || {};
1858
1859     if (defined $attrs->{prefetch}) {
1860       $join = $self->_merge_attr(
1861         $join, $attrs->{prefetch}
1862       );
1863       
1864     }
1865
1866     $attrs->{from} =   # have to copy here to avoid corrupting the original
1867       [
1868         @{$attrs->{from}}, 
1869         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
1870       ];
1871
1872   }
1873
1874   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
1875   if ($attrs->{order_by}) {
1876     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
1877                            ? [ @{$attrs->{order_by}} ]
1878                            : [ $attrs->{order_by} ]);
1879   } else {
1880     $attrs->{order_by} = [];    
1881   }
1882
1883   my $collapse = $attrs->{collapse} || {};
1884   if (my $prefetch = delete $attrs->{prefetch}) {
1885     $prefetch = $self->_merge_attr({}, $prefetch);
1886     my @pre_order;
1887     my $seen = $attrs->{seen_join} || {};
1888     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
1889       # bring joins back to level of current class
1890       my @prefetch = $source->resolve_prefetch(
1891         $p, $alias, $seen, \@pre_order, $collapse
1892       );
1893       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
1894       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
1895     }
1896     push(@{$attrs->{order_by}}, @pre_order);
1897   }
1898   $attrs->{collapse} = $collapse;
1899
1900   if ($attrs->{page}) {
1901     $attrs->{offset} ||= 0;
1902     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
1903   }
1904
1905   return $self->{_attrs} = $attrs;
1906 }
1907
1908 sub _rollout_attr {
1909   my ($self, $attr) = @_;
1910   
1911   if (ref $attr eq 'HASH') {
1912     return $self->_rollout_hash($attr);
1913   } elsif (ref $attr eq 'ARRAY') {
1914     return $self->_rollout_array($attr);
1915   } else {
1916     return [$attr];
1917   }
1918 }
1919
1920 sub _rollout_array {
1921   my ($self, $attr) = @_;
1922
1923   my @rolled_array;
1924   foreach my $element (@{$attr}) {
1925     if (ref $element eq 'HASH') {
1926       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
1927     } elsif (ref $element eq 'ARRAY') {
1928       #  XXX - should probably recurse here
1929       push( @rolled_array, @{$self->_rollout_array($element)} );
1930     } else {
1931       push( @rolled_array, $element );
1932     }
1933   }
1934   return \@rolled_array;
1935 }
1936
1937 sub _rollout_hash {
1938   my ($self, $attr) = @_;
1939
1940   my @rolled_array;
1941   foreach my $key (keys %{$attr}) {
1942     push( @rolled_array, { $key => $attr->{$key} } );
1943   }
1944   return \@rolled_array;
1945 }
1946
1947 sub _calculate_score {
1948   my ($self, $a, $b) = @_;
1949
1950   if (ref $b eq 'HASH') {
1951     my ($b_key) = keys %{$b};
1952     if (ref $a eq 'HASH') {
1953       my ($a_key) = keys %{$a};
1954       if ($a_key eq $b_key) {
1955         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
1956       } else {
1957         return 0;
1958       }
1959     } else {
1960       return ($a eq $b_key) ? 1 : 0;
1961     }       
1962   } else {
1963     if (ref $a eq 'HASH') {
1964       my ($a_key) = keys %{$a};
1965       return ($b eq $a_key) ? 1 : 0;
1966     } else {
1967       return ($b eq $a) ? 1 : 0;
1968     }
1969   }
1970 }
1971
1972 sub _merge_attr {
1973   my ($self, $a, $b) = @_;
1974
1975   return $b unless defined($a);
1976   return $a unless defined($b);
1977   
1978   $a = $self->_rollout_attr($a);
1979   $b = $self->_rollout_attr($b);
1980
1981   my $seen_keys;
1982   foreach my $b_element ( @{$b} ) {
1983     # find best candidate from $a to merge $b_element into
1984     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
1985     foreach my $a_element ( @{$a} ) {
1986       my $score = $self->_calculate_score( $a_element, $b_element );
1987       if ($score > $best_candidate->{score}) {
1988         $best_candidate->{position} = $position;
1989         $best_candidate->{score} = $score;
1990       }
1991       $position++;
1992     }
1993     my ($b_key) = ( ref $b_element eq 'HASH' ) ? keys %{$b_element} : ($b_element);
1994     if ($best_candidate->{score} == 0 || exists $seen_keys->{$b_key}) {
1995       push( @{$a}, $b_element );
1996     } else {
1997       $seen_keys->{$b_key} = 1; # don't merge the same key twice
1998       my $a_best = $a->[$best_candidate->{position}];
1999       # merge a_best and b_element together and replace original with merged
2000       if (ref $a_best ne 'HASH') {
2001         $a->[$best_candidate->{position}] = $b_element;
2002       } elsif (ref $b_element eq 'HASH') {
2003         my ($key) = keys %{$a_best};
2004         $a->[$best_candidate->{position}] = { $key => $self->_merge_attr($a_best->{$key}, $b_element->{$key}) };
2005       }
2006     }
2007   }
2008
2009   return $a;
2010 }
2011
2012 sub result_source {
2013     my $self = shift;
2014
2015     if (@_) {
2016         $self->_source_handle($_[0]->handle);
2017     } else {
2018         $self->_source_handle->resolve;
2019     }
2020 }
2021
2022 =head2 throw_exception
2023
2024 See L<DBIx::Class::Schema/throw_exception> for details.
2025
2026 =cut
2027
2028 sub throw_exception {
2029   my $self=shift;
2030   $self->_source_handle->schema->throw_exception(@_);
2031 }
2032
2033 # XXX: FIXME: Attributes docs need clearing up
2034
2035 =head1 ATTRIBUTES
2036
2037 The resultset takes various attributes that modify its behavior. Here's an
2038 overview of them:
2039
2040 =head2 order_by
2041
2042 =over 4
2043
2044 =item Value: ($order_by | \@order_by)
2045
2046 =back
2047
2048 Which column(s) to order the results by. This is currently passed
2049 through directly to SQL, so you can give e.g. C<year DESC> for a
2050 descending order on the column `year'.
2051
2052 Please note that if you have C<quote_char> enabled (see
2053 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
2054 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
2055 so you will need to manually quote things as appropriate.)
2056
2057 =head2 columns
2058
2059 =over 4
2060
2061 =item Value: \@columns
2062
2063 =back
2064
2065 Shortcut to request a particular set of columns to be retrieved.  Adds
2066 C<me.> onto the start of any column without a C<.> in it and sets C<select>
2067 from that, then auto-populates C<as> from C<select> as normal. (You may also
2068 use the C<cols> attribute, as in earlier versions of DBIC.)
2069
2070 =head2 include_columns
2071
2072 =over 4
2073
2074 =item Value: \@columns
2075
2076 =back
2077
2078 Shortcut to include additional columns in the returned results - for example
2079
2080   $schema->resultset('CD')->search(undef, {
2081     include_columns => ['artist.name'],
2082     join => ['artist']
2083   });
2084
2085 would return all CDs and include a 'name' column to the information
2086 passed to object inflation. Note that the 'artist' is the name of the
2087 column (or relationship) accessor, and 'name' is the name of the column
2088 accessor in the related table.
2089
2090 =head2 select
2091
2092 =over 4
2093
2094 =item Value: \@select_columns
2095
2096 =back
2097
2098 Indicates which columns should be selected from the storage. You can use
2099 column names, or in the case of RDBMS back ends, function or stored procedure
2100 names:
2101
2102   $rs = $schema->resultset('Employee')->search(undef, {
2103     select => [
2104       'name',
2105       { count => 'employeeid' },
2106       { sum => 'salary' }
2107     ]
2108   });
2109
2110 When you use function/stored procedure names and do not supply an C<as>
2111 attribute, the column names returned are storage-dependent. E.g. MySQL would
2112 return a column named C<count(employeeid)> in the above example.
2113
2114 =head2 +select
2115
2116 =over 4
2117
2118 Indicates additional columns to be selected from storage.  Works the same as
2119 L<select> but adds columns to the selection.
2120
2121 =back
2122
2123 =head2 +as
2124
2125 =over 4
2126
2127 Indicates additional column names for those added via L<+select>.
2128
2129 =back
2130
2131 =head2 as
2132
2133 =over 4
2134
2135 =item Value: \@inflation_names
2136
2137 =back
2138
2139 Indicates column names for object inflation. That is, c< as >
2140 indicates the name that the column can be accessed as via the
2141 C<get_column> method (or via the object accessor, B<if one already
2142 exists>).  It has nothing to do with the SQL code C< SELECT foo AS bar
2143 >.
2144
2145 The C< as > attribute is used in conjunction with C<select>,
2146 usually when C<select> contains one or more function or stored
2147 procedure names:
2148
2149   $rs = $schema->resultset('Employee')->search(undef, {
2150     select => [
2151       'name',
2152       { count => 'employeeid' }
2153     ],
2154     as => ['name', 'employee_count'],
2155   });
2156
2157   my $employee = $rs->first(); # get the first Employee
2158
2159 If the object against which the search is performed already has an accessor
2160 matching a column name specified in C<as>, the value can be retrieved using
2161 the accessor as normal:
2162
2163   my $name = $employee->name();
2164
2165 If on the other hand an accessor does not exist in the object, you need to
2166 use C<get_column> instead:
2167
2168   my $employee_count = $employee->get_column('employee_count');
2169
2170 You can create your own accessors if required - see
2171 L<DBIx::Class::Manual::Cookbook> for details.
2172
2173 Please note: This will NOT insert an C<AS employee_count> into the SQL
2174 statement produced, it is used for internal access only. Thus
2175 attempting to use the accessor in an C<order_by> clause or similar
2176 will fail miserably.
2177
2178 To get around this limitation, you can supply literal SQL to your
2179 C<select> attibute that contains the C<AS alias> text, eg:
2180
2181   select => [\'myfield AS alias']
2182
2183 =head2 join
2184
2185 =over 4
2186
2187 =item Value: ($rel_name | \@rel_names | \%rel_names)
2188
2189 =back
2190
2191 Contains a list of relationships that should be joined for this query.  For
2192 example:
2193
2194   # Get CDs by Nine Inch Nails
2195   my $rs = $schema->resultset('CD')->search(
2196     { 'artist.name' => 'Nine Inch Nails' },
2197     { join => 'artist' }
2198   );
2199
2200 Can also contain a hash reference to refer to the other relation's relations.
2201 For example:
2202
2203   package MyApp::Schema::Track;
2204   use base qw/DBIx::Class/;
2205   __PACKAGE__->table('track');
2206   __PACKAGE__->add_columns(qw/trackid cd position title/);
2207   __PACKAGE__->set_primary_key('trackid');
2208   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2209   1;
2210
2211   # In your application
2212   my $rs = $schema->resultset('Artist')->search(
2213     { 'track.title' => 'Teardrop' },
2214     {
2215       join     => { cd => 'track' },
2216       order_by => 'artist.name',
2217     }
2218   );
2219
2220 You need to use the relationship (not the table) name in  conditions, 
2221 because they are aliased as such. The current table is aliased as "me", so 
2222 you need to use me.column_name in order to avoid ambiguity. For example:
2223
2224   # Get CDs from 1984 with a 'Foo' track 
2225   my $rs = $schema->resultset('CD')->search(
2226     { 
2227       'me.year' => 1984,
2228       'tracks.name' => 'Foo'
2229     },
2230     { join => 'tracks' }
2231   );
2232   
2233 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2234 similarly for a third time). For e.g.
2235
2236   my $rs = $schema->resultset('Artist')->search({
2237     'cds.title'   => 'Down to Earth',
2238     'cds_2.title' => 'Popular',
2239   }, {
2240     join => [ qw/cds cds/ ],
2241   });
2242
2243 will return a set of all artists that have both a cd with title 'Down
2244 to Earth' and a cd with title 'Popular'.
2245
2246 If you want to fetch related objects from other tables as well, see C<prefetch>
2247 below.
2248
2249 =head2 prefetch
2250
2251 =over 4
2252
2253 =item Value: ($rel_name | \@rel_names | \%rel_names)
2254
2255 =back
2256
2257 Contains one or more relationships that should be fetched along with the main
2258 query (when they are accessed afterwards they will have already been
2259 "prefetched").  This is useful for when you know you will need the related
2260 objects, because it saves at least one query:
2261
2262   my $rs = $schema->resultset('Tag')->search(
2263     undef,
2264     {
2265       prefetch => {
2266         cd => 'artist'
2267       }
2268     }
2269   );
2270
2271 The initial search results in SQL like the following:
2272
2273   SELECT tag.*, cd.*, artist.* FROM tag
2274   JOIN cd ON tag.cd = cd.cdid
2275   JOIN artist ON cd.artist = artist.artistid
2276
2277 L<DBIx::Class> has no need to go back to the database when we access the
2278 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2279 case.
2280
2281 Simple prefetches will be joined automatically, so there is no need
2282 for a C<join> attribute in the above search. If you're prefetching to
2283 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
2284 specify the join as well.
2285
2286 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2287 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2288 with an accessor type of 'single' or 'filter').
2289
2290 =head2 page
2291
2292 =over 4
2293
2294 =item Value: $page
2295
2296 =back
2297
2298 Makes the resultset paged and specifies the page to retrieve. Effectively
2299 identical to creating a non-pages resultset and then calling ->page($page)
2300 on it.
2301
2302 If L<rows> attribute is not specified it defualts to 10 rows per page.
2303
2304 =head2 rows
2305
2306 =over 4
2307
2308 =item Value: $rows
2309
2310 =back
2311
2312 Specifes the maximum number of rows for direct retrieval or the number of
2313 rows per page if the page attribute or method is used.
2314
2315 =head2 offset
2316
2317 =over 4
2318
2319 =item Value: $offset
2320
2321 =back
2322
2323 Specifies the (zero-based) row number for the  first row to be returned, or the
2324 of the first row of the first page if paging is used.
2325
2326 =head2 group_by
2327
2328 =over 4
2329
2330 =item Value: \@columns
2331
2332 =back
2333
2334 A arrayref of columns to group by. Can include columns of joined tables.
2335
2336   group_by => [qw/ column1 column2 ... /]
2337
2338 =head2 having
2339
2340 =over 4
2341
2342 =item Value: $condition
2343
2344 =back
2345
2346 HAVING is a select statement attribute that is applied between GROUP BY and
2347 ORDER BY. It is applied to the after the grouping calculations have been
2348 done.
2349
2350   having => { 'count(employee)' => { '>=', 100 } }
2351
2352 =head2 distinct
2353
2354 =over 4
2355
2356 =item Value: (0 | 1)
2357
2358 =back
2359
2360 Set to 1 to group by all columns.
2361
2362 =head2 where
2363
2364 =over 4
2365
2366 Adds to the WHERE clause.
2367
2368   # only return rows WHERE deleted IS NULL for all searches
2369   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2370
2371 Can be overridden by passing C<{ where => undef }> as an attribute
2372 to a resulset.
2373
2374 =back
2375
2376 =head2 cache
2377
2378 Set to 1 to cache search results. This prevents extra SQL queries if you
2379 revisit rows in your ResultSet:
2380
2381   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2382
2383   while( my $artist = $resultset->next ) {
2384     ... do stuff ...
2385   }
2386
2387   $rs->first; # without cache, this would issue a query
2388
2389 By default, searches are not cached.
2390
2391 For more examples of using these attributes, see
2392 L<DBIx::Class::Manual::Cookbook>.
2393
2394 =head2 from
2395
2396 =over 4
2397
2398 =item Value: \@from_clause
2399
2400 =back
2401
2402 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2403 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2404 clauses.
2405
2406 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2407
2408 C<join> will usually do what you need and it is strongly recommended that you
2409 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2410 And we really do mean "cannot", not just tried and failed. Attempting to use
2411 this because you're having problems with C<join> is like trying to use x86
2412 ASM because you've got a syntax error in your C. Trust us on this.
2413
2414 Now, if you're still really, really sure you need to use this (and if you're
2415 not 100% sure, ask the mailing list first), here's an explanation of how this
2416 works.
2417
2418 The syntax is as follows -
2419
2420   [
2421     { <alias1> => <table1> },
2422     [
2423       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2424       [], # nested JOIN (optional)
2425       { <table1.column1> => <table2.column2>, ... (more conditions) },
2426     ],
2427     # More of the above [ ] may follow for additional joins
2428   ]
2429
2430   <table1> <alias1>
2431   JOIN
2432     <table2> <alias2>
2433     [JOIN ...]
2434   ON <table1.column1> = <table2.column2>
2435   <more joins may follow>
2436
2437 An easy way to follow the examples below is to remember the following:
2438
2439     Anything inside "[]" is a JOIN
2440     Anything inside "{}" is a condition for the enclosing JOIN
2441
2442 The following examples utilize a "person" table in a family tree application.
2443 In order to express parent->child relationships, this table is self-joined:
2444
2445     # Person->belongs_to('father' => 'Person');
2446     # Person->belongs_to('mother' => 'Person');
2447
2448 C<from> can be used to nest joins. Here we return all children with a father,
2449 then search against all mothers of those children:
2450
2451   $rs = $schema->resultset('Person')->search(
2452       undef,
2453       {
2454           alias => 'mother', # alias columns in accordance with "from"
2455           from => [
2456               { mother => 'person' },
2457               [
2458                   [
2459                       { child => 'person' },
2460                       [
2461                           { father => 'person' },
2462                           { 'father.person_id' => 'child.father_id' }
2463                       ]
2464                   ],
2465                   { 'mother.person_id' => 'child.mother_id' }
2466               ],
2467           ]
2468       },
2469   );
2470
2471   # Equivalent SQL:
2472   # SELECT mother.* FROM person mother
2473   # JOIN (
2474   #   person child
2475   #   JOIN person father
2476   #   ON ( father.person_id = child.father_id )
2477   # )
2478   # ON ( mother.person_id = child.mother_id )
2479
2480 The type of any join can be controlled manually. To search against only people
2481 with a father in the person table, we could explicitly use C<INNER JOIN>:
2482
2483     $rs = $schema->resultset('Person')->search(
2484         undef,
2485         {
2486             alias => 'child', # alias columns in accordance with "from"
2487             from => [
2488                 { child => 'person' },
2489                 [
2490                     { father => 'person', -join_type => 'inner' },
2491                     { 'father.id' => 'child.father_id' }
2492                 ],
2493             ]
2494         },
2495     );
2496
2497     # Equivalent SQL:
2498     # SELECT child.* FROM person child
2499     # INNER JOIN person father ON child.father_id = father.id
2500
2501 =cut
2502
2503 1;