Merge 'subquery' into 'trunk'
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet is also an iterator. L</next> is used to return all the
50 L<DBIx::Class::Row>s the ResultSet represents.
51
52 The query that the ResultSet represents is B<only> executed against
53 the database when these methods are called:
54
55 =over
56
57 =item L</find>
58
59 =item L</next>
60
61 =item L</all>
62
63 =item L</count>
64
65 =item L</single>
66
67 =item L</first>
68
69 =back
70
71 =head1 EXAMPLES 
72
73 =head2 Chaining resultsets
74
75 Let's say you've got a query that needs to be run to return some data
76 to the user. But, you have an authorization system in place that
77 prevents certain users from seeing certain information. So, you want
78 to construct the basic query in one method, but add constraints to it in
79 another.
80
81   sub get_data {
82     my $self = shift;
83     my $request = $self->get_request; # Get a request object somehow.
84     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
85
86     my $cd_rs = $schema->resultset('CD')->search({
87       title => $request->param('title'),
88       year => $request->param('year'),
89     });
90
91     $self->apply_security_policy( $cd_rs );
92
93     return $cd_rs->all();
94   }
95
96   sub apply_security_policy {
97     my $self = shift;
98     my ($rs) = @_;
99
100     return $rs->search({
101       subversive => 0,
102     });
103   }
104
105 =head2 Multiple queries
106
107 Since a resultset just defines a query, you can do all sorts of
108 things with it with the same object.
109
110   # Don't hit the DB yet.
111   my $cd_rs = $schema->resultset('CD')->search({
112     title => 'something',
113     year => 2009,
114   });
115
116   # Each of these hits the DB individually.
117   my $count = $cd_rs->count;
118   my $most_recent = $cd_rs->get_column('date_released')->max();
119   my @records = $cd_rs->all;
120
121 And it's not just limited to SELECT statements.
122
123   $cd_rs->delete();
124
125 This is even cooler:
126
127   $cd_rs->create({ artist => 'Fred' });
128
129 Which is the same as:
130
131   $schema->resultset('CD')->create({
132     title => 'something',
133     year => 2009,
134     artist => 'Fred'
135   });
136
137 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
138
139 =head1 OVERLOADING
140
141 If a resultset is used in a numeric context it returns the L</count>.
142 However, if it is used in a booleand context it is always true.  So if
143 you want to check if a resultset has any results use C<if $rs != 0>.
144 C<if $rs> will always be true.
145
146 =head1 METHODS
147
148 =head2 new
149
150 =over 4
151
152 =item Arguments: $source, \%$attrs
153
154 =item Return Value: $rs
155
156 =back
157
158 The resultset constructor. Takes a source object (usually a
159 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
160 L</ATTRIBUTES> below).  Does not perform any queries -- these are
161 executed as needed by the other methods.
162
163 Generally you won't need to construct a resultset manually.  You'll
164 automatically get one from e.g. a L</search> called in scalar context:
165
166   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
167
168 IMPORTANT: If called on an object, proxies to new_result instead so
169
170   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
171
172 will return a CD object, not a ResultSet.
173
174 =cut
175
176 sub new {
177   my $class = shift;
178   return $class->new_result(@_) if ref $class;
179
180   my ($source, $attrs) = @_;
181   $source = $source->handle 
182     unless $source->isa('DBIx::Class::ResultSourceHandle');
183   $attrs = { %{$attrs||{}} };
184
185   if ($attrs->{page}) {
186     $attrs->{rows} ||= 10;
187   }
188
189   $attrs->{alias} ||= 'me';
190
191   # Creation of {} and bless separated to mitigate RH perl bug
192   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
193   my $self = {
194     _source_handle => $source,
195     cond => $attrs->{where},
196     count => undef,
197     pager => undef,
198     attrs => $attrs
199   };
200
201   bless $self, $class;
202
203   $self->result_class(
204     $attrs->{result_class} || $source->resolve->result_class
205   );
206
207   return $self;
208 }
209
210 =head2 search
211
212 =over 4
213
214 =item Arguments: $cond, \%attrs?
215
216 =item Return Value: $resultset (scalar context), @row_objs (list context)
217
218 =back
219
220   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
221   my $new_rs = $cd_rs->search({ year => 2005 });
222
223   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
224                  # year = 2005 OR year = 2004
225
226 If you need to pass in additional attributes but no additional condition,
227 call it as C<search(undef, \%attrs)>.
228
229   # "SELECT name, artistid FROM $artist_table"
230   my @all_artists = $schema->resultset('Artist')->search(undef, {
231     columns => [qw/name artistid/],
232   });
233
234 For a list of attributes that can be passed to C<search>, see
235 L</ATTRIBUTES>. For more examples of using this function, see
236 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
237 documentation for the first argument, see L<SQL::Abstract>.
238
239 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
240
241 =cut
242
243 sub search {
244   my $self = shift;
245   my $rs = $self->search_rs( @_ );
246   return (wantarray ? $rs->all : $rs);
247 }
248
249 =head2 search_rs
250
251 =over 4
252
253 =item Arguments: $cond, \%attrs?
254
255 =item Return Value: $resultset
256
257 =back
258
259 This method does the same exact thing as search() except it will
260 always return a resultset, even in list context.
261
262 =cut
263
264 sub search_rs {
265   my $self = shift;
266
267   my $attrs = {};
268   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
269   my $our_attrs = { %{$self->{attrs}} };
270   my $having = delete $our_attrs->{having};
271   my $where = delete $our_attrs->{where};
272
273   my $rows;
274
275   my %safe = (alias => 1, cache => 1);
276
277   unless (
278     (@_ && defined($_[0])) # @_ == () or (undef)
279     || 
280     (keys %$attrs # empty attrs or only 'safe' attrs
281     && List::Util::first { !$safe{$_} } keys %$attrs)
282   ) {
283     # no search, effectively just a clone
284     $rows = $self->get_cache;
285   }
286
287   my $new_attrs = { %{$our_attrs}, %{$attrs} };
288
289   # merge new attrs into inherited
290   foreach my $key (qw/join prefetch +select +as/) {
291     next unless exists $attrs->{$key};
292     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
293   }
294
295   my $cond = (@_
296     ? (
297         (@_ == 1 || ref $_[0] eq "HASH")
298           ? (
299               (ref $_[0] eq 'HASH')
300                 ? (
301                     (keys %{ $_[0] }  > 0)
302                       ? shift
303                       : undef
304                    )
305                 :  shift
306              )
307           : (
308               (@_ % 2)
309                 ? $self->throw_exception("Odd number of arguments to search")
310                 : {@_}
311              )
312       )
313     : undef
314   );
315
316   if (defined $where) {
317     $new_attrs->{where} = (
318       defined $new_attrs->{where}
319         ? { '-and' => [
320               map {
321                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
322               } $where, $new_attrs->{where}
323             ]
324           }
325         : $where);
326   }
327
328   if (defined $cond) {
329     $new_attrs->{where} = (
330       defined $new_attrs->{where}
331         ? { '-and' => [
332               map {
333                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
334               } $cond, $new_attrs->{where}
335             ]
336           }
337         : $cond);
338   }
339
340   if (defined $having) {
341     $new_attrs->{having} = (
342       defined $new_attrs->{having}
343         ? { '-and' => [
344               map {
345                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
346               } $having, $new_attrs->{having}
347             ]
348           }
349         : $having);
350   }
351
352   my $rs = (ref $self)->new($self->result_source, $new_attrs);
353   if ($rows) {
354     $rs->set_cache($rows);
355   }
356   return $rs;
357 }
358
359 =head2 search_literal
360
361 =over 4
362
363 =item Arguments: $sql_fragment, @bind_values
364
365 =item Return Value: $resultset (scalar context), @row_objs (list context)
366
367 =back
368
369   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
370   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
371
372 Pass a literal chunk of SQL to be added to the conditional part of the
373 resultset query.
374
375 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
376 only be used in that context. There are known problems using C<search_literal>
377 in chained queries; it can result in bind values in the wrong order.  See
378 L<DBIx::Class::Manual::Cookbook/Searching> and
379 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
380 require C<search_literal>.
381
382 =cut
383
384 sub search_literal {
385   my ($self, $cond, @vals) = @_;
386   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
387   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
388   return $self->search(\$cond, $attrs);
389 }
390
391 =head2 find
392
393 =over 4
394
395 =item Arguments: @values | \%cols, \%attrs?
396
397 =item Return Value: $row_object | undef
398
399 =back
400
401 Finds a row based on its primary key or unique constraint. For example, to find
402 a row by its primary key:
403
404   my $cd = $schema->resultset('CD')->find(5);
405
406 You can also find a row by a specific unique constraint using the C<key>
407 attribute. For example:
408
409   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
410     key => 'cd_artist_title'
411   });
412
413 Additionally, you can specify the columns explicitly by name:
414
415   my $cd = $schema->resultset('CD')->find(
416     {
417       artist => 'Massive Attack',
418       title  => 'Mezzanine',
419     },
420     { key => 'cd_artist_title' }
421   );
422
423 If the C<key> is specified as C<primary>, it searches only on the primary key.
424
425 If no C<key> is specified, it searches on all unique constraints defined on the
426 source for which column data is provided, including the primary key.
427
428 If your table does not have a primary key, you B<must> provide a value for the
429 C<key> attribute matching one of the unique constraints on the source.
430
431 In addition to C<key>, L</find> recognizes and applies standard
432 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
433
434 Note: If your query does not return only one row, a warning is generated:
435
436   Query returned more than one row
437
438 See also L</find_or_create> and L</update_or_create>. For information on how to
439 declare unique constraints, see
440 L<DBIx::Class::ResultSource/add_unique_constraint>.
441
442 =cut
443
444 sub find {
445   my $self = shift;
446   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
447
448   # Default to the primary key, but allow a specific key
449   my @cols = exists $attrs->{key}
450     ? $self->result_source->unique_constraint_columns($attrs->{key})
451     : $self->result_source->primary_columns;
452   $self->throw_exception(
453     "Can't find unless a primary key is defined or unique constraint is specified"
454   ) unless @cols;
455
456   # Parse out a hashref from input
457   my $input_query;
458   if (ref $_[0] eq 'HASH') {
459     $input_query = { %{$_[0]} };
460   }
461   elsif (@_ == @cols) {
462     $input_query = {};
463     @{$input_query}{@cols} = @_;
464   }
465   else {
466     # Compatibility: Allow e.g. find(id => $value)
467     carp "Find by key => value deprecated; please use a hashref instead";
468     $input_query = {@_};
469   }
470
471   my (%related, $info);
472
473   KEY: foreach my $key (keys %$input_query) {
474     if (ref($input_query->{$key})
475         && ($info = $self->result_source->relationship_info($key))) {
476       my $val = delete $input_query->{$key};
477       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
478       my $rel_q = $self->result_source->resolve_condition(
479                     $info->{cond}, $val, $key
480                   );
481       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
482       @related{keys %$rel_q} = values %$rel_q;
483     }
484   }
485   if (my @keys = keys %related) {
486     @{$input_query}{@keys} = values %related;
487   }
488
489
490   # Build the final query: Default to the disjunction of the unique queries,
491   # but allow the input query in case the ResultSet defines the query or the
492   # user is abusing find
493   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
494   my $query;
495   if (exists $attrs->{key}) {
496     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
497     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
498     $query = $self->_add_alias($unique_query, $alias);
499   }
500   else {
501     my @unique_queries = $self->_unique_queries($input_query, $attrs);
502     $query = @unique_queries
503       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
504       : $self->_add_alias($input_query, $alias);
505   }
506
507   # Run the query
508   if (keys %$attrs) {
509     my $rs = $self->search($query, $attrs);
510     if (keys %{$rs->_resolved_attrs->{collapse}}) {
511       my $row = $rs->next;
512       carp "Query returned more than one row" if $rs->next;
513       return $row;
514     }
515     else {
516       return $rs->single;
517     }
518   }
519   else {
520     if (keys %{$self->_resolved_attrs->{collapse}}) {
521       my $rs = $self->search($query);
522       my $row = $rs->next;
523       carp "Query returned more than one row" if $rs->next;
524       return $row;
525     }
526     else {
527       return $self->single($query);
528     }
529   }
530 }
531
532 # _add_alias
533 #
534 # Add the specified alias to the specified query hash. A copy is made so the
535 # original query is not modified.
536
537 sub _add_alias {
538   my ($self, $query, $alias) = @_;
539
540   my %aliased = %$query;
541   foreach my $col (grep { ! m/\./ } keys %aliased) {
542     $aliased{"$alias.$col"} = delete $aliased{$col};
543   }
544
545   return \%aliased;
546 }
547
548 # _unique_queries
549 #
550 # Build a list of queries which satisfy unique constraints.
551
552 sub _unique_queries {
553   my ($self, $query, $attrs) = @_;
554
555   my @constraint_names = exists $attrs->{key}
556     ? ($attrs->{key})
557     : $self->result_source->unique_constraint_names;
558
559   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
560   my $num_where = scalar keys %$where;
561
562   my @unique_queries;
563   foreach my $name (@constraint_names) {
564     my @unique_cols = $self->result_source->unique_constraint_columns($name);
565     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
566
567     my $num_cols = scalar @unique_cols;
568     my $num_query = scalar keys %$unique_query;
569
570     my $total = $num_query + $num_where;
571     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
572       # The query is either unique on its own or is unique in combination with
573       # the existing where clause
574       push @unique_queries, $unique_query;
575     }
576   }
577
578   return @unique_queries;
579 }
580
581 # _build_unique_query
582 #
583 # Constrain the specified query hash based on the specified column names.
584
585 sub _build_unique_query {
586   my ($self, $query, $unique_cols) = @_;
587
588   return {
589     map  { $_ => $query->{$_} }
590     grep { exists $query->{$_} }
591       @$unique_cols
592   };
593 }
594
595 =head2 search_related
596
597 =over 4
598
599 =item Arguments: $rel, $cond, \%attrs?
600
601 =item Return Value: $new_resultset
602
603 =back
604
605   $new_rs = $cd_rs->search_related('artist', {
606     name => 'Emo-R-Us',
607   });
608
609 Searches the specified relationship, optionally specifying a condition and
610 attributes for matching records. See L</ATTRIBUTES> for more information.
611
612 =cut
613
614 sub search_related {
615   return shift->related_resultset(shift)->search(@_);
616 }
617
618 =head2 search_related_rs
619
620 This method works exactly the same as search_related, except that
621 it guarantees a restultset, even in list context.
622
623 =cut
624
625 sub search_related_rs {
626   return shift->related_resultset(shift)->search_rs(@_);
627 }
628
629 =head2 cursor
630
631 =over 4
632
633 =item Arguments: none
634
635 =item Return Value: $cursor
636
637 =back
638
639 Returns a storage-driven cursor to the given resultset. See
640 L<DBIx::Class::Cursor> for more information.
641
642 =cut
643
644 sub cursor {
645   my ($self) = @_;
646
647   my $attrs = { %{$self->_resolved_attrs} };
648   return $self->{cursor}
649     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
650           $attrs->{where},$attrs);
651 }
652
653 =head2 single
654
655 =over 4
656
657 =item Arguments: $cond?
658
659 =item Return Value: $row_object?
660
661 =back
662
663   my $cd = $schema->resultset('CD')->single({ year => 2001 });
664
665 Inflates the first result without creating a cursor if the resultset has
666 any records in it; if not returns nothing. Used by L</find> as a lean version of
667 L</search>.
668
669 While this method can take an optional search condition (just like L</search>)
670 being a fast-code-path it does not recognize search attributes. If you need to
671 add extra joins or similar, call L</search> and then chain-call L</single> on the
672 L<DBIx::Class::ResultSet> returned.
673
674 =over
675
676 =item B<Note>
677
678 As of 0.08100, this method enforces the assumption that the preceeding
679 query returns only one row. If more than one row is returned, you will receive
680 a warning:
681
682   Query returned more than one row
683
684 In this case, you should be using L</first> or L</find> instead, or if you really
685 know what you are doing, use the L</rows> attribute to explicitly limit the size 
686 of the resultset.
687
688 =back
689
690 =cut
691
692 sub single {
693   my ($self, $where) = @_;
694   if(@_ > 2) {
695       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
696   }
697
698   my $attrs = { %{$self->_resolved_attrs} };
699   if ($where) {
700     if (defined $attrs->{where}) {
701       $attrs->{where} = {
702         '-and' =>
703             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
704                $where, delete $attrs->{where} ]
705       };
706     } else {
707       $attrs->{where} = $where;
708     }
709   }
710
711 #  XXX: Disabled since it doesn't infer uniqueness in all cases
712 #  unless ($self->_is_unique_query($attrs->{where})) {
713 #    carp "Query not guaranteed to return a single row"
714 #      . "; please declare your unique constraints or use search instead";
715 #  }
716
717   my @data = $self->result_source->storage->select_single(
718     $attrs->{from}, $attrs->{select},
719     $attrs->{where}, $attrs
720   );
721
722   return (@data ? ($self->_construct_object(@data))[0] : undef);
723 }
724
725 # _is_unique_query
726 #
727 # Try to determine if the specified query is guaranteed to be unique, based on
728 # the declared unique constraints.
729
730 sub _is_unique_query {
731   my ($self, $query) = @_;
732
733   my $collapsed = $self->_collapse_query($query);
734   my $alias = $self->{attrs}{alias};
735
736   foreach my $name ($self->result_source->unique_constraint_names) {
737     my @unique_cols = map {
738       "$alias.$_"
739     } $self->result_source->unique_constraint_columns($name);
740
741     # Count the values for each unique column
742     my %seen = map { $_ => 0 } @unique_cols;
743
744     foreach my $key (keys %$collapsed) {
745       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
746       next unless exists $seen{$aliased};  # Additional constraints are okay
747       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
748     }
749
750     # If we get 0 or more than 1 value for a column, it's not necessarily unique
751     return 1 unless grep { $_ != 1 } values %seen;
752   }
753
754   return 0;
755 }
756
757 # _collapse_query
758 #
759 # Recursively collapse the query, accumulating values for each column.
760
761 sub _collapse_query {
762   my ($self, $query, $collapsed) = @_;
763
764   $collapsed ||= {};
765
766   if (ref $query eq 'ARRAY') {
767     foreach my $subquery (@$query) {
768       next unless ref $subquery;  # -or
769 #      warn "ARRAY: " . Dumper $subquery;
770       $collapsed = $self->_collapse_query($subquery, $collapsed);
771     }
772   }
773   elsif (ref $query eq 'HASH') {
774     if (keys %$query and (keys %$query)[0] eq '-and') {
775       foreach my $subquery (@{$query->{-and}}) {
776 #        warn "HASH: " . Dumper $subquery;
777         $collapsed = $self->_collapse_query($subquery, $collapsed);
778       }
779     }
780     else {
781 #      warn "LEAF: " . Dumper $query;
782       foreach my $col (keys %$query) {
783         my $value = $query->{$col};
784         $collapsed->{$col}{$value}++;
785       }
786     }
787   }
788
789   return $collapsed;
790 }
791
792 =head2 get_column
793
794 =over 4
795
796 =item Arguments: $cond?
797
798 =item Return Value: $resultsetcolumn
799
800 =back
801
802   my $max_length = $rs->get_column('length')->max;
803
804 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
805
806 =cut
807
808 sub get_column {
809   my ($self, $column) = @_;
810   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
811   return $new;
812 }
813
814 =head2 search_like
815
816 =over 4
817
818 =item Arguments: $cond, \%attrs?
819
820 =item Return Value: $resultset (scalar context), @row_objs (list context)
821
822 =back
823
824   # WHERE title LIKE '%blue%'
825   $cd_rs = $rs->search_like({ title => '%blue%'});
826
827 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
828 that this is simply a convenience method retained for ex Class::DBI users.
829 You most likely want to use L</search> with specific operators.
830
831 For more information, see L<DBIx::Class::Manual::Cookbook>.
832
833 =cut
834
835 sub search_like {
836   my $class = shift;
837   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
839   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
840   return $class->search($query, { %$attrs });
841 }
842
843 =head2 slice
844
845 =over 4
846
847 =item Arguments: $first, $last
848
849 =item Return Value: $resultset (scalar context), @row_objs (list context)
850
851 =back
852
853 Returns a resultset or object list representing a subset of elements from the
854 resultset slice is called on. Indexes are from 0, i.e., to get the first
855 three records, call:
856
857   my ($one, $two, $three) = $rs->slice(0, 2);
858
859 =cut
860
861 sub slice {
862   my ($self, $min, $max) = @_;
863   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
864   $attrs->{offset} = $self->{attrs}{offset} || 0;
865   $attrs->{offset} += $min;
866   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
867   return $self->search(undef(), $attrs);
868   #my $slice = (ref $self)->new($self->result_source, $attrs);
869   #return (wantarray ? $slice->all : $slice);
870 }
871
872 =head2 next
873
874 =over 4
875
876 =item Arguments: none
877
878 =item Return Value: $result?
879
880 =back
881
882 Returns the next element in the resultset (C<undef> is there is none).
883
884 Can be used to efficiently iterate over records in the resultset:
885
886   my $rs = $schema->resultset('CD')->search;
887   while (my $cd = $rs->next) {
888     print $cd->title;
889   }
890
891 Note that you need to store the resultset object, and call C<next> on it.
892 Calling C<< resultset('Table')->next >> repeatedly will always return the
893 first record from the resultset.
894
895 =cut
896
897 sub next {
898   my ($self) = @_;
899   if (my $cache = $self->get_cache) {
900     $self->{all_cache_position} ||= 0;
901     return $cache->[$self->{all_cache_position}++];
902   }
903   if ($self->{attrs}{cache}) {
904     $self->{all_cache_position} = 1;
905     return ($self->all)[0];
906   }
907   if ($self->{stashed_objects}) {
908     my $obj = shift(@{$self->{stashed_objects}});
909     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
910     return $obj;
911   }
912   my @row = (
913     exists $self->{stashed_row}
914       ? @{delete $self->{stashed_row}}
915       : $self->cursor->next
916   );
917   return undef unless (@row);
918   my ($row, @more) = $self->_construct_object(@row);
919   $self->{stashed_objects} = \@more if @more;
920   return $row;
921 }
922
923 sub _construct_object {
924   my ($self, @row) = @_;
925   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
926   my @new = $self->result_class->inflate_result($self->result_source, @$info);
927   @new = $self->{_attrs}{record_filter}->(@new)
928     if exists $self->{_attrs}{record_filter};
929   return @new;
930 }
931
932 sub _collapse_result {
933   my ($self, $as_proto, $row) = @_;
934
935   my @copy = @$row;
936
937   # 'foo'         => [ undef, 'foo' ]
938   # 'foo.bar'     => [ 'foo', 'bar' ]
939   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
940
941   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
942
943   my %collapse = %{$self->{_attrs}{collapse}||{}};
944
945   my @pri_index;
946
947   # if we're doing collapsing (has_many prefetch) we need to grab records
948   # until the PK changes, so fill @pri_index. if not, we leave it empty so
949   # we know we don't have to bother.
950
951   # the reason for not using the collapse stuff directly is because if you
952   # had for e.g. two artists in a row with no cds, the collapse info for
953   # both would be NULL (undef) so you'd lose the second artist
954
955   # store just the index so we can check the array positions from the row
956   # without having to contruct the full hash
957
958   if (keys %collapse) {
959     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
960     foreach my $i (0 .. $#construct_as) {
961       next if defined($construct_as[$i][0]); # only self table
962       if (delete $pri{$construct_as[$i][1]}) {
963         push(@pri_index, $i);
964       }
965       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
966     }
967   }
968
969   # no need to do an if, it'll be empty if @pri_index is empty anyway
970
971   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
972
973   my @const_rows;
974
975   do { # no need to check anything at the front, we always want the first row
976
977     my %const;
978   
979     foreach my $this_as (@construct_as) {
980       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
981     }
982
983     push(@const_rows, \%const);
984
985   } until ( # no pri_index => no collapse => drop straight out
986       !@pri_index
987     or
988       do { # get another row, stash it, drop out if different PK
989
990         @copy = $self->cursor->next;
991         $self->{stashed_row} = \@copy;
992
993         # last thing in do block, counts as true if anything doesn't match
994
995         # check xor defined first for NULL vs. NOT NULL then if one is
996         # defined the other must be so check string equality
997
998         grep {
999           (defined $pri_vals{$_} ^ defined $copy[$_])
1000           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1001         } @pri_index;
1002       }
1003   );
1004
1005   my $alias = $self->{attrs}{alias};
1006   my $info = [];
1007
1008   my %collapse_pos;
1009
1010   my @const_keys;
1011
1012   foreach my $const (@const_rows) {
1013     scalar @const_keys or do {
1014       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1015     };
1016     foreach my $key (@const_keys) {
1017       if (length $key) {
1018         my $target = $info;
1019         my @parts = split(/\./, $key);
1020         my $cur = '';
1021         my $data = $const->{$key};
1022         foreach my $p (@parts) {
1023           $target = $target->[1]->{$p} ||= [];
1024           $cur .= ".${p}";
1025           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
1026             # collapsing at this point and on final part
1027             my $pos = $collapse_pos{$cur};
1028             CK: foreach my $ck (@ckey) {
1029               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1030                 $collapse_pos{$cur} = $data;
1031                 delete @collapse_pos{ # clear all positioning for sub-entries
1032                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1033                 };
1034                 push(@$target, []);
1035                 last CK;
1036               }
1037             }
1038           }
1039           if (exists $collapse{$cur}) {
1040             $target = $target->[-1];
1041           }
1042         }
1043         $target->[0] = $data;
1044       } else {
1045         $info->[0] = $const->{$key};
1046       }
1047     }
1048   }
1049
1050   return $info;
1051 }
1052
1053 =head2 result_source
1054
1055 =over 4
1056
1057 =item Arguments: $result_source?
1058
1059 =item Return Value: $result_source
1060
1061 =back
1062
1063 An accessor for the primary ResultSource object from which this ResultSet
1064 is derived.
1065
1066 =head2 result_class
1067
1068 =over 4
1069
1070 =item Arguments: $result_class?
1071
1072 =item Return Value: $result_class
1073
1074 =back
1075
1076 An accessor for the class to use when creating row objects. Defaults to 
1077 C<< result_source->result_class >> - which in most cases is the name of the 
1078 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1079
1080 =cut
1081
1082 sub result_class {
1083   my ($self, $result_class) = @_;
1084   if ($result_class) {
1085     $self->ensure_class_loaded($result_class);
1086     $self->_result_class($result_class);
1087   }
1088   $self->_result_class;
1089 }
1090
1091 =head2 count
1092
1093 =over 4
1094
1095 =item Arguments: $cond, \%attrs??
1096
1097 =item Return Value: $count
1098
1099 =back
1100
1101 Performs an SQL C<COUNT> with the same query as the resultset was built
1102 with to find the number of elements. If passed arguments, does a search
1103 on the resultset and counts the results of that.
1104
1105 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1106 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1107 not support C<DISTINCT> with multiple columns. If you are using such a
1108 database, you should only use columns from the main table in your C<group_by>
1109 clause.
1110
1111 =cut
1112
1113 sub count {
1114   my $self = shift;
1115   return $self->search(@_)->count if @_ and defined $_[0];
1116   return scalar @{ $self->get_cache } if $self->get_cache;
1117   my $count = $self->_count;
1118   return 0 unless $count;
1119
1120   # need to take offset from resolved attrs
1121
1122   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1123   $count = $self->{attrs}{rows} if
1124     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1125   $count = 0 if ($count < 0);
1126   return $count;
1127 }
1128
1129 sub _count { # Separated out so pager can get the full count
1130   my $self = shift;
1131   my $select = { count => '*' };
1132
1133   my $attrs = { %{$self->_resolved_attrs} };
1134   if (my $group_by = delete $attrs->{group_by}) {
1135     delete $attrs->{having};
1136     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1137     # todo: try CONCAT for multi-column pk
1138     my @pk = $self->result_source->primary_columns;
1139     if (@pk == 1) {
1140       my $alias = $attrs->{alias};
1141       foreach my $column (@distinct) {
1142         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1143           @distinct = ($column);
1144           last;
1145         }
1146       }
1147     }
1148
1149     $select = { count => { distinct => \@distinct } };
1150   }
1151
1152   $attrs->{select} = $select;
1153   $attrs->{as} = [qw/count/];
1154
1155   # offset, order by and page are not needed to count. record_filter is cdbi
1156   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1157
1158   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1159   my ($count) = $tmp_rs->cursor->next;
1160   return $count;
1161 }
1162
1163 sub _bool {
1164   return 1;
1165 }
1166
1167 =head2 count_literal
1168
1169 =over 4
1170
1171 =item Arguments: $sql_fragment, @bind_values
1172
1173 =item Return Value: $count
1174
1175 =back
1176
1177 Counts the results in a literal query. Equivalent to calling L</search_literal>
1178 with the passed arguments, then L</count>.
1179
1180 =cut
1181
1182 sub count_literal { shift->search_literal(@_)->count; }
1183
1184 =head2 all
1185
1186 =over 4
1187
1188 =item Arguments: none
1189
1190 =item Return Value: @objects
1191
1192 =back
1193
1194 Returns all elements in the resultset. Called implicitly if the resultset
1195 is returned in list context.
1196
1197 =cut
1198
1199 sub all {
1200   my $self = shift;
1201   if(@_) {
1202       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1203   }
1204
1205   return @{ $self->get_cache } if $self->get_cache;
1206
1207   my @obj;
1208
1209   # TODO: don't call resolve here
1210   if (keys %{$self->_resolved_attrs->{collapse}}) {
1211 #  if ($self->{attrs}{prefetch}) {
1212       # Using $self->cursor->all is really just an optimisation.
1213       # If we're collapsing has_many prefetches it probably makes
1214       # very little difference, and this is cleaner than hacking
1215       # _construct_object to survive the approach
1216     my @row = $self->cursor->next;
1217     while (@row) {
1218       push(@obj, $self->_construct_object(@row));
1219       @row = (exists $self->{stashed_row}
1220                ? @{delete $self->{stashed_row}}
1221                : $self->cursor->next);
1222     }
1223   } else {
1224     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1225   }
1226
1227   $self->set_cache(\@obj) if $self->{attrs}{cache};
1228   return @obj;
1229 }
1230
1231 =head2 reset
1232
1233 =over 4
1234
1235 =item Arguments: none
1236
1237 =item Return Value: $self
1238
1239 =back
1240
1241 Resets the resultset's cursor, so you can iterate through the elements again.
1242
1243 =cut
1244
1245 sub reset {
1246   my ($self) = @_;
1247   delete $self->{_attrs} if exists $self->{_attrs};
1248   $self->{all_cache_position} = 0;
1249   $self->cursor->reset;
1250   return $self;
1251 }
1252
1253 =head2 first
1254
1255 =over 4
1256
1257 =item Arguments: none
1258
1259 =item Return Value: $object?
1260
1261 =back
1262
1263 Resets the resultset and returns an object for the first result (if the
1264 resultset returns anything).
1265
1266 =cut
1267
1268 sub first {
1269   return $_[0]->reset->next;
1270 }
1271
1272 # _cond_for_update_delete
1273 #
1274 # update/delete require the condition to be modified to handle
1275 # the differing SQL syntax available.  This transforms the $self->{cond}
1276 # appropriately, returning the new condition.
1277
1278 sub _cond_for_update_delete {
1279   my ($self, $full_cond) = @_;
1280   my $cond = {};
1281
1282   $full_cond ||= $self->{cond};
1283   # No-op. No condition, we're updating/deleting everything
1284   return $cond unless ref $full_cond;
1285
1286   if (ref $full_cond eq 'ARRAY') {
1287     $cond = [
1288       map {
1289         my %hash;
1290         foreach my $key (keys %{$_}) {
1291           $key =~ /([^.]+)$/;
1292           $hash{$1} = $_->{$key};
1293         }
1294         \%hash;
1295       } @{$full_cond}
1296     ];
1297   }
1298   elsif (ref $full_cond eq 'HASH') {
1299     if ((keys %{$full_cond})[0] eq '-and') {
1300       $cond->{-and} = [];
1301
1302       my @cond = @{$full_cond->{-and}};
1303       for (my $i = 0; $i < @cond; $i++) {
1304         my $entry = $cond[$i];
1305
1306         my $hash;
1307         if (ref $entry eq 'HASH') {
1308           $hash = $self->_cond_for_update_delete($entry);
1309         }
1310         else {
1311           $entry =~ /([^.]+)$/;
1312           $hash->{$1} = $cond[++$i];
1313         }
1314
1315         push @{$cond->{-and}}, $hash;
1316       }
1317     }
1318     else {
1319       foreach my $key (keys %{$full_cond}) {
1320         $key =~ /([^.]+)$/;
1321         $cond->{$1} = $full_cond->{$key};
1322       }
1323     }
1324   }
1325   else {
1326     $self->throw_exception(
1327       "Can't update/delete on resultset with condition unless hash or array"
1328     );
1329   }
1330
1331   return $cond;
1332 }
1333
1334
1335 =head2 update
1336
1337 =over 4
1338
1339 =item Arguments: \%values
1340
1341 =item Return Value: $storage_rv
1342
1343 =back
1344
1345 Sets the specified columns in the resultset to the supplied values in a
1346 single query. Return value will be true if the update succeeded or false
1347 if no records were updated; exact type of success value is storage-dependent.
1348
1349 =cut
1350
1351 sub update {
1352   my ($self, $values) = @_;
1353   $self->throw_exception("Values for update must be a hash")
1354     unless ref $values eq 'HASH';
1355
1356   carp(   'WARNING! Currently $rs->update() does not generate proper SQL'
1357         . ' on joined resultsets, and may affect rows well outside of the'
1358         . ' contents of $rs. Use at your own risk' )
1359     if ( $self->{attrs}{seen_join} );
1360
1361   my $cond = $self->_cond_for_update_delete;
1362    
1363   return $self->result_source->storage->update(
1364     $self->result_source, $values, $cond
1365   );
1366 }
1367
1368 =head2 update_all
1369
1370 =over 4
1371
1372 =item Arguments: \%values
1373
1374 =item Return Value: 1
1375
1376 =back
1377
1378 Fetches all objects and updates them one at a time. Note that C<update_all>
1379 will run DBIC cascade triggers, while L</update> will not.
1380
1381 =cut
1382
1383 sub update_all {
1384   my ($self, $values) = @_;
1385   $self->throw_exception("Values for update must be a hash")
1386     unless ref $values eq 'HASH';
1387   foreach my $obj ($self->all) {
1388     $obj->set_columns($values)->update;
1389   }
1390   return 1;
1391 }
1392
1393 =head2 delete
1394
1395 =over 4
1396
1397 =item Arguments: none
1398
1399 =item Return Value: 1
1400
1401 =back
1402
1403 Deletes the contents of the resultset from its result source. Note that this
1404 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1405 to run. See also L<DBIx::Class::Row/delete>.
1406
1407 delete may not generate correct SQL for a query with joins or a resultset
1408 chained from a related resultset.  In this case it will generate a warning:-
1409
1410   WARNING! Currently $rs->delete() does not generate proper SQL on
1411   joined resultsets, and may delete rows well outside of the contents
1412   of $rs. Use at your own risk
1413
1414 In these cases you may find that delete_all is more appropriate, or you
1415 need to respecify your query in a way that can be expressed without a join.
1416
1417 =cut
1418
1419 sub delete {
1420   my ($self) = @_;
1421   $self->throw_exception("Delete should not be passed any arguments")
1422     if $_[1];
1423   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1424         . ' on joined resultsets, and may delete rows well outside of the'
1425         . ' contents of $rs. Use at your own risk' )
1426     if ( $self->{attrs}{seen_join} );
1427   my $cond = $self->_cond_for_update_delete;
1428
1429   $self->result_source->storage->delete($self->result_source, $cond);
1430   return 1;
1431 }
1432
1433 =head2 delete_all
1434
1435 =over 4
1436
1437 =item Arguments: none
1438
1439 =item Return Value: 1
1440
1441 =back
1442
1443 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1444 will run DBIC cascade triggers, while L</delete> will not.
1445
1446 =cut
1447
1448 sub delete_all {
1449   my ($self) = @_;
1450   $_->delete for $self->all;
1451   return 1;
1452 }
1453
1454 =head2 populate
1455
1456 =over 4
1457
1458 =item Arguments: \@data;
1459
1460 =back
1461
1462 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1463 For the arrayref of hashrefs style each hashref should be a structure suitable
1464 forsubmitting to a $resultset->create(...) method.
1465
1466 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1467 to insert the data, as this is a faster method.  
1468
1469 Otherwise, each set of data is inserted into the database using
1470 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1471 objects is returned.
1472
1473 Example:  Assuming an Artist Class that has many CDs Classes relating:
1474
1475   my $Artist_rs = $schema->resultset("Artist");
1476   
1477   ## Void Context Example 
1478   $Artist_rs->populate([
1479      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1480         { title => 'My First CD', year => 2006 },
1481         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1482       ],
1483      },
1484      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1485         { title => 'My parents sold me to a record company' ,year => 2005 },
1486         { title => 'Why Am I So Ugly?', year => 2006 },
1487         { title => 'I Got Surgery and am now Popular', year => 2007 }
1488       ],
1489      },
1490   ]);
1491   
1492   ## Array Context Example
1493   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1494     { name => "Artist One"},
1495     { name => "Artist Two"},
1496     { name => "Artist Three", cds=> [
1497     { title => "First CD", year => 2007},
1498     { title => "Second CD", year => 2008},
1499   ]}
1500   ]);
1501   
1502   print $ArtistOne->name; ## response is 'Artist One'
1503   print $ArtistThree->cds->count ## reponse is '2'
1504
1505 For the arrayref of arrayrefs style,  the first element should be a list of the
1506 fieldsnames to which the remaining elements are rows being inserted.  For
1507 example:
1508
1509   $Arstist_rs->populate([
1510     [qw/artistid name/],
1511     [100, 'A Formally Unknown Singer'],
1512     [101, 'A singer that jumped the shark two albums ago'],
1513     [102, 'An actually cool singer.'],
1514   ]);
1515
1516 Please note an important effect on your data when choosing between void and
1517 wantarray context. Since void context goes straight to C<insert_bulk> in 
1518 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1519 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1520 create primary keys for you, you will find that your PKs are empty.  In this 
1521 case you will have to use the wantarray context in order to create those 
1522 values.
1523
1524 =cut
1525
1526 sub populate {
1527   my $self = shift @_;
1528   my $data = ref $_[0][0] eq 'HASH'
1529     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1530     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1531   
1532   if(defined wantarray) {
1533     my @created;
1534     foreach my $item (@$data) {
1535       push(@created, $self->create($item));
1536     }
1537     return @created;
1538   } else {
1539     my ($first, @rest) = @$data;
1540
1541     my @names = grep {!ref $first->{$_}} keys %$first;
1542     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1543     my @pks = $self->result_source->primary_columns;  
1544
1545     ## do the belongs_to relationships  
1546     foreach my $index (0..$#$data) {
1547       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1548         my @ret = $self->populate($data);
1549         return;
1550       }
1551     
1552       foreach my $rel (@rels) {
1553         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1554         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1555         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1556         my $related = $result->result_source->resolve_condition(
1557           $result->result_source->relationship_info($reverse)->{cond},
1558           $self,        
1559           $result,        
1560         );
1561
1562         delete $data->[$index]->{$rel};
1563         $data->[$index] = {%{$data->[$index]}, %$related};
1564       
1565         push @names, keys %$related if $index == 0;
1566       }
1567     }
1568
1569     ## do bulk insert on current row
1570     my @values = map { [ @$_{@names} ] } @$data;
1571
1572     $self->result_source->storage->insert_bulk(
1573       $self->result_source, 
1574       \@names, 
1575       \@values,
1576     );
1577
1578     ## do the has_many relationships
1579     foreach my $item (@$data) {
1580
1581       foreach my $rel (@rels) {
1582         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1583
1584         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1585      || $self->throw_exception('Cannot find the relating object.');
1586      
1587         my $child = $parent->$rel;
1588     
1589         my $related = $child->result_source->resolve_condition(
1590           $parent->result_source->relationship_info($rel)->{cond},
1591           $child,
1592           $parent,
1593         );
1594
1595         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1596         my @populate = map { {%$_, %$related} } @rows_to_add;
1597
1598         $child->populate( \@populate );
1599       }
1600     }
1601   }
1602 }
1603
1604 =head2 _normalize_populate_args ($args)
1605
1606 Private method used by L</populate> to normalize its incoming arguments.  Factored
1607 out in case you want to subclass and accept new argument structures to the
1608 L</populate> method.
1609
1610 =cut
1611
1612 sub _normalize_populate_args {
1613   my ($self, $data) = @_;
1614   my @names = @{shift(@$data)};
1615   my @results_to_create;
1616   foreach my $datum (@$data) {
1617     my %result_to_create;
1618     foreach my $index (0..$#names) {
1619       $result_to_create{$names[$index]} = $$datum[$index];
1620     }
1621     push @results_to_create, \%result_to_create;    
1622   }
1623   return \@results_to_create;
1624 }
1625
1626 =head2 pager
1627
1628 =over 4
1629
1630 =item Arguments: none
1631
1632 =item Return Value: $pager
1633
1634 =back
1635
1636 Return Value a L<Data::Page> object for the current resultset. Only makes
1637 sense for queries with a C<page> attribute.
1638
1639 =cut
1640
1641 sub pager {
1642   my ($self) = @_;
1643   my $attrs = $self->{attrs};
1644   $self->throw_exception("Can't create pager for non-paged rs")
1645     unless $self->{attrs}{page};
1646   $attrs->{rows} ||= 10;
1647   return $self->{pager} ||= Data::Page->new(
1648     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1649 }
1650
1651 =head2 page
1652
1653 =over 4
1654
1655 =item Arguments: $page_number
1656
1657 =item Return Value: $rs
1658
1659 =back
1660
1661 Returns a resultset for the $page_number page of the resultset on which page
1662 is called, where each page contains a number of rows equal to the 'rows'
1663 attribute set on the resultset (10 by default).
1664
1665 =cut
1666
1667 sub page {
1668   my ($self, $page) = @_;
1669   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1670 }
1671
1672 =head2 new_result
1673
1674 =over 4
1675
1676 =item Arguments: \%vals
1677
1678 =item Return Value: $rowobject
1679
1680 =back
1681
1682 Creates a new row object in the resultset's result class and returns
1683 it. The row is not inserted into the database at this point, call
1684 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1685 will tell you whether the row object has been inserted or not.
1686
1687 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1688
1689 =cut
1690
1691 sub new_result {
1692   my ($self, $values) = @_;
1693   $self->throw_exception( "new_result needs a hash" )
1694     unless (ref $values eq 'HASH');
1695
1696   my %new;
1697   my $alias = $self->{attrs}{alias};
1698
1699   if (
1700     defined $self->{cond}
1701     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1702   ) {
1703     %new = %{$self->{attrs}{related_objects}};
1704   } else {
1705     $self->throw_exception(
1706       "Can't abstract implicit construct, condition not a hash"
1707     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1708   
1709     my $collapsed_cond = (
1710       $self->{cond}
1711         ? $self->_collapse_cond($self->{cond})
1712         : {}
1713     );
1714   
1715     # precendence must be given to passed values over values inherited from
1716     # the cond, so the order here is important.
1717     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1718     while( my($col,$value) = each %implied ){
1719       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1720         $new{$col} = $value->{'='};
1721         next;
1722       }
1723       $new{$col} = $value if $self->_is_deterministic_value($value);
1724     }
1725   }
1726
1727   %new = (
1728     %new,
1729     %{ $self->_remove_alias($values, $alias) },
1730     -source_handle => $self->_source_handle,
1731     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1732   );
1733
1734   return $self->result_class->new(\%new);
1735 }
1736
1737 # _is_deterministic_value
1738 #
1739 # Make an effor to strip non-deterministic values from the condition, 
1740 # to make sure new_result chokes less
1741
1742 sub _is_deterministic_value {
1743   my $self = shift;
1744   my $value = shift;
1745   my $ref_type = ref $value;
1746   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1747   return 1 if Scalar::Util::blessed($value);
1748   return 0;
1749 }
1750
1751 # _collapse_cond
1752 #
1753 # Recursively collapse the condition.
1754
1755 sub _collapse_cond {
1756   my ($self, $cond, $collapsed) = @_;
1757
1758   $collapsed ||= {};
1759
1760   if (ref $cond eq 'ARRAY') {
1761     foreach my $subcond (@$cond) {
1762       next unless ref $subcond;  # -or
1763 #      warn "ARRAY: " . Dumper $subcond;
1764       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1765     }
1766   }
1767   elsif (ref $cond eq 'HASH') {
1768     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1769       foreach my $subcond (@{$cond->{-and}}) {
1770 #        warn "HASH: " . Dumper $subcond;
1771         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1772       }
1773     }
1774     else {
1775 #      warn "LEAF: " . Dumper $cond;
1776       foreach my $col (keys %$cond) {
1777         my $value = $cond->{$col};
1778         $collapsed->{$col} = $value;
1779       }
1780     }
1781   }
1782
1783   return $collapsed;
1784 }
1785
1786 # _remove_alias
1787 #
1788 # Remove the specified alias from the specified query hash. A copy is made so
1789 # the original query is not modified.
1790
1791 sub _remove_alias {
1792   my ($self, $query, $alias) = @_;
1793
1794   my %orig = %{ $query || {} };
1795   my %unaliased;
1796
1797   foreach my $key (keys %orig) {
1798     if ($key !~ /\./) {
1799       $unaliased{$key} = $orig{$key};
1800       next;
1801     }
1802     $unaliased{$1} = $orig{$key}
1803       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1804   }
1805
1806   return \%unaliased;
1807 }
1808
1809 =head2 as_query
1810
1811 =over 4
1812
1813 =item Arguments: none
1814
1815 =item Return Value: \[ $sql, @bind ]
1816
1817 =back
1818
1819 Returns the SQL query and bind vars associated with the invocant.
1820
1821 This is generally used as the RHS for a subquery.
1822
1823 =cut
1824
1825 sub as_query { return shift->cursor->as_query(@_) }
1826
1827 =head2 find_or_new
1828
1829 =over 4
1830
1831 =item Arguments: \%vals, \%attrs?
1832
1833 =item Return Value: $rowobject
1834
1835 =back
1836
1837   my $artist = $schema->resultset('Artist')->find_or_new(
1838     { artist => 'fred' }, { key => 'artists' });
1839
1840   $cd->cd_to_producer->find_or_new({ producer => $producer },
1841                                    { key => 'primary });
1842
1843 Find an existing record from this resultset, based on its primary
1844 key, or a unique constraint. If none exists, instantiate a new result
1845 object and return it. The object will not be saved into your storage
1846 until you call L<DBIx::Class::Row/insert> on it.
1847
1848 You most likely want this method when looking for existing rows using
1849 a unique constraint that is not the primary key, or looking for
1850 related rows.
1851
1852 If you want objects to be saved immediately, use L</find_or_create> instead.
1853
1854 B<Note>: C<find_or_new> is probably not what you want when creating a
1855 new row in a table that uses primary keys supplied by the
1856 database. Passing in a primary key column with a value of I<undef>
1857 will cause L</find> to attempt to search for a row with a value of
1858 I<NULL>.
1859
1860 =cut
1861
1862 sub find_or_new {
1863   my $self     = shift;
1864   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1865   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1866   my $exists   = $self->find($hash, $attrs);
1867   return defined $exists ? $exists : $self->new_result($hash);
1868 }
1869
1870 =head2 create
1871
1872 =over 4
1873
1874 =item Arguments: \%vals
1875
1876 =item Return Value: a L<DBIx::Class::Row> $object
1877
1878 =back
1879
1880 Attempt to create a single new row or a row with multiple related rows
1881 in the table represented by the resultset (and related tables). This
1882 will not check for duplicate rows before inserting, use
1883 L</find_or_create> to do that.
1884
1885 To create one row for this resultset, pass a hashref of key/value
1886 pairs representing the columns of the table and the values you wish to
1887 store. If the appropriate relationships are set up, foreign key fields
1888 can also be passed an object representing the foreign row, and the
1889 value will be set to its primary key.
1890
1891 To create related objects, pass a hashref for the value if the related
1892 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1893 and use the name of the relationship as the key. (NOT the name of the field,
1894 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1895 of hashrefs containing the data for each of the rows to create in the foreign
1896 tables, again using the relationship name as the key.
1897
1898 Instead of hashrefs of plain related data (key/value pairs), you may
1899 also pass new or inserted objects. New objects (not inserted yet, see
1900 L</new>), will be inserted into their appropriate tables.
1901
1902 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1903
1904 Example of creating a new row.
1905
1906   $person_rs->create({
1907     name=>"Some Person",
1908     email=>"somebody@someplace.com"
1909   });
1910   
1911 Example of creating a new row and also creating rows in a related C<has_many>
1912 or C<has_one> resultset.  Note Arrayref.
1913
1914   $artist_rs->create(
1915      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1916         { title => 'My First CD', year => 2006 },
1917         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1918       ],
1919      },
1920   );
1921
1922 Example of creating a new row and also creating a row in a related
1923 C<belongs_to>resultset. Note Hashref.
1924
1925   $cd_rs->create({
1926     title=>"Music for Silly Walks",
1927     year=>2000,
1928     artist => {
1929       name=>"Silly Musician",
1930     }
1931   });
1932
1933 =cut
1934
1935 sub create {
1936   my ($self, $attrs) = @_;
1937   $self->throw_exception( "create needs a hashref" )
1938     unless ref $attrs eq 'HASH';
1939   return $self->new_result($attrs)->insert;
1940 }
1941
1942 =head2 find_or_create
1943
1944 =over 4
1945
1946 =item Arguments: \%vals, \%attrs?
1947
1948 =item Return Value: $rowobject
1949
1950 =back
1951
1952   $cd->cd_to_producer->find_or_create({ producer => $producer },
1953                                       { key => 'primary });
1954
1955 Tries to find a record based on its primary key or unique constraints; if none
1956 is found, creates one and returns that instead.
1957
1958   my $cd = $schema->resultset('CD')->find_or_create({
1959     cdid   => 5,
1960     artist => 'Massive Attack',
1961     title  => 'Mezzanine',
1962     year   => 2005,
1963   });
1964
1965 Also takes an optional C<key> attribute, to search by a specific key or unique
1966 constraint. For example:
1967
1968   my $cd = $schema->resultset('CD')->find_or_create(
1969     {
1970       artist => 'Massive Attack',
1971       title  => 'Mezzanine',
1972     },
1973     { key => 'cd_artist_title' }
1974   );
1975
1976 B<Note>: Because find_or_create() reads from the database and then
1977 possibly inserts based on the result, this method is subject to a race
1978 condition. Another process could create a record in the table after
1979 the find has completed and before the create has started. To avoid
1980 this problem, use find_or_create() inside a transaction.
1981
1982 B<Note>: C<find_or_create> is probably not what you want when creating
1983 a new row in a table that uses primary keys supplied by the
1984 database. Passing in a primary key column with a value of I<undef>
1985 will cause L</find> to attempt to search for a row with a value of
1986 I<NULL>.
1987
1988 See also L</find> and L</update_or_create>. For information on how to declare
1989 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1990
1991 =cut
1992
1993 sub find_or_create {
1994   my $self     = shift;
1995   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1996   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1997   my $exists   = $self->find($hash, $attrs);
1998   return defined $exists ? $exists : $self->create($hash);
1999 }
2000
2001 =head2 update_or_create
2002
2003 =over 4
2004
2005 =item Arguments: \%col_values, { key => $unique_constraint }?
2006
2007 =item Return Value: $rowobject
2008
2009 =back
2010
2011   $resultset->update_or_create({ col => $val, ... });
2012
2013 First, searches for an existing row matching one of the unique constraints
2014 (including the primary key) on the source of this resultset. If a row is
2015 found, updates it with the other given column values. Otherwise, creates a new
2016 row.
2017
2018 Takes an optional C<key> attribute to search on a specific unique constraint.
2019 For example:
2020
2021   # In your application
2022   my $cd = $schema->resultset('CD')->update_or_create(
2023     {
2024       artist => 'Massive Attack',
2025       title  => 'Mezzanine',
2026       year   => 1998,
2027     },
2028     { key => 'cd_artist_title' }
2029   );
2030
2031   $cd->cd_to_producer->update_or_create({ 
2032     producer => $producer, 
2033     name => 'harry',
2034   }, { 
2035     key => 'primary,
2036   });
2037
2038
2039 If no C<key> is specified, it searches on all unique constraints defined on the
2040 source, including the primary key.
2041
2042 If the C<key> is specified as C<primary>, it searches only on the primary key.
2043
2044 See also L</find> and L</find_or_create>. For information on how to declare
2045 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2046
2047 B<Note>: C<update_or_create> is probably not what you want when
2048 looking for a row in a table that uses primary keys supplied by the
2049 database, unless you actually have a key value. Passing in a primary
2050 key column with a value of I<undef> will cause L</find> to attempt to
2051 search for a row with a value of I<NULL>.
2052
2053 =cut
2054
2055 sub update_or_create {
2056   my $self = shift;
2057   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2058   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2059
2060   my $row = $self->find($cond, $attrs);
2061   if (defined $row) {
2062     $row->update($cond);
2063     return $row;
2064   }
2065
2066   return $self->create($cond);
2067 }
2068
2069 =head2 get_cache
2070
2071 =over 4
2072
2073 =item Arguments: none
2074
2075 =item Return Value: \@cache_objects?
2076
2077 =back
2078
2079 Gets the contents of the cache for the resultset, if the cache is set.
2080
2081 The cache is populated either by using the L</prefetch> attribute to
2082 L</search> or by calling L</set_cache>.
2083
2084 =cut
2085
2086 sub get_cache {
2087   shift->{all_cache};
2088 }
2089
2090 =head2 set_cache
2091
2092 =over 4
2093
2094 =item Arguments: \@cache_objects
2095
2096 =item Return Value: \@cache_objects
2097
2098 =back
2099
2100 Sets the contents of the cache for the resultset. Expects an arrayref
2101 of objects of the same class as those produced by the resultset. Note that
2102 if the cache is set the resultset will return the cached objects rather
2103 than re-querying the database even if the cache attr is not set.
2104
2105 The contents of the cache can also be populated by using the
2106 L</prefetch> attribute to L</search>.
2107
2108 =cut
2109
2110 sub set_cache {
2111   my ( $self, $data ) = @_;
2112   $self->throw_exception("set_cache requires an arrayref")
2113       if defined($data) && (ref $data ne 'ARRAY');
2114   $self->{all_cache} = $data;
2115 }
2116
2117 =head2 clear_cache
2118
2119 =over 4
2120
2121 =item Arguments: none
2122
2123 =item Return Value: []
2124
2125 =back
2126
2127 Clears the cache for the resultset.
2128
2129 =cut
2130
2131 sub clear_cache {
2132   shift->set_cache(undef);
2133 }
2134
2135 =head2 related_resultset
2136
2137 =over 4
2138
2139 =item Arguments: $relationship_name
2140
2141 =item Return Value: $resultset
2142
2143 =back
2144
2145 Returns a related resultset for the supplied relationship name.
2146
2147   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2148
2149 =cut
2150
2151 sub related_resultset {
2152   my ($self, $rel) = @_;
2153
2154   $self->{related_resultsets} ||= {};
2155   return $self->{related_resultsets}{$rel} ||= do {
2156     my $rel_obj = $self->result_source->relationship_info($rel);
2157
2158     $self->throw_exception(
2159       "search_related: result source '" . $self->result_source->source_name .
2160         "' has no such relationship $rel")
2161       unless $rel_obj;
2162     
2163     my ($from,$seen) = $self->_resolve_from($rel);
2164
2165     my $join_count = $seen->{$rel};
2166     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2167
2168     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2169     my %attrs = %{$self->{attrs}||{}};
2170     delete @attrs{qw(result_class alias)};
2171
2172     my $new_cache;
2173
2174     if (my $cache = $self->get_cache) {
2175       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2176         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2177                         @$cache ];
2178       }
2179     }
2180
2181     my $rel_source = $self->result_source->related_source($rel);
2182
2183     my $new = do {
2184
2185       # The reason we do this now instead of passing the alias to the
2186       # search_rs below is that if you wrap/overload resultset on the
2187       # source you need to know what alias it's -going- to have for things
2188       # to work sanely (e.g. RestrictWithObject wants to be able to add
2189       # extra query restrictions, and these may need to be $alias.)
2190
2191       my $attrs = $rel_source->resultset_attributes;
2192       local $attrs->{alias} = $alias;
2193
2194       $rel_source->resultset
2195                  ->search_rs(
2196                      undef, {
2197                        %attrs,
2198                        join => undef,
2199                        prefetch => undef,
2200                        select => undef,
2201                        as => undef,
2202                        where => $self->{cond},
2203                        seen_join => $seen,
2204                        from => $from,
2205                    });
2206     };
2207     $new->set_cache($new_cache) if $new_cache;
2208     $new;
2209   };
2210 }
2211
2212 =head2 current_source_alias
2213
2214 =over 4
2215
2216 =item Arguments: none
2217
2218 =item Return Value: $source_alias
2219
2220 =back
2221
2222 Returns the current table alias for the result source this resultset is built
2223 on, that will be used in the SQL query. Usually it is C<me>.
2224
2225 Currently the source alias that refers to the result set returned by a
2226 L</search>/L</find> family method depends on how you got to the resultset: it's
2227 C<me> by default, but eg. L</search_related> aliases it to the related result
2228 source name (and keeps C<me> referring to the original result set). The long
2229 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2230 (and make this method unnecessary).
2231
2232 Thus it's currently necessary to use this method in predefined queries (see
2233 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2234 source alias of the current result set:
2235
2236   # in a result set class
2237   sub modified_by {
2238     my ($self, $user) = @_;
2239
2240     my $me = $self->current_source_alias;
2241
2242     return $self->search(
2243       "$me.modified" => $user->id,
2244     );
2245   }
2246
2247 =cut
2248
2249 sub current_source_alias {
2250   my ($self) = @_;
2251
2252   return ($self->{attrs} || {})->{alias} || 'me';
2253 }
2254
2255 sub _resolve_from {
2256   my ($self, $extra_join) = @_;
2257   my $source = $self->result_source;
2258   my $attrs = $self->{attrs};
2259   
2260   my $from = $attrs->{from}
2261     || [ { $attrs->{alias} => $source->from } ];
2262     
2263   my $seen = { %{$attrs->{seen_join}||{}} };
2264
2265   my $join = ($attrs->{join}
2266                ? [ $attrs->{join}, $extra_join ]
2267                : $extra_join);
2268
2269   # we need to take the prefetch the attrs into account before we 
2270   # ->resolve_join as otherwise they get lost - captainL
2271   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2272
2273   $from = [
2274     @$from,
2275     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2276   ];
2277
2278   return ($from,$seen);
2279 }
2280
2281 sub _resolved_attrs {
2282   my $self = shift;
2283   return $self->{_attrs} if $self->{_attrs};
2284
2285   my $attrs = { %{$self->{attrs}||{}} };
2286   my $source = $self->result_source;
2287   my $alias = $attrs->{alias};
2288
2289   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2290   if ($attrs->{columns}) {
2291     delete $attrs->{as};
2292   } elsif (!$attrs->{select}) {
2293     $attrs->{columns} = [ $source->columns ];
2294   }
2295  
2296   $attrs->{select} = 
2297     ($attrs->{select}
2298       ? (ref $attrs->{select} eq 'ARRAY'
2299           ? [ @{$attrs->{select}} ]
2300           : [ $attrs->{select} ])
2301       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
2302     );
2303   $attrs->{as} =
2304     ($attrs->{as}
2305       ? (ref $attrs->{as} eq 'ARRAY'
2306           ? [ @{$attrs->{as}} ]
2307           : [ $attrs->{as} ])
2308       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
2309     );
2310   
2311   my $adds;
2312   if ($adds = delete $attrs->{include_columns}) {
2313     $adds = [$adds] unless ref $adds eq 'ARRAY';
2314     push(@{$attrs->{select}}, @$adds);
2315     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
2316   }
2317   if ($adds = delete $attrs->{'+select'}) {
2318     $adds = [$adds] unless ref $adds eq 'ARRAY';
2319     push(@{$attrs->{select}},
2320            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
2321   }
2322   if (my $adds = delete $attrs->{'+as'}) {
2323     $adds = [$adds] unless ref $adds eq 'ARRAY';
2324     push(@{$attrs->{as}}, @$adds);
2325   }
2326
2327   $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
2328
2329   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
2330     my $join = delete $attrs->{join} || {};
2331
2332     if (defined $attrs->{prefetch}) {
2333       $join = $self->_merge_attr(
2334         $join, $attrs->{prefetch}
2335       );
2336       
2337     }
2338
2339     $attrs->{from} =   # have to copy here to avoid corrupting the original
2340       [
2341         @{$attrs->{from}}, 
2342         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
2343       ];
2344
2345   }
2346
2347   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
2348   if ($attrs->{order_by}) {
2349     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
2350                            ? [ @{$attrs->{order_by}} ]
2351                            : [ $attrs->{order_by} ]);
2352   } else {
2353     $attrs->{order_by} = [];    
2354   }
2355
2356   my $collapse = $attrs->{collapse} || {};
2357   if (my $prefetch = delete $attrs->{prefetch}) {
2358     $prefetch = $self->_merge_attr({}, $prefetch);
2359     my @pre_order;
2360     my $seen = { %{ $attrs->{seen_join} || {} } };
2361     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
2362       # bring joins back to level of current class
2363       my @prefetch = $source->resolve_prefetch(
2364         $p, $alias, $seen, \@pre_order, $collapse
2365       );
2366       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
2367       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
2368     }
2369     push(@{$attrs->{order_by}}, @pre_order);
2370   }
2371   $attrs->{collapse} = $collapse;
2372
2373   if ($attrs->{page}) {
2374     $attrs->{offset} ||= 0;
2375     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
2376   }
2377
2378   return $self->{_attrs} = $attrs;
2379 }
2380
2381 sub _rollout_attr {
2382   my ($self, $attr) = @_;
2383   
2384   if (ref $attr eq 'HASH') {
2385     return $self->_rollout_hash($attr);
2386   } elsif (ref $attr eq 'ARRAY') {
2387     return $self->_rollout_array($attr);
2388   } else {
2389     return [$attr];
2390   }
2391 }
2392
2393 sub _rollout_array {
2394   my ($self, $attr) = @_;
2395
2396   my @rolled_array;
2397   foreach my $element (@{$attr}) {
2398     if (ref $element eq 'HASH') {
2399       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2400     } elsif (ref $element eq 'ARRAY') {
2401       #  XXX - should probably recurse here
2402       push( @rolled_array, @{$self->_rollout_array($element)} );
2403     } else {
2404       push( @rolled_array, $element );
2405     }
2406   }
2407   return \@rolled_array;
2408 }
2409
2410 sub _rollout_hash {
2411   my ($self, $attr) = @_;
2412
2413   my @rolled_array;
2414   foreach my $key (keys %{$attr}) {
2415     push( @rolled_array, { $key => $attr->{$key} } );
2416   }
2417   return \@rolled_array;
2418 }
2419
2420 sub _calculate_score {
2421   my ($self, $a, $b) = @_;
2422
2423   if (ref $b eq 'HASH') {
2424     my ($b_key) = keys %{$b};
2425     if (ref $a eq 'HASH') {
2426       my ($a_key) = keys %{$a};
2427       if ($a_key eq $b_key) {
2428         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2429       } else {
2430         return 0;
2431       }
2432     } else {
2433       return ($a eq $b_key) ? 1 : 0;
2434     }       
2435   } else {
2436     if (ref $a eq 'HASH') {
2437       my ($a_key) = keys %{$a};
2438       return ($b eq $a_key) ? 1 : 0;
2439     } else {
2440       return ($b eq $a) ? 1 : 0;
2441     }
2442   }
2443 }
2444
2445 sub _merge_attr {
2446   my ($self, $orig, $import) = @_;
2447
2448   return $import unless defined($orig);
2449   return $orig unless defined($import);
2450   
2451   $orig = $self->_rollout_attr($orig);
2452   $import = $self->_rollout_attr($import);
2453
2454   my $seen_keys;
2455   foreach my $import_element ( @{$import} ) {
2456     # find best candidate from $orig to merge $b_element into
2457     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2458     foreach my $orig_element ( @{$orig} ) {
2459       my $score = $self->_calculate_score( $orig_element, $import_element );
2460       if ($score > $best_candidate->{score}) {
2461         $best_candidate->{position} = $position;
2462         $best_candidate->{score} = $score;
2463       }
2464       $position++;
2465     }
2466     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2467
2468     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2469       push( @{$orig}, $import_element );
2470     } else {
2471       my $orig_best = $orig->[$best_candidate->{position}];
2472       # merge orig_best and b_element together and replace original with merged
2473       if (ref $orig_best ne 'HASH') {
2474         $orig->[$best_candidate->{position}] = $import_element;
2475       } elsif (ref $import_element eq 'HASH') {
2476         my ($key) = keys %{$orig_best};
2477         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2478       }
2479     }
2480     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2481   }
2482
2483   return $orig;
2484 }
2485
2486 sub result_source {
2487     my $self = shift;
2488
2489     if (@_) {
2490         $self->_source_handle($_[0]->handle);
2491     } else {
2492         $self->_source_handle->resolve;
2493     }
2494 }
2495
2496 =head2 throw_exception
2497
2498 See L<DBIx::Class::Schema/throw_exception> for details.
2499
2500 =cut
2501
2502 sub throw_exception {
2503   my $self=shift;
2504   if (ref $self && $self->_source_handle->schema) {
2505     $self->_source_handle->schema->throw_exception(@_)
2506   } else {
2507     croak(@_);
2508   }
2509
2510 }
2511
2512 # XXX: FIXME: Attributes docs need clearing up
2513
2514 =head1 ATTRIBUTES
2515
2516 Attributes are used to refine a ResultSet in various ways when
2517 searching for data. They can be passed to any method which takes an
2518 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2519 L</count>.
2520
2521 These are in no particular order:
2522
2523 =head2 order_by
2524
2525 =over 4
2526
2527 =item Value: ($order_by | \@order_by)
2528
2529 =back
2530
2531 Which column(s) to order the results by. This is currently passed
2532 through directly to SQL, so you can give e.g. C<year DESC> for a
2533 descending order on the column `year'.
2534
2535 Please note that if you have C<quote_char> enabled (see
2536 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
2537 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
2538 so you will need to manually quote things as appropriate.)
2539
2540 If your L<SQL::Abstract> version supports it (>=1.50), you can also use
2541 C<{-desc => 'year'}>, which takes care of the quoting for you. This is the
2542 recommended syntax.
2543
2544 =head2 columns
2545
2546 =over 4
2547
2548 =item Value: \@columns
2549
2550 =back
2551
2552 Shortcut to request a particular set of columns to be retrieved.  Adds
2553 C<me.> onto the start of any column without a C<.> in it and sets C<select>
2554 from that, then auto-populates C<as> from C<select> as normal. (You may also
2555 use the C<cols> attribute, as in earlier versions of DBIC.)
2556
2557 =head2 include_columns
2558
2559 =over 4
2560
2561 =item Value: \@columns
2562
2563 =back
2564
2565 Shortcut to include additional columns in the returned results - for example
2566
2567   $schema->resultset('CD')->search(undef, {
2568     include_columns => ['artist.name'],
2569     join => ['artist']
2570   });
2571
2572 would return all CDs and include a 'name' column to the information
2573 passed to object inflation. Note that the 'artist' is the name of the
2574 column (or relationship) accessor, and 'name' is the name of the column
2575 accessor in the related table.
2576
2577 =head2 select
2578
2579 =over 4
2580
2581 =item Value: \@select_columns
2582
2583 =back
2584
2585 Indicates which columns should be selected from the storage. You can use
2586 column names, or in the case of RDBMS back ends, function or stored procedure
2587 names:
2588
2589   $rs = $schema->resultset('Employee')->search(undef, {
2590     select => [
2591       'name',
2592       { count => 'employeeid' },
2593       { sum => 'salary' }
2594     ]
2595   });
2596
2597 When you use function/stored procedure names and do not supply an C<as>
2598 attribute, the column names returned are storage-dependent. E.g. MySQL would
2599 return a column named C<count(employeeid)> in the above example.
2600
2601 =head2 +select
2602
2603 =over 4
2604
2605 Indicates additional columns to be selected from storage.  Works the same as
2606 L</select> but adds columns to the selection.
2607
2608 =back
2609
2610 =head2 +as
2611
2612 =over 4
2613
2614 Indicates additional column names for those added via L</+select>. See L</as>.
2615
2616 =back
2617
2618 =head2 as
2619
2620 =over 4
2621
2622 =item Value: \@inflation_names
2623
2624 =back
2625
2626 Indicates column names for object inflation. That is, C<as>
2627 indicates the name that the column can be accessed as via the
2628 C<get_column> method (or via the object accessor, B<if one already
2629 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2630
2631 The C<as> attribute is used in conjunction with C<select>,
2632 usually when C<select> contains one or more function or stored
2633 procedure names:
2634
2635   $rs = $schema->resultset('Employee')->search(undef, {
2636     select => [
2637       'name',
2638       { count => 'employeeid' }
2639     ],
2640     as => ['name', 'employee_count'],
2641   });
2642
2643   my $employee = $rs->first(); # get the first Employee
2644
2645 If the object against which the search is performed already has an accessor
2646 matching a column name specified in C<as>, the value can be retrieved using
2647 the accessor as normal:
2648
2649   my $name = $employee->name();
2650
2651 If on the other hand an accessor does not exist in the object, you need to
2652 use C<get_column> instead:
2653
2654   my $employee_count = $employee->get_column('employee_count');
2655
2656 You can create your own accessors if required - see
2657 L<DBIx::Class::Manual::Cookbook> for details.
2658
2659 Please note: This will NOT insert an C<AS employee_count> into the SQL
2660 statement produced, it is used for internal access only. Thus
2661 attempting to use the accessor in an C<order_by> clause or similar
2662 will fail miserably.
2663
2664 To get around this limitation, you can supply literal SQL to your
2665 C<select> attibute that contains the C<AS alias> text, eg:
2666
2667   select => [\'myfield AS alias']
2668
2669 =head2 join
2670
2671 =over 4
2672
2673 =item Value: ($rel_name | \@rel_names | \%rel_names)
2674
2675 =back
2676
2677 Contains a list of relationships that should be joined for this query.  For
2678 example:
2679
2680   # Get CDs by Nine Inch Nails
2681   my $rs = $schema->resultset('CD')->search(
2682     { 'artist.name' => 'Nine Inch Nails' },
2683     { join => 'artist' }
2684   );
2685
2686 Can also contain a hash reference to refer to the other relation's relations.
2687 For example:
2688
2689   package MyApp::Schema::Track;
2690   use base qw/DBIx::Class/;
2691   __PACKAGE__->table('track');
2692   __PACKAGE__->add_columns(qw/trackid cd position title/);
2693   __PACKAGE__->set_primary_key('trackid');
2694   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2695   1;
2696
2697   # In your application
2698   my $rs = $schema->resultset('Artist')->search(
2699     { 'track.title' => 'Teardrop' },
2700     {
2701       join     => { cd => 'track' },
2702       order_by => 'artist.name',
2703     }
2704   );
2705
2706 You need to use the relationship (not the table) name in  conditions, 
2707 because they are aliased as such. The current table is aliased as "me", so 
2708 you need to use me.column_name in order to avoid ambiguity. For example:
2709
2710   # Get CDs from 1984 with a 'Foo' track 
2711   my $rs = $schema->resultset('CD')->search(
2712     { 
2713       'me.year' => 1984,
2714       'tracks.name' => 'Foo'
2715     },
2716     { join => 'tracks' }
2717   );
2718   
2719 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2720 similarly for a third time). For e.g.
2721
2722   my $rs = $schema->resultset('Artist')->search({
2723     'cds.title'   => 'Down to Earth',
2724     'cds_2.title' => 'Popular',
2725   }, {
2726     join => [ qw/cds cds/ ],
2727   });
2728
2729 will return a set of all artists that have both a cd with title 'Down
2730 to Earth' and a cd with title 'Popular'.
2731
2732 If you want to fetch related objects from other tables as well, see C<prefetch>
2733 below.
2734
2735 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2736
2737 =head2 prefetch
2738
2739 =over 4
2740
2741 =item Value: ($rel_name | \@rel_names | \%rel_names)
2742
2743 =back
2744
2745 Contains one or more relationships that should be fetched along with
2746 the main query (when they are accessed afterwards the data will
2747 already be available, without extra queries to the database).  This is
2748 useful for when you know you will need the related objects, because it
2749 saves at least one query:
2750
2751   my $rs = $schema->resultset('Tag')->search(
2752     undef,
2753     {
2754       prefetch => {
2755         cd => 'artist'
2756       }
2757     }
2758   );
2759
2760 The initial search results in SQL like the following:
2761
2762   SELECT tag.*, cd.*, artist.* FROM tag
2763   JOIN cd ON tag.cd = cd.cdid
2764   JOIN artist ON cd.artist = artist.artistid
2765
2766 L<DBIx::Class> has no need to go back to the database when we access the
2767 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2768 case.
2769
2770 Simple prefetches will be joined automatically, so there is no need
2771 for a C<join> attribute in the above search. 
2772
2773 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2774 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2775 with an accessor type of 'single' or 'filter'). A more complex example that
2776 prefetches an artists cds, the tracks on those cds, and the tags associted 
2777 with that artist is given below (assuming many-to-many from artists to tags):
2778
2779  my $rs = $schema->resultset('Artist')->search(
2780    undef,
2781    {
2782      prefetch => [
2783        { cds => 'tracks' },
2784        { artist_tags => 'tags' }
2785      ]
2786    }
2787  );
2788  
2789
2790 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2791 attributes will be ignored.
2792
2793 =head2 page
2794
2795 =over 4
2796
2797 =item Value: $page
2798
2799 =back
2800
2801 Makes the resultset paged and specifies the page to retrieve. Effectively
2802 identical to creating a non-pages resultset and then calling ->page($page)
2803 on it.
2804
2805 If L<rows> attribute is not specified it defualts to 10 rows per page.
2806
2807 =head2 rows
2808
2809 =over 4
2810
2811 =item Value: $rows
2812
2813 =back
2814
2815 Specifes the maximum number of rows for direct retrieval or the number of
2816 rows per page if the page attribute or method is used.
2817
2818 =head2 offset
2819
2820 =over 4
2821
2822 =item Value: $offset
2823
2824 =back
2825
2826 Specifies the (zero-based) row number for the  first row to be returned, or the
2827 of the first row of the first page if paging is used.
2828
2829 =head2 group_by
2830
2831 =over 4
2832
2833 =item Value: \@columns
2834
2835 =back
2836
2837 A arrayref of columns to group by. Can include columns of joined tables.
2838
2839   group_by => [qw/ column1 column2 ... /]
2840
2841 =head2 having
2842
2843 =over 4
2844
2845 =item Value: $condition
2846
2847 =back
2848
2849 HAVING is a select statement attribute that is applied between GROUP BY and
2850 ORDER BY. It is applied to the after the grouping calculations have been
2851 done.
2852
2853   having => { 'count(employee)' => { '>=', 100 } }
2854
2855 =head2 distinct
2856
2857 =over 4
2858
2859 =item Value: (0 | 1)
2860
2861 =back
2862
2863 Set to 1 to group by all columns.
2864
2865 =head2 where
2866
2867 =over 4
2868
2869 Adds to the WHERE clause.
2870
2871   # only return rows WHERE deleted IS NULL for all searches
2872   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2873
2874 Can be overridden by passing C<{ where => undef }> as an attribute
2875 to a resulset.
2876
2877 =back
2878
2879 =head2 cache
2880
2881 Set to 1 to cache search results. This prevents extra SQL queries if you
2882 revisit rows in your ResultSet:
2883
2884   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2885
2886   while( my $artist = $resultset->next ) {
2887     ... do stuff ...
2888   }
2889
2890   $rs->first; # without cache, this would issue a query
2891
2892 By default, searches are not cached.
2893
2894 For more examples of using these attributes, see
2895 L<DBIx::Class::Manual::Cookbook>.
2896
2897 =head2 from
2898
2899 =over 4
2900
2901 =item Value: \@from_clause
2902
2903 =back
2904
2905 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2906 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2907 clauses.
2908
2909 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2910
2911 C<join> will usually do what you need and it is strongly recommended that you
2912 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2913 And we really do mean "cannot", not just tried and failed. Attempting to use
2914 this because you're having problems with C<join> is like trying to use x86
2915 ASM because you've got a syntax error in your C. Trust us on this.
2916
2917 Now, if you're still really, really sure you need to use this (and if you're
2918 not 100% sure, ask the mailing list first), here's an explanation of how this
2919 works.
2920
2921 The syntax is as follows -
2922
2923   [
2924     { <alias1> => <table1> },
2925     [
2926       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2927       [], # nested JOIN (optional)
2928       { <table1.column1> => <table2.column2>, ... (more conditions) },
2929     ],
2930     # More of the above [ ] may follow for additional joins
2931   ]
2932
2933   <table1> <alias1>
2934   JOIN
2935     <table2> <alias2>
2936     [JOIN ...]
2937   ON <table1.column1> = <table2.column2>
2938   <more joins may follow>
2939
2940 An easy way to follow the examples below is to remember the following:
2941
2942     Anything inside "[]" is a JOIN
2943     Anything inside "{}" is a condition for the enclosing JOIN
2944
2945 The following examples utilize a "person" table in a family tree application.
2946 In order to express parent->child relationships, this table is self-joined:
2947
2948     # Person->belongs_to('father' => 'Person');
2949     # Person->belongs_to('mother' => 'Person');
2950
2951 C<from> can be used to nest joins. Here we return all children with a father,
2952 then search against all mothers of those children:
2953
2954   $rs = $schema->resultset('Person')->search(
2955       undef,
2956       {
2957           alias => 'mother', # alias columns in accordance with "from"
2958           from => [
2959               { mother => 'person' },
2960               [
2961                   [
2962                       { child => 'person' },
2963                       [
2964                           { father => 'person' },
2965                           { 'father.person_id' => 'child.father_id' }
2966                       ]
2967                   ],
2968                   { 'mother.person_id' => 'child.mother_id' }
2969               ],
2970           ]
2971       },
2972   );
2973
2974   # Equivalent SQL:
2975   # SELECT mother.* FROM person mother
2976   # JOIN (
2977   #   person child
2978   #   JOIN person father
2979   #   ON ( father.person_id = child.father_id )
2980   # )
2981   # ON ( mother.person_id = child.mother_id )
2982
2983 The type of any join can be controlled manually. To search against only people
2984 with a father in the person table, we could explicitly use C<INNER JOIN>:
2985
2986     $rs = $schema->resultset('Person')->search(
2987         undef,
2988         {
2989             alias => 'child', # alias columns in accordance with "from"
2990             from => [
2991                 { child => 'person' },
2992                 [
2993                     { father => 'person', -join_type => 'inner' },
2994                     { 'father.id' => 'child.father_id' }
2995                 ],
2996             ]
2997         },
2998     );
2999
3000     # Equivalent SQL:
3001     # SELECT child.* FROM person child
3002     # INNER JOIN person father ON child.father_id = father.id
3003
3004 If you need to express really complex joins or you need a subselect, you
3005 can supply literal SQL to C<from> via a scalar reference. In this case
3006 the contents of the scalar will replace the table name asscoiated with the
3007 resultsource.
3008
3009 WARNING: This technique might very well not work as expected on chained
3010 searches - you have been warned.
3011
3012     # Assuming the Event resultsource is defined as:
3013
3014         MySchema::Event->add_columns (
3015             sequence => {
3016                 data_type => 'INT',
3017                 is_auto_increment => 1,
3018             },
3019             location => {
3020                 data_type => 'INT',
3021             },
3022             type => {
3023                 data_type => 'INT',
3024             },
3025         );
3026         MySchema::Event->set_primary_key ('sequence');
3027
3028     # This will get back the latest event for every location. The column
3029     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3030     # combo to limit the resultset
3031
3032     $rs = $schema->resultset('Event');
3033     $table = $rs->result_source->name;
3034     $latest = $rs->search (
3035         undef,
3036         { from => \ " 
3037             (SELECT e1.* FROM $table e1 
3038                 JOIN $table e2 
3039                     ON e1.location = e2.location 
3040                     AND e1.sequence < e2.sequence 
3041                 WHERE e2.sequence is NULL 
3042             ) me",
3043         },
3044     );
3045
3046     # Equivalent SQL (with the DBIC chunks added):
3047
3048     SELECT me.sequence, me.location, me.type FROM
3049        (SELECT e1.* FROM events e1
3050            JOIN events e2
3051                ON e1.location = e2.location
3052                AND e1.sequence < e2.sequence
3053            WHERE e2.sequence is NULL
3054        ) me;
3055
3056 =head2 for
3057
3058 =over 4
3059
3060 =item Value: ( 'update' | 'shared' )
3061
3062 =back
3063
3064 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3065 ... FOR SHARED.
3066
3067 =cut
3068
3069 1;