fixed a bug with search on related resultset
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => \&count,
7         'bool'   => sub { 1; },
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use Data::Dumper;
13 use Scalar::Util qw/weaken/;
14
15 use DBIx::Class::ResultSetColumn;
16 use base qw/DBIx::Class/;
17 __PACKAGE__->load_components(qw/AccessorGroup/);
18 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search(registered => 1);
27   my @rows = $schema->resultset('CD')->search(year => 2005);
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 METHODS
56
57 =head2 new
58
59 =over 4
60
61 =item Arguments: $source, \%$attrs
62
63 =item Return Value: $rs
64
65 =back
66
67 The resultset constructor. Takes a source object (usually a
68 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
69 L</ATTRIBUTES> below).  Does not perform any queries -- these are
70 executed as needed by the other methods.
71
72 Generally you won't need to construct a resultset manually.  You'll
73 automatically get one from e.g. a L</search> called in scalar context:
74
75   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
76
77 IMPORTANT: If called on an object, proxies to new_result instead so
78
79   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
80
81 will return a CD object, not a ResultSet.
82
83 =cut
84
85 sub new {
86   my $class = shift;
87   return $class->new_result(@_) if ref $class;
88   
89   my ($source, $attrs) = @_;
90   weaken $source;
91
92   if ($attrs->{page}) {
93     $attrs->{rows} ||= 10;
94     $attrs->{offset} ||= 0;
95     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
96   }
97
98   $attrs->{alias} ||= 'me';
99
100   bless {
101     result_source => $source,
102     result_class => $attrs->{result_class} || $source->result_class,
103     cond => $attrs->{where},
104 #    from => $attrs->{from},
105 #    collapse => $collapse,
106     count => undef,
107     page => delete $attrs->{page},
108     pager => undef,
109     attrs => $attrs
110   }, $class;
111 }
112
113 =head2 search
114
115 =over 4
116
117 =item Arguments: $cond, \%attrs?
118
119 =item Return Value: $resultset (scalar context), @row_objs (list context)
120
121 =back
122
123   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
124   my $new_rs = $cd_rs->search({ year => 2005 });
125
126   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
127                  # year = 2005 OR year = 2004
128
129 If you need to pass in additional attributes but no additional condition,
130 call it as C<search(undef, \%attrs)>.
131
132   # "SELECT name, artistid FROM $artist_table"
133   my @all_artists = $schema->resultset('Artist')->search(undef, {
134     columns => [qw/name artistid/],
135   });
136
137 =cut
138
139 sub search {
140   my $self = shift;
141   my $rs = $self->search_rs( @_ );
142   return (wantarray ? $rs->all : $rs);
143 }
144
145 =head2 search_rs
146
147 =over 4
148
149 =item Arguments: $cond, \%attrs?
150
151 =item Return Value: $resultset
152
153 =back
154
155 This method does the same exact thing as search() except it will 
156 always return a resultset, even in list context.
157
158 =cut
159
160 sub search_rs {
161   my $self = shift;
162
163   my $our_attrs = { %{$self->{attrs}} };
164   my $having = delete $our_attrs->{having};
165   my $attrs = {};
166   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
167   
168   # merge new attrs into old
169   foreach my $key (qw/join prefetch/) {
170     next unless (exists $attrs->{$key});
171     if (exists $our_attrs->{$key}) {
172       $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
173     } else {
174       $our_attrs->{$key} = $attrs->{$key};
175     }
176     delete $attrs->{$key};
177   }
178
179   if (exists $our_attrs->{prefetch}) {
180       $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
181   }
182
183   my $new_attrs = { %{$our_attrs}, %{$attrs} };
184
185   # merge new where and having into old
186   my $where = (@_
187                 ? ((@_ == 1 || ref $_[0] eq "HASH")
188                     ? shift
189                     : ((@_ % 2)
190                         ? $self->throw_exception(
191                             "Odd number of arguments to search")
192                         : {@_}))
193                 : undef());
194   if (defined $where) {
195     $new_attrs->{where} = (defined $new_attrs->{where}
196               ? { '-and' =>
197                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
198                       $where, $new_attrs->{where} ] }
199               : $where);
200   }
201
202   if (defined $having) {
203     $new_attrs->{having} = (defined $new_attrs->{having}
204               ? { '-and' =>
205                   [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
206                       $having, $new_attrs->{having} ] }
207               : $having);
208   }
209
210   my $rs = (ref $self)->new($self->result_source, $new_attrs);
211   $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
212
213   unless (@_) { # no search, effectively just a clone
214     my $rows = $self->get_cache;
215     if ($rows) {
216       $rs->set_cache($rows);
217     }
218   }
219   
220   return $rs;
221 }
222
223 =head2 search_literal
224
225 =over 4
226
227 =item Arguments: $sql_fragment, @bind_values
228
229 =item Return Value: $resultset (scalar context), @row_objs (list context)
230
231 =back
232
233   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
234   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
235
236 Pass a literal chunk of SQL to be added to the conditional part of the
237 resultset query.
238
239 =cut
240
241 sub search_literal {
242   my ($self, $cond, @vals) = @_;
243   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
244   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
245   return $self->search(\$cond, $attrs);
246 }
247
248 =head2 find
249
250 =over 4
251
252 =item Arguments: @values | \%cols, \%attrs?
253
254 =item Return Value: $row_object
255
256 =back
257
258 Finds a row based on its primary key or unique constraint. For example, to find
259 a row by its primary key:
260
261   my $cd = $schema->resultset('CD')->find(5);
262
263 You can also find a row by a specific unique constraint using the C<key>
264 attribute. For example:
265
266   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'cd_artist_title' });
267
268 Additionally, you can specify the columns explicitly by name:
269
270   my $cd = $schema->resultset('CD')->find(
271     {
272       artist => 'Massive Attack',
273       title  => 'Mezzanine',
274     },
275     { key => 'cd_artist_title' }
276   );
277
278 If the C<key> is specified as C<primary>, it searches only on the primary key.
279
280 If no C<key> is specified, it searches on all unique constraints defined on the
281 source, including the primary key.
282
283 See also L</find_or_create> and L</update_or_create>. For information on how to
284 declare unique constraints, see
285 L<DBIx::Class::ResultSource/add_unique_constraint>.
286
287 =cut
288
289 sub find {
290   my $self = shift;
291   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
292
293   # Default to the primary key, but allow a specific key
294   my @cols = exists $attrs->{key}
295     ? $self->result_source->unique_constraint_columns($attrs->{key})
296     : $self->result_source->primary_columns;
297   $self->throw_exception(
298     "Can't find unless a primary key or unique constraint is defined"
299   ) unless @cols;
300
301   # Parse out a hashref from input
302   my $input_query;
303   if (ref $_[0] eq 'HASH') {
304     $input_query = { %{$_[0]} };
305   }
306   elsif (@_ == @cols) {
307     $input_query = {};
308     @{$input_query}{@cols} = @_;
309   }
310   else {
311     # Compatibility: Allow e.g. find(id => $value)
312     carp "Find by key => value deprecated; please use a hashref instead";
313     $input_query = {@_};
314   }
315
316   my @unique_queries = $self->_unique_queries($input_query, $attrs);
317
318   # Handle cases where the ResultSet defines the query, or where the user is
319   # abusing find
320   my $query = @unique_queries ? \@unique_queries : $input_query;
321
322   # Run the query
323   if (keys %$attrs) {
324     my $rs = $self->search($query, $attrs);
325     $rs->_resolve;
326     return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
327   }
328   else {
329     $self->_resolve;  
330     return (keys %{$self->{_attrs}->{collapse}})
331       ? $self->search($query)->next
332       : $self->single($query);
333   }
334 }
335
336 # _unique_queries
337 #
338 # Build a list of queries which satisfy unique constraints.
339
340 sub _unique_queries {
341   my ($self, $query, $attrs) = @_;
342
343   my @constraint_names = exists $attrs->{key}
344     ? ($attrs->{key})
345     : $self->result_source->unique_constraint_names;
346
347   my @unique_queries;
348   foreach my $name (@constraint_names) {
349     my @unique_cols = $self->result_source->unique_constraint_columns($name);
350     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
351
352     next unless scalar keys %$unique_query;
353
354     # Add the ResultSet's alias
355     foreach my $key (grep { ! m/\./ } keys %$unique_query) {
356       $unique_query->{"$self->{attrs}->{alias}.$key"} = delete $unique_query->{$key};
357     }
358
359     push @unique_queries, $unique_query;
360   }
361
362   return @unique_queries;
363 }
364
365 # _build_unique_query
366 #
367 # Constrain the specified query hash based on the specified column names.
368
369 sub _build_unique_query {
370   my ($self, $query, $unique_cols) = @_;
371
372   my %unique_query =
373     map  { $_ => $query->{$_} }
374     grep { exists $query->{$_} }
375     @$unique_cols;
376
377   return \%unique_query;
378 }
379
380 =head2 search_related
381
382 =over 4
383
384 =item Arguments: $cond, \%attrs?
385
386 =item Return Value: $new_resultset
387
388 =back
389
390   $new_rs = $cd_rs->search_related('artist', {
391     name => 'Emo-R-Us',
392   });
393
394 Searches the specified relationship, optionally specifying a condition and
395 attributes for matching records. See L</ATTRIBUTES> for more information.
396
397 =cut
398
399 sub search_related {
400   return shift->related_resultset(shift)->search(@_);
401 }
402
403 =head2 cursor
404
405 =over 4
406
407 =item Arguments: none
408
409 =item Return Value: $cursor
410
411 =back
412
413 Returns a storage-driven cursor to the given resultset. See
414 L<DBIx::Class::Cursor> for more information.
415
416 =cut
417
418 sub cursor {
419   my ($self) = @_;
420
421   $self->_resolve;
422   my $attrs = { %{$self->{_attrs}} };
423   return $self->{cursor}
424     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
425           $attrs->{where},$attrs);
426 }
427
428 =head2 single
429
430 =over 4
431
432 =item Arguments: $cond?
433
434 =item Return Value: $row_object?
435
436 =back
437
438   my $cd = $schema->resultset('CD')->single({ year => 2001 });
439
440 Inflates the first result without creating a cursor if the resultset has
441 any records in it; if not returns nothing. Used by L</find> as an optimisation.
442
443 Can optionally take an additional condition *only* - this is a fast-code-path
444 method; if you need to add extra joins or similar call ->search and then
445 ->single without a condition on the $rs returned from that.
446
447 =cut
448
449 sub single {
450   my ($self, $where) = @_;
451   $self->_resolve;
452   my $attrs = { %{$self->{_attrs}} };
453   if ($where) {
454     if (defined $attrs->{where}) {
455       $attrs->{where} = {
456         '-and' =>
457             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
458                $where, delete $attrs->{where} ]
459       };
460     } else {
461       $attrs->{where} = $where;
462     }
463   }
464
465   unless ($self->_is_unique_query($attrs->{where})) {
466     carp "Query not guarnteed to return a single row"
467       . "; please declare your unique constraints or use search instead";
468   }
469
470   my @data = $self->result_source->storage->select_single(
471           $attrs->{from}, $attrs->{select},
472           $attrs->{where},$attrs);
473   return (@data ? $self->_construct_object(@data) : ());
474 }
475
476 # _is_unique_query
477 #
478 # Try to determine if the specified query is guaranteed to be unique, based on
479 # the declared unique constraints.
480
481 sub _is_unique_query {
482   my ($self, $query) = @_;
483
484   my $collapsed = $self->_collapse_query($query);
485
486   foreach my $name ($self->result_source->unique_constraint_names) {
487     my @unique_cols = map { "$self->{attrs}->{alias}.$_" }
488       $self->result_source->unique_constraint_columns($name);
489
490     # Count the values for each unique column
491     my %seen = map { $_ => 0 } @unique_cols;
492
493     foreach my $key (keys %$collapsed) {
494       my $aliased = $key;
495       $aliased = "$self->{attrs}->{alias}.$key" unless $key =~ /\./;
496
497       next unless exists $seen{$aliased};  # Additional constraints are okay
498       $seen{$aliased} = scalar @{ $collapsed->{$key} };
499     }
500
501     # If we get 0 or more than 1 value for a column, it's not necessarily unique
502     return 1 unless grep { $_ != 1 } values %seen;
503   }
504
505   return 0;
506 }
507
508 # _collapse_query
509 #
510 # Recursively collapse the query, accumulating values for each column.
511
512 sub _collapse_query {
513   my ($self, $query, $collapsed) = @_;
514
515   $collapsed ||= {};
516
517   if (ref $query eq 'ARRAY') {
518     foreach my $subquery (@$query) {
519       next unless ref $subquery;  # -or
520 #      warn "ARRAY: " . Dumper $subquery;
521       $collapsed = $self->_collapse_query($subquery, $collapsed);
522     }
523   }
524   elsif (ref $query eq 'HASH') {
525     if (keys %$query and (keys %$query)[0] eq '-and') {
526       foreach my $subquery (@{$query->{-and}}) {
527 #        warn "HASH: " . Dumper $subquery;
528         $collapsed = $self->_collapse_query($subquery, $collapsed);
529       }
530     }
531     else {
532 #      warn "LEAF: " . Dumper $query;
533       foreach my $key (keys %$query) {
534         push @{$collapsed->{$key}}, $query->{$key};
535       }
536     }
537   }
538
539   return $collapsed;
540 }
541
542 =head2 get_column
543
544 =over 4
545
546 =item Arguments: $cond?
547
548 =item Return Value: $resultsetcolumn
549
550 =back
551
552   my $max_length = $rs->get_column('length')->max;
553
554 Returns a ResultSetColumn instance for $column based on $self
555
556 =cut
557
558 sub get_column {
559   my ($self, $column) = @_;
560
561   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
562   return $new;
563 }
564
565 =head2 search_like
566
567 =over 4
568
569 =item Arguments: $cond, \%attrs?
570
571 =item Return Value: $resultset (scalar context), @row_objs (list context)
572
573 =back
574
575   # WHERE title LIKE '%blue%'
576   $cd_rs = $rs->search_like({ title => '%blue%'});
577
578 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
579 that this is simply a convenience method. You most likely want to use
580 L</search> with specific operators.
581
582 For more information, see L<DBIx::Class::Manual::Cookbook>.
583
584 =cut
585
586 sub search_like {
587   my $class = shift;
588   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
589   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
590   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
591   return $class->search($query, { %$attrs });
592 }
593
594 =head2 slice
595
596 =over 4
597
598 =item Arguments: $first, $last
599
600 =item Return Value: $resultset (scalar context), @row_objs (list context)
601
602 =back
603
604 Returns a resultset or object list representing a subset of elements from the
605 resultset slice is called on. Indexes are from 0, i.e., to get the first
606 three records, call:
607
608   my ($one, $two, $three) = $rs->slice(0, 2);
609
610 =cut
611
612 sub slice {
613   my ($self, $min, $max) = @_;
614   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
615   $attrs->{offset} = $self->{attrs}{offset} || 0;
616   $attrs->{offset} += $min;
617   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
618   return $self->search(undef(), $attrs);
619   #my $slice = (ref $self)->new($self->result_source, $attrs);
620   #return (wantarray ? $slice->all : $slice);
621 }
622
623 =head2 next
624
625 =over 4
626
627 =item Arguments: none
628
629 =item Return Value: $result?
630
631 =back
632
633 Returns the next element in the resultset (C<undef> is there is none).
634
635 Can be used to efficiently iterate over records in the resultset:
636
637   my $rs = $schema->resultset('CD')->search;
638   while (my $cd = $rs->next) {
639     print $cd->title;
640   }
641
642 Note that you need to store the resultset object, and call C<next> on it. 
643 Calling C<< resultset('Table')->next >> repeatedly will always return the
644 first record from the resultset.
645
646 =cut
647
648 sub next {
649   my ($self) = @_;
650   if (my $cache = $self->get_cache) {
651     $self->{all_cache_position} ||= 0;
652     return $cache->[$self->{all_cache_position}++];
653   }
654   if ($self->{attrs}{cache}) {
655     $self->{all_cache_position} = 1;
656     return ($self->all)[0];
657   }
658   my @row = (exists $self->{stashed_row} ?
659                @{delete $self->{stashed_row}} :
660                $self->cursor->next
661   );
662   return unless (@row);
663   return $self->_construct_object(@row);
664 }
665
666 sub _resolve {
667   my $self = shift;
668
669   return if(exists $self->{_attrs}); #return if _resolve has already been called
670
671   my $attrs = $self->{attrs};  
672   my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
673
674   # XXX - lose storable dclone
675   my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
676   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
677   $attrs->{record_filter} = $record_filter if ($record_filter);
678   $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
679
680   my $alias = $attrs->{alias};
681  
682   $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
683   delete $attrs->{as} if $attrs->{columns};
684   $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
685   my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
686   $attrs->{select} = [
687                       map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
688                       ] if $attrs->{columns};
689   $attrs->{as} ||= [
690                     map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
691                     ];
692   if (my $include = delete $attrs->{include_columns}) {
693       push(@{$attrs->{select}}, @$include);
694       push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
695   }
696
697   $attrs->{from} ||= [ { $alias => $source->from } ];
698   $attrs->{seen_join} ||= {};
699   my %seen;
700   if (my $join = delete $attrs->{join}) {
701                 foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
702                         if (ref $j eq 'HASH') {
703               $seen{$_} = 1 foreach keys %$j;
704                         } else {
705               $seen{$j} = 1;
706                         }
707                 }
708
709                 push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
710   }
711   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
712   $attrs->{order_by} = [ $attrs->{order_by} ] if
713       $attrs->{order_by} and !ref($attrs->{order_by});
714   $attrs->{order_by} ||= [];
715
716  if(my $seladds = delete($attrs->{'+select'})) {
717    my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
718    $attrs->{select} = [
719      @{ $attrs->{select} },
720      map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
721    ];
722  }
723  if(my $asadds = delete($attrs->{'+as'})) {
724    my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
725    $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
726  }
727   
728   my $collapse = $attrs->{collapse} || {};
729   if (my $prefetch = delete $attrs->{prefetch}) {
730       my @pre_order;
731       foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
732           if ( ref $p eq 'HASH' ) {
733               foreach my $key (keys %$p) {
734                   push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
735                       unless $seen{$key};
736               }
737           } else {
738               push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
739                   unless $seen{$p};
740           }
741           my @prefetch = $source->resolve_prefetch(
742                                                    $p, $attrs->{alias}, {}, \@pre_order, $collapse);
743           push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
744           push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
745       }
746       push(@{$attrs->{order_by}}, @pre_order);
747   }
748   $attrs->{collapse} = $collapse;
749   $self->{_attrs} = $attrs;
750 }
751
752 sub _merge_attr {
753   my ($self, $a, $b, $is_prefetch) = @_;
754     
755   return $b unless $a;
756   if (ref $b eq 'HASH' && ref $a eq 'HASH') {
757                 foreach my $key (keys %{$b}) {
758                         if (exists $a->{$key}) {
759               $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
760                         } else {
761               $a->{$key} = delete $b->{$key};
762                         }
763                 }
764                 return $a;
765   } else {
766                 $a = [$a] unless (ref $a eq 'ARRAY');
767                 $b = [$b] unless (ref $b eq 'ARRAY');
768
769                 my $hash = {};
770                 my $array = [];      
771                 foreach ($a, $b) {
772                         foreach my $element (@{$_}) {
773               if (ref $element eq 'HASH') {
774                                         $hash = $self->_merge_attr($hash, $element, $is_prefetch);
775               } elsif (ref $element eq 'ARRAY') {
776                                         $array = [@{$array}, @{$element}];
777               } else {  
778                                         if (($b == $_) && $is_prefetch) {
779                                                 $self->_merge_array($array, $element, $is_prefetch);
780                                         } else {
781                                                 push(@{$array}, $element);
782                                         }
783               }
784                         }
785                 }
786
787                 if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
788                         return [$hash, @{$array}];
789                 } else {        
790                         return (keys %{$hash}) ? $hash : $array;
791                 }
792   }
793 }
794
795 sub _merge_array {
796         my ($self, $a, $b) = @_;
797  
798         $b = [$b] unless (ref $b eq 'ARRAY');
799         # add elements from @{$b} to @{$a} which aren't already in @{$a}
800         foreach my $b_element (@{$b}) {
801                 push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
802         }
803 }
804
805 sub _construct_object {
806   my ($self, @row) = @_;
807   my @as = @{ $self->{_attrs}{as} };
808
809   my $info = $self->_collapse_result(\@as, \@row);
810   my $new = $self->result_class->inflate_result($self->result_source, @$info);
811   $new = $self->{_attrs}{record_filter}->($new)
812     if exists $self->{_attrs}{record_filter};
813   return $new;
814 }
815
816 sub _collapse_result {
817   my ($self, $as, $row, $prefix) = @_;
818
819   my $live_join = $self->{attrs}->{_live_join} ||="";
820   my %const;
821
822   my @copy = @$row;
823   foreach my $this_as (@$as) {
824     my $val = shift @copy;
825     if (defined $prefix) {
826       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
827         my $remain = $1;
828         $remain =~ /^(?:(.*)\.)?([^.]+)$/;
829         $const{$1||''}{$2} = $val;
830       }
831     } else {
832       $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
833       $const{$1||''}{$2} = $val;
834     }
835   }
836
837   my $info = [ {}, {} ];
838   foreach my $key (keys %const) {
839     if (length $key && $key ne $live_join) {
840       my $target = $info;
841       my @parts = split(/\./, $key);
842       foreach my $p (@parts) {
843         $target = $target->[1]->{$p} ||= [];
844       }
845       $target->[0] = $const{$key};
846     } else {
847       $info->[0] = $const{$key};
848     }
849   }
850
851   my @collapse;
852   if (defined $prefix) {
853     @collapse = map {
854         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
855     } keys %{$self->{_attrs}->{collapse}}
856   } else {
857     @collapse = keys %{$self->{_attrs}->{collapse}};
858   };
859
860   if (@collapse) {
861     my ($c) = sort { length $a <=> length $b } @collapse;
862     my $target = $info;
863     foreach my $p (split(/\./, $c)) {
864       $target = $target->[1]->{$p} ||= [];
865     }
866     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
867     my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
868     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
869     my $tree = $self->_collapse_result($as, $row, $c_prefix);
870     my (@final, @raw);
871     while ( !(grep {
872                 !defined($tree->[0]->{$_}) ||
873                 $co_check{$_} ne $tree->[0]->{$_}
874               } @co_key) ) {
875       push(@final, $tree);
876       last unless (@raw = $self->cursor->next);
877       $row = $self->{stashed_row} = \@raw;
878       $tree = $self->_collapse_result($as, $row, $c_prefix);
879     }
880     @$target = (@final ? @final : [ {}, {} ]); 
881       # single empty result to indicate an empty prefetched has_many
882   }
883   return $info;
884 }
885
886 =head2 result_source
887
888 =over 4
889
890 =item Arguments: $result_source?
891
892 =item Return Value: $result_source
893
894 =back
895
896 An accessor for the primary ResultSource object from which this ResultSet
897 is derived.
898
899 =cut
900
901
902 =head2 count
903
904 =over 4
905
906 =item Arguments: $cond, \%attrs??
907
908 =item Return Value: $count
909
910 =back
911
912 Performs an SQL C<COUNT> with the same query as the resultset was built
913 with to find the number of elements. If passed arguments, does a search
914 on the resultset and counts the results of that.
915
916 Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
917 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
918 not support C<DISTINCT> with multiple columns. If you are using such a
919 database, you should only use columns from the main table in your C<group_by>
920 clause.
921
922 =cut
923
924 sub count {
925   my $self = shift;
926   return $self->search(@_)->count if @_ and defined $_[0];
927   return scalar @{ $self->get_cache } if $self->get_cache;
928   my $count = $self->_count;
929   return 0 unless $count;
930
931   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
932   $count = $self->{attrs}{rows} if
933     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
934   return $count;
935 }
936
937 sub _count { # Separated out so pager can get the full count
938   my $self = shift;
939   my $select = { count => '*' };
940   
941   $self->_resolve;
942   my $attrs = { %{ $self->{_attrs} } };
943   if (my $group_by = delete $attrs->{group_by}) {
944     delete $attrs->{having};
945     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
946     # todo: try CONCAT for multi-column pk
947     my @pk = $self->result_source->primary_columns;
948     if (@pk == 1) {
949       foreach my $column (@distinct) {
950         if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
951           @distinct = ($column);
952           last;
953         }
954       }
955     }
956
957     $select = { count => { distinct => \@distinct } };
958   }
959
960   $attrs->{select} = $select;
961   $attrs->{as} = [qw/count/];
962
963   # offset, order by and page are not needed to count. record_filter is cdbi
964   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
965         my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
966         $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
967
968   my ($count) = $tmp_rs->cursor->next;
969   return $count;
970 }
971
972 =head2 count_literal
973
974 =over 4
975
976 =item Arguments: $sql_fragment, @bind_values
977
978 =item Return Value: $count
979
980 =back
981
982 Counts the results in a literal query. Equivalent to calling L</search_literal>
983 with the passed arguments, then L</count>.
984
985 =cut
986
987 sub count_literal { shift->search_literal(@_)->count; }
988
989 =head2 all
990
991 =over 4
992
993 =item Arguments: none
994
995 =item Return Value: @objects
996
997 =back
998
999 Returns all elements in the resultset. Called implicitly if the resultset
1000 is returned in list context.
1001
1002 =cut
1003
1004 sub all {
1005   my ($self) = @_;
1006   return @{ $self->get_cache } if $self->get_cache;
1007
1008   my @obj;
1009
1010   # TODO: don't call resolve here
1011   $self->_resolve;
1012   if (keys %{$self->{_attrs}->{collapse}}) {
1013 #  if ($self->{attrs}->{prefetch}) {
1014       # Using $self->cursor->all is really just an optimisation.
1015       # If we're collapsing has_many prefetches it probably makes
1016       # very little difference, and this is cleaner than hacking
1017       # _construct_object to survive the approach
1018     my @row = $self->cursor->next;
1019     while (@row) {
1020       push(@obj, $self->_construct_object(@row));
1021       @row = (exists $self->{stashed_row}
1022                ? @{delete $self->{stashed_row}}
1023                : $self->cursor->next);
1024     }
1025   } else {
1026     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1027   }
1028
1029   $self->set_cache(\@obj) if $self->{attrs}{cache};
1030   return @obj;
1031 }
1032
1033 =head2 reset
1034
1035 =over 4
1036
1037 =item Arguments: none
1038
1039 =item Return Value: $self
1040
1041 =back
1042
1043 Resets the resultset's cursor, so you can iterate through the elements again.
1044
1045 =cut
1046
1047 sub reset {
1048   my ($self) = @_;
1049   delete $self->{_attrs} if (exists $self->{_attrs});
1050
1051   $self->{all_cache_position} = 0;
1052   $self->cursor->reset;
1053   return $self;
1054 }
1055
1056 =head2 first
1057
1058 =over 4
1059
1060 =item Arguments: none
1061
1062 =item Return Value: $object?
1063
1064 =back
1065
1066 Resets the resultset and returns an object for the first result (if the
1067 resultset returns anything).
1068
1069 =cut
1070
1071 sub first {
1072   return $_[0]->reset->next;
1073 }
1074
1075 # _cond_for_update_delete
1076 #
1077 # update/delete require the condition to be modified to handle
1078 # the differing SQL syntax available.  This transforms the $self->{cond}
1079 # appropriately, returning the new condition.
1080
1081 sub _cond_for_update_delete {
1082   my ($self) = @_;
1083   my $cond = {};
1084
1085   if (!ref($self->{cond})) {
1086     # No-op. No condition, we're updating/deleting everything
1087   }
1088   elsif (ref $self->{cond} eq 'ARRAY') {
1089     $cond = [
1090       map {
1091         my %hash;
1092         foreach my $key (keys %{$_}) {
1093           $key =~ /([^.]+)$/;
1094           $hash{$1} = $_->{$key};
1095         }
1096         \%hash;
1097       } @{$self->{cond}}
1098     ];
1099   }
1100   elsif (ref $self->{cond} eq 'HASH') {
1101     if ((keys %{$self->{cond}})[0] eq '-and') {
1102       $cond->{-and} = [];
1103
1104       my @cond = @{$self->{cond}{-and}};
1105       for (my $i = 0; $i <= @cond - 1; $i++) {
1106         my $entry = $cond[$i];
1107
1108         my %hash;
1109         if (ref $entry eq 'HASH') {
1110           foreach my $key (keys %{$entry}) {
1111             $key =~ /([^.]+)$/;
1112             $hash{$1} = $entry->{$key};
1113           }
1114         }
1115         else {
1116           $entry =~ /([^.]+)$/;
1117           $hash{$1} = $cond[++$i];
1118         }
1119
1120         push @{$cond->{-and}}, \%hash;
1121       }
1122     }
1123     else {
1124       foreach my $key (keys %{$self->{cond}}) {
1125         $key =~ /([^.]+)$/;
1126         $cond->{$1} = $self->{cond}{$key};
1127       }
1128     }
1129   }
1130   else {
1131     $self->throw_exception(
1132       "Can't update/delete on resultset with condition unless hash or array"
1133     );
1134   }
1135
1136   return $cond;
1137 }
1138
1139
1140 =head2 update
1141
1142 =over 4
1143
1144 =item Arguments: \%values
1145
1146 =item Return Value: $storage_rv
1147
1148 =back
1149
1150 Sets the specified columns in the resultset to the supplied values in a
1151 single query. Return value will be true if the update succeeded or false
1152 if no records were updated; exact type of success value is storage-dependent.
1153
1154 =cut
1155
1156 sub update {
1157   my ($self, $values) = @_;
1158   $self->throw_exception("Values for update must be a hash")
1159     unless ref $values eq 'HASH';
1160
1161   my $cond = $self->_cond_for_update_delete;
1162
1163   return $self->result_source->storage->update(
1164     $self->result_source->from, $values, $cond
1165   );
1166 }
1167
1168 =head2 update_all
1169
1170 =over 4
1171
1172 =item Arguments: \%values
1173
1174 =item Return Value: 1
1175
1176 =back
1177
1178 Fetches all objects and updates them one at a time. Note that C<update_all>
1179 will run DBIC cascade triggers, while L</update> will not.
1180
1181 =cut
1182
1183 sub update_all {
1184   my ($self, $values) = @_;
1185   $self->throw_exception("Values for update must be a hash")
1186     unless ref $values eq 'HASH';
1187   foreach my $obj ($self->all) {
1188     $obj->set_columns($values)->update;
1189   }
1190   return 1;
1191 }
1192
1193 =head2 delete
1194
1195 =over 4
1196
1197 =item Arguments: none
1198
1199 =item Return Value: 1
1200
1201 =back
1202
1203 Deletes the contents of the resultset from its result source. Note that this
1204 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1205 to run.
1206
1207 =cut
1208
1209 sub delete {
1210   my ($self) = @_;
1211   my $del = {};
1212
1213   my $cond = $self->_cond_for_update_delete;
1214
1215   $self->result_source->storage->delete($self->result_source->from, $cond);
1216   return 1;
1217 }
1218
1219 =head2 delete_all
1220
1221 =over 4
1222
1223 =item Arguments: none
1224
1225 =item Return Value: 1
1226
1227 =back
1228
1229 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1230 will run DBIC cascade triggers, while L</delete> will not.
1231
1232 =cut
1233
1234 sub delete_all {
1235   my ($self) = @_;
1236   $_->delete for $self->all;
1237   return 1;
1238 }
1239
1240 =head2 pager
1241
1242 =over 4
1243
1244 =item Arguments: none
1245
1246 =item Return Value: $pager
1247
1248 =back
1249
1250 Return Value a L<Data::Page> object for the current resultset. Only makes
1251 sense for queries with a C<page> attribute.
1252
1253 =cut
1254
1255 sub pager {
1256   my ($self) = @_;
1257   my $attrs = $self->{attrs};
1258   $self->throw_exception("Can't create pager for non-paged rs")
1259     unless $self->{page};
1260   $attrs->{rows} ||= 10;
1261   return $self->{pager} ||= Data::Page->new(
1262     $self->_count, $attrs->{rows}, $self->{page});
1263 }
1264
1265 =head2 page
1266
1267 =over 4
1268
1269 =item Arguments: $page_number
1270
1271 =item Return Value: $rs
1272
1273 =back
1274
1275 Returns a resultset for the $page_number page of the resultset on which page
1276 is called, where each page contains a number of rows equal to the 'rows'
1277 attribute set on the resultset (10 by default).
1278
1279 =cut
1280
1281 sub page {
1282   my ($self, $page) = @_;
1283   my $attrs = { %{$self->{attrs}} };
1284   $attrs->{page} = $page;
1285   return (ref $self)->new($self->result_source, $attrs);
1286 }
1287
1288 =head2 new_result
1289
1290 =over 4
1291
1292 =item Arguments: \%vals
1293
1294 =item Return Value: $object
1295
1296 =back
1297
1298 Creates an object in the resultset's result class and returns it.
1299
1300 =cut
1301
1302 sub new_result {
1303   my ($self, $values) = @_;
1304   $self->throw_exception( "new_result needs a hash" )
1305     unless (ref $values eq 'HASH');
1306   $self->throw_exception(
1307     "Can't abstract implicit construct, condition not a hash"
1308   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1309   my %new = %$values;
1310   my $alias = $self->{attrs}{alias};
1311   foreach my $key (keys %{$self->{cond}||{}}) {
1312     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
1313   }
1314   my $obj = $self->result_class->new(\%new);
1315   $obj->result_source($self->result_source) if $obj->can('result_source');
1316   return $obj;
1317 }
1318
1319 =head2 find_or_new
1320
1321 =over 4
1322
1323 =item Arguments: \%vals, \%attrs?
1324
1325 =item Return Value: $object
1326
1327 =back
1328
1329 Find an existing record from this resultset. If none exists, instantiate a new
1330 result object and return it. The object will not be saved into your storage
1331 until you call L<DBIx::Class::Row/insert> on it.
1332
1333 If you want objects to be saved immediately, use L</find_or_create> instead.
1334
1335 =cut
1336
1337 sub find_or_new {
1338   my $self     = shift;
1339   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1340   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1341   my $exists   = $self->find($hash, $attrs);
1342   return defined $exists ? $exists : $self->new_result($hash);
1343 }
1344
1345 =head2 create
1346
1347 =over 4
1348
1349 =item Arguments: \%vals
1350
1351 =item Return Value: $object
1352
1353 =back
1354
1355 Inserts a record into the resultset and returns the object representing it.
1356
1357 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1358
1359 =cut
1360
1361 sub create {
1362   my ($self, $attrs) = @_;
1363   $self->throw_exception( "create needs a hashref" )
1364     unless ref $attrs eq 'HASH';
1365   return $self->new_result($attrs)->insert;
1366 }
1367
1368 =head2 find_or_create
1369
1370 =over 4
1371
1372 =item Arguments: \%vals, \%attrs?
1373
1374 =item Return Value: $object
1375
1376 =back
1377
1378   $class->find_or_create({ key => $val, ... });
1379
1380 Tries to find a record based on its primary key or unique constraint; if none
1381 is found, creates one and returns that instead.
1382
1383   my $cd = $schema->resultset('CD')->find_or_create({
1384     cdid   => 5,
1385     artist => 'Massive Attack',
1386     title  => 'Mezzanine',
1387     year   => 2005,
1388   });
1389
1390 Also takes an optional C<key> attribute, to search by a specific key or unique
1391 constraint. For example:
1392
1393   my $cd = $schema->resultset('CD')->find_or_create(
1394     {
1395       artist => 'Massive Attack',
1396       title  => 'Mezzanine',
1397     },
1398     { key => 'cd_artist_title' }
1399   );
1400
1401 See also L</find> and L</update_or_create>. For information on how to declare
1402 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1403
1404 =cut
1405
1406 sub find_or_create {
1407   my $self     = shift;
1408   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1409   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1410   my $exists   = $self->find($hash, $attrs);
1411   return defined $exists ? $exists : $self->create($hash);
1412 }
1413
1414 =head2 update_or_create
1415
1416 =over 4
1417
1418 =item Arguments: \%col_values, { key => $unique_constraint }?
1419
1420 =item Return Value: $object
1421
1422 =back
1423
1424   $class->update_or_create({ col => $val, ... });
1425
1426 First, searches for an existing row matching one of the unique constraints
1427 (including the primary key) on the source of this resultset. If a row is
1428 found, updates it with the other given column values. Otherwise, creates a new
1429 row.
1430
1431 Takes an optional C<key> attribute to search on a specific unique constraint.
1432 For example:
1433
1434   # In your application
1435   my $cd = $schema->resultset('CD')->update_or_create(
1436     {
1437       artist => 'Massive Attack',
1438       title  => 'Mezzanine',
1439       year   => 1998,
1440     },
1441     { key => 'cd_artist_title' }
1442   );
1443
1444 If no C<key> is specified, it searches on all unique constraints defined on the
1445 source, including the primary key.
1446
1447 If the C<key> is specified as C<primary>, it searches only on the primary key.
1448
1449 See also L</find> and L</find_or_create>. For information on how to declare
1450 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1451
1452 =cut
1453
1454 sub update_or_create {
1455   my $self = shift;
1456   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1457   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1458
1459   my $row = $self->find($cond);
1460   if (defined $row) {
1461     $row->update($cond);
1462     return $row;
1463   }
1464
1465   return $self->create($cond);
1466 }
1467
1468 =head2 get_cache
1469
1470 =over 4
1471
1472 =item Arguments: none
1473
1474 =item Return Value: \@cache_objects?
1475
1476 =back
1477
1478 Gets the contents of the cache for the resultset, if the cache is set.
1479
1480 =cut
1481
1482 sub get_cache {
1483   shift->{all_cache};
1484 }
1485
1486 =head2 set_cache
1487
1488 =over 4
1489
1490 =item Arguments: \@cache_objects
1491
1492 =item Return Value: \@cache_objects
1493
1494 =back
1495
1496 Sets the contents of the cache for the resultset. Expects an arrayref
1497 of objects of the same class as those produced by the resultset. Note that
1498 if the cache is set the resultset will return the cached objects rather
1499 than re-querying the database even if the cache attr is not set.
1500
1501 =cut
1502
1503 sub set_cache {
1504   my ( $self, $data ) = @_;
1505   $self->throw_exception("set_cache requires an arrayref")
1506       if defined($data) && (ref $data ne 'ARRAY');
1507   $self->{all_cache} = $data;
1508 }
1509
1510 =head2 clear_cache
1511
1512 =over 4
1513
1514 =item Arguments: none
1515
1516 =item Return Value: []
1517
1518 =back
1519
1520 Clears the cache for the resultset.
1521
1522 =cut
1523
1524 sub clear_cache {
1525   shift->set_cache(undef);
1526 }
1527
1528 =head2 related_resultset
1529
1530 =over 4
1531
1532 =item Arguments: $relationship_name
1533
1534 =item Return Value: $resultset
1535
1536 =back
1537
1538 Returns a related resultset for the supplied relationship name.
1539
1540   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
1541
1542 =cut
1543
1544 sub related_resultset {
1545   my ( $self, $rel ) = @_;
1546
1547   $self->{related_resultsets} ||= {};
1548   return $self->{related_resultsets}{$rel} ||= do {
1549       #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
1550       my $rel_obj = $self->result_source->relationship_info($rel);
1551       $self->throw_exception(
1552         "search_related: result source '" . $self->result_source->name .
1553         "' has no such relationship ${rel}")
1554         unless $rel_obj; #die Dumper $self->{attrs};
1555
1556       my $rs = $self->result_source->schema->resultset($rel_obj->{class}
1557            )->search( undef,
1558              { %{$self->{attrs}},
1559                select => undef,
1560                as => undef,
1561                                                          join => $rel,
1562                                                          _live_join => $rel }
1563            );
1564
1565       # keep reference of the original resultset
1566       $rs->{_parent_rs} = $self->result_source;
1567       return $rs;
1568   };
1569 }
1570
1571 =head2 throw_exception
1572
1573 See L<DBIx::Class::Schema/throw_exception> for details.
1574
1575 =cut
1576
1577 sub throw_exception {
1578   my $self=shift;
1579   $self->result_source->schema->throw_exception(@_);
1580 }
1581
1582 # XXX: FIXME: Attributes docs need clearing up
1583
1584 =head1 ATTRIBUTES
1585
1586 The resultset takes various attributes that modify its behavior. Here's an
1587 overview of them:
1588
1589 =head2 order_by
1590
1591 =over 4
1592
1593 =item Value: ($order_by | \@order_by)
1594
1595 =back
1596
1597 Which column(s) to order the results by. This is currently passed
1598 through directly to SQL, so you can give e.g. C<year DESC> for a
1599 descending order on the column `year'.
1600
1601 Please note that if you have quoting enabled (see 
1602 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
1603 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
1604 so you will need to manually quote things as appropriate.)
1605
1606 =head2 columns
1607
1608 =over 4
1609
1610 =item Value: \@columns
1611
1612 =back
1613
1614 Shortcut to request a particular set of columns to be retrieved.  Adds
1615 C<me.> onto the start of any column without a C<.> in it and sets C<select>
1616 from that, then auto-populates C<as> from C<select> as normal. (You may also
1617 use the C<cols> attribute, as in earlier versions of DBIC.)
1618
1619 =head2 include_columns
1620
1621 =over 4
1622
1623 =item Value: \@columns
1624
1625 =back
1626
1627 Shortcut to include additional columns in the returned results - for example
1628
1629   $schema->resultset('CD')->search(undef, {
1630     include_columns => ['artist.name'],
1631     join => ['artist']
1632   });
1633
1634 would return all CDs and include a 'name' column to the information
1635 passed to object inflation
1636
1637 =head2 select
1638
1639 =over 4
1640
1641 =item Value: \@select_columns
1642
1643 =back
1644
1645 Indicates which columns should be selected from the storage. You can use
1646 column names, or in the case of RDBMS back ends, function or stored procedure
1647 names:
1648
1649   $rs = $schema->resultset('Employee')->search(undef, {
1650     select => [
1651       'name',
1652       { count => 'employeeid' },
1653       { sum => 'salary' }
1654     ]
1655   });
1656
1657 When you use function/stored procedure names and do not supply an C<as>
1658 attribute, the column names returned are storage-dependent. E.g. MySQL would
1659 return a column named C<count(employeeid)> in the above example.
1660
1661 =head2 +select
1662
1663 =over 4
1664
1665 Indicates additional columns to be selected from storage.  Works the same as
1666 L<select> but adds columns to the selection.
1667
1668 =back
1669
1670 =head2 +as
1671
1672 =over 4
1673
1674 Indicates additional column names for those added via L<+select>.
1675
1676 =back
1677
1678 =head2 as
1679
1680 =over 4
1681
1682 =item Value: \@inflation_names
1683
1684 =back
1685
1686 Indicates column names for object inflation. This is used in conjunction with
1687 C<select>, usually when C<select> contains one or more function or stored
1688 procedure names:
1689
1690   $rs = $schema->resultset('Employee')->search(undef, {
1691     select => [
1692       'name',
1693       { count => 'employeeid' }
1694     ],
1695     as => ['name', 'employee_count'],
1696   });
1697
1698   my $employee = $rs->first(); # get the first Employee
1699
1700 If the object against which the search is performed already has an accessor
1701 matching a column name specified in C<as>, the value can be retrieved using
1702 the accessor as normal:
1703
1704   my $name = $employee->name();
1705
1706 If on the other hand an accessor does not exist in the object, you need to
1707 use C<get_column> instead:
1708
1709   my $employee_count = $employee->get_column('employee_count');
1710
1711 You can create your own accessors if required - see
1712 L<DBIx::Class::Manual::Cookbook> for details.
1713
1714 Please note: This will NOT insert an C<AS employee_count> into the SQL statement
1715 produced, it is used for internal access only. Thus attempting to use the accessor
1716 in an C<order_by> clause or similar will fail misrably.
1717
1718 =head2 join
1719
1720 =over 4
1721
1722 =item Value: ($rel_name | \@rel_names | \%rel_names)
1723
1724 =back
1725
1726 Contains a list of relationships that should be joined for this query.  For
1727 example:
1728
1729   # Get CDs by Nine Inch Nails
1730   my $rs = $schema->resultset('CD')->search(
1731     { 'artist.name' => 'Nine Inch Nails' },
1732     { join => 'artist' }
1733   );
1734
1735 Can also contain a hash reference to refer to the other relation's relations.
1736 For example:
1737
1738   package MyApp::Schema::Track;
1739   use base qw/DBIx::Class/;
1740   __PACKAGE__->table('track');
1741   __PACKAGE__->add_columns(qw/trackid cd position title/);
1742   __PACKAGE__->set_primary_key('trackid');
1743   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
1744   1;
1745
1746   # In your application
1747   my $rs = $schema->resultset('Artist')->search(
1748     { 'track.title' => 'Teardrop' },
1749     {
1750       join     => { cd => 'track' },
1751       order_by => 'artist.name',
1752     }
1753   );
1754
1755 If the same join is supplied twice, it will be aliased to <rel>_2 (and
1756 similarly for a third time). For e.g.
1757
1758   my $rs = $schema->resultset('Artist')->search({
1759     'cds.title'   => 'Down to Earth',
1760     'cds_2.title' => 'Popular',
1761   }, {
1762     join => [ qw/cds cds/ ],
1763   });
1764
1765 will return a set of all artists that have both a cd with title 'Down
1766 to Earth' and a cd with title 'Popular'.
1767
1768 If you want to fetch related objects from other tables as well, see C<prefetch>
1769 below.
1770
1771 =head2 prefetch
1772
1773 =over 4
1774
1775 =item Value: ($rel_name | \@rel_names | \%rel_names)
1776
1777 =back
1778
1779 Contains one or more relationships that should be fetched along with the main
1780 query (when they are accessed afterwards they will have already been
1781 "prefetched").  This is useful for when you know you will need the related
1782 objects, because it saves at least one query:
1783
1784   my $rs = $schema->resultset('Tag')->search(
1785     undef,
1786     {
1787       prefetch => {
1788         cd => 'artist'
1789       }
1790     }
1791   );
1792
1793 The initial search results in SQL like the following:
1794
1795   SELECT tag.*, cd.*, artist.* FROM tag
1796   JOIN cd ON tag.cd = cd.cdid
1797   JOIN artist ON cd.artist = artist.artistid
1798
1799 L<DBIx::Class> has no need to go back to the database when we access the
1800 C<cd> or C<artist> relationships, which saves us two SQL statements in this
1801 case.
1802
1803 Simple prefetches will be joined automatically, so there is no need
1804 for a C<join> attribute in the above search. If you're prefetching to
1805 depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
1806 specify the join as well.
1807
1808 C<prefetch> can be used with the following relationship types: C<belongs_to>,
1809 C<has_one> (or if you're using C<add_relationship>, any relationship declared
1810 with an accessor type of 'single' or 'filter').
1811
1812 =head2 page
1813
1814 =over 4
1815
1816 =item Value: $page
1817
1818 =back
1819
1820 Makes the resultset paged and specifies the page to retrieve. Effectively
1821 identical to creating a non-pages resultset and then calling ->page($page)
1822 on it. 
1823
1824 If L<rows> attribute is not specified it defualts to 10 rows per page.
1825
1826 =head2 rows
1827
1828 =over 4
1829
1830 =item Value: $rows
1831
1832 =back
1833
1834 Specifes the maximum number of rows for direct retrieval or the number of
1835 rows per page if the page attribute or method is used.
1836
1837 =head2 offset
1838
1839 =over 4
1840
1841 =item Value: $offset
1842
1843 =back
1844
1845 Specifies the (zero-based) row number for the  first row to be returned, or the
1846 of the first row of the first page if paging is used.
1847
1848 =head2 group_by
1849
1850 =over 4
1851
1852 =item Value: \@columns
1853
1854 =back
1855
1856 A arrayref of columns to group by. Can include columns of joined tables.
1857
1858   group_by => [qw/ column1 column2 ... /]
1859
1860 =head2 having
1861
1862 =over 4
1863
1864 =item Value: $condition
1865
1866 =back
1867
1868 HAVING is a select statement attribute that is applied between GROUP BY and
1869 ORDER BY. It is applied to the after the grouping calculations have been
1870 done. 
1871
1872   having => { 'count(employee)' => { '>=', 100 } }
1873
1874 =head2 distinct
1875
1876 =over 4
1877
1878 =item Value: (0 | 1)
1879
1880 =back
1881
1882 Set to 1 to group by all columns.
1883
1884 =head2 cache
1885
1886 Set to 1 to cache search results. This prevents extra SQL queries if you
1887 revisit rows in your ResultSet:
1888
1889   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
1890   
1891   while( my $artist = $resultset->next ) {
1892     ... do stuff ...
1893   }
1894
1895   $rs->first; # without cache, this would issue a query
1896
1897 By default, searches are not cached.
1898
1899 For more examples of using these attributes, see
1900 L<DBIx::Class::Manual::Cookbook>.
1901
1902 =head2 from
1903
1904 =over 4
1905
1906 =item Value: \@from_clause
1907
1908 =back
1909
1910 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
1911 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
1912 clauses.
1913
1914 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
1915
1916 C<join> will usually do what you need and it is strongly recommended that you
1917 avoid using C<from> unless you cannot achieve the desired result using C<join>.
1918 And we really do mean "cannot", not just tried and failed. Attempting to use
1919 this because you're having problems with C<join> is like trying to use x86
1920 ASM because you've got a syntax error in your C. Trust us on this.
1921
1922 Now, if you're still really, really sure you need to use this (and if you're
1923 not 100% sure, ask the mailing list first), here's an explanation of how this
1924 works.
1925
1926 The syntax is as follows -
1927
1928   [
1929     { <alias1> => <table1> },
1930     [
1931       { <alias2> => <table2>, -join_type => 'inner|left|right' },
1932       [], # nested JOIN (optional)
1933       { <table1.column1> => <table2.column2>, ... (more conditions) },
1934     ],
1935     # More of the above [ ] may follow for additional joins
1936   ]
1937
1938   <table1> <alias1>
1939   JOIN
1940     <table2> <alias2>
1941     [JOIN ...]
1942   ON <table1.column1> = <table2.column2>
1943   <more joins may follow>
1944
1945 An easy way to follow the examples below is to remember the following:
1946
1947     Anything inside "[]" is a JOIN
1948     Anything inside "{}" is a condition for the enclosing JOIN
1949
1950 The following examples utilize a "person" table in a family tree application.
1951 In order to express parent->child relationships, this table is self-joined:
1952
1953     # Person->belongs_to('father' => 'Person');
1954     # Person->belongs_to('mother' => 'Person');
1955
1956 C<from> can be used to nest joins. Here we return all children with a father,
1957 then search against all mothers of those children:
1958
1959   $rs = $schema->resultset('Person')->search(
1960       undef,
1961       {
1962           alias => 'mother', # alias columns in accordance with "from"
1963           from => [
1964               { mother => 'person' },
1965               [
1966                   [
1967                       { child => 'person' },
1968                       [
1969                           { father => 'person' },
1970                           { 'father.person_id' => 'child.father_id' }
1971                       ]
1972                   ],
1973                   { 'mother.person_id' => 'child.mother_id' }
1974               ],
1975           ]
1976       },
1977   );
1978
1979   # Equivalent SQL:
1980   # SELECT mother.* FROM person mother
1981   # JOIN (
1982   #   person child
1983   #   JOIN person father
1984   #   ON ( father.person_id = child.father_id )
1985   # )
1986   # ON ( mother.person_id = child.mother_id )
1987
1988 The type of any join can be controlled manually. To search against only people
1989 with a father in the person table, we could explicitly use C<INNER JOIN>:
1990
1991     $rs = $schema->resultset('Person')->search(
1992         undef,
1993         {
1994             alias => 'child', # alias columns in accordance with "from"
1995             from => [
1996                 { child => 'person' },
1997                 [
1998                     { father => 'person', -join_type => 'inner' },
1999                     { 'father.id' => 'child.father_id' }
2000                 ],
2001             ]
2002         },
2003     );
2004
2005     # Equivalent SQL:
2006     # SELECT child.* FROM person child
2007     # INNER JOIN person father ON child.father_id = father.id
2008
2009 =cut
2010
2011 1;