Merge 'sybase' into 'trunk'
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet is also an iterator. L</next> is used to return all the
50 L<DBIx::Class::Row>s the ResultSet represents.
51
52 The query that the ResultSet represents is B<only> executed against
53 the database when these methods are called:
54
55 =over
56
57 =item L</find>
58
59 =item L</next>
60
61 =item L</all>
62
63 =item L</count>
64
65 =item L</single>
66
67 =item L</first>
68
69 =back
70
71 =head1 EXAMPLES 
72
73 =head2 Chaining resultsets
74
75 Let's say you've got a query that needs to be run to return some data
76 to the user. But, you have an authorization system in place that
77 prevents certain users from seeing certain information. So, you want
78 to construct the basic query in one method, but add constraints to it in
79 another.
80
81   sub get_data {
82     my $self = shift;
83     my $request = $self->get_request; # Get a request object somehow.
84     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
85
86     my $cd_rs = $schema->resultset('CD')->search({
87       title => $request->param('title'),
88       year => $request->param('year'),
89     });
90
91     $self->apply_security_policy( $cd_rs );
92
93     return $cd_rs->all();
94   }
95
96   sub apply_security_policy {
97     my $self = shift;
98     my ($rs) = @_;
99
100     return $rs->search({
101       subversive => 0,
102     });
103   }
104
105 =head2 Multiple queries
106
107 Since a resultset just defines a query, you can do all sorts of
108 things with it with the same object.
109
110   # Don't hit the DB yet.
111   my $cd_rs = $schema->resultset('CD')->search({
112     title => 'something',
113     year => 2009,
114   });
115
116   # Each of these hits the DB individually.
117   my $count = $cd_rs->count;
118   my $most_recent = $cd_rs->get_column('date_released')->max();
119   my @records = $cd_rs->all;
120
121 And it's not just limited to SELECT statements.
122
123   $cd_rs->delete();
124
125 This is even cooler:
126
127   $cd_rs->create({ artist => 'Fred' });
128
129 Which is the same as:
130
131   $schema->resultset('CD')->create({
132     title => 'something',
133     year => 2009,
134     artist => 'Fred'
135   });
136
137 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
138
139 =head1 OVERLOADING
140
141 If a resultset is used in a numeric context it returns the L</count>.
142 However, if it is used in a booleand context it is always true.  So if
143 you want to check if a resultset has any results use C<if $rs != 0>.
144 C<if $rs> will always be true.
145
146 =head1 METHODS
147
148 =head2 new
149
150 =over 4
151
152 =item Arguments: $source, \%$attrs
153
154 =item Return Value: $rs
155
156 =back
157
158 The resultset constructor. Takes a source object (usually a
159 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
160 L</ATTRIBUTES> below).  Does not perform any queries -- these are
161 executed as needed by the other methods.
162
163 Generally you won't need to construct a resultset manually.  You'll
164 automatically get one from e.g. a L</search> called in scalar context:
165
166   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
167
168 IMPORTANT: If called on an object, proxies to new_result instead so
169
170   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
171
172 will return a CD object, not a ResultSet.
173
174 =cut
175
176 sub new {
177   my $class = shift;
178   return $class->new_result(@_) if ref $class;
179
180   my ($source, $attrs) = @_;
181   $source = $source->handle 
182     unless $source->isa('DBIx::Class::ResultSourceHandle');
183   $attrs = { %{$attrs||{}} };
184
185   if ($attrs->{page}) {
186     $attrs->{rows} ||= 10;
187   }
188
189   $attrs->{alias} ||= 'me';
190
191   # Creation of {} and bless separated to mitigate RH perl bug
192   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
193   my $self = {
194     _source_handle => $source,
195     cond => $attrs->{where},
196     count => undef,
197     pager => undef,
198     attrs => $attrs
199   };
200
201   bless $self, $class;
202
203   $self->result_class(
204     $attrs->{result_class} || $source->resolve->result_class
205   );
206
207   return $self;
208 }
209
210 =head2 search
211
212 =over 4
213
214 =item Arguments: $cond, \%attrs?
215
216 =item Return Value: $resultset (scalar context), @row_objs (list context)
217
218 =back
219
220   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
221   my $new_rs = $cd_rs->search({ year => 2005 });
222
223   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
224                  # year = 2005 OR year = 2004
225
226 If you need to pass in additional attributes but no additional condition,
227 call it as C<search(undef, \%attrs)>.
228
229   # "SELECT name, artistid FROM $artist_table"
230   my @all_artists = $schema->resultset('Artist')->search(undef, {
231     columns => [qw/name artistid/],
232   });
233
234 For a list of attributes that can be passed to C<search>, see
235 L</ATTRIBUTES>. For more examples of using this function, see
236 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
237 documentation for the first argument, see L<SQL::Abstract>.
238
239 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
240
241 =cut
242
243 sub search {
244   my $self = shift;
245   my $rs = $self->search_rs( @_ );
246   return (wantarray ? $rs->all : $rs);
247 }
248
249 =head2 search_rs
250
251 =over 4
252
253 =item Arguments: $cond, \%attrs?
254
255 =item Return Value: $resultset
256
257 =back
258
259 This method does the same exact thing as search() except it will
260 always return a resultset, even in list context.
261
262 =cut
263
264 sub search_rs {
265   my $self = shift;
266
267   my $attrs = {};
268   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
269   my $our_attrs = { %{$self->{attrs}} };
270   my $having = delete $our_attrs->{having};
271   my $where = delete $our_attrs->{where};
272
273   my $rows;
274
275   my %safe = (alias => 1, cache => 1);
276
277   unless (
278     (@_ && defined($_[0])) # @_ == () or (undef)
279     || 
280     (keys %$attrs # empty attrs or only 'safe' attrs
281     && List::Util::first { !$safe{$_} } keys %$attrs)
282   ) {
283     # no search, effectively just a clone
284     $rows = $self->get_cache;
285   }
286
287   my $new_attrs = { %{$our_attrs}, %{$attrs} };
288
289   # merge new attrs into inherited
290   foreach my $key (qw/join prefetch +select +as/) {
291     next unless exists $attrs->{$key};
292     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
293   }
294
295   my $cond = (@_
296     ? (
297         (@_ == 1 || ref $_[0] eq "HASH")
298           ? (
299               (ref $_[0] eq 'HASH')
300                 ? (
301                     (keys %{ $_[0] }  > 0)
302                       ? shift
303                       : undef
304                    )
305                 :  shift
306              )
307           : (
308               (@_ % 2)
309                 ? $self->throw_exception("Odd number of arguments to search")
310                 : {@_}
311              )
312       )
313     : undef
314   );
315
316   if (defined $where) {
317     $new_attrs->{where} = (
318       defined $new_attrs->{where}
319         ? { '-and' => [
320               map {
321                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
322               } $where, $new_attrs->{where}
323             ]
324           }
325         : $where);
326   }
327
328   if (defined $cond) {
329     $new_attrs->{where} = (
330       defined $new_attrs->{where}
331         ? { '-and' => [
332               map {
333                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
334               } $cond, $new_attrs->{where}
335             ]
336           }
337         : $cond);
338   }
339
340   if (defined $having) {
341     $new_attrs->{having} = (
342       defined $new_attrs->{having}
343         ? { '-and' => [
344               map {
345                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
346               } $having, $new_attrs->{having}
347             ]
348           }
349         : $having);
350   }
351
352   my $rs = (ref $self)->new($self->result_source, $new_attrs);
353   if ($rows) {
354     $rs->set_cache($rows);
355   }
356   return $rs;
357 }
358
359 =head2 search_literal
360
361 =over 4
362
363 =item Arguments: $sql_fragment, @bind_values
364
365 =item Return Value: $resultset (scalar context), @row_objs (list context)
366
367 =back
368
369   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
370   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
371
372 Pass a literal chunk of SQL to be added to the conditional part of the
373 resultset query.
374
375 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
376 only be used in that context. There are known problems using C<search_literal>
377 in chained queries; it can result in bind values in the wrong order.  See
378 L<DBIx::Class::Manual::Cookbook/Searching> and
379 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
380 require C<search_literal>.
381
382 =cut
383
384 sub search_literal {
385   my ($self, $cond, @vals) = @_;
386   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
387   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
388   return $self->search(\$cond, $attrs);
389 }
390
391 =head2 find
392
393 =over 4
394
395 =item Arguments: @values | \%cols, \%attrs?
396
397 =item Return Value: $row_object | undef
398
399 =back
400
401 Finds a row based on its primary key or unique constraint. For example, to find
402 a row by its primary key:
403
404   my $cd = $schema->resultset('CD')->find(5);
405
406 You can also find a row by a specific unique constraint using the C<key>
407 attribute. For example:
408
409   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
410     key => 'cd_artist_title'
411   });
412
413 Additionally, you can specify the columns explicitly by name:
414
415   my $cd = $schema->resultset('CD')->find(
416     {
417       artist => 'Massive Attack',
418       title  => 'Mezzanine',
419     },
420     { key => 'cd_artist_title' }
421   );
422
423 If the C<key> is specified as C<primary>, it searches only on the primary key.
424
425 If no C<key> is specified, it searches on all unique constraints defined on the
426 source for which column data is provided, including the primary key.
427
428 If your table does not have a primary key, you B<must> provide a value for the
429 C<key> attribute matching one of the unique constraints on the source.
430
431 In addition to C<key>, L</find> recognizes and applies standard
432 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
433
434 Note: If your query does not return only one row, a warning is generated:
435
436   Query returned more than one row
437
438 See also L</find_or_create> and L</update_or_create>. For information on how to
439 declare unique constraints, see
440 L<DBIx::Class::ResultSource/add_unique_constraint>.
441
442 =cut
443
444 sub find {
445   my $self = shift;
446   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
447
448   # Default to the primary key, but allow a specific key
449   my @cols = exists $attrs->{key}
450     ? $self->result_source->unique_constraint_columns($attrs->{key})
451     : $self->result_source->primary_columns;
452   $self->throw_exception(
453     "Can't find unless a primary key is defined or unique constraint is specified"
454   ) unless @cols;
455
456   # Parse out a hashref from input
457   my $input_query;
458   if (ref $_[0] eq 'HASH') {
459     $input_query = { %{$_[0]} };
460   }
461   elsif (@_ == @cols) {
462     $input_query = {};
463     @{$input_query}{@cols} = @_;
464   }
465   else {
466     # Compatibility: Allow e.g. find(id => $value)
467     carp "Find by key => value deprecated; please use a hashref instead";
468     $input_query = {@_};
469   }
470
471   my (%related, $info);
472
473   KEY: foreach my $key (keys %$input_query) {
474     if (ref($input_query->{$key})
475         && ($info = $self->result_source->relationship_info($key))) {
476       my $val = delete $input_query->{$key};
477       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
478       my $rel_q = $self->result_source->resolve_condition(
479                     $info->{cond}, $val, $key
480                   );
481       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
482       @related{keys %$rel_q} = values %$rel_q;
483     }
484   }
485   if (my @keys = keys %related) {
486     @{$input_query}{@keys} = values %related;
487   }
488
489
490   # Build the final query: Default to the disjunction of the unique queries,
491   # but allow the input query in case the ResultSet defines the query or the
492   # user is abusing find
493   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
494   my $query;
495   if (exists $attrs->{key}) {
496     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
497     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
498     $query = $self->_add_alias($unique_query, $alias);
499   }
500   else {
501     my @unique_queries = $self->_unique_queries($input_query, $attrs);
502     $query = @unique_queries
503       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
504       : $self->_add_alias($input_query, $alias);
505   }
506
507   # Run the query
508   if (keys %$attrs) {
509     my $rs = $self->search($query, $attrs);
510     if (keys %{$rs->_resolved_attrs->{collapse}}) {
511       my $row = $rs->next;
512       carp "Query returned more than one row" if $rs->next;
513       return $row;
514     }
515     else {
516       return $rs->single;
517     }
518   }
519   else {
520     if (keys %{$self->_resolved_attrs->{collapse}}) {
521       my $rs = $self->search($query);
522       my $row = $rs->next;
523       carp "Query returned more than one row" if $rs->next;
524       return $row;
525     }
526     else {
527       return $self->single($query);
528     }
529   }
530 }
531
532 # _add_alias
533 #
534 # Add the specified alias to the specified query hash. A copy is made so the
535 # original query is not modified.
536
537 sub _add_alias {
538   my ($self, $query, $alias) = @_;
539
540   my %aliased = %$query;
541   foreach my $col (grep { ! m/\./ } keys %aliased) {
542     $aliased{"$alias.$col"} = delete $aliased{$col};
543   }
544
545   return \%aliased;
546 }
547
548 # _unique_queries
549 #
550 # Build a list of queries which satisfy unique constraints.
551
552 sub _unique_queries {
553   my ($self, $query, $attrs) = @_;
554
555   my @constraint_names = exists $attrs->{key}
556     ? ($attrs->{key})
557     : $self->result_source->unique_constraint_names;
558
559   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
560   my $num_where = scalar keys %$where;
561
562   my @unique_queries;
563   foreach my $name (@constraint_names) {
564     my @unique_cols = $self->result_source->unique_constraint_columns($name);
565     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
566
567     my $num_cols = scalar @unique_cols;
568     my $num_query = scalar keys %$unique_query;
569
570     my $total = $num_query + $num_where;
571     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
572       # The query is either unique on its own or is unique in combination with
573       # the existing where clause
574       push @unique_queries, $unique_query;
575     }
576   }
577
578   return @unique_queries;
579 }
580
581 # _build_unique_query
582 #
583 # Constrain the specified query hash based on the specified column names.
584
585 sub _build_unique_query {
586   my ($self, $query, $unique_cols) = @_;
587
588   return {
589     map  { $_ => $query->{$_} }
590     grep { exists $query->{$_} }
591       @$unique_cols
592   };
593 }
594
595 =head2 search_related
596
597 =over 4
598
599 =item Arguments: $rel, $cond, \%attrs?
600
601 =item Return Value: $new_resultset
602
603 =back
604
605   $new_rs = $cd_rs->search_related('artist', {
606     name => 'Emo-R-Us',
607   });
608
609 Searches the specified relationship, optionally specifying a condition and
610 attributes for matching records. See L</ATTRIBUTES> for more information.
611
612 =cut
613
614 sub search_related {
615   return shift->related_resultset(shift)->search(@_);
616 }
617
618 =head2 search_related_rs
619
620 This method works exactly the same as search_related, except that
621 it guarantees a restultset, even in list context.
622
623 =cut
624
625 sub search_related_rs {
626   return shift->related_resultset(shift)->search_rs(@_);
627 }
628
629 =head2 cursor
630
631 =over 4
632
633 =item Arguments: none
634
635 =item Return Value: $cursor
636
637 =back
638
639 Returns a storage-driven cursor to the given resultset. See
640 L<DBIx::Class::Cursor> for more information.
641
642 =cut
643
644 sub cursor {
645   my ($self) = @_;
646
647   my $attrs = { %{$self->_resolved_attrs} };
648   return $self->{cursor}
649     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
650           $attrs->{where},$attrs);
651 }
652
653 =head2 single
654
655 =over 4
656
657 =item Arguments: $cond?
658
659 =item Return Value: $row_object?
660
661 =back
662
663   my $cd = $schema->resultset('CD')->single({ year => 2001 });
664
665 Inflates the first result without creating a cursor if the resultset has
666 any records in it; if not returns nothing. Used by L</find> as a lean version of
667 L</search>.
668
669 While this method can take an optional search condition (just like L</search>)
670 being a fast-code-path it does not recognize search attributes. If you need to
671 add extra joins or similar, call L</search> and then chain-call L</single> on the
672 L<DBIx::Class::ResultSet> returned.
673
674 =over
675
676 =item B<Note>
677
678 As of 0.08100, this method enforces the assumption that the preceeding
679 query returns only one row. If more than one row is returned, you will receive
680 a warning:
681
682   Query returned more than one row
683
684 In this case, you should be using L</first> or L</find> instead, or if you really
685 know what you are doing, use the L</rows> attribute to explicitly limit the size 
686 of the resultset.
687
688 =back
689
690 =cut
691
692 sub single {
693   my ($self, $where) = @_;
694   if(@_ > 2) {
695       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
696   }
697
698   my $attrs = { %{$self->_resolved_attrs} };
699   if ($where) {
700     if (defined $attrs->{where}) {
701       $attrs->{where} = {
702         '-and' =>
703             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
704                $where, delete $attrs->{where} ]
705       };
706     } else {
707       $attrs->{where} = $where;
708     }
709   }
710
711 #  XXX: Disabled since it doesn't infer uniqueness in all cases
712 #  unless ($self->_is_unique_query($attrs->{where})) {
713 #    carp "Query not guaranteed to return a single row"
714 #      . "; please declare your unique constraints or use search instead";
715 #  }
716
717   my @data = $self->result_source->storage->select_single(
718     $attrs->{from}, $attrs->{select},
719     $attrs->{where}, $attrs
720   );
721
722   return (@data ? ($self->_construct_object(@data))[0] : undef);
723 }
724
725 # _is_unique_query
726 #
727 # Try to determine if the specified query is guaranteed to be unique, based on
728 # the declared unique constraints.
729
730 sub _is_unique_query {
731   my ($self, $query) = @_;
732
733   my $collapsed = $self->_collapse_query($query);
734   my $alias = $self->{attrs}{alias};
735
736   foreach my $name ($self->result_source->unique_constraint_names) {
737     my @unique_cols = map {
738       "$alias.$_"
739     } $self->result_source->unique_constraint_columns($name);
740
741     # Count the values for each unique column
742     my %seen = map { $_ => 0 } @unique_cols;
743
744     foreach my $key (keys %$collapsed) {
745       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
746       next unless exists $seen{$aliased};  # Additional constraints are okay
747       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
748     }
749
750     # If we get 0 or more than 1 value for a column, it's not necessarily unique
751     return 1 unless grep { $_ != 1 } values %seen;
752   }
753
754   return 0;
755 }
756
757 # _collapse_query
758 #
759 # Recursively collapse the query, accumulating values for each column.
760
761 sub _collapse_query {
762   my ($self, $query, $collapsed) = @_;
763
764   $collapsed ||= {};
765
766   if (ref $query eq 'ARRAY') {
767     foreach my $subquery (@$query) {
768       next unless ref $subquery;  # -or
769 #      warn "ARRAY: " . Dumper $subquery;
770       $collapsed = $self->_collapse_query($subquery, $collapsed);
771     }
772   }
773   elsif (ref $query eq 'HASH') {
774     if (keys %$query and (keys %$query)[0] eq '-and') {
775       foreach my $subquery (@{$query->{-and}}) {
776 #        warn "HASH: " . Dumper $subquery;
777         $collapsed = $self->_collapse_query($subquery, $collapsed);
778       }
779     }
780     else {
781 #      warn "LEAF: " . Dumper $query;
782       foreach my $col (keys %$query) {
783         my $value = $query->{$col};
784         $collapsed->{$col}{$value}++;
785       }
786     }
787   }
788
789   return $collapsed;
790 }
791
792 =head2 get_column
793
794 =over 4
795
796 =item Arguments: $cond?
797
798 =item Return Value: $resultsetcolumn
799
800 =back
801
802   my $max_length = $rs->get_column('length')->max;
803
804 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
805
806 =cut
807
808 sub get_column {
809   my ($self, $column) = @_;
810   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
811   return $new;
812 }
813
814 =head2 search_like
815
816 =over 4
817
818 =item Arguments: $cond, \%attrs?
819
820 =item Return Value: $resultset (scalar context), @row_objs (list context)
821
822 =back
823
824   # WHERE title LIKE '%blue%'
825   $cd_rs = $rs->search_like({ title => '%blue%'});
826
827 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
828 that this is simply a convenience method retained for ex Class::DBI users.
829 You most likely want to use L</search> with specific operators.
830
831 For more information, see L<DBIx::Class::Manual::Cookbook>.
832
833 =cut
834
835 sub search_like {
836   my $class = shift;
837   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
839   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
840   return $class->search($query, { %$attrs });
841 }
842
843 =head2 slice
844
845 =over 4
846
847 =item Arguments: $first, $last
848
849 =item Return Value: $resultset (scalar context), @row_objs (list context)
850
851 =back
852
853 Returns a resultset or object list representing a subset of elements from the
854 resultset slice is called on. Indexes are from 0, i.e., to get the first
855 three records, call:
856
857   my ($one, $two, $three) = $rs->slice(0, 2);
858
859 =cut
860
861 sub slice {
862   my ($self, $min, $max) = @_;
863   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
864   $attrs->{offset} = $self->{attrs}{offset} || 0;
865   $attrs->{offset} += $min;
866   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
867   return $self->search(undef(), $attrs);
868   #my $slice = (ref $self)->new($self->result_source, $attrs);
869   #return (wantarray ? $slice->all : $slice);
870 }
871
872 =head2 next
873
874 =over 4
875
876 =item Arguments: none
877
878 =item Return Value: $result?
879
880 =back
881
882 Returns the next element in the resultset (C<undef> is there is none).
883
884 Can be used to efficiently iterate over records in the resultset:
885
886   my $rs = $schema->resultset('CD')->search;
887   while (my $cd = $rs->next) {
888     print $cd->title;
889   }
890
891 Note that you need to store the resultset object, and call C<next> on it.
892 Calling C<< resultset('Table')->next >> repeatedly will always return the
893 first record from the resultset.
894
895 =cut
896
897 sub next {
898   my ($self) = @_;
899   if (my $cache = $self->get_cache) {
900     $self->{all_cache_position} ||= 0;
901     return $cache->[$self->{all_cache_position}++];
902   }
903   if ($self->{attrs}{cache}) {
904     $self->{all_cache_position} = 1;
905     return ($self->all)[0];
906   }
907   if ($self->{stashed_objects}) {
908     my $obj = shift(@{$self->{stashed_objects}});
909     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
910     return $obj;
911   }
912   my @row = (
913     exists $self->{stashed_row}
914       ? @{delete $self->{stashed_row}}
915       : $self->cursor->next
916   );
917   return undef unless (@row);
918   my ($row, @more) = $self->_construct_object(@row);
919   $self->{stashed_objects} = \@more if @more;
920   return $row;
921 }
922
923 sub _construct_object {
924   my ($self, @row) = @_;
925   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
926   my @new = $self->result_class->inflate_result($self->result_source, @$info);
927   @new = $self->{_attrs}{record_filter}->(@new)
928     if exists $self->{_attrs}{record_filter};
929   return @new;
930 }
931
932 sub _collapse_result {
933   my ($self, $as_proto, $row) = @_;
934
935   my @copy = @$row;
936
937   # 'foo'         => [ undef, 'foo' ]
938   # 'foo.bar'     => [ 'foo', 'bar' ]
939   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
940
941   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
942
943   my %collapse = %{$self->{_attrs}{collapse}||{}};
944
945   my @pri_index;
946
947   # if we're doing collapsing (has_many prefetch) we need to grab records
948   # until the PK changes, so fill @pri_index. if not, we leave it empty so
949   # we know we don't have to bother.
950
951   # the reason for not using the collapse stuff directly is because if you
952   # had for e.g. two artists in a row with no cds, the collapse info for
953   # both would be NULL (undef) so you'd lose the second artist
954
955   # store just the index so we can check the array positions from the row
956   # without having to contruct the full hash
957
958   if (keys %collapse) {
959     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
960     foreach my $i (0 .. $#construct_as) {
961       next if defined($construct_as[$i][0]); # only self table
962       if (delete $pri{$construct_as[$i][1]}) {
963         push(@pri_index, $i);
964       }
965       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
966     }
967   }
968
969   # no need to do an if, it'll be empty if @pri_index is empty anyway
970
971   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
972
973   my @const_rows;
974
975   do { # no need to check anything at the front, we always want the first row
976
977     my %const;
978   
979     foreach my $this_as (@construct_as) {
980       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
981     }
982
983     push(@const_rows, \%const);
984
985   } until ( # no pri_index => no collapse => drop straight out
986       !@pri_index
987     or
988       do { # get another row, stash it, drop out if different PK
989
990         @copy = $self->cursor->next;
991         $self->{stashed_row} = \@copy;
992
993         # last thing in do block, counts as true if anything doesn't match
994
995         # check xor defined first for NULL vs. NOT NULL then if one is
996         # defined the other must be so check string equality
997
998         grep {
999           (defined $pri_vals{$_} ^ defined $copy[$_])
1000           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1001         } @pri_index;
1002       }
1003   );
1004
1005   my $alias = $self->{attrs}{alias};
1006   my $info = [];
1007
1008   my %collapse_pos;
1009
1010   my @const_keys;
1011
1012   foreach my $const (@const_rows) {
1013     scalar @const_keys or do {
1014       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1015     };
1016     foreach my $key (@const_keys) {
1017       if (length $key) {
1018         my $target = $info;
1019         my @parts = split(/\./, $key);
1020         my $cur = '';
1021         my $data = $const->{$key};
1022         foreach my $p (@parts) {
1023           $target = $target->[1]->{$p} ||= [];
1024           $cur .= ".${p}";
1025           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
1026             # collapsing at this point and on final part
1027             my $pos = $collapse_pos{$cur};
1028             CK: foreach my $ck (@ckey) {
1029               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1030                 $collapse_pos{$cur} = $data;
1031                 delete @collapse_pos{ # clear all positioning for sub-entries
1032                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1033                 };
1034                 push(@$target, []);
1035                 last CK;
1036               }
1037             }
1038           }
1039           if (exists $collapse{$cur}) {
1040             $target = $target->[-1];
1041           }
1042         }
1043         $target->[0] = $data;
1044       } else {
1045         $info->[0] = $const->{$key};
1046       }
1047     }
1048   }
1049
1050   return $info;
1051 }
1052
1053 =head2 result_source
1054
1055 =over 4
1056
1057 =item Arguments: $result_source?
1058
1059 =item Return Value: $result_source
1060
1061 =back
1062
1063 An accessor for the primary ResultSource object from which this ResultSet
1064 is derived.
1065
1066 =head2 result_class
1067
1068 =over 4
1069
1070 =item Arguments: $result_class?
1071
1072 =item Return Value: $result_class
1073
1074 =back
1075
1076 An accessor for the class to use when creating row objects. Defaults to 
1077 C<< result_source->result_class >> - which in most cases is the name of the 
1078 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1079
1080 =cut
1081
1082 sub result_class {
1083   my ($self, $result_class) = @_;
1084   if ($result_class) {
1085     $self->ensure_class_loaded($result_class);
1086     $self->_result_class($result_class);
1087   }
1088   $self->_result_class;
1089 }
1090
1091 =head2 count
1092
1093 =over 4
1094
1095 =item Arguments: $cond, \%attrs??
1096
1097 =item Return Value: $count
1098
1099 =back
1100
1101 Performs an SQL C<COUNT> with the same query as the resultset was built
1102 with to find the number of elements. If passed arguments, does a search
1103 on the resultset and counts the results of that.
1104
1105 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1106 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1107 not support C<DISTINCT> with multiple columns. If you are using such a
1108 database, you should only use columns from the main table in your C<group_by>
1109 clause.
1110
1111 =cut
1112
1113 sub count {
1114   my $self = shift;
1115   return $self->search(@_)->count if @_ and defined $_[0];
1116   return scalar @{ $self->get_cache } if $self->get_cache;
1117   my $count = $self->_count;
1118   return 0 unless $count;
1119
1120   # need to take offset from resolved attrs
1121
1122   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1123   $count = $self->{attrs}{rows} if
1124     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1125   $count = 0 if ($count < 0);
1126   return $count;
1127 }
1128
1129 sub _count { # Separated out so pager can get the full count
1130   my $self = shift;
1131   my $select = { count => '*' };
1132
1133   my $attrs = { %{$self->_resolved_attrs} };
1134   if (my $group_by = delete $attrs->{group_by}) {
1135     delete $attrs->{having};
1136     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1137     # todo: try CONCAT for multi-column pk
1138     my @pk = $self->result_source->primary_columns;
1139     if (@pk == 1) {
1140       my $alias = $attrs->{alias};
1141       foreach my $column (@distinct) {
1142         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1143           @distinct = ($column);
1144           last;
1145         }
1146       }
1147     }
1148
1149     $select = { count => { distinct => \@distinct } };
1150   }
1151
1152   $attrs->{select} = $select;
1153   $attrs->{as} = [qw/count/];
1154
1155   # offset, order by and page are not needed to count. record_filter is cdbi
1156   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1157
1158   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1159   my ($count) = $tmp_rs->cursor->next;
1160   return $count;
1161 }
1162
1163 sub _bool {
1164   return 1;
1165 }
1166
1167 =head2 count_literal
1168
1169 =over 4
1170
1171 =item Arguments: $sql_fragment, @bind_values
1172
1173 =item Return Value: $count
1174
1175 =back
1176
1177 Counts the results in a literal query. Equivalent to calling L</search_literal>
1178 with the passed arguments, then L</count>.
1179
1180 =cut
1181
1182 sub count_literal { shift->search_literal(@_)->count; }
1183
1184 =head2 all
1185
1186 =over 4
1187
1188 =item Arguments: none
1189
1190 =item Return Value: @objects
1191
1192 =back
1193
1194 Returns all elements in the resultset. Called implicitly if the resultset
1195 is returned in list context.
1196
1197 =cut
1198
1199 sub all {
1200   my $self = shift;
1201   if(@_) {
1202       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1203   }
1204
1205   return @{ $self->get_cache } if $self->get_cache;
1206
1207   my @obj;
1208
1209   # TODO: don't call resolve here
1210   if (keys %{$self->_resolved_attrs->{collapse}}) {
1211 #  if ($self->{attrs}{prefetch}) {
1212       # Using $self->cursor->all is really just an optimisation.
1213       # If we're collapsing has_many prefetches it probably makes
1214       # very little difference, and this is cleaner than hacking
1215       # _construct_object to survive the approach
1216     my @row = $self->cursor->next;
1217     while (@row) {
1218       push(@obj, $self->_construct_object(@row));
1219       @row = (exists $self->{stashed_row}
1220                ? @{delete $self->{stashed_row}}
1221                : $self->cursor->next);
1222     }
1223   } else {
1224     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1225   }
1226
1227   $self->set_cache(\@obj) if $self->{attrs}{cache};
1228   return @obj;
1229 }
1230
1231 =head2 reset
1232
1233 =over 4
1234
1235 =item Arguments: none
1236
1237 =item Return Value: $self
1238
1239 =back
1240
1241 Resets the resultset's cursor, so you can iterate through the elements again.
1242
1243 =cut
1244
1245 sub reset {
1246   my ($self) = @_;
1247   delete $self->{_attrs} if exists $self->{_attrs};
1248   $self->{all_cache_position} = 0;
1249   $self->cursor->reset;
1250   return $self;
1251 }
1252
1253 =head2 first
1254
1255 =over 4
1256
1257 =item Arguments: none
1258
1259 =item Return Value: $object?
1260
1261 =back
1262
1263 Resets the resultset and returns an object for the first result (if the
1264 resultset returns anything).
1265
1266 =cut
1267
1268 sub first {
1269   return $_[0]->reset->next;
1270 }
1271
1272 # _cond_for_update_delete
1273 #
1274 # update/delete require the condition to be modified to handle
1275 # the differing SQL syntax available.  This transforms the $self->{cond}
1276 # appropriately, returning the new condition.
1277
1278 sub _cond_for_update_delete {
1279   my ($self, $full_cond) = @_;
1280   my $cond = {};
1281
1282   $full_cond ||= $self->{cond};
1283   # No-op. No condition, we're updating/deleting everything
1284   return $cond unless ref $full_cond;
1285
1286   if (ref $full_cond eq 'ARRAY') {
1287     $cond = [
1288       map {
1289         my %hash;
1290         foreach my $key (keys %{$_}) {
1291           $key =~ /([^.]+)$/;
1292           $hash{$1} = $_->{$key};
1293         }
1294         \%hash;
1295       } @{$full_cond}
1296     ];
1297   }
1298   elsif (ref $full_cond eq 'HASH') {
1299     if ((keys %{$full_cond})[0] eq '-and') {
1300       $cond->{-and} = [];
1301
1302       my @cond = @{$full_cond->{-and}};
1303       for (my $i = 0; $i < @cond; $i++) {
1304         my $entry = $cond[$i];
1305
1306         my $hash;
1307         if (ref $entry eq 'HASH') {
1308           $hash = $self->_cond_for_update_delete($entry);
1309         }
1310         else {
1311           $entry =~ /([^.]+)$/;
1312           $hash->{$1} = $cond[++$i];
1313         }
1314
1315         push @{$cond->{-and}}, $hash;
1316       }
1317     }
1318     else {
1319       foreach my $key (keys %{$full_cond}) {
1320         $key =~ /([^.]+)$/;
1321         $cond->{$1} = $full_cond->{$key};
1322       }
1323     }
1324   }
1325   else {
1326     $self->throw_exception(
1327       "Can't update/delete on resultset with condition unless hash or array"
1328     );
1329   }
1330
1331   return $cond;
1332 }
1333
1334
1335 =head2 update
1336
1337 =over 4
1338
1339 =item Arguments: \%values
1340
1341 =item Return Value: $storage_rv
1342
1343 =back
1344
1345 Sets the specified columns in the resultset to the supplied values in a
1346 single query. Return value will be true if the update succeeded or false
1347 if no records were updated; exact type of success value is storage-dependent.
1348
1349 =cut
1350
1351 sub update {
1352   my ($self, $values) = @_;
1353   $self->throw_exception("Values for update must be a hash")
1354     unless ref $values eq 'HASH';
1355
1356   carp(   'WARNING! Currently $rs->update() does not generate proper SQL'
1357         . ' on joined resultsets, and may affect rows well outside of the'
1358         . ' contents of $rs. Use at your own risk' )
1359     if ( $self->{attrs}{seen_join} );
1360
1361   my $cond = $self->_cond_for_update_delete;
1362    
1363   return $self->result_source->storage->update(
1364     $self->result_source, $values, $cond
1365   );
1366 }
1367
1368 =head2 update_all
1369
1370 =over 4
1371
1372 =item Arguments: \%values
1373
1374 =item Return Value: 1
1375
1376 =back
1377
1378 Fetches all objects and updates them one at a time. Note that C<update_all>
1379 will run DBIC cascade triggers, while L</update> will not.
1380
1381 =cut
1382
1383 sub update_all {
1384   my ($self, $values) = @_;
1385   $self->throw_exception("Values for update must be a hash")
1386     unless ref $values eq 'HASH';
1387   foreach my $obj ($self->all) {
1388     $obj->set_columns($values)->update;
1389   }
1390   return 1;
1391 }
1392
1393 =head2 delete
1394
1395 =over 4
1396
1397 =item Arguments: none
1398
1399 =item Return Value: 1
1400
1401 =back
1402
1403 Deletes the contents of the resultset from its result source. Note that this
1404 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1405 to run. See also L<DBIx::Class::Row/delete>.
1406
1407 delete may not generate correct SQL for a query with joins or a resultset
1408 chained from a related resultset.  In this case it will generate a warning:-
1409
1410   WARNING! Currently $rs->delete() does not generate proper SQL on
1411   joined resultsets, and may delete rows well outside of the contents
1412   of $rs. Use at your own risk
1413
1414 In these cases you may find that delete_all is more appropriate, or you
1415 need to respecify your query in a way that can be expressed without a join.
1416
1417 =cut
1418
1419 sub delete {
1420   my ($self) = @_;
1421   $self->throw_exception("Delete should not be passed any arguments")
1422     if $_[1];
1423   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1424         . ' on joined resultsets, and may delete rows well outside of the'
1425         . ' contents of $rs. Use at your own risk' )
1426     if ( $self->{attrs}{seen_join} );
1427   my $cond = $self->_cond_for_update_delete;
1428
1429   $self->result_source->storage->delete($self->result_source, $cond);
1430   return 1;
1431 }
1432
1433 =head2 delete_all
1434
1435 =over 4
1436
1437 =item Arguments: none
1438
1439 =item Return Value: 1
1440
1441 =back
1442
1443 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1444 will run DBIC cascade triggers, while L</delete> will not.
1445
1446 =cut
1447
1448 sub delete_all {
1449   my ($self) = @_;
1450   $_->delete for $self->all;
1451   return 1;
1452 }
1453
1454 =head2 populate
1455
1456 =over 4
1457
1458 =item Arguments: \@data;
1459
1460 =back
1461
1462 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1463 For the arrayref of hashrefs style each hashref should be a structure suitable
1464 forsubmitting to a $resultset->create(...) method.
1465
1466 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1467 to insert the data, as this is a faster method.  
1468
1469 Otherwise, each set of data is inserted into the database using
1470 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1471 objects is returned.
1472
1473 Example:  Assuming an Artist Class that has many CDs Classes relating:
1474
1475   my $Artist_rs = $schema->resultset("Artist");
1476   
1477   ## Void Context Example 
1478   $Artist_rs->populate([
1479      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1480         { title => 'My First CD', year => 2006 },
1481         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1482       ],
1483      },
1484      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1485         { title => 'My parents sold me to a record company' ,year => 2005 },
1486         { title => 'Why Am I So Ugly?', year => 2006 },
1487         { title => 'I Got Surgery and am now Popular', year => 2007 }
1488       ],
1489      },
1490   ]);
1491   
1492   ## Array Context Example
1493   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1494     { name => "Artist One"},
1495     { name => "Artist Two"},
1496     { name => "Artist Three", cds=> [
1497     { title => "First CD", year => 2007},
1498     { title => "Second CD", year => 2008},
1499   ]}
1500   ]);
1501   
1502   print $ArtistOne->name; ## response is 'Artist One'
1503   print $ArtistThree->cds->count ## reponse is '2'
1504
1505 For the arrayref of arrayrefs style,  the first element should be a list of the
1506 fieldsnames to which the remaining elements are rows being inserted.  For
1507 example:
1508
1509   $Arstist_rs->populate([
1510     [qw/artistid name/],
1511     [100, 'A Formally Unknown Singer'],
1512     [101, 'A singer that jumped the shark two albums ago'],
1513     [102, 'An actually cool singer.'],
1514   ]);
1515
1516 Please note an important effect on your data when choosing between void and
1517 wantarray context. Since void context goes straight to C<insert_bulk> in 
1518 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1519 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1520 create primary keys for you, you will find that your PKs are empty.  In this 
1521 case you will have to use the wantarray context in order to create those 
1522 values.
1523
1524 =cut
1525
1526 sub populate {
1527   my $self = shift @_;
1528   my $data = ref $_[0][0] eq 'HASH'
1529     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1530     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1531   
1532   if(defined wantarray) {
1533     my @created;
1534     foreach my $item (@$data) {
1535       push(@created, $self->create($item));
1536     }
1537     return @created;
1538   } else {
1539     my ($first, @rest) = @$data;
1540
1541     my @names = grep {!ref $first->{$_}} keys %$first;
1542     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1543     my @pks = $self->result_source->primary_columns;  
1544
1545     ## do the belongs_to relationships  
1546     foreach my $index (0..$#$data) {
1547       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1548         my @ret = $self->populate($data);
1549         return;
1550       }
1551     
1552       foreach my $rel (@rels) {
1553         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1554         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1555         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1556         my $related = $result->result_source->resolve_condition(
1557           $result->result_source->relationship_info($reverse)->{cond},
1558           $self,        
1559           $result,        
1560         );
1561
1562         delete $data->[$index]->{$rel};
1563         $data->[$index] = {%{$data->[$index]}, %$related};
1564       
1565         push @names, keys %$related if $index == 0;
1566       }
1567     }
1568
1569     ## do bulk insert on current row
1570     my @values = map { [ @$_{@names} ] } @$data;
1571
1572     $self->result_source->storage->insert_bulk(
1573       $self->result_source, 
1574       \@names, 
1575       \@values,
1576     );
1577
1578     ## do the has_many relationships
1579     foreach my $item (@$data) {
1580
1581       foreach my $rel (@rels) {
1582         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1583
1584         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1585      || $self->throw_exception('Cannot find the relating object.');
1586      
1587         my $child = $parent->$rel;
1588     
1589         my $related = $child->result_source->resolve_condition(
1590           $parent->result_source->relationship_info($rel)->{cond},
1591           $child,
1592           $parent,
1593         );
1594
1595         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1596         my @populate = map { {%$_, %$related} } @rows_to_add;
1597
1598         $child->populate( \@populate );
1599       }
1600     }
1601   }
1602 }
1603
1604 =head2 _normalize_populate_args ($args)
1605
1606 Private method used by L</populate> to normalize its incoming arguments.  Factored
1607 out in case you want to subclass and accept new argument structures to the
1608 L</populate> method.
1609
1610 =cut
1611
1612 sub _normalize_populate_args {
1613   my ($self, $data) = @_;
1614   my @names = @{shift(@$data)};
1615   my @results_to_create;
1616   foreach my $datum (@$data) {
1617     my %result_to_create;
1618     foreach my $index (0..$#names) {
1619       $result_to_create{$names[$index]} = $$datum[$index];
1620     }
1621     push @results_to_create, \%result_to_create;    
1622   }
1623   return \@results_to_create;
1624 }
1625
1626 =head2 pager
1627
1628 =over 4
1629
1630 =item Arguments: none
1631
1632 =item Return Value: $pager
1633
1634 =back
1635
1636 Return Value a L<Data::Page> object for the current resultset. Only makes
1637 sense for queries with a C<page> attribute.
1638
1639 =cut
1640
1641 sub pager {
1642   my ($self) = @_;
1643   my $attrs = $self->{attrs};
1644   $self->throw_exception("Can't create pager for non-paged rs")
1645     unless $self->{attrs}{page};
1646   $attrs->{rows} ||= 10;
1647   return $self->{pager} ||= Data::Page->new(
1648     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1649 }
1650
1651 =head2 page
1652
1653 =over 4
1654
1655 =item Arguments: $page_number
1656
1657 =item Return Value: $rs
1658
1659 =back
1660
1661 Returns a resultset for the $page_number page of the resultset on which page
1662 is called, where each page contains a number of rows equal to the 'rows'
1663 attribute set on the resultset (10 by default).
1664
1665 =cut
1666
1667 sub page {
1668   my ($self, $page) = @_;
1669   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1670 }
1671
1672 =head2 new_result
1673
1674 =over 4
1675
1676 =item Arguments: \%vals
1677
1678 =item Return Value: $rowobject
1679
1680 =back
1681
1682 Creates a new row object in the resultset's result class and returns
1683 it. The row is not inserted into the database at this point, call
1684 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1685 will tell you whether the row object has been inserted or not.
1686
1687 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1688
1689 =cut
1690
1691 sub new_result {
1692   my ($self, $values) = @_;
1693   $self->throw_exception( "new_result needs a hash" )
1694     unless (ref $values eq 'HASH');
1695
1696   my %new;
1697   my $alias = $self->{attrs}{alias};
1698
1699   if (
1700     defined $self->{cond}
1701     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1702   ) {
1703     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
1704     $new{-from_resultset} = [ keys %new ] if keys %new;
1705   } else {
1706     $self->throw_exception(
1707       "Can't abstract implicit construct, condition not a hash"
1708     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1709   
1710     my $collapsed_cond = (
1711       $self->{cond}
1712         ? $self->_collapse_cond($self->{cond})
1713         : {}
1714     );
1715   
1716     # precendence must be given to passed values over values inherited from
1717     # the cond, so the order here is important.
1718     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1719     while( my($col,$value) = each %implied ){
1720       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1721         $new{$col} = $value->{'='};
1722         next;
1723       }
1724       $new{$col} = $value if $self->_is_deterministic_value($value);
1725     }
1726   }
1727
1728   %new = (
1729     %new,
1730     %{ $self->_remove_alias($values, $alias) },
1731     -source_handle => $self->_source_handle,
1732     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1733   );
1734
1735   return $self->result_class->new(\%new);
1736 }
1737
1738 # _is_deterministic_value
1739 #
1740 # Make an effor to strip non-deterministic values from the condition, 
1741 # to make sure new_result chokes less
1742
1743 sub _is_deterministic_value {
1744   my $self = shift;
1745   my $value = shift;
1746   my $ref_type = ref $value;
1747   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1748   return 1 if Scalar::Util::blessed($value);
1749   return 0;
1750 }
1751
1752 # _collapse_cond
1753 #
1754 # Recursively collapse the condition.
1755
1756 sub _collapse_cond {
1757   my ($self, $cond, $collapsed) = @_;
1758
1759   $collapsed ||= {};
1760
1761   if (ref $cond eq 'ARRAY') {
1762     foreach my $subcond (@$cond) {
1763       next unless ref $subcond;  # -or
1764 #      warn "ARRAY: " . Dumper $subcond;
1765       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1766     }
1767   }
1768   elsif (ref $cond eq 'HASH') {
1769     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1770       foreach my $subcond (@{$cond->{-and}}) {
1771 #        warn "HASH: " . Dumper $subcond;
1772         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1773       }
1774     }
1775     else {
1776 #      warn "LEAF: " . Dumper $cond;
1777       foreach my $col (keys %$cond) {
1778         my $value = $cond->{$col};
1779         $collapsed->{$col} = $value;
1780       }
1781     }
1782   }
1783
1784   return $collapsed;
1785 }
1786
1787 # _remove_alias
1788 #
1789 # Remove the specified alias from the specified query hash. A copy is made so
1790 # the original query is not modified.
1791
1792 sub _remove_alias {
1793   my ($self, $query, $alias) = @_;
1794
1795   my %orig = %{ $query || {} };
1796   my %unaliased;
1797
1798   foreach my $key (keys %orig) {
1799     if ($key !~ /\./) {
1800       $unaliased{$key} = $orig{$key};
1801       next;
1802     }
1803     $unaliased{$1} = $orig{$key}
1804       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1805   }
1806
1807   return \%unaliased;
1808 }
1809
1810 =head2 as_query
1811
1812 =over 4
1813
1814 =item Arguments: none
1815
1816 =item Return Value: \[ $sql, @bind ]
1817
1818 =back
1819
1820 Returns the SQL query and bind vars associated with the invocant.
1821
1822 This is generally used as the RHS for a subquery.
1823
1824 =cut
1825
1826 sub as_query { return shift->cursor->as_query(@_) }
1827
1828 =head2 find_or_new
1829
1830 =over 4
1831
1832 =item Arguments: \%vals, \%attrs?
1833
1834 =item Return Value: $rowobject
1835
1836 =back
1837
1838   my $artist = $schema->resultset('Artist')->find_or_new(
1839     { artist => 'fred' }, { key => 'artists' });
1840
1841   $cd->cd_to_producer->find_or_new({ producer => $producer },
1842                                    { key => 'primary });
1843
1844 Find an existing record from this resultset, based on its primary
1845 key, or a unique constraint. If none exists, instantiate a new result
1846 object and return it. The object will not be saved into your storage
1847 until you call L<DBIx::Class::Row/insert> on it.
1848
1849 You most likely want this method when looking for existing rows using
1850 a unique constraint that is not the primary key, or looking for
1851 related rows.
1852
1853 If you want objects to be saved immediately, use L</find_or_create> instead.
1854
1855 B<Note>: C<find_or_new> is probably not what you want when creating a
1856 new row in a table that uses primary keys supplied by the
1857 database. Passing in a primary key column with a value of I<undef>
1858 will cause L</find> to attempt to search for a row with a value of
1859 I<NULL>.
1860
1861 =cut
1862
1863 sub find_or_new {
1864   my $self     = shift;
1865   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1866   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1867   my $exists   = $self->find($hash, $attrs);
1868   return defined $exists ? $exists : $self->new_result($hash);
1869 }
1870
1871 =head2 create
1872
1873 =over 4
1874
1875 =item Arguments: \%vals
1876
1877 =item Return Value: a L<DBIx::Class::Row> $object
1878
1879 =back
1880
1881 Attempt to create a single new row or a row with multiple related rows
1882 in the table represented by the resultset (and related tables). This
1883 will not check for duplicate rows before inserting, use
1884 L</find_or_create> to do that.
1885
1886 To create one row for this resultset, pass a hashref of key/value
1887 pairs representing the columns of the table and the values you wish to
1888 store. If the appropriate relationships are set up, foreign key fields
1889 can also be passed an object representing the foreign row, and the
1890 value will be set to its primary key.
1891
1892 To create related objects, pass a hashref for the value if the related
1893 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1894 and use the name of the relationship as the key. (NOT the name of the field,
1895 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1896 of hashrefs containing the data for each of the rows to create in the foreign
1897 tables, again using the relationship name as the key.
1898
1899 Instead of hashrefs of plain related data (key/value pairs), you may
1900 also pass new or inserted objects. New objects (not inserted yet, see
1901 L</new>), will be inserted into their appropriate tables.
1902
1903 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1904
1905 Example of creating a new row.
1906
1907   $person_rs->create({
1908     name=>"Some Person",
1909     email=>"somebody@someplace.com"
1910   });
1911   
1912 Example of creating a new row and also creating rows in a related C<has_many>
1913 or C<has_one> resultset.  Note Arrayref.
1914
1915   $artist_rs->create(
1916      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1917         { title => 'My First CD', year => 2006 },
1918         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1919       ],
1920      },
1921   );
1922
1923 Example of creating a new row and also creating a row in a related
1924 C<belongs_to>resultset. Note Hashref.
1925
1926   $cd_rs->create({
1927     title=>"Music for Silly Walks",
1928     year=>2000,
1929     artist => {
1930       name=>"Silly Musician",
1931     }
1932   });
1933
1934 =cut
1935
1936 sub create {
1937   my ($self, $attrs) = @_;
1938   $self->throw_exception( "create needs a hashref" )
1939     unless ref $attrs eq 'HASH';
1940   return $self->new_result($attrs)->insert;
1941 }
1942
1943 =head2 find_or_create
1944
1945 =over 4
1946
1947 =item Arguments: \%vals, \%attrs?
1948
1949 =item Return Value: $rowobject
1950
1951 =back
1952
1953   $cd->cd_to_producer->find_or_create({ producer => $producer },
1954                                       { key => 'primary });
1955
1956 Tries to find a record based on its primary key or unique constraints; if none
1957 is found, creates one and returns that instead.
1958
1959   my $cd = $schema->resultset('CD')->find_or_create({
1960     cdid   => 5,
1961     artist => 'Massive Attack',
1962     title  => 'Mezzanine',
1963     year   => 2005,
1964   });
1965
1966 Also takes an optional C<key> attribute, to search by a specific key or unique
1967 constraint. For example:
1968
1969   my $cd = $schema->resultset('CD')->find_or_create(
1970     {
1971       artist => 'Massive Attack',
1972       title  => 'Mezzanine',
1973     },
1974     { key => 'cd_artist_title' }
1975   );
1976
1977 B<Note>: Because find_or_create() reads from the database and then
1978 possibly inserts based on the result, this method is subject to a race
1979 condition. Another process could create a record in the table after
1980 the find has completed and before the create has started. To avoid
1981 this problem, use find_or_create() inside a transaction.
1982
1983 B<Note>: C<find_or_create> is probably not what you want when creating
1984 a new row in a table that uses primary keys supplied by the
1985 database. Passing in a primary key column with a value of I<undef>
1986 will cause L</find> to attempt to search for a row with a value of
1987 I<NULL>.
1988
1989 See also L</find> and L</update_or_create>. For information on how to declare
1990 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1991
1992 =cut
1993
1994 sub find_or_create {
1995   my $self     = shift;
1996   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1997   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1998   my $exists   = $self->find($hash, $attrs);
1999   return defined $exists ? $exists : $self->create($hash);
2000 }
2001
2002 =head2 update_or_create
2003
2004 =over 4
2005
2006 =item Arguments: \%col_values, { key => $unique_constraint }?
2007
2008 =item Return Value: $rowobject
2009
2010 =back
2011
2012   $resultset->update_or_create({ col => $val, ... });
2013
2014 First, searches for an existing row matching one of the unique constraints
2015 (including the primary key) on the source of this resultset. If a row is
2016 found, updates it with the other given column values. Otherwise, creates a new
2017 row.
2018
2019 Takes an optional C<key> attribute to search on a specific unique constraint.
2020 For example:
2021
2022   # In your application
2023   my $cd = $schema->resultset('CD')->update_or_create(
2024     {
2025       artist => 'Massive Attack',
2026       title  => 'Mezzanine',
2027       year   => 1998,
2028     },
2029     { key => 'cd_artist_title' }
2030   );
2031
2032   $cd->cd_to_producer->update_or_create({ 
2033     producer => $producer, 
2034     name => 'harry',
2035   }, { 
2036     key => 'primary,
2037   });
2038
2039
2040 If no C<key> is specified, it searches on all unique constraints defined on the
2041 source, including the primary key.
2042
2043 If the C<key> is specified as C<primary>, it searches only on the primary key.
2044
2045 See also L</find> and L</find_or_create>. For information on how to declare
2046 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2047
2048 B<Note>: C<update_or_create> is probably not what you want when
2049 looking for a row in a table that uses primary keys supplied by the
2050 database, unless you actually have a key value. Passing in a primary
2051 key column with a value of I<undef> will cause L</find> to attempt to
2052 search for a row with a value of I<NULL>.
2053
2054 =cut
2055
2056 sub update_or_create {
2057   my $self = shift;
2058   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2059   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2060
2061   my $row = $self->find($cond, $attrs);
2062   if (defined $row) {
2063     $row->update($cond);
2064     return $row;
2065   }
2066
2067   return $self->create($cond);
2068 }
2069
2070 =head2 get_cache
2071
2072 =over 4
2073
2074 =item Arguments: none
2075
2076 =item Return Value: \@cache_objects?
2077
2078 =back
2079
2080 Gets the contents of the cache for the resultset, if the cache is set.
2081
2082 The cache is populated either by using the L</prefetch> attribute to
2083 L</search> or by calling L</set_cache>.
2084
2085 =cut
2086
2087 sub get_cache {
2088   shift->{all_cache};
2089 }
2090
2091 =head2 set_cache
2092
2093 =over 4
2094
2095 =item Arguments: \@cache_objects
2096
2097 =item Return Value: \@cache_objects
2098
2099 =back
2100
2101 Sets the contents of the cache for the resultset. Expects an arrayref
2102 of objects of the same class as those produced by the resultset. Note that
2103 if the cache is set the resultset will return the cached objects rather
2104 than re-querying the database even if the cache attr is not set.
2105
2106 The contents of the cache can also be populated by using the
2107 L</prefetch> attribute to L</search>.
2108
2109 =cut
2110
2111 sub set_cache {
2112   my ( $self, $data ) = @_;
2113   $self->throw_exception("set_cache requires an arrayref")
2114       if defined($data) && (ref $data ne 'ARRAY');
2115   $self->{all_cache} = $data;
2116 }
2117
2118 =head2 clear_cache
2119
2120 =over 4
2121
2122 =item Arguments: none
2123
2124 =item Return Value: []
2125
2126 =back
2127
2128 Clears the cache for the resultset.
2129
2130 =cut
2131
2132 sub clear_cache {
2133   shift->set_cache(undef);
2134 }
2135
2136 =head2 related_resultset
2137
2138 =over 4
2139
2140 =item Arguments: $relationship_name
2141
2142 =item Return Value: $resultset
2143
2144 =back
2145
2146 Returns a related resultset for the supplied relationship name.
2147
2148   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2149
2150 =cut
2151
2152 sub related_resultset {
2153   my ($self, $rel) = @_;
2154
2155   $self->{related_resultsets} ||= {};
2156   return $self->{related_resultsets}{$rel} ||= do {
2157     my $rel_obj = $self->result_source->relationship_info($rel);
2158
2159     $self->throw_exception(
2160       "search_related: result source '" . $self->result_source->source_name .
2161         "' has no such relationship $rel")
2162       unless $rel_obj;
2163     
2164     my ($from,$seen) = $self->_resolve_from($rel);
2165
2166     my $join_count = $seen->{$rel};
2167     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2168
2169     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2170     my %attrs = %{$self->{attrs}||{}};
2171     delete @attrs{qw(result_class alias)};
2172
2173     my $new_cache;
2174
2175     if (my $cache = $self->get_cache) {
2176       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2177         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2178                         @$cache ];
2179       }
2180     }
2181
2182     my $rel_source = $self->result_source->related_source($rel);
2183
2184     my $new = do {
2185
2186       # The reason we do this now instead of passing the alias to the
2187       # search_rs below is that if you wrap/overload resultset on the
2188       # source you need to know what alias it's -going- to have for things
2189       # to work sanely (e.g. RestrictWithObject wants to be able to add
2190       # extra query restrictions, and these may need to be $alias.)
2191
2192       my $attrs = $rel_source->resultset_attributes;
2193       local $attrs->{alias} = $alias;
2194
2195       $rel_source->resultset
2196                  ->search_rs(
2197                      undef, {
2198                        %attrs,
2199                        join => undef,
2200                        prefetch => undef,
2201                        select => undef,
2202                        as => undef,
2203                        where => $self->{cond},
2204                        seen_join => $seen,
2205                        from => $from,
2206                    });
2207     };
2208     $new->set_cache($new_cache) if $new_cache;
2209     $new;
2210   };
2211 }
2212
2213 =head2 current_source_alias
2214
2215 =over 4
2216
2217 =item Arguments: none
2218
2219 =item Return Value: $source_alias
2220
2221 =back
2222
2223 Returns the current table alias for the result source this resultset is built
2224 on, that will be used in the SQL query. Usually it is C<me>.
2225
2226 Currently the source alias that refers to the result set returned by a
2227 L</search>/L</find> family method depends on how you got to the resultset: it's
2228 C<me> by default, but eg. L</search_related> aliases it to the related result
2229 source name (and keeps C<me> referring to the original result set). The long
2230 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2231 (and make this method unnecessary).
2232
2233 Thus it's currently necessary to use this method in predefined queries (see
2234 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2235 source alias of the current result set:
2236
2237   # in a result set class
2238   sub modified_by {
2239     my ($self, $user) = @_;
2240
2241     my $me = $self->current_source_alias;
2242
2243     return $self->search(
2244       "$me.modified" => $user->id,
2245     );
2246   }
2247
2248 =cut
2249
2250 sub current_source_alias {
2251   my ($self) = @_;
2252
2253   return ($self->{attrs} || {})->{alias} || 'me';
2254 }
2255
2256 sub _resolve_from {
2257   my ($self, $extra_join) = @_;
2258   my $source = $self->result_source;
2259   my $attrs = $self->{attrs};
2260   
2261   my $from = $attrs->{from}
2262     || [ { $attrs->{alias} => $source->from } ];
2263     
2264   my $seen = { %{$attrs->{seen_join}||{}} };
2265
2266   my $join = ($attrs->{join}
2267                ? [ $attrs->{join}, $extra_join ]
2268                : $extra_join);
2269
2270   # we need to take the prefetch the attrs into account before we 
2271   # ->resolve_join as otherwise they get lost - captainL
2272   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2273
2274   $from = [
2275     @$from,
2276     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2277   ];
2278
2279   return ($from,$seen);
2280 }
2281
2282 sub _resolved_attrs {
2283   my $self = shift;
2284   return $self->{_attrs} if $self->{_attrs};
2285
2286   my $attrs  = { %{ $self->{attrs} || {} } };
2287   my $source = $self->result_source;
2288   my $alias  = $attrs->{alias};
2289
2290   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2291   my @colbits;
2292
2293   # build columns (as long as select isn't set) into a set of as/select hashes
2294   unless ( $attrs->{select} ) {
2295       @colbits = map {
2296           ( ref($_) eq 'HASH' ) ? $_
2297             : {
2298               (
2299                   /^\Q${alias}.\E(.+)$/ ? $1
2300                   : $_
2301                 ) => ( /\./ ? $_ : "${alias}.$_" )
2302             }
2303       } ( ref($attrs->{columns}) eq 'ARRAY' ) ? @{ delete $attrs->{columns}} : (delete $attrs->{columns} || $source->columns );
2304   }
2305   # add the additional columns on
2306   foreach ( 'include_columns', '+columns' ) {
2307       push @colbits, map {
2308           ( ref($_) eq 'HASH' )
2309             ? $_
2310             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2311       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2312   }
2313
2314   # start with initial select items
2315   if ( $attrs->{select} ) {
2316     $attrs->{select} =
2317         ( ref $attrs->{select} eq 'ARRAY' )
2318       ? [ @{ $attrs->{select} } ]
2319       : [ $attrs->{select} ];
2320     $attrs->{as} = (
2321       $attrs->{as}
2322       ? (
2323         ref $attrs->{as} eq 'ARRAY'
2324         ? [ @{ $attrs->{as} } ]
2325         : [ $attrs->{as} ]
2326         )
2327       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2328     );
2329   }
2330   else {
2331
2332     # otherwise we intialise select & as to empty
2333     $attrs->{select} = [];
2334     $attrs->{as}     = [];
2335   }
2336
2337   # now add colbits to select/as
2338   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2339   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2340
2341   my $adds;
2342   if ( $adds = delete $attrs->{'+select'} ) {
2343     $adds = [$adds] unless ref $adds eq 'ARRAY';
2344     push(
2345       @{ $attrs->{select} },
2346       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2347     );
2348   }
2349   if ( $adds = delete $attrs->{'+as'} ) {
2350     $adds = [$adds] unless ref $adds eq 'ARRAY';
2351     push( @{ $attrs->{as} }, @$adds );
2352   }
2353
2354   $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
2355
2356   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
2357     my $join = delete $attrs->{join} || {};
2358
2359     if ( defined $attrs->{prefetch} ) {
2360       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2361
2362     }
2363
2364     $attrs->{from} =    # have to copy here to avoid corrupting the original
2365       [
2366       @{ $attrs->{from} },
2367       $source->resolve_join(
2368         $join, $alias, { %{ $attrs->{seen_join} || {} } }
2369       )
2370       ];
2371
2372   }
2373
2374   $attrs->{group_by} ||= $attrs->{select}
2375     if delete $attrs->{distinct};
2376   if ( $attrs->{order_by} ) {
2377     $attrs->{order_by} = (
2378       ref( $attrs->{order_by} ) eq 'ARRAY'
2379       ? [ @{ $attrs->{order_by} } ]
2380       : [ $attrs->{order_by} ]
2381     );
2382   }
2383   else {
2384     $attrs->{order_by} = [];
2385   }
2386
2387   my $collapse = $attrs->{collapse} || {};
2388   if ( my $prefetch = delete $attrs->{prefetch} ) {
2389     $prefetch = $self->_merge_attr( {}, $prefetch );
2390     my @pre_order;
2391     my $seen = { %{ $attrs->{seen_join} || {} } };
2392     foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
2393
2394       # bring joins back to level of current class
2395       my @prefetch =
2396         $source->resolve_prefetch( $p, $alias, $seen, \@pre_order, $collapse );
2397       push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
2398       push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
2399     }
2400     push( @{ $attrs->{order_by} }, @pre_order );
2401   }
2402   $attrs->{collapse} = $collapse;
2403
2404   if ( $attrs->{page} ) {
2405     $attrs->{offset} ||= 0;
2406     $attrs->{offset} += ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
2407   }
2408
2409   return $self->{_attrs} = $attrs;
2410 }
2411
2412 sub _rollout_attr {
2413   my ($self, $attr) = @_;
2414   
2415   if (ref $attr eq 'HASH') {
2416     return $self->_rollout_hash($attr);
2417   } elsif (ref $attr eq 'ARRAY') {
2418     return $self->_rollout_array($attr);
2419   } else {
2420     return [$attr];
2421   }
2422 }
2423
2424 sub _rollout_array {
2425   my ($self, $attr) = @_;
2426
2427   my @rolled_array;
2428   foreach my $element (@{$attr}) {
2429     if (ref $element eq 'HASH') {
2430       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2431     } elsif (ref $element eq 'ARRAY') {
2432       #  XXX - should probably recurse here
2433       push( @rolled_array, @{$self->_rollout_array($element)} );
2434     } else {
2435       push( @rolled_array, $element );
2436     }
2437   }
2438   return \@rolled_array;
2439 }
2440
2441 sub _rollout_hash {
2442   my ($self, $attr) = @_;
2443
2444   my @rolled_array;
2445   foreach my $key (keys %{$attr}) {
2446     push( @rolled_array, { $key => $attr->{$key} } );
2447   }
2448   return \@rolled_array;
2449 }
2450
2451 sub _calculate_score {
2452   my ($self, $a, $b) = @_;
2453
2454   if (ref $b eq 'HASH') {
2455     my ($b_key) = keys %{$b};
2456     if (ref $a eq 'HASH') {
2457       my ($a_key) = keys %{$a};
2458       if ($a_key eq $b_key) {
2459         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2460       } else {
2461         return 0;
2462       }
2463     } else {
2464       return ($a eq $b_key) ? 1 : 0;
2465     }       
2466   } else {
2467     if (ref $a eq 'HASH') {
2468       my ($a_key) = keys %{$a};
2469       return ($b eq $a_key) ? 1 : 0;
2470     } else {
2471       return ($b eq $a) ? 1 : 0;
2472     }
2473   }
2474 }
2475
2476 sub _merge_attr {
2477   my ($self, $orig, $import) = @_;
2478
2479   return $import unless defined($orig);
2480   return $orig unless defined($import);
2481   
2482   $orig = $self->_rollout_attr($orig);
2483   $import = $self->_rollout_attr($import);
2484
2485   my $seen_keys;
2486   foreach my $import_element ( @{$import} ) {
2487     # find best candidate from $orig to merge $b_element into
2488     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2489     foreach my $orig_element ( @{$orig} ) {
2490       my $score = $self->_calculate_score( $orig_element, $import_element );
2491       if ($score > $best_candidate->{score}) {
2492         $best_candidate->{position} = $position;
2493         $best_candidate->{score} = $score;
2494       }
2495       $position++;
2496     }
2497     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2498
2499     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2500       push( @{$orig}, $import_element );
2501     } else {
2502       my $orig_best = $orig->[$best_candidate->{position}];
2503       # merge orig_best and b_element together and replace original with merged
2504       if (ref $orig_best ne 'HASH') {
2505         $orig->[$best_candidate->{position}] = $import_element;
2506       } elsif (ref $import_element eq 'HASH') {
2507         my ($key) = keys %{$orig_best};
2508         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2509       }
2510     }
2511     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2512   }
2513
2514   return $orig;
2515 }
2516
2517 sub result_source {
2518     my $self = shift;
2519
2520     if (@_) {
2521         $self->_source_handle($_[0]->handle);
2522     } else {
2523         $self->_source_handle->resolve;
2524     }
2525 }
2526
2527 =head2 throw_exception
2528
2529 See L<DBIx::Class::Schema/throw_exception> for details.
2530
2531 =cut
2532
2533 sub throw_exception {
2534   my $self=shift;
2535   if (ref $self && $self->_source_handle->schema) {
2536     $self->_source_handle->schema->throw_exception(@_)
2537   } else {
2538     croak(@_);
2539   }
2540
2541 }
2542
2543 # XXX: FIXME: Attributes docs need clearing up
2544
2545 =head1 ATTRIBUTES
2546
2547 Attributes are used to refine a ResultSet in various ways when
2548 searching for data. They can be passed to any method which takes an
2549 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2550 L</count>.
2551
2552 These are in no particular order:
2553
2554 =head2 order_by
2555
2556 =over 4
2557
2558 =item Value: ($order_by | \@order_by)
2559
2560 =back
2561
2562 Which column(s) to order the results by. This is currently passed
2563 through directly to SQL, so you can give e.g. C<year DESC> for a
2564 descending order on the column `year'.
2565
2566 Please note that if you have C<quote_char> enabled (see
2567 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
2568 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
2569 so you will need to manually quote things as appropriate.)
2570
2571 If your L<SQL::Abstract> version supports it (>=1.50), you can also use
2572 C<{-desc => 'year'}>, which takes care of the quoting for you. This is the
2573 recommended syntax.
2574
2575 =head2 columns
2576
2577 =over 4
2578
2579 =item Value: \@columns
2580
2581 =back
2582
2583 Shortcut to request a particular set of columns to be retrieved. Each
2584 column spec may be a string (a table column name), or a hash (in which
2585 case the key is the C<as> value, and the value is used as the C<select>
2586 expression). Adds C<me.> onto the start of any column without a C<.> in
2587 it and sets C<select> from that, then auto-populates C<as> from
2588 C<select> as normal. (You may also use the C<cols> attribute, as in
2589 earlier versions of DBIC.)
2590
2591 =head2 +columns
2592
2593 =over 4
2594
2595 =item Value: \@columns
2596
2597 =back
2598
2599 Indicates additional columns to be selected from storage. Works the same
2600 as L</columns> but adds columns to the selection. (You may also use the
2601 C<include_columns> attribute, as in earlier versions of DBIC). For
2602 example:-
2603
2604   $schema->resultset('CD')->search(undef, {
2605     '+columns' => ['artist.name'],
2606     join => ['artist']
2607   });
2608
2609 would return all CDs and include a 'name' column to the information
2610 passed to object inflation. Note that the 'artist' is the name of the
2611 column (or relationship) accessor, and 'name' is the name of the column
2612 accessor in the related table.
2613
2614 =head2 include_columns
2615
2616 =over 4
2617
2618 =item Value: \@columns
2619
2620 =back
2621
2622 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
2623
2624 =head2 select
2625
2626 =over 4
2627
2628 =item Value: \@select_columns
2629
2630 =back
2631
2632 Indicates which columns should be selected from the storage. You can use
2633 column names, or in the case of RDBMS back ends, function or stored procedure
2634 names:
2635
2636   $rs = $schema->resultset('Employee')->search(undef, {
2637     select => [
2638       'name',
2639       { count => 'employeeid' },
2640       { sum => 'salary' }
2641     ]
2642   });
2643
2644 When you use function/stored procedure names and do not supply an C<as>
2645 attribute, the column names returned are storage-dependent. E.g. MySQL would
2646 return a column named C<count(employeeid)> in the above example.
2647
2648 =head2 +select
2649
2650 =over 4
2651
2652 Indicates additional columns to be selected from storage.  Works the same as
2653 L</select> but adds columns to the selection.
2654
2655 =back
2656
2657 =head2 +as
2658
2659 =over 4
2660
2661 Indicates additional column names for those added via L</+select>. See L</as>.
2662
2663 =back
2664
2665 =head2 as
2666
2667 =over 4
2668
2669 =item Value: \@inflation_names
2670
2671 =back
2672
2673 Indicates column names for object inflation. That is, C<as>
2674 indicates the name that the column can be accessed as via the
2675 C<get_column> method (or via the object accessor, B<if one already
2676 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2677
2678 The C<as> attribute is used in conjunction with C<select>,
2679 usually when C<select> contains one or more function or stored
2680 procedure names:
2681
2682   $rs = $schema->resultset('Employee')->search(undef, {
2683     select => [
2684       'name',
2685       { count => 'employeeid' }
2686     ],
2687     as => ['name', 'employee_count'],
2688   });
2689
2690   my $employee = $rs->first(); # get the first Employee
2691
2692 If the object against which the search is performed already has an accessor
2693 matching a column name specified in C<as>, the value can be retrieved using
2694 the accessor as normal:
2695
2696   my $name = $employee->name();
2697
2698 If on the other hand an accessor does not exist in the object, you need to
2699 use C<get_column> instead:
2700
2701   my $employee_count = $employee->get_column('employee_count');
2702
2703 You can create your own accessors if required - see
2704 L<DBIx::Class::Manual::Cookbook> for details.
2705
2706 Please note: This will NOT insert an C<AS employee_count> into the SQL
2707 statement produced, it is used for internal access only. Thus
2708 attempting to use the accessor in an C<order_by> clause or similar
2709 will fail miserably.
2710
2711 To get around this limitation, you can supply literal SQL to your
2712 C<select> attibute that contains the C<AS alias> text, eg:
2713
2714   select => [\'myfield AS alias']
2715
2716 =head2 join
2717
2718 =over 4
2719
2720 =item Value: ($rel_name | \@rel_names | \%rel_names)
2721
2722 =back
2723
2724 Contains a list of relationships that should be joined for this query.  For
2725 example:
2726
2727   # Get CDs by Nine Inch Nails
2728   my $rs = $schema->resultset('CD')->search(
2729     { 'artist.name' => 'Nine Inch Nails' },
2730     { join => 'artist' }
2731   );
2732
2733 Can also contain a hash reference to refer to the other relation's relations.
2734 For example:
2735
2736   package MyApp::Schema::Track;
2737   use base qw/DBIx::Class/;
2738   __PACKAGE__->table('track');
2739   __PACKAGE__->add_columns(qw/trackid cd position title/);
2740   __PACKAGE__->set_primary_key('trackid');
2741   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2742   1;
2743
2744   # In your application
2745   my $rs = $schema->resultset('Artist')->search(
2746     { 'track.title' => 'Teardrop' },
2747     {
2748       join     => { cd => 'track' },
2749       order_by => 'artist.name',
2750     }
2751   );
2752
2753 You need to use the relationship (not the table) name in  conditions, 
2754 because they are aliased as such. The current table is aliased as "me", so 
2755 you need to use me.column_name in order to avoid ambiguity. For example:
2756
2757   # Get CDs from 1984 with a 'Foo' track 
2758   my $rs = $schema->resultset('CD')->search(
2759     { 
2760       'me.year' => 1984,
2761       'tracks.name' => 'Foo'
2762     },
2763     { join => 'tracks' }
2764   );
2765   
2766 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2767 similarly for a third time). For e.g.
2768
2769   my $rs = $schema->resultset('Artist')->search({
2770     'cds.title'   => 'Down to Earth',
2771     'cds_2.title' => 'Popular',
2772   }, {
2773     join => [ qw/cds cds/ ],
2774   });
2775
2776 will return a set of all artists that have both a cd with title 'Down
2777 to Earth' and a cd with title 'Popular'.
2778
2779 If you want to fetch related objects from other tables as well, see C<prefetch>
2780 below.
2781
2782 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2783
2784 =head2 prefetch
2785
2786 =over 4
2787
2788 =item Value: ($rel_name | \@rel_names | \%rel_names)
2789
2790 =back
2791
2792 Contains one or more relationships that should be fetched along with
2793 the main query (when they are accessed afterwards the data will
2794 already be available, without extra queries to the database).  This is
2795 useful for when you know you will need the related objects, because it
2796 saves at least one query:
2797
2798   my $rs = $schema->resultset('Tag')->search(
2799     undef,
2800     {
2801       prefetch => {
2802         cd => 'artist'
2803       }
2804     }
2805   );
2806
2807 The initial search results in SQL like the following:
2808
2809   SELECT tag.*, cd.*, artist.* FROM tag
2810   JOIN cd ON tag.cd = cd.cdid
2811   JOIN artist ON cd.artist = artist.artistid
2812
2813 L<DBIx::Class> has no need to go back to the database when we access the
2814 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2815 case.
2816
2817 Simple prefetches will be joined automatically, so there is no need
2818 for a C<join> attribute in the above search. 
2819
2820 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2821 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2822 with an accessor type of 'single' or 'filter'). A more complex example that
2823 prefetches an artists cds, the tracks on those cds, and the tags associted 
2824 with that artist is given below (assuming many-to-many from artists to tags):
2825
2826  my $rs = $schema->resultset('Artist')->search(
2827    undef,
2828    {
2829      prefetch => [
2830        { cds => 'tracks' },
2831        { artist_tags => 'tags' }
2832      ]
2833    }
2834  );
2835  
2836
2837 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2838 attributes will be ignored.
2839
2840 =head2 page
2841
2842 =over 4
2843
2844 =item Value: $page
2845
2846 =back
2847
2848 Makes the resultset paged and specifies the page to retrieve. Effectively
2849 identical to creating a non-pages resultset and then calling ->page($page)
2850 on it.
2851
2852 If L<rows> attribute is not specified it defualts to 10 rows per page.
2853
2854 =head2 rows
2855
2856 =over 4
2857
2858 =item Value: $rows
2859
2860 =back
2861
2862 Specifes the maximum number of rows for direct retrieval or the number of
2863 rows per page if the page attribute or method is used.
2864
2865 =head2 offset
2866
2867 =over 4
2868
2869 =item Value: $offset
2870
2871 =back
2872
2873 Specifies the (zero-based) row number for the  first row to be returned, or the
2874 of the first row of the first page if paging is used.
2875
2876 =head2 group_by
2877
2878 =over 4
2879
2880 =item Value: \@columns
2881
2882 =back
2883
2884 A arrayref of columns to group by. Can include columns of joined tables.
2885
2886   group_by => [qw/ column1 column2 ... /]
2887
2888 =head2 having
2889
2890 =over 4
2891
2892 =item Value: $condition
2893
2894 =back
2895
2896 HAVING is a select statement attribute that is applied between GROUP BY and
2897 ORDER BY. It is applied to the after the grouping calculations have been
2898 done.
2899
2900   having => { 'count(employee)' => { '>=', 100 } }
2901
2902 =head2 distinct
2903
2904 =over 4
2905
2906 =item Value: (0 | 1)
2907
2908 =back
2909
2910 Set to 1 to group by all columns.
2911
2912 =head2 where
2913
2914 =over 4
2915
2916 Adds to the WHERE clause.
2917
2918   # only return rows WHERE deleted IS NULL for all searches
2919   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2920
2921 Can be overridden by passing C<{ where => undef }> as an attribute
2922 to a resulset.
2923
2924 =back
2925
2926 =head2 cache
2927
2928 Set to 1 to cache search results. This prevents extra SQL queries if you
2929 revisit rows in your ResultSet:
2930
2931   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2932
2933   while( my $artist = $resultset->next ) {
2934     ... do stuff ...
2935   }
2936
2937   $rs->first; # without cache, this would issue a query
2938
2939 By default, searches are not cached.
2940
2941 For more examples of using these attributes, see
2942 L<DBIx::Class::Manual::Cookbook>.
2943
2944 =head2 from
2945
2946 =over 4
2947
2948 =item Value: \@from_clause
2949
2950 =back
2951
2952 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2953 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2954 clauses.
2955
2956 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2957
2958 C<join> will usually do what you need and it is strongly recommended that you
2959 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2960 And we really do mean "cannot", not just tried and failed. Attempting to use
2961 this because you're having problems with C<join> is like trying to use x86
2962 ASM because you've got a syntax error in your C. Trust us on this.
2963
2964 Now, if you're still really, really sure you need to use this (and if you're
2965 not 100% sure, ask the mailing list first), here's an explanation of how this
2966 works.
2967
2968 The syntax is as follows -
2969
2970   [
2971     { <alias1> => <table1> },
2972     [
2973       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2974       [], # nested JOIN (optional)
2975       { <table1.column1> => <table2.column2>, ... (more conditions) },
2976     ],
2977     # More of the above [ ] may follow for additional joins
2978   ]
2979
2980   <table1> <alias1>
2981   JOIN
2982     <table2> <alias2>
2983     [JOIN ...]
2984   ON <table1.column1> = <table2.column2>
2985   <more joins may follow>
2986
2987 An easy way to follow the examples below is to remember the following:
2988
2989     Anything inside "[]" is a JOIN
2990     Anything inside "{}" is a condition for the enclosing JOIN
2991
2992 The following examples utilize a "person" table in a family tree application.
2993 In order to express parent->child relationships, this table is self-joined:
2994
2995     # Person->belongs_to('father' => 'Person');
2996     # Person->belongs_to('mother' => 'Person');
2997
2998 C<from> can be used to nest joins. Here we return all children with a father,
2999 then search against all mothers of those children:
3000
3001   $rs = $schema->resultset('Person')->search(
3002       undef,
3003       {
3004           alias => 'mother', # alias columns in accordance with "from"
3005           from => [
3006               { mother => 'person' },
3007               [
3008                   [
3009                       { child => 'person' },
3010                       [
3011                           { father => 'person' },
3012                           { 'father.person_id' => 'child.father_id' }
3013                       ]
3014                   ],
3015                   { 'mother.person_id' => 'child.mother_id' }
3016               ],
3017           ]
3018       },
3019   );
3020
3021   # Equivalent SQL:
3022   # SELECT mother.* FROM person mother
3023   # JOIN (
3024   #   person child
3025   #   JOIN person father
3026   #   ON ( father.person_id = child.father_id )
3027   # )
3028   # ON ( mother.person_id = child.mother_id )
3029
3030 The type of any join can be controlled manually. To search against only people
3031 with a father in the person table, we could explicitly use C<INNER JOIN>:
3032
3033     $rs = $schema->resultset('Person')->search(
3034         undef,
3035         {
3036             alias => 'child', # alias columns in accordance with "from"
3037             from => [
3038                 { child => 'person' },
3039                 [
3040                     { father => 'person', -join_type => 'inner' },
3041                     { 'father.id' => 'child.father_id' }
3042                 ],
3043             ]
3044         },
3045     );
3046
3047     # Equivalent SQL:
3048     # SELECT child.* FROM person child
3049     # INNER JOIN person father ON child.father_id = father.id
3050
3051 If you need to express really complex joins or you need a subselect, you
3052 can supply literal SQL to C<from> via a scalar reference. In this case
3053 the contents of the scalar will replace the table name asscoiated with the
3054 resultsource.
3055
3056 WARNING: This technique might very well not work as expected on chained
3057 searches - you have been warned.
3058
3059     # Assuming the Event resultsource is defined as:
3060
3061         MySchema::Event->add_columns (
3062             sequence => {
3063                 data_type => 'INT',
3064                 is_auto_increment => 1,
3065             },
3066             location => {
3067                 data_type => 'INT',
3068             },
3069             type => {
3070                 data_type => 'INT',
3071             },
3072         );
3073         MySchema::Event->set_primary_key ('sequence');
3074
3075     # This will get back the latest event for every location. The column
3076     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3077     # combo to limit the resultset
3078
3079     $rs = $schema->resultset('Event');
3080     $table = $rs->result_source->name;
3081     $latest = $rs->search (
3082         undef,
3083         { from => \ " 
3084             (SELECT e1.* FROM $table e1 
3085                 JOIN $table e2 
3086                     ON e1.location = e2.location 
3087                     AND e1.sequence < e2.sequence 
3088                 WHERE e2.sequence is NULL 
3089             ) me",
3090         },
3091     );
3092
3093     # Equivalent SQL (with the DBIC chunks added):
3094
3095     SELECT me.sequence, me.location, me.type FROM
3096        (SELECT e1.* FROM events e1
3097            JOIN events e2
3098                ON e1.location = e2.location
3099                AND e1.sequence < e2.sequence
3100            WHERE e2.sequence is NULL
3101        ) me;
3102
3103 =head2 for
3104
3105 =over 4
3106
3107 =item Value: ( 'update' | 'shared' )
3108
3109 =back
3110
3111 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3112 ... FOR SHARED.
3113
3114 =cut
3115
3116 1;