Merge 'trunk' into 'prefetch'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet is also an iterator. L</next> is used to return all the
50 L<DBIx::Class::Row>s the ResultSet represents.
51
52 The query that the ResultSet represents is B<only> executed against
53 the database when these methods are called:
54
55 =over
56
57 =item L</find>
58
59 =item L</next>
60
61 =item L</all>
62
63 =item L</count>
64
65 =item L</single>
66
67 =item L</first>
68
69 =back
70
71 =head1 EXAMPLES 
72
73 =head2 Chaining resultsets
74
75 Let's say you've got a query that needs to be run to return some data
76 to the user. But, you have an authorization system in place that
77 prevents certain users from seeing certain information. So, you want
78 to construct the basic query in one method, but add constraints to it in
79 another.
80
81   sub get_data {
82     my $self = shift;
83     my $request = $self->get_request; # Get a request object somehow.
84     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
85
86     my $cd_rs = $schema->resultset('CD')->search({
87       title => $request->param('title'),
88       year => $request->param('year'),
89     });
90
91     $self->apply_security_policy( $cd_rs );
92
93     return $cd_rs->all();
94   }
95
96   sub apply_security_policy {
97     my $self = shift;
98     my ($rs) = @_;
99
100     return $rs->search({
101       subversive => 0,
102     });
103   }
104
105 =head2 Multiple queries
106
107 Since a resultset just defines a query, you can do all sorts of
108 things with it with the same object.
109
110   # Don't hit the DB yet.
111   my $cd_rs = $schema->resultset('CD')->search({
112     title => 'something',
113     year => 2009,
114   });
115
116   # Each of these hits the DB individually.
117   my $count = $cd_rs->count;
118   my $most_recent = $cd_rs->get_column('date_released')->max();
119   my @records = $cd_rs->all;
120
121 And it's not just limited to SELECT statements.
122
123   $cd_rs->delete();
124
125 This is even cooler:
126
127   $cd_rs->create({ artist => 'Fred' });
128
129 Which is the same as:
130
131   $schema->resultset('CD')->create({
132     title => 'something',
133     year => 2009,
134     artist => 'Fred'
135   });
136
137 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
138
139 =head1 OVERLOADING
140
141 If a resultset is used in a numeric context it returns the L</count>.
142 However, if it is used in a booleand context it is always true.  So if
143 you want to check if a resultset has any results use C<if $rs != 0>.
144 C<if $rs> will always be true.
145
146 =head1 METHODS
147
148 =head2 new
149
150 =over 4
151
152 =item Arguments: $source, \%$attrs
153
154 =item Return Value: $rs
155
156 =back
157
158 The resultset constructor. Takes a source object (usually a
159 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
160 L</ATTRIBUTES> below).  Does not perform any queries -- these are
161 executed as needed by the other methods.
162
163 Generally you won't need to construct a resultset manually.  You'll
164 automatically get one from e.g. a L</search> called in scalar context:
165
166   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
167
168 IMPORTANT: If called on an object, proxies to new_result instead so
169
170   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
171
172 will return a CD object, not a ResultSet.
173
174 =cut
175
176 sub new {
177   my $class = shift;
178   return $class->new_result(@_) if ref $class;
179
180   my ($source, $attrs) = @_;
181   $source = $source->handle 
182     unless $source->isa('DBIx::Class::ResultSourceHandle');
183   $attrs = { %{$attrs||{}} };
184
185   if ($attrs->{page}) {
186     $attrs->{rows} ||= 10;
187   }
188
189   $attrs->{alias} ||= 'me';
190
191   # Creation of {} and bless separated to mitigate RH perl bug
192   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
193   my $self = {
194     _source_handle => $source,
195     cond => $attrs->{where},
196     count => undef,
197     pager => undef,
198     attrs => $attrs
199   };
200
201   bless $self, $class;
202
203   $self->result_class(
204     $attrs->{result_class} || $source->resolve->result_class
205   );
206
207   return $self;
208 }
209
210 =head2 search
211
212 =over 4
213
214 =item Arguments: $cond, \%attrs?
215
216 =item Return Value: $resultset (scalar context), @row_objs (list context)
217
218 =back
219
220   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
221   my $new_rs = $cd_rs->search({ year => 2005 });
222
223   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
224                  # year = 2005 OR year = 2004
225
226 If you need to pass in additional attributes but no additional condition,
227 call it as C<search(undef, \%attrs)>.
228
229   # "SELECT name, artistid FROM $artist_table"
230   my @all_artists = $schema->resultset('Artist')->search(undef, {
231     columns => [qw/name artistid/],
232   });
233
234 For a list of attributes that can be passed to C<search>, see
235 L</ATTRIBUTES>. For more examples of using this function, see
236 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
237 documentation for the first argument, see L<SQL::Abstract>.
238
239 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
240
241 =cut
242
243 sub search {
244   my $self = shift;
245   my $rs = $self->search_rs( @_ );
246   return (wantarray ? $rs->all : $rs);
247 }
248
249 =head2 search_rs
250
251 =over 4
252
253 =item Arguments: $cond, \%attrs?
254
255 =item Return Value: $resultset
256
257 =back
258
259 This method does the same exact thing as search() except it will
260 always return a resultset, even in list context.
261
262 =cut
263
264 sub search_rs {
265   my $self = shift;
266
267   my $attrs = {};
268   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
269   my $our_attrs = { %{$self->{attrs}} };
270   my $having = delete $our_attrs->{having};
271   my $where = delete $our_attrs->{where};
272
273   my $rows;
274
275   my %safe = (alias => 1, cache => 1);
276
277   unless (
278     (@_ && defined($_[0])) # @_ == () or (undef)
279     || 
280     (keys %$attrs # empty attrs or only 'safe' attrs
281     && List::Util::first { !$safe{$_} } keys %$attrs)
282   ) {
283     # no search, effectively just a clone
284     $rows = $self->get_cache;
285   }
286
287   my $new_attrs = { %{$our_attrs}, %{$attrs} };
288
289   # merge new attrs into inherited
290   foreach my $key (qw/join prefetch +select +as/) {
291     next unless exists $attrs->{$key};
292     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
293   }
294
295   my $cond = (@_
296     ? (
297         (@_ == 1 || ref $_[0] eq "HASH")
298           ? (
299               (ref $_[0] eq 'HASH')
300                 ? (
301                     (keys %{ $_[0] }  > 0)
302                       ? shift
303                       : undef
304                    )
305                 :  shift
306              )
307           : (
308               (@_ % 2)
309                 ? $self->throw_exception("Odd number of arguments to search")
310                 : {@_}
311              )
312       )
313     : undef
314   );
315
316   if (defined $where) {
317     $new_attrs->{where} = (
318       defined $new_attrs->{where}
319         ? { '-and' => [
320               map {
321                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
322               } $where, $new_attrs->{where}
323             ]
324           }
325         : $where);
326   }
327
328   if (defined $cond) {
329     $new_attrs->{where} = (
330       defined $new_attrs->{where}
331         ? { '-and' => [
332               map {
333                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
334               } $cond, $new_attrs->{where}
335             ]
336           }
337         : $cond);
338   }
339
340   if (defined $having) {
341     $new_attrs->{having} = (
342       defined $new_attrs->{having}
343         ? { '-and' => [
344               map {
345                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
346               } $having, $new_attrs->{having}
347             ]
348           }
349         : $having);
350   }
351
352   my $rs = (ref $self)->new($self->result_source, $new_attrs);
353   if ($rows) {
354     $rs->set_cache($rows);
355   }
356   return $rs;
357 }
358
359 =head2 search_literal
360
361 =over 4
362
363 =item Arguments: $sql_fragment, @bind_values
364
365 =item Return Value: $resultset (scalar context), @row_objs (list context)
366
367 =back
368
369   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
370   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
371
372 Pass a literal chunk of SQL to be added to the conditional part of the
373 resultset query.
374
375 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
376 only be used in that context. There are known problems using C<search_literal>
377 in chained queries; it can result in bind values in the wrong order.  See
378 L<DBIx::Class::Manual::Cookbook/Searching> and
379 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
380 require C<search_literal>.
381
382 =cut
383
384 sub search_literal {
385   my ($self, $cond, @vals) = @_;
386   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
387   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
388   return $self->search(\$cond, $attrs);
389 }
390
391 =head2 find
392
393 =over 4
394
395 =item Arguments: @values | \%cols, \%attrs?
396
397 =item Return Value: $row_object | undef
398
399 =back
400
401 Finds a row based on its primary key or unique constraint. For example, to find
402 a row by its primary key:
403
404   my $cd = $schema->resultset('CD')->find(5);
405
406 You can also find a row by a specific unique constraint using the C<key>
407 attribute. For example:
408
409   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
410     key => 'cd_artist_title'
411   });
412
413 Additionally, you can specify the columns explicitly by name:
414
415   my $cd = $schema->resultset('CD')->find(
416     {
417       artist => 'Massive Attack',
418       title  => 'Mezzanine',
419     },
420     { key => 'cd_artist_title' }
421   );
422
423 If the C<key> is specified as C<primary>, it searches only on the primary key.
424
425 If no C<key> is specified, it searches on all unique constraints defined on the
426 source for which column data is provided, including the primary key.
427
428 If your table does not have a primary key, you B<must> provide a value for the
429 C<key> attribute matching one of the unique constraints on the source.
430
431 In addition to C<key>, L</find> recognizes and applies standard
432 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
433
434 Note: If your query does not return only one row, a warning is generated:
435
436   Query returned more than one row
437
438 See also L</find_or_create> and L</update_or_create>. For information on how to
439 declare unique constraints, see
440 L<DBIx::Class::ResultSource/add_unique_constraint>.
441
442 =cut
443
444 sub find {
445   my $self = shift;
446   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
447
448   # Default to the primary key, but allow a specific key
449   my @cols = exists $attrs->{key}
450     ? $self->result_source->unique_constraint_columns($attrs->{key})
451     : $self->result_source->primary_columns;
452   $self->throw_exception(
453     "Can't find unless a primary key is defined or unique constraint is specified"
454   ) unless @cols;
455
456   # Parse out a hashref from input
457   my $input_query;
458   if (ref $_[0] eq 'HASH') {
459     $input_query = { %{$_[0]} };
460   }
461   elsif (@_ == @cols) {
462     $input_query = {};
463     @{$input_query}{@cols} = @_;
464   }
465   else {
466     # Compatibility: Allow e.g. find(id => $value)
467     carp "Find by key => value deprecated; please use a hashref instead";
468     $input_query = {@_};
469   }
470
471   my (%related, $info);
472
473   KEY: foreach my $key (keys %$input_query) {
474     if (ref($input_query->{$key})
475         && ($info = $self->result_source->relationship_info($key))) {
476       my $val = delete $input_query->{$key};
477       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
478       my $rel_q = $self->result_source->resolve_condition(
479                     $info->{cond}, $val, $key
480                   );
481       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
482       @related{keys %$rel_q} = values %$rel_q;
483     }
484   }
485   if (my @keys = keys %related) {
486     @{$input_query}{@keys} = values %related;
487   }
488
489
490   # Build the final query: Default to the disjunction of the unique queries,
491   # but allow the input query in case the ResultSet defines the query or the
492   # user is abusing find
493   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
494   my $query;
495   if (exists $attrs->{key}) {
496     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
497     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
498     $query = $self->_add_alias($unique_query, $alias);
499   }
500   else {
501     my @unique_queries = $self->_unique_queries($input_query, $attrs);
502     $query = @unique_queries
503       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
504       : $self->_add_alias($input_query, $alias);
505   }
506
507   # Run the query
508   if (keys %$attrs) {
509     my $rs = $self->search($query, $attrs);
510     if (keys %{$rs->_resolved_attrs->{collapse}}) {
511       my $row = $rs->next;
512       carp "Query returned more than one row" if $rs->next;
513       return $row;
514     }
515     else {
516       return $rs->single;
517     }
518   }
519   else {
520     if (keys %{$self->_resolved_attrs->{collapse}}) {
521       my $rs = $self->search($query);
522       my $row = $rs->next;
523       carp "Query returned more than one row" if $rs->next;
524       return $row;
525     }
526     else {
527       return $self->single($query);
528     }
529   }
530 }
531
532 # _add_alias
533 #
534 # Add the specified alias to the specified query hash. A copy is made so the
535 # original query is not modified.
536
537 sub _add_alias {
538   my ($self, $query, $alias) = @_;
539
540   my %aliased = %$query;
541   foreach my $col (grep { ! m/\./ } keys %aliased) {
542     $aliased{"$alias.$col"} = delete $aliased{$col};
543   }
544
545   return \%aliased;
546 }
547
548 # _unique_queries
549 #
550 # Build a list of queries which satisfy unique constraints.
551
552 sub _unique_queries {
553   my ($self, $query, $attrs) = @_;
554
555   my @constraint_names = exists $attrs->{key}
556     ? ($attrs->{key})
557     : $self->result_source->unique_constraint_names;
558
559   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
560   my $num_where = scalar keys %$where;
561
562   my @unique_queries;
563   foreach my $name (@constraint_names) {
564     my @unique_cols = $self->result_source->unique_constraint_columns($name);
565     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
566
567     my $num_cols = scalar @unique_cols;
568     my $num_query = scalar keys %$unique_query;
569
570     my $total = $num_query + $num_where;
571     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
572       # The query is either unique on its own or is unique in combination with
573       # the existing where clause
574       push @unique_queries, $unique_query;
575     }
576   }
577
578   return @unique_queries;
579 }
580
581 # _build_unique_query
582 #
583 # Constrain the specified query hash based on the specified column names.
584
585 sub _build_unique_query {
586   my ($self, $query, $unique_cols) = @_;
587
588   return {
589     map  { $_ => $query->{$_} }
590     grep { exists $query->{$_} }
591       @$unique_cols
592   };
593 }
594
595 =head2 search_related
596
597 =over 4
598
599 =item Arguments: $rel, $cond, \%attrs?
600
601 =item Return Value: $new_resultset
602
603 =back
604
605   $new_rs = $cd_rs->search_related('artist', {
606     name => 'Emo-R-Us',
607   });
608
609 Searches the specified relationship, optionally specifying a condition and
610 attributes for matching records. See L</ATTRIBUTES> for more information.
611
612 =cut
613
614 sub search_related {
615   return shift->related_resultset(shift)->search(@_);
616 }
617
618 =head2 search_related_rs
619
620 This method works exactly the same as search_related, except that
621 it guarantees a restultset, even in list context.
622
623 =cut
624
625 sub search_related_rs {
626   return shift->related_resultset(shift)->search_rs(@_);
627 }
628
629 =head2 cursor
630
631 =over 4
632
633 =item Arguments: none
634
635 =item Return Value: $cursor
636
637 =back
638
639 Returns a storage-driven cursor to the given resultset. See
640 L<DBIx::Class::Cursor> for more information.
641
642 =cut
643
644 sub cursor {
645   my ($self) = @_;
646
647   my $attrs = { %{$self->_resolved_attrs} };
648   return $self->{cursor}
649     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
650           $attrs->{where},$attrs);
651 }
652
653 =head2 single
654
655 =over 4
656
657 =item Arguments: $cond?
658
659 =item Return Value: $row_object?
660
661 =back
662
663   my $cd = $schema->resultset('CD')->single({ year => 2001 });
664
665 Inflates the first result without creating a cursor if the resultset has
666 any records in it; if not returns nothing. Used by L</find> as a lean version of
667 L</search>.
668
669 While this method can take an optional search condition (just like L</search>)
670 being a fast-code-path it does not recognize search attributes. If you need to
671 add extra joins or similar, call L</search> and then chain-call L</single> on the
672 L<DBIx::Class::ResultSet> returned.
673
674 =over
675
676 =item B<Note>
677
678 As of 0.08100, this method enforces the assumption that the preceeding
679 query returns only one row. If more than one row is returned, you will receive
680 a warning:
681
682   Query returned more than one row
683
684 In this case, you should be using L</first> or L</find> instead, or if you really
685 know what you are doing, use the L</rows> attribute to explicitly limit the size 
686 of the resultset.
687
688 =back
689
690 =cut
691
692 sub single {
693   my ($self, $where) = @_;
694   if(@_ > 2) {
695       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
696   }
697
698   my $attrs = { %{$self->_resolved_attrs} };
699   if ($where) {
700     if (defined $attrs->{where}) {
701       $attrs->{where} = {
702         '-and' =>
703             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
704                $where, delete $attrs->{where} ]
705       };
706     } else {
707       $attrs->{where} = $where;
708     }
709   }
710
711 #  XXX: Disabled since it doesn't infer uniqueness in all cases
712 #  unless ($self->_is_unique_query($attrs->{where})) {
713 #    carp "Query not guaranteed to return a single row"
714 #      . "; please declare your unique constraints or use search instead";
715 #  }
716
717   my @data = $self->result_source->storage->select_single(
718     $attrs->{from}, $attrs->{select},
719     $attrs->{where}, $attrs
720   );
721
722   return (@data ? ($self->_construct_object(@data))[0] : undef);
723 }
724
725 # _is_unique_query
726 #
727 # Try to determine if the specified query is guaranteed to be unique, based on
728 # the declared unique constraints.
729
730 sub _is_unique_query {
731   my ($self, $query) = @_;
732
733   my $collapsed = $self->_collapse_query($query);
734   my $alias = $self->{attrs}{alias};
735
736   foreach my $name ($self->result_source->unique_constraint_names) {
737     my @unique_cols = map {
738       "$alias.$_"
739     } $self->result_source->unique_constraint_columns($name);
740
741     # Count the values for each unique column
742     my %seen = map { $_ => 0 } @unique_cols;
743
744     foreach my $key (keys %$collapsed) {
745       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
746       next unless exists $seen{$aliased};  # Additional constraints are okay
747       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
748     }
749
750     # If we get 0 or more than 1 value for a column, it's not necessarily unique
751     return 1 unless grep { $_ != 1 } values %seen;
752   }
753
754   return 0;
755 }
756
757 # _collapse_query
758 #
759 # Recursively collapse the query, accumulating values for each column.
760
761 sub _collapse_query {
762   my ($self, $query, $collapsed) = @_;
763
764   $collapsed ||= {};
765
766   if (ref $query eq 'ARRAY') {
767     foreach my $subquery (@$query) {
768       next unless ref $subquery;  # -or
769 #      warn "ARRAY: " . Dumper $subquery;
770       $collapsed = $self->_collapse_query($subquery, $collapsed);
771     }
772   }
773   elsif (ref $query eq 'HASH') {
774     if (keys %$query and (keys %$query)[0] eq '-and') {
775       foreach my $subquery (@{$query->{-and}}) {
776 #        warn "HASH: " . Dumper $subquery;
777         $collapsed = $self->_collapse_query($subquery, $collapsed);
778       }
779     }
780     else {
781 #      warn "LEAF: " . Dumper $query;
782       foreach my $col (keys %$query) {
783         my $value = $query->{$col};
784         $collapsed->{$col}{$value}++;
785       }
786     }
787   }
788
789   return $collapsed;
790 }
791
792 =head2 get_column
793
794 =over 4
795
796 =item Arguments: $cond?
797
798 =item Return Value: $resultsetcolumn
799
800 =back
801
802   my $max_length = $rs->get_column('length')->max;
803
804 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
805
806 =cut
807
808 sub get_column {
809   my ($self, $column) = @_;
810   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
811   return $new;
812 }
813
814 =head2 search_like
815
816 =over 4
817
818 =item Arguments: $cond, \%attrs?
819
820 =item Return Value: $resultset (scalar context), @row_objs (list context)
821
822 =back
823
824   # WHERE title LIKE '%blue%'
825   $cd_rs = $rs->search_like({ title => '%blue%'});
826
827 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
828 that this is simply a convenience method retained for ex Class::DBI users.
829 You most likely want to use L</search> with specific operators.
830
831 For more information, see L<DBIx::Class::Manual::Cookbook>.
832
833 =cut
834
835 sub search_like {
836   my $class = shift;
837   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
839   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
840   return $class->search($query, { %$attrs });
841 }
842
843 =head2 slice
844
845 =over 4
846
847 =item Arguments: $first, $last
848
849 =item Return Value: $resultset (scalar context), @row_objs (list context)
850
851 =back
852
853 Returns a resultset or object list representing a subset of elements from the
854 resultset slice is called on. Indexes are from 0, i.e., to get the first
855 three records, call:
856
857   my ($one, $two, $three) = $rs->slice(0, 2);
858
859 =cut
860
861 sub slice {
862   my ($self, $min, $max) = @_;
863   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
864   $attrs->{offset} = $self->{attrs}{offset} || 0;
865   $attrs->{offset} += $min;
866   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
867   return $self->search(undef(), $attrs);
868   #my $slice = (ref $self)->new($self->result_source, $attrs);
869   #return (wantarray ? $slice->all : $slice);
870 }
871
872 =head2 next
873
874 =over 4
875
876 =item Arguments: none
877
878 =item Return Value: $result?
879
880 =back
881
882 Returns the next element in the resultset (C<undef> is there is none).
883
884 Can be used to efficiently iterate over records in the resultset:
885
886   my $rs = $schema->resultset('CD')->search;
887   while (my $cd = $rs->next) {
888     print $cd->title;
889   }
890
891 Note that you need to store the resultset object, and call C<next> on it.
892 Calling C<< resultset('Table')->next >> repeatedly will always return the
893 first record from the resultset.
894
895 =cut
896
897 sub next {
898   my ($self) = @_;
899   if (my $cache = $self->get_cache) {
900     $self->{all_cache_position} ||= 0;
901     return $cache->[$self->{all_cache_position}++];
902   }
903   if ($self->{attrs}{cache}) {
904     $self->{all_cache_position} = 1;
905     return ($self->all)[0];
906   }
907   if ($self->{stashed_objects}) {
908     my $obj = shift(@{$self->{stashed_objects}});
909     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
910     return $obj;
911   }
912   my @row = (
913     exists $self->{stashed_row}
914       ? @{delete $self->{stashed_row}}
915       : $self->cursor->next
916   );
917   return undef unless (@row);
918   my ($row, @more) = $self->_construct_object(@row);
919   $self->{stashed_objects} = \@more if @more;
920   return $row;
921 }
922
923 sub _construct_object {
924   my ($self, @row) = @_;
925   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
926   my @new = $self->result_class->inflate_result($self->result_source, @$info);
927   @new = $self->{_attrs}{record_filter}->(@new)
928     if exists $self->{_attrs}{record_filter};
929   return @new;
930 }
931
932 sub _collapse_result {
933   my ($self, $as_proto, $row) = @_;
934
935   my @copy = @$row;
936
937   # 'foo'         => [ undef, 'foo' ]
938   # 'foo.bar'     => [ 'foo', 'bar' ]
939   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
940
941   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
942
943   my %collapse = %{$self->{_attrs}{collapse}||{}};
944
945   my @pri_index;
946
947   # if we're doing collapsing (has_many prefetch) we need to grab records
948   # until the PK changes, so fill @pri_index. if not, we leave it empty so
949   # we know we don't have to bother.
950
951   # the reason for not using the collapse stuff directly is because if you
952   # had for e.g. two artists in a row with no cds, the collapse info for
953   # both would be NULL (undef) so you'd lose the second artist
954
955   # store just the index so we can check the array positions from the row
956   # without having to contruct the full hash
957
958   if (keys %collapse) {
959     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
960     foreach my $i (0 .. $#construct_as) {
961       next if defined($construct_as[$i][0]); # only self table
962       if (delete $pri{$construct_as[$i][1]}) {
963         push(@pri_index, $i);
964       }
965       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
966     }
967   }
968
969   # no need to do an if, it'll be empty if @pri_index is empty anyway
970
971   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
972
973   my @const_rows;
974
975   do { # no need to check anything at the front, we always want the first row
976
977     my %const;
978   
979     foreach my $this_as (@construct_as) {
980       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
981     }
982
983     push(@const_rows, \%const);
984
985   } until ( # no pri_index => no collapse => drop straight out
986       !@pri_index
987     or
988       do { # get another row, stash it, drop out if different PK
989
990         @copy = $self->cursor->next;
991         $self->{stashed_row} = \@copy;
992
993         # last thing in do block, counts as true if anything doesn't match
994
995         # check xor defined first for NULL vs. NOT NULL then if one is
996         # defined the other must be so check string equality
997
998         grep {
999           (defined $pri_vals{$_} ^ defined $copy[$_])
1000           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1001         } @pri_index;
1002       }
1003   );
1004
1005   my $alias = $self->{attrs}{alias};
1006   my $info = [];
1007
1008   my %collapse_pos;
1009
1010   my @const_keys;
1011
1012   foreach my $const (@const_rows) {
1013     scalar @const_keys or do {
1014       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1015     };
1016     foreach my $key (@const_keys) {
1017       if (length $key) {
1018         my $target = $info;
1019         my @parts = split(/\./, $key);
1020         my $cur = '';
1021         my $data = $const->{$key};
1022         foreach my $p (@parts) {
1023           $target = $target->[1]->{$p} ||= [];
1024           $cur .= ".${p}";
1025           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
1026             # collapsing at this point and on final part
1027             my $pos = $collapse_pos{$cur};
1028             CK: foreach my $ck (@ckey) {
1029               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1030                 $collapse_pos{$cur} = $data;
1031                 delete @collapse_pos{ # clear all positioning for sub-entries
1032                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1033                 };
1034                 push(@$target, []);
1035                 last CK;
1036               }
1037             }
1038           }
1039           if (exists $collapse{$cur}) {
1040             $target = $target->[-1];
1041           }
1042         }
1043         $target->[0] = $data;
1044       } else {
1045         $info->[0] = $const->{$key};
1046       }
1047     }
1048   }
1049
1050   return $info;
1051 }
1052
1053 =head2 result_source
1054
1055 =over 4
1056
1057 =item Arguments: $result_source?
1058
1059 =item Return Value: $result_source
1060
1061 =back
1062
1063 An accessor for the primary ResultSource object from which this ResultSet
1064 is derived.
1065
1066 =head2 result_class
1067
1068 =over 4
1069
1070 =item Arguments: $result_class?
1071
1072 =item Return Value: $result_class
1073
1074 =back
1075
1076 An accessor for the class to use when creating row objects. Defaults to 
1077 C<< result_source->result_class >> - which in most cases is the name of the 
1078 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1079
1080 =cut
1081
1082 sub result_class {
1083   my ($self, $result_class) = @_;
1084   if ($result_class) {
1085     $self->ensure_class_loaded($result_class);
1086     $self->_result_class($result_class);
1087   }
1088   $self->_result_class;
1089 }
1090
1091 =head2 count
1092
1093 =over 4
1094
1095 =item Arguments: $cond, \%attrs??
1096
1097 =item Return Value: $count
1098
1099 =back
1100
1101 Performs an SQL C<COUNT> with the same query as the resultset was built
1102 with to find the number of elements. If passed arguments, does a search
1103 on the resultset and counts the results of that.
1104
1105 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1106 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1107 not support C<DISTINCT> with multiple columns. If you are using such a
1108 database, you should only use columns from the main table in your C<group_by>
1109 clause.
1110
1111 =cut
1112
1113 sub count {
1114   my $self = shift;
1115   return $self->search(@_)->count if @_ and defined $_[0];
1116   return scalar @{ $self->get_cache } if $self->get_cache;
1117   my $count = $self->_count;
1118   return 0 unless $count;
1119
1120   # need to take offset from resolved attrs
1121
1122   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1123   $count = $self->{attrs}{rows} if
1124     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1125   $count = 0 if ($count < 0);
1126   return $count;
1127 }
1128
1129 sub _count { # Separated out so pager can get the full count
1130   my $self = shift;
1131   my $select = { count => '*' };
1132
1133   my $attrs = { %{$self->_resolved_attrs} };
1134   if (my $group_by = delete $attrs->{group_by}) {
1135     delete $attrs->{having};
1136     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1137     # todo: try CONCAT for multi-column pk
1138     my @pk = $self->result_source->primary_columns;
1139     if (@pk == 1) {
1140       my $alias = $attrs->{alias};
1141       foreach my $column (@distinct) {
1142         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1143           @distinct = ($column);
1144           last;
1145         }
1146       }
1147     }
1148
1149     $select = { count => { distinct => \@distinct } };
1150   }
1151
1152   $attrs->{select} = $select;
1153   $attrs->{as} = [qw/count/];
1154
1155   # offset, order by and page are not needed to count. record_filter is cdbi
1156   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1157
1158   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1159   my ($count) = $tmp_rs->cursor->next;
1160   return $count;
1161 }
1162
1163 sub _bool {
1164   return 1;
1165 }
1166
1167 =head2 count_literal
1168
1169 =over 4
1170
1171 =item Arguments: $sql_fragment, @bind_values
1172
1173 =item Return Value: $count
1174
1175 =back
1176
1177 Counts the results in a literal query. Equivalent to calling L</search_literal>
1178 with the passed arguments, then L</count>.
1179
1180 =cut
1181
1182 sub count_literal { shift->search_literal(@_)->count; }
1183
1184 =head2 all
1185
1186 =over 4
1187
1188 =item Arguments: none
1189
1190 =item Return Value: @objects
1191
1192 =back
1193
1194 Returns all elements in the resultset. Called implicitly if the resultset
1195 is returned in list context.
1196
1197 =cut
1198
1199 sub all {
1200   my $self = shift;
1201   if(@_) {
1202       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1203   }
1204
1205   return @{ $self->get_cache } if $self->get_cache;
1206
1207   my @obj;
1208
1209   # TODO: don't call resolve here
1210   if (keys %{$self->_resolved_attrs->{collapse}}) {
1211 #  if ($self->{attrs}{prefetch}) {
1212       # Using $self->cursor->all is really just an optimisation.
1213       # If we're collapsing has_many prefetches it probably makes
1214       # very little difference, and this is cleaner than hacking
1215       # _construct_object to survive the approach
1216     my @row = $self->cursor->next;
1217     while (@row) {
1218       push(@obj, $self->_construct_object(@row));
1219       @row = (exists $self->{stashed_row}
1220                ? @{delete $self->{stashed_row}}
1221                : $self->cursor->next);
1222     }
1223   } else {
1224     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1225   }
1226
1227   $self->set_cache(\@obj) if $self->{attrs}{cache};
1228   return @obj;
1229 }
1230
1231 =head2 reset
1232
1233 =over 4
1234
1235 =item Arguments: none
1236
1237 =item Return Value: $self
1238
1239 =back
1240
1241 Resets the resultset's cursor, so you can iterate through the elements again.
1242
1243 =cut
1244
1245 sub reset {
1246   my ($self) = @_;
1247   delete $self->{_attrs} if exists $self->{_attrs};
1248   $self->{all_cache_position} = 0;
1249   $self->cursor->reset;
1250   return $self;
1251 }
1252
1253 =head2 first
1254
1255 =over 4
1256
1257 =item Arguments: none
1258
1259 =item Return Value: $object?
1260
1261 =back
1262
1263 Resets the resultset and returns an object for the first result (if the
1264 resultset returns anything).
1265
1266 =cut
1267
1268 sub first {
1269   return $_[0]->reset->next;
1270 }
1271
1272 # _cond_for_update_delete
1273 #
1274 # update/delete require the condition to be modified to handle
1275 # the differing SQL syntax available.  This transforms the $self->{cond}
1276 # appropriately, returning the new condition.
1277
1278 sub _cond_for_update_delete {
1279   my ($self, $full_cond) = @_;
1280   my $cond = {};
1281
1282   $full_cond ||= $self->{cond};
1283   # No-op. No condition, we're updating/deleting everything
1284   return $cond unless ref $full_cond;
1285
1286   if (ref $full_cond eq 'ARRAY') {
1287     $cond = [
1288       map {
1289         my %hash;
1290         foreach my $key (keys %{$_}) {
1291           $key =~ /([^.]+)$/;
1292           $hash{$1} = $_->{$key};
1293         }
1294         \%hash;
1295       } @{$full_cond}
1296     ];
1297   }
1298   elsif (ref $full_cond eq 'HASH') {
1299     if ((keys %{$full_cond})[0] eq '-and') {
1300       $cond->{-and} = [];
1301
1302       my @cond = @{$full_cond->{-and}};
1303       for (my $i = 0; $i < @cond; $i++) {
1304         my $entry = $cond[$i];
1305
1306         my $hash;
1307         if (ref $entry eq 'HASH') {
1308           $hash = $self->_cond_for_update_delete($entry);
1309         }
1310         else {
1311           $entry =~ /([^.]+)$/;
1312           $hash->{$1} = $cond[++$i];
1313         }
1314
1315         push @{$cond->{-and}}, $hash;
1316       }
1317     }
1318     else {
1319       foreach my $key (keys %{$full_cond}) {
1320         $key =~ /([^.]+)$/;
1321         $cond->{$1} = $full_cond->{$key};
1322       }
1323     }
1324   }
1325   else {
1326     $self->throw_exception(
1327       "Can't update/delete on resultset with condition unless hash or array"
1328     );
1329   }
1330
1331   return $cond;
1332 }
1333
1334
1335 =head2 update
1336
1337 =over 4
1338
1339 =item Arguments: \%values
1340
1341 =item Return Value: $storage_rv
1342
1343 =back
1344
1345 Sets the specified columns in the resultset to the supplied values in a
1346 single query. Return value will be true if the update succeeded or false
1347 if no records were updated; exact type of success value is storage-dependent.
1348
1349 =cut
1350
1351 sub update {
1352   my ($self, $values) = @_;
1353   $self->throw_exception("Values for update must be a hash")
1354     unless ref $values eq 'HASH';
1355
1356   carp(   'WARNING! Currently $rs->update() does not generate proper SQL'
1357         . ' on joined resultsets, and may affect rows well outside of the'
1358         . ' contents of $rs. Use at your own risk' )
1359     if ( $self->{attrs}{seen_join} );
1360
1361   my $cond = $self->_cond_for_update_delete;
1362    
1363   return $self->result_source->storage->update(
1364     $self->result_source, $values, $cond
1365   );
1366 }
1367
1368 =head2 update_all
1369
1370 =over 4
1371
1372 =item Arguments: \%values
1373
1374 =item Return Value: 1
1375
1376 =back
1377
1378 Fetches all objects and updates them one at a time. Note that C<update_all>
1379 will run DBIC cascade triggers, while L</update> will not.
1380
1381 =cut
1382
1383 sub update_all {
1384   my ($self, $values) = @_;
1385   $self->throw_exception("Values for update must be a hash")
1386     unless ref $values eq 'HASH';
1387   foreach my $obj ($self->all) {
1388     $obj->set_columns($values)->update;
1389   }
1390   return 1;
1391 }
1392
1393 =head2 delete
1394
1395 =over 4
1396
1397 =item Arguments: none
1398
1399 =item Return Value: 1
1400
1401 =back
1402
1403 Deletes the contents of the resultset from its result source. Note that this
1404 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1405 to run. See also L<DBIx::Class::Row/delete>.
1406
1407 delete may not generate correct SQL for a query with joins or a resultset
1408 chained from a related resultset.  In this case it will generate a warning:-
1409
1410   WARNING! Currently $rs->delete() does not generate proper SQL on
1411   joined resultsets, and may delete rows well outside of the contents
1412   of $rs. Use at your own risk
1413
1414 In these cases you may find that delete_all is more appropriate, or you
1415 need to respecify your query in a way that can be expressed without a join.
1416
1417 =cut
1418
1419 sub delete {
1420   my ($self) = @_;
1421   $self->throw_exception("Delete should not be passed any arguments")
1422     if $_[1];
1423   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1424         . ' on joined resultsets, and may delete rows well outside of the'
1425         . ' contents of $rs. Use at your own risk' )
1426     if ( $self->{attrs}{seen_join} );
1427   my $cond = $self->_cond_for_update_delete;
1428
1429   $self->result_source->storage->delete($self->result_source, $cond);
1430   return 1;
1431 }
1432
1433 =head2 delete_all
1434
1435 =over 4
1436
1437 =item Arguments: none
1438
1439 =item Return Value: 1
1440
1441 =back
1442
1443 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1444 will run DBIC cascade triggers, while L</delete> will not.
1445
1446 =cut
1447
1448 sub delete_all {
1449   my ($self) = @_;
1450   $_->delete for $self->all;
1451   return 1;
1452 }
1453
1454 =head2 populate
1455
1456 =over 4
1457
1458 =item Arguments: \@data;
1459
1460 =back
1461
1462 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1463 For the arrayref of hashrefs style each hashref should be a structure suitable
1464 forsubmitting to a $resultset->create(...) method.
1465
1466 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1467 to insert the data, as this is a faster method.  
1468
1469 Otherwise, each set of data is inserted into the database using
1470 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1471 objects is returned.
1472
1473 Example:  Assuming an Artist Class that has many CDs Classes relating:
1474
1475   my $Artist_rs = $schema->resultset("Artist");
1476   
1477   ## Void Context Example 
1478   $Artist_rs->populate([
1479      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1480         { title => 'My First CD', year => 2006 },
1481         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1482       ],
1483      },
1484      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1485         { title => 'My parents sold me to a record company' ,year => 2005 },
1486         { title => 'Why Am I So Ugly?', year => 2006 },
1487         { title => 'I Got Surgery and am now Popular', year => 2007 }
1488       ],
1489      },
1490   ]);
1491   
1492   ## Array Context Example
1493   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1494     { name => "Artist One"},
1495     { name => "Artist Two"},
1496     { name => "Artist Three", cds=> [
1497     { title => "First CD", year => 2007},
1498     { title => "Second CD", year => 2008},
1499   ]}
1500   ]);
1501   
1502   print $ArtistOne->name; ## response is 'Artist One'
1503   print $ArtistThree->cds->count ## reponse is '2'
1504
1505 For the arrayref of arrayrefs style,  the first element should be a list of the
1506 fieldsnames to which the remaining elements are rows being inserted.  For
1507 example:
1508
1509   $Arstist_rs->populate([
1510     [qw/artistid name/],
1511     [100, 'A Formally Unknown Singer'],
1512     [101, 'A singer that jumped the shark two albums ago'],
1513     [102, 'An actually cool singer.'],
1514   ]);
1515
1516 Please note an important effect on your data when choosing between void and
1517 wantarray context. Since void context goes straight to C<insert_bulk> in 
1518 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1519 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1520 create primary keys for you, you will find that your PKs are empty.  In this 
1521 case you will have to use the wantarray context in order to create those 
1522 values.
1523
1524 =cut
1525
1526 sub populate {
1527   my $self = shift @_;
1528   my $data = ref $_[0][0] eq 'HASH'
1529     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1530     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1531   
1532   if(defined wantarray) {
1533     my @created;
1534     foreach my $item (@$data) {
1535       push(@created, $self->create($item));
1536     }
1537     return @created;
1538   } else {
1539     my ($first, @rest) = @$data;
1540
1541     my @names = grep {!ref $first->{$_}} keys %$first;
1542     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1543     my @pks = $self->result_source->primary_columns;  
1544
1545     ## do the belongs_to relationships  
1546     foreach my $index (0..$#$data) {
1547       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1548         my @ret = $self->populate($data);
1549         return;
1550       }
1551     
1552       foreach my $rel (@rels) {
1553         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1554         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1555         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1556         my $related = $result->result_source->resolve_condition(
1557           $result->result_source->relationship_info($reverse)->{cond},
1558           $self,        
1559           $result,        
1560         );
1561
1562         delete $data->[$index]->{$rel};
1563         $data->[$index] = {%{$data->[$index]}, %$related};
1564       
1565         push @names, keys %$related if $index == 0;
1566       }
1567     }
1568
1569     ## do bulk insert on current row
1570     my @values = map { [ @$_{@names} ] } @$data;
1571
1572     $self->result_source->storage->insert_bulk(
1573       $self->result_source, 
1574       \@names, 
1575       \@values,
1576     );
1577
1578     ## do the has_many relationships
1579     foreach my $item (@$data) {
1580
1581       foreach my $rel (@rels) {
1582         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1583
1584         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1585      || $self->throw_exception('Cannot find the relating object.');
1586      
1587         my $child = $parent->$rel;
1588     
1589         my $related = $child->result_source->resolve_condition(
1590           $parent->result_source->relationship_info($rel)->{cond},
1591           $child,
1592           $parent,
1593         );
1594
1595         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1596         my @populate = map { {%$_, %$related} } @rows_to_add;
1597
1598         $child->populate( \@populate );
1599       }
1600     }
1601   }
1602 }
1603
1604 =head2 _normalize_populate_args ($args)
1605
1606 Private method used by L</populate> to normalize its incoming arguments.  Factored
1607 out in case you want to subclass and accept new argument structures to the
1608 L</populate> method.
1609
1610 =cut
1611
1612 sub _normalize_populate_args {
1613   my ($self, $data) = @_;
1614   my @names = @{shift(@$data)};
1615   my @results_to_create;
1616   foreach my $datum (@$data) {
1617     my %result_to_create;
1618     foreach my $index (0..$#names) {
1619       $result_to_create{$names[$index]} = $$datum[$index];
1620     }
1621     push @results_to_create, \%result_to_create;    
1622   }
1623   return \@results_to_create;
1624 }
1625
1626 =head2 pager
1627
1628 =over 4
1629
1630 =item Arguments: none
1631
1632 =item Return Value: $pager
1633
1634 =back
1635
1636 Return Value a L<Data::Page> object for the current resultset. Only makes
1637 sense for queries with a C<page> attribute.
1638
1639 =cut
1640
1641 sub pager {
1642   my ($self) = @_;
1643   my $attrs = $self->{attrs};
1644   $self->throw_exception("Can't create pager for non-paged rs")
1645     unless $self->{attrs}{page};
1646   $attrs->{rows} ||= 10;
1647   return $self->{pager} ||= Data::Page->new(
1648     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1649 }
1650
1651 =head2 page
1652
1653 =over 4
1654
1655 =item Arguments: $page_number
1656
1657 =item Return Value: $rs
1658
1659 =back
1660
1661 Returns a resultset for the $page_number page of the resultset on which page
1662 is called, where each page contains a number of rows equal to the 'rows'
1663 attribute set on the resultset (10 by default).
1664
1665 =cut
1666
1667 sub page {
1668   my ($self, $page) = @_;
1669   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1670 }
1671
1672 =head2 new_result
1673
1674 =over 4
1675
1676 =item Arguments: \%vals
1677
1678 =item Return Value: $rowobject
1679
1680 =back
1681
1682 Creates a new row object in the resultset's result class and returns
1683 it. The row is not inserted into the database at this point, call
1684 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1685 will tell you whether the row object has been inserted or not.
1686
1687 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1688
1689 =cut
1690
1691 sub new_result {
1692   my ($self, $values) = @_;
1693   $self->throw_exception( "new_result needs a hash" )
1694     unless (ref $values eq 'HASH');
1695
1696   my %new;
1697   my $alias = $self->{attrs}{alias};
1698
1699   if (
1700     defined $self->{cond}
1701     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1702   ) {
1703     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
1704     $new{-from_resultset} = [ keys %new ] if keys %new;
1705   } else {
1706     $self->throw_exception(
1707       "Can't abstract implicit construct, condition not a hash"
1708     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1709   
1710     my $collapsed_cond = (
1711       $self->{cond}
1712         ? $self->_collapse_cond($self->{cond})
1713         : {}
1714     );
1715   
1716     # precendence must be given to passed values over values inherited from
1717     # the cond, so the order here is important.
1718     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1719     while( my($col,$value) = each %implied ){
1720       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1721         $new{$col} = $value->{'='};
1722         next;
1723       }
1724       $new{$col} = $value if $self->_is_deterministic_value($value);
1725     }
1726   }
1727
1728   %new = (
1729     %new,
1730     %{ $self->_remove_alias($values, $alias) },
1731     -source_handle => $self->_source_handle,
1732     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1733   );
1734
1735   return $self->result_class->new(\%new);
1736 }
1737
1738 # _is_deterministic_value
1739 #
1740 # Make an effor to strip non-deterministic values from the condition, 
1741 # to make sure new_result chokes less
1742
1743 sub _is_deterministic_value {
1744   my $self = shift;
1745   my $value = shift;
1746   my $ref_type = ref $value;
1747   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1748   return 1 if Scalar::Util::blessed($value);
1749   return 0;
1750 }
1751
1752 # _collapse_cond
1753 #
1754 # Recursively collapse the condition.
1755
1756 sub _collapse_cond {
1757   my ($self, $cond, $collapsed) = @_;
1758
1759   $collapsed ||= {};
1760
1761   if (ref $cond eq 'ARRAY') {
1762     foreach my $subcond (@$cond) {
1763       next unless ref $subcond;  # -or
1764 #      warn "ARRAY: " . Dumper $subcond;
1765       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1766     }
1767   }
1768   elsif (ref $cond eq 'HASH') {
1769     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1770       foreach my $subcond (@{$cond->{-and}}) {
1771 #        warn "HASH: " . Dumper $subcond;
1772         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1773       }
1774     }
1775     else {
1776 #      warn "LEAF: " . Dumper $cond;
1777       foreach my $col (keys %$cond) {
1778         my $value = $cond->{$col};
1779         $collapsed->{$col} = $value;
1780       }
1781     }
1782   }
1783
1784   return $collapsed;
1785 }
1786
1787 # _remove_alias
1788 #
1789 # Remove the specified alias from the specified query hash. A copy is made so
1790 # the original query is not modified.
1791
1792 sub _remove_alias {
1793   my ($self, $query, $alias) = @_;
1794
1795   my %orig = %{ $query || {} };
1796   my %unaliased;
1797
1798   foreach my $key (keys %orig) {
1799     if ($key !~ /\./) {
1800       $unaliased{$key} = $orig{$key};
1801       next;
1802     }
1803     $unaliased{$1} = $orig{$key}
1804       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1805   }
1806
1807   return \%unaliased;
1808 }
1809
1810 =head2 as_query (EXPERIMENTAL)
1811
1812 =over 4
1813
1814 =item Arguments: none
1815
1816 =item Return Value: \[ $sql, @bind ]
1817
1818 =back
1819
1820 Returns the SQL query and bind vars associated with the invocant.
1821
1822 This is generally used as the RHS for a subquery.
1823
1824 B<NOTE>: This feature is still experimental.
1825
1826 =cut
1827
1828 sub as_query { return shift->cursor->as_query(@_) }
1829
1830 =head2 find_or_new
1831
1832 =over 4
1833
1834 =item Arguments: \%vals, \%attrs?
1835
1836 =item Return Value: $rowobject
1837
1838 =back
1839
1840   my $artist = $schema->resultset('Artist')->find_or_new(
1841     { artist => 'fred' }, { key => 'artists' });
1842
1843   $cd->cd_to_producer->find_or_new({ producer => $producer },
1844                                    { key => 'primary });
1845
1846 Find an existing record from this resultset, based on its primary
1847 key, or a unique constraint. If none exists, instantiate a new result
1848 object and return it. The object will not be saved into your storage
1849 until you call L<DBIx::Class::Row/insert> on it.
1850
1851 You most likely want this method when looking for existing rows using
1852 a unique constraint that is not the primary key, or looking for
1853 related rows.
1854
1855 If you want objects to be saved immediately, use L</find_or_create> instead.
1856
1857 B<Note>: C<find_or_new> is probably not what you want when creating a
1858 new row in a table that uses primary keys supplied by the
1859 database. Passing in a primary key column with a value of I<undef>
1860 will cause L</find> to attempt to search for a row with a value of
1861 I<NULL>.
1862
1863 =cut
1864
1865 sub find_or_new {
1866   my $self     = shift;
1867   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1868   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1869   my $exists   = $self->find($hash, $attrs);
1870   return defined $exists ? $exists : $self->new_result($hash);
1871 }
1872
1873 =head2 create
1874
1875 =over 4
1876
1877 =item Arguments: \%vals
1878
1879 =item Return Value: a L<DBIx::Class::Row> $object
1880
1881 =back
1882
1883 Attempt to create a single new row or a row with multiple related rows
1884 in the table represented by the resultset (and related tables). This
1885 will not check for duplicate rows before inserting, use
1886 L</find_or_create> to do that.
1887
1888 To create one row for this resultset, pass a hashref of key/value
1889 pairs representing the columns of the table and the values you wish to
1890 store. If the appropriate relationships are set up, foreign key fields
1891 can also be passed an object representing the foreign row, and the
1892 value will be set to its primary key.
1893
1894 To create related objects, pass a hashref for the value if the related
1895 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1896 and use the name of the relationship as the key. (NOT the name of the field,
1897 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1898 of hashrefs containing the data for each of the rows to create in the foreign
1899 tables, again using the relationship name as the key.
1900
1901 Instead of hashrefs of plain related data (key/value pairs), you may
1902 also pass new or inserted objects. New objects (not inserted yet, see
1903 L</new>), will be inserted into their appropriate tables.
1904
1905 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1906
1907 Example of creating a new row.
1908
1909   $person_rs->create({
1910     name=>"Some Person",
1911     email=>"somebody@someplace.com"
1912   });
1913   
1914 Example of creating a new row and also creating rows in a related C<has_many>
1915 or C<has_one> resultset.  Note Arrayref.
1916
1917   $artist_rs->create(
1918      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1919         { title => 'My First CD', year => 2006 },
1920         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1921       ],
1922      },
1923   );
1924
1925 Example of creating a new row and also creating a row in a related
1926 C<belongs_to>resultset. Note Hashref.
1927
1928   $cd_rs->create({
1929     title=>"Music for Silly Walks",
1930     year=>2000,
1931     artist => {
1932       name=>"Silly Musician",
1933     }
1934   });
1935
1936 =cut
1937
1938 sub create {
1939   my ($self, $attrs) = @_;
1940   $self->throw_exception( "create needs a hashref" )
1941     unless ref $attrs eq 'HASH';
1942   return $self->new_result($attrs)->insert;
1943 }
1944
1945 =head2 find_or_create
1946
1947 =over 4
1948
1949 =item Arguments: \%vals, \%attrs?
1950
1951 =item Return Value: $rowobject
1952
1953 =back
1954
1955   $cd->cd_to_producer->find_or_create({ producer => $producer },
1956                                       { key => 'primary });
1957
1958 Tries to find a record based on its primary key or unique constraints; if none
1959 is found, creates one and returns that instead.
1960
1961   my $cd = $schema->resultset('CD')->find_or_create({
1962     cdid   => 5,
1963     artist => 'Massive Attack',
1964     title  => 'Mezzanine',
1965     year   => 2005,
1966   });
1967
1968 Also takes an optional C<key> attribute, to search by a specific key or unique
1969 constraint. For example:
1970
1971   my $cd = $schema->resultset('CD')->find_or_create(
1972     {
1973       artist => 'Massive Attack',
1974       title  => 'Mezzanine',
1975     },
1976     { key => 'cd_artist_title' }
1977   );
1978
1979 B<Note>: Because find_or_create() reads from the database and then
1980 possibly inserts based on the result, this method is subject to a race
1981 condition. Another process could create a record in the table after
1982 the find has completed and before the create has started. To avoid
1983 this problem, use find_or_create() inside a transaction.
1984
1985 B<Note>: C<find_or_create> is probably not what you want when creating
1986 a new row in a table that uses primary keys supplied by the
1987 database. Passing in a primary key column with a value of I<undef>
1988 will cause L</find> to attempt to search for a row with a value of
1989 I<NULL>.
1990
1991 See also L</find> and L</update_or_create>. For information on how to declare
1992 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1993
1994 =cut
1995
1996 sub find_or_create {
1997   my $self     = shift;
1998   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1999   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2000   my $exists   = $self->find($hash, $attrs);
2001   return defined $exists ? $exists : $self->create($hash);
2002 }
2003
2004 =head2 update_or_create
2005
2006 =over 4
2007
2008 =item Arguments: \%col_values, { key => $unique_constraint }?
2009
2010 =item Return Value: $rowobject
2011
2012 =back
2013
2014   $resultset->update_or_create({ col => $val, ... });
2015
2016 First, searches for an existing row matching one of the unique constraints
2017 (including the primary key) on the source of this resultset. If a row is
2018 found, updates it with the other given column values. Otherwise, creates a new
2019 row.
2020
2021 Takes an optional C<key> attribute to search on a specific unique constraint.
2022 For example:
2023
2024   # In your application
2025   my $cd = $schema->resultset('CD')->update_or_create(
2026     {
2027       artist => 'Massive Attack',
2028       title  => 'Mezzanine',
2029       year   => 1998,
2030     },
2031     { key => 'cd_artist_title' }
2032   );
2033
2034   $cd->cd_to_producer->update_or_create({ 
2035     producer => $producer, 
2036     name => 'harry',
2037   }, { 
2038     key => 'primary,
2039   });
2040
2041
2042 If no C<key> is specified, it searches on all unique constraints defined on the
2043 source, including the primary key.
2044
2045 If the C<key> is specified as C<primary>, it searches only on the primary key.
2046
2047 See also L</find> and L</find_or_create>. For information on how to declare
2048 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2049
2050 B<Note>: C<update_or_create> is probably not what you want when
2051 looking for a row in a table that uses primary keys supplied by the
2052 database, unless you actually have a key value. Passing in a primary
2053 key column with a value of I<undef> will cause L</find> to attempt to
2054 search for a row with a value of I<NULL>.
2055
2056 =cut
2057
2058 sub update_or_create {
2059   my $self = shift;
2060   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2061   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2062
2063   my $row = $self->find($cond, $attrs);
2064   if (defined $row) {
2065     $row->update($cond);
2066     return $row;
2067   }
2068
2069   return $self->create($cond);
2070 }
2071
2072 =head2 get_cache
2073
2074 =over 4
2075
2076 =item Arguments: none
2077
2078 =item Return Value: \@cache_objects?
2079
2080 =back
2081
2082 Gets the contents of the cache for the resultset, if the cache is set.
2083
2084 The cache is populated either by using the L</prefetch> attribute to
2085 L</search> or by calling L</set_cache>.
2086
2087 =cut
2088
2089 sub get_cache {
2090   shift->{all_cache};
2091 }
2092
2093 =head2 set_cache
2094
2095 =over 4
2096
2097 =item Arguments: \@cache_objects
2098
2099 =item Return Value: \@cache_objects
2100
2101 =back
2102
2103 Sets the contents of the cache for the resultset. Expects an arrayref
2104 of objects of the same class as those produced by the resultset. Note that
2105 if the cache is set the resultset will return the cached objects rather
2106 than re-querying the database even if the cache attr is not set.
2107
2108 The contents of the cache can also be populated by using the
2109 L</prefetch> attribute to L</search>.
2110
2111 =cut
2112
2113 sub set_cache {
2114   my ( $self, $data ) = @_;
2115   $self->throw_exception("set_cache requires an arrayref")
2116       if defined($data) && (ref $data ne 'ARRAY');
2117   $self->{all_cache} = $data;
2118 }
2119
2120 =head2 clear_cache
2121
2122 =over 4
2123
2124 =item Arguments: none
2125
2126 =item Return Value: []
2127
2128 =back
2129
2130 Clears the cache for the resultset.
2131
2132 =cut
2133
2134 sub clear_cache {
2135   shift->set_cache(undef);
2136 }
2137
2138 =head2 related_resultset
2139
2140 =over 4
2141
2142 =item Arguments: $relationship_name
2143
2144 =item Return Value: $resultset
2145
2146 =back
2147
2148 Returns a related resultset for the supplied relationship name.
2149
2150   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2151
2152 =cut
2153
2154 sub related_resultset {
2155   my ($self, $rel) = @_;
2156
2157   $self->{related_resultsets} ||= {};
2158   return $self->{related_resultsets}{$rel} ||= do {
2159     my $rel_obj = $self->result_source->relationship_info($rel);
2160
2161     $self->throw_exception(
2162       "search_related: result source '" . $self->result_source->source_name .
2163         "' has no such relationship $rel")
2164       unless $rel_obj;
2165     
2166     my ($from,$seen) = $self->_resolve_from($rel);
2167
2168     my $join_count = $seen->{$rel};
2169     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2170
2171     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2172     my %attrs = %{$self->{attrs}||{}};
2173     delete @attrs{qw(result_class alias)};
2174
2175     my $new_cache;
2176
2177     if (my $cache = $self->get_cache) {
2178       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2179         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2180                         @$cache ];
2181       }
2182     }
2183
2184     my $rel_source = $self->result_source->related_source($rel);
2185
2186     my $new = do {
2187
2188       # The reason we do this now instead of passing the alias to the
2189       # search_rs below is that if you wrap/overload resultset on the
2190       # source you need to know what alias it's -going- to have for things
2191       # to work sanely (e.g. RestrictWithObject wants to be able to add
2192       # extra query restrictions, and these may need to be $alias.)
2193
2194       my $attrs = $rel_source->resultset_attributes;
2195       local $attrs->{alias} = $alias;
2196
2197       $rel_source->resultset
2198                  ->search_rs(
2199                      undef, {
2200                        %attrs,
2201                        join => undef,
2202                        prefetch => undef,
2203                        select => undef,
2204                        as => undef,
2205                        where => $self->{cond},
2206                        seen_join => $seen,
2207                        from => $from,
2208                    });
2209     };
2210     $new->set_cache($new_cache) if $new_cache;
2211     $new;
2212   };
2213 }
2214
2215 =head2 current_source_alias
2216
2217 =over 4
2218
2219 =item Arguments: none
2220
2221 =item Return Value: $source_alias
2222
2223 =back
2224
2225 Returns the current table alias for the result source this resultset is built
2226 on, that will be used in the SQL query. Usually it is C<me>.
2227
2228 Currently the source alias that refers to the result set returned by a
2229 L</search>/L</find> family method depends on how you got to the resultset: it's
2230 C<me> by default, but eg. L</search_related> aliases it to the related result
2231 source name (and keeps C<me> referring to the original result set). The long
2232 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2233 (and make this method unnecessary).
2234
2235 Thus it's currently necessary to use this method in predefined queries (see
2236 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2237 source alias of the current result set:
2238
2239   # in a result set class
2240   sub modified_by {
2241     my ($self, $user) = @_;
2242
2243     my $me = $self->current_source_alias;
2244
2245     return $self->search(
2246       "$me.modified" => $user->id,
2247     );
2248   }
2249
2250 =cut
2251
2252 sub current_source_alias {
2253   my ($self) = @_;
2254
2255   return ($self->{attrs} || {})->{alias} || 'me';
2256 }
2257
2258 sub _resolve_from {
2259   my ($self, $extra_join) = @_;
2260   my $source = $self->result_source;
2261   my $attrs = $self->{attrs};
2262   
2263   my $from = $attrs->{from}
2264     || [ { $attrs->{alias} => $source->from } ];
2265     
2266   my $seen = { %{$attrs->{seen_join}||{}} };
2267
2268   my $join = ($attrs->{join}
2269                ? [ $attrs->{join}, $extra_join ]
2270                : $extra_join);
2271
2272   # we need to take the prefetch the attrs into account before we 
2273   # ->resolve_join as otherwise they get lost - captainL
2274   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2275
2276   $from = [
2277     @$from,
2278     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2279   ];
2280
2281   return ($from,$seen);
2282 }
2283
2284 sub _resolved_attrs {
2285   my $self = shift;
2286   return $self->{_attrs} if $self->{_attrs};
2287
2288   my $attrs  = { %{ $self->{attrs} || {} } };
2289   my $source = $self->result_source;
2290   my $alias  = $attrs->{alias};
2291
2292   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2293   my @colbits;
2294
2295   # build columns (as long as select isn't set) into a set of as/select hashes
2296   unless ( $attrs->{select} ) {
2297       @colbits = map {
2298           ( ref($_) eq 'HASH' ) ? $_
2299             : {
2300               (
2301                   /^\Q${alias}.\E(.+)$/ ? $1
2302                   : $_
2303                 ) => ( /\./ ? $_ : "${alias}.$_" )
2304             }
2305       } ( ref($attrs->{columns}) eq 'ARRAY' ) ? @{ delete $attrs->{columns}} : (delete $attrs->{columns} || $source->columns );
2306   }
2307   # add the additional columns on
2308   foreach ( 'include_columns', '+columns' ) {
2309       push @colbits, map {
2310           ( ref($_) eq 'HASH' )
2311             ? $_
2312             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2313       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2314   }
2315
2316   # start with initial select items
2317   if ( $attrs->{select} ) {
2318     $attrs->{select} =
2319         ( ref $attrs->{select} eq 'ARRAY' )
2320       ? [ @{ $attrs->{select} } ]
2321       : [ $attrs->{select} ];
2322     $attrs->{as} = (
2323       $attrs->{as}
2324       ? (
2325         ref $attrs->{as} eq 'ARRAY'
2326         ? [ @{ $attrs->{as} } ]
2327         : [ $attrs->{as} ]
2328         )
2329       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2330     );
2331   }
2332   else {
2333
2334     # otherwise we intialise select & as to empty
2335     $attrs->{select} = [];
2336     $attrs->{as}     = [];
2337   }
2338
2339   # now add colbits to select/as
2340   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2341   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2342
2343   my $adds;
2344   if ( $adds = delete $attrs->{'+select'} ) {
2345     $adds = [$adds] unless ref $adds eq 'ARRAY';
2346     push(
2347       @{ $attrs->{select} },
2348       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2349     );
2350   }
2351   if ( $adds = delete $attrs->{'+as'} ) {
2352     $adds = [$adds] unless ref $adds eq 'ARRAY';
2353     push( @{ $attrs->{as} }, @$adds );
2354   }
2355
2356   $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
2357
2358   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
2359     my $join = delete $attrs->{join} || {};
2360
2361     if ( defined $attrs->{prefetch} ) {
2362       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2363
2364     }
2365
2366     $attrs->{from} =    # have to copy here to avoid corrupting the original
2367       [
2368       @{ $attrs->{from} },
2369       $source->resolve_join(
2370         $join, $alias, { %{ $attrs->{seen_join} || {} } }
2371       )
2372       ];
2373
2374   }
2375
2376   $attrs->{group_by} ||= $attrs->{select}
2377     if delete $attrs->{distinct};
2378   if ( $attrs->{order_by} ) {
2379     $attrs->{order_by} = (
2380       ref( $attrs->{order_by} ) eq 'ARRAY'
2381       ? [ @{ $attrs->{order_by} } ]
2382       : [ $attrs->{order_by} ]
2383     );
2384   }
2385   else {
2386     $attrs->{order_by} = [];
2387   }
2388
2389   my $collapse = $attrs->{collapse} || {};
2390   if ( my $prefetch = delete $attrs->{prefetch} ) {
2391     $prefetch = $self->_merge_attr( {}, $prefetch );
2392     my @pre_order;
2393     my $seen = { %{ $attrs->{seen_join} || {} } };
2394     foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
2395
2396       # bring joins back to level of current class
2397       my @prefetch =
2398         $source->resolve_prefetch( $p, $alias, $seen, \@pre_order, $collapse );
2399       push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
2400       push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
2401     }
2402     push( @{ $attrs->{order_by} }, @pre_order );
2403   }
2404   $attrs->{collapse} = $collapse;
2405
2406   if ( $attrs->{page} ) {
2407     $attrs->{offset} ||= 0;
2408     $attrs->{offset} += ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
2409   }
2410
2411   return $self->{_attrs} = $attrs;
2412 }
2413
2414 sub _rollout_attr {
2415   my ($self, $attr) = @_;
2416   
2417   if (ref $attr eq 'HASH') {
2418     return $self->_rollout_hash($attr);
2419   } elsif (ref $attr eq 'ARRAY') {
2420     return $self->_rollout_array($attr);
2421   } else {
2422     return [$attr];
2423   }
2424 }
2425
2426 sub _rollout_array {
2427   my ($self, $attr) = @_;
2428
2429   my @rolled_array;
2430   foreach my $element (@{$attr}) {
2431     if (ref $element eq 'HASH') {
2432       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2433     } elsif (ref $element eq 'ARRAY') {
2434       #  XXX - should probably recurse here
2435       push( @rolled_array, @{$self->_rollout_array($element)} );
2436     } else {
2437       push( @rolled_array, $element );
2438     }
2439   }
2440   return \@rolled_array;
2441 }
2442
2443 sub _rollout_hash {
2444   my ($self, $attr) = @_;
2445
2446   my @rolled_array;
2447   foreach my $key (keys %{$attr}) {
2448     push( @rolled_array, { $key => $attr->{$key} } );
2449   }
2450   return \@rolled_array;
2451 }
2452
2453 sub _calculate_score {
2454   my ($self, $a, $b) = @_;
2455
2456   if (ref $b eq 'HASH') {
2457     my ($b_key) = keys %{$b};
2458     if (ref $a eq 'HASH') {
2459       my ($a_key) = keys %{$a};
2460       if ($a_key eq $b_key) {
2461         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2462       } else {
2463         return 0;
2464       }
2465     } else {
2466       return ($a eq $b_key) ? 1 : 0;
2467     }       
2468   } else {
2469     if (ref $a eq 'HASH') {
2470       my ($a_key) = keys %{$a};
2471       return ($b eq $a_key) ? 1 : 0;
2472     } else {
2473       return ($b eq $a) ? 1 : 0;
2474     }
2475   }
2476 }
2477
2478 sub _merge_attr {
2479   my ($self, $orig, $import) = @_;
2480
2481   return $import unless defined($orig);
2482   return $orig unless defined($import);
2483   
2484   $orig = $self->_rollout_attr($orig);
2485   $import = $self->_rollout_attr($import);
2486
2487   my $seen_keys;
2488   foreach my $import_element ( @{$import} ) {
2489     # find best candidate from $orig to merge $b_element into
2490     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2491     foreach my $orig_element ( @{$orig} ) {
2492       my $score = $self->_calculate_score( $orig_element, $import_element );
2493       if ($score > $best_candidate->{score}) {
2494         $best_candidate->{position} = $position;
2495         $best_candidate->{score} = $score;
2496       }
2497       $position++;
2498     }
2499     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2500
2501     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2502       push( @{$orig}, $import_element );
2503     } else {
2504       my $orig_best = $orig->[$best_candidate->{position}];
2505       # merge orig_best and b_element together and replace original with merged
2506       if (ref $orig_best ne 'HASH') {
2507         $orig->[$best_candidate->{position}] = $import_element;
2508       } elsif (ref $import_element eq 'HASH') {
2509         my ($key) = keys %{$orig_best};
2510         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2511       }
2512     }
2513     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2514   }
2515
2516   return $orig;
2517 }
2518
2519 sub result_source {
2520     my $self = shift;
2521
2522     if (@_) {
2523         $self->_source_handle($_[0]->handle);
2524     } else {
2525         $self->_source_handle->resolve;
2526     }
2527 }
2528
2529 =head2 throw_exception
2530
2531 See L<DBIx::Class::Schema/throw_exception> for details.
2532
2533 =cut
2534
2535 sub throw_exception {
2536   my $self=shift;
2537   if (ref $self && $self->_source_handle->schema) {
2538     $self->_source_handle->schema->throw_exception(@_)
2539   } else {
2540     croak(@_);
2541   }
2542
2543 }
2544
2545 # XXX: FIXME: Attributes docs need clearing up
2546
2547 =head1 ATTRIBUTES
2548
2549 Attributes are used to refine a ResultSet in various ways when
2550 searching for data. They can be passed to any method which takes an
2551 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2552 L</count>.
2553
2554 These are in no particular order:
2555
2556 =head2 order_by
2557
2558 =over 4
2559
2560 =item Value: ( $order_by | \@order_by | \%order_by )
2561
2562 =back
2563
2564 Which column(s) to order the results by. If a single column name, or
2565 an arrayref of names is supplied, the argument is passed through
2566 directly to SQL. The hashref syntax allows for connection-agnostic
2567 specification of ordering direction:
2568
2569  For descending order:
2570
2571   order_by => { -desc => [qw/col1 col2 col3/] }
2572
2573  For explicit ascending order:
2574
2575   order_by => { -asc => 'col' }
2576
2577 The old scalarref syntax (i.e. order_by => \'year DESC') is still
2578 supported, although you are strongly encouraged to use the hashref
2579 syntax as outlined above.
2580
2581 =head2 columns
2582
2583 =over 4
2584
2585 =item Value: \@columns
2586
2587 =back
2588
2589 Shortcut to request a particular set of columns to be retrieved. Each
2590 column spec may be a string (a table column name), or a hash (in which
2591 case the key is the C<as> value, and the value is used as the C<select>
2592 expression). Adds C<me.> onto the start of any column without a C<.> in
2593 it and sets C<select> from that, then auto-populates C<as> from
2594 C<select> as normal. (You may also use the C<cols> attribute, as in
2595 earlier versions of DBIC.)
2596
2597 =head2 +columns
2598
2599 =over 4
2600
2601 =item Value: \@columns
2602
2603 =back
2604
2605 Indicates additional columns to be selected from storage. Works the same
2606 as L</columns> but adds columns to the selection. (You may also use the
2607 C<include_columns> attribute, as in earlier versions of DBIC). For
2608 example:-
2609
2610   $schema->resultset('CD')->search(undef, {
2611     '+columns' => ['artist.name'],
2612     join => ['artist']
2613   });
2614
2615 would return all CDs and include a 'name' column to the information
2616 passed to object inflation. Note that the 'artist' is the name of the
2617 column (or relationship) accessor, and 'name' is the name of the column
2618 accessor in the related table.
2619
2620 =head2 include_columns
2621
2622 =over 4
2623
2624 =item Value: \@columns
2625
2626 =back
2627
2628 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
2629
2630 =head2 select
2631
2632 =over 4
2633
2634 =item Value: \@select_columns
2635
2636 =back
2637
2638 Indicates which columns should be selected from the storage. You can use
2639 column names, or in the case of RDBMS back ends, function or stored procedure
2640 names:
2641
2642   $rs = $schema->resultset('Employee')->search(undef, {
2643     select => [
2644       'name',
2645       { count => 'employeeid' },
2646       { sum => 'salary' }
2647     ]
2648   });
2649
2650 When you use function/stored procedure names and do not supply an C<as>
2651 attribute, the column names returned are storage-dependent. E.g. MySQL would
2652 return a column named C<count(employeeid)> in the above example.
2653
2654 =head2 +select
2655
2656 =over 4
2657
2658 Indicates additional columns to be selected from storage.  Works the same as
2659 L</select> but adds columns to the selection.
2660
2661 =back
2662
2663 =head2 +as
2664
2665 =over 4
2666
2667 Indicates additional column names for those added via L</+select>. See L</as>.
2668
2669 =back
2670
2671 =head2 as
2672
2673 =over 4
2674
2675 =item Value: \@inflation_names
2676
2677 =back
2678
2679 Indicates column names for object inflation. That is, C<as>
2680 indicates the name that the column can be accessed as via the
2681 C<get_column> method (or via the object accessor, B<if one already
2682 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2683
2684 The C<as> attribute is used in conjunction with C<select>,
2685 usually when C<select> contains one or more function or stored
2686 procedure names:
2687
2688   $rs = $schema->resultset('Employee')->search(undef, {
2689     select => [
2690       'name',
2691       { count => 'employeeid' }
2692     ],
2693     as => ['name', 'employee_count'],
2694   });
2695
2696   my $employee = $rs->first(); # get the first Employee
2697
2698 If the object against which the search is performed already has an accessor
2699 matching a column name specified in C<as>, the value can be retrieved using
2700 the accessor as normal:
2701
2702   my $name = $employee->name();
2703
2704 If on the other hand an accessor does not exist in the object, you need to
2705 use C<get_column> instead:
2706
2707   my $employee_count = $employee->get_column('employee_count');
2708
2709 You can create your own accessors if required - see
2710 L<DBIx::Class::Manual::Cookbook> for details.
2711
2712 Please note: This will NOT insert an C<AS employee_count> into the SQL
2713 statement produced, it is used for internal access only. Thus
2714 attempting to use the accessor in an C<order_by> clause or similar
2715 will fail miserably.
2716
2717 To get around this limitation, you can supply literal SQL to your
2718 C<select> attibute that contains the C<AS alias> text, eg:
2719
2720   select => [\'myfield AS alias']
2721
2722 =head2 join
2723
2724 =over 4
2725
2726 =item Value: ($rel_name | \@rel_names | \%rel_names)
2727
2728 =back
2729
2730 Contains a list of relationships that should be joined for this query.  For
2731 example:
2732
2733   # Get CDs by Nine Inch Nails
2734   my $rs = $schema->resultset('CD')->search(
2735     { 'artist.name' => 'Nine Inch Nails' },
2736     { join => 'artist' }
2737   );
2738
2739 Can also contain a hash reference to refer to the other relation's relations.
2740 For example:
2741
2742   package MyApp::Schema::Track;
2743   use base qw/DBIx::Class/;
2744   __PACKAGE__->table('track');
2745   __PACKAGE__->add_columns(qw/trackid cd position title/);
2746   __PACKAGE__->set_primary_key('trackid');
2747   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2748   1;
2749
2750   # In your application
2751   my $rs = $schema->resultset('Artist')->search(
2752     { 'track.title' => 'Teardrop' },
2753     {
2754       join     => { cd => 'track' },
2755       order_by => 'artist.name',
2756     }
2757   );
2758
2759 You need to use the relationship (not the table) name in  conditions, 
2760 because they are aliased as such. The current table is aliased as "me", so 
2761 you need to use me.column_name in order to avoid ambiguity. For example:
2762
2763   # Get CDs from 1984 with a 'Foo' track 
2764   my $rs = $schema->resultset('CD')->search(
2765     { 
2766       'me.year' => 1984,
2767       'tracks.name' => 'Foo'
2768     },
2769     { join => 'tracks' }
2770   );
2771   
2772 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2773 similarly for a third time). For e.g.
2774
2775   my $rs = $schema->resultset('Artist')->search({
2776     'cds.title'   => 'Down to Earth',
2777     'cds_2.title' => 'Popular',
2778   }, {
2779     join => [ qw/cds cds/ ],
2780   });
2781
2782 will return a set of all artists that have both a cd with title 'Down
2783 to Earth' and a cd with title 'Popular'.
2784
2785 If you want to fetch related objects from other tables as well, see C<prefetch>
2786 below.
2787
2788 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2789
2790 =head2 prefetch
2791
2792 =over 4
2793
2794 =item Value: ($rel_name | \@rel_names | \%rel_names)
2795
2796 =back
2797
2798 Contains one or more relationships that should be fetched along with
2799 the main query (when they are accessed afterwards the data will
2800 already be available, without extra queries to the database).  This is
2801 useful for when you know you will need the related objects, because it
2802 saves at least one query:
2803
2804   my $rs = $schema->resultset('Tag')->search(
2805     undef,
2806     {
2807       prefetch => {
2808         cd => 'artist'
2809       }
2810     }
2811   );
2812
2813 The initial search results in SQL like the following:
2814
2815   SELECT tag.*, cd.*, artist.* FROM tag
2816   JOIN cd ON tag.cd = cd.cdid
2817   JOIN artist ON cd.artist = artist.artistid
2818
2819 L<DBIx::Class> has no need to go back to the database when we access the
2820 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2821 case.
2822
2823 Simple prefetches will be joined automatically, so there is no need
2824 for a C<join> attribute in the above search. 
2825
2826 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2827 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2828 with an accessor type of 'single' or 'filter'). A more complex example that
2829 prefetches an artists cds, the tracks on those cds, and the tags associted 
2830 with that artist is given below (assuming many-to-many from artists to tags):
2831
2832  my $rs = $schema->resultset('Artist')->search(
2833    undef,
2834    {
2835      prefetch => [
2836        { cds => 'tracks' },
2837        { artist_tags => 'tags' }
2838      ]
2839    }
2840  );
2841  
2842
2843 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2844 attributes will be ignored.
2845
2846 =head2 page
2847
2848 =over 4
2849
2850 =item Value: $page
2851
2852 =back
2853
2854 Makes the resultset paged and specifies the page to retrieve. Effectively
2855 identical to creating a non-pages resultset and then calling ->page($page)
2856 on it.
2857
2858 If L<rows> attribute is not specified it defualts to 10 rows per page.
2859
2860 =head2 rows
2861
2862 =over 4
2863
2864 =item Value: $rows
2865
2866 =back
2867
2868 Specifes the maximum number of rows for direct retrieval or the number of
2869 rows per page if the page attribute or method is used.
2870
2871 =head2 offset
2872
2873 =over 4
2874
2875 =item Value: $offset
2876
2877 =back
2878
2879 Specifies the (zero-based) row number for the  first row to be returned, or the
2880 of the first row of the first page if paging is used.
2881
2882 =head2 group_by
2883
2884 =over 4
2885
2886 =item Value: \@columns
2887
2888 =back
2889
2890 A arrayref of columns to group by. Can include columns of joined tables.
2891
2892   group_by => [qw/ column1 column2 ... /]
2893
2894 =head2 having
2895
2896 =over 4
2897
2898 =item Value: $condition
2899
2900 =back
2901
2902 HAVING is a select statement attribute that is applied between GROUP BY and
2903 ORDER BY. It is applied to the after the grouping calculations have been
2904 done.
2905
2906   having => { 'count(employee)' => { '>=', 100 } }
2907
2908 =head2 distinct
2909
2910 =over 4
2911
2912 =item Value: (0 | 1)
2913
2914 =back
2915
2916 Set to 1 to group by all columns.
2917
2918 =head2 where
2919
2920 =over 4
2921
2922 Adds to the WHERE clause.
2923
2924   # only return rows WHERE deleted IS NULL for all searches
2925   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2926
2927 Can be overridden by passing C<{ where => undef }> as an attribute
2928 to a resulset.
2929
2930 =back
2931
2932 =head2 cache
2933
2934 Set to 1 to cache search results. This prevents extra SQL queries if you
2935 revisit rows in your ResultSet:
2936
2937   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2938
2939   while( my $artist = $resultset->next ) {
2940     ... do stuff ...
2941   }
2942
2943   $rs->first; # without cache, this would issue a query
2944
2945 By default, searches are not cached.
2946
2947 For more examples of using these attributes, see
2948 L<DBIx::Class::Manual::Cookbook>.
2949
2950 =head2 from
2951
2952 =over 4
2953
2954 =item Value: \@from_clause
2955
2956 =back
2957
2958 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2959 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2960 clauses.
2961
2962 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2963
2964 C<join> will usually do what you need and it is strongly recommended that you
2965 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2966 And we really do mean "cannot", not just tried and failed. Attempting to use
2967 this because you're having problems with C<join> is like trying to use x86
2968 ASM because you've got a syntax error in your C. Trust us on this.
2969
2970 Now, if you're still really, really sure you need to use this (and if you're
2971 not 100% sure, ask the mailing list first), here's an explanation of how this
2972 works.
2973
2974 The syntax is as follows -
2975
2976   [
2977     { <alias1> => <table1> },
2978     [
2979       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2980       [], # nested JOIN (optional)
2981       { <table1.column1> => <table2.column2>, ... (more conditions) },
2982     ],
2983     # More of the above [ ] may follow for additional joins
2984   ]
2985
2986   <table1> <alias1>
2987   JOIN
2988     <table2> <alias2>
2989     [JOIN ...]
2990   ON <table1.column1> = <table2.column2>
2991   <more joins may follow>
2992
2993 An easy way to follow the examples below is to remember the following:
2994
2995     Anything inside "[]" is a JOIN
2996     Anything inside "{}" is a condition for the enclosing JOIN
2997
2998 The following examples utilize a "person" table in a family tree application.
2999 In order to express parent->child relationships, this table is self-joined:
3000
3001     # Person->belongs_to('father' => 'Person');
3002     # Person->belongs_to('mother' => 'Person');
3003
3004 C<from> can be used to nest joins. Here we return all children with a father,
3005 then search against all mothers of those children:
3006
3007   $rs = $schema->resultset('Person')->search(
3008       undef,
3009       {
3010           alias => 'mother', # alias columns in accordance with "from"
3011           from => [
3012               { mother => 'person' },
3013               [
3014                   [
3015                       { child => 'person' },
3016                       [
3017                           { father => 'person' },
3018                           { 'father.person_id' => 'child.father_id' }
3019                       ]
3020                   ],
3021                   { 'mother.person_id' => 'child.mother_id' }
3022               ],
3023           ]
3024       },
3025   );
3026
3027   # Equivalent SQL:
3028   # SELECT mother.* FROM person mother
3029   # JOIN (
3030   #   person child
3031   #   JOIN person father
3032   #   ON ( father.person_id = child.father_id )
3033   # )
3034   # ON ( mother.person_id = child.mother_id )
3035
3036 The type of any join can be controlled manually. To search against only people
3037 with a father in the person table, we could explicitly use C<INNER JOIN>:
3038
3039     $rs = $schema->resultset('Person')->search(
3040         undef,
3041         {
3042             alias => 'child', # alias columns in accordance with "from"
3043             from => [
3044                 { child => 'person' },
3045                 [
3046                     { father => 'person', -join_type => 'inner' },
3047                     { 'father.id' => 'child.father_id' }
3048                 ],
3049             ]
3050         },
3051     );
3052
3053     # Equivalent SQL:
3054     # SELECT child.* FROM person child
3055     # INNER JOIN person father ON child.father_id = father.id
3056
3057 If you need to express really complex joins or you need a subselect, you
3058 can supply literal SQL to C<from> via a scalar reference. In this case
3059 the contents of the scalar will replace the table name asscoiated with the
3060 resultsource.
3061
3062 WARNING: This technique might very well not work as expected on chained
3063 searches - you have been warned.
3064
3065     # Assuming the Event resultsource is defined as:
3066
3067         MySchema::Event->add_columns (
3068             sequence => {
3069                 data_type => 'INT',
3070                 is_auto_increment => 1,
3071             },
3072             location => {
3073                 data_type => 'INT',
3074             },
3075             type => {
3076                 data_type => 'INT',
3077             },
3078         );
3079         MySchema::Event->set_primary_key ('sequence');
3080
3081     # This will get back the latest event for every location. The column
3082     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3083     # combo to limit the resultset
3084
3085     $rs = $schema->resultset('Event');
3086     $table = $rs->result_source->name;
3087     $latest = $rs->search (
3088         undef,
3089         { from => \ " 
3090             (SELECT e1.* FROM $table e1 
3091                 JOIN $table e2 
3092                     ON e1.location = e2.location 
3093                     AND e1.sequence < e2.sequence 
3094                 WHERE e2.sequence is NULL 
3095             ) me",
3096         },
3097     );
3098
3099     # Equivalent SQL (with the DBIC chunks added):
3100
3101     SELECT me.sequence, me.location, me.type FROM
3102        (SELECT e1.* FROM events e1
3103            JOIN events e2
3104                ON e1.location = e2.location
3105                AND e1.sequence < e2.sequence
3106            WHERE e2.sequence is NULL
3107        ) me;
3108
3109 =head2 for
3110
3111 =over 4
3112
3113 =item Value: ( 'update' | 'shared' )
3114
3115 =back
3116
3117 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3118 ... FOR SHARED.
3119
3120 =cut
3121
3122 1;