99ce6ce55c190e4c95f22fe32358cfabd20a7739
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet is also an iterator. L</next> is used to return all the
50 L<DBIx::Class::Row>s the ResultSet represents.
51
52 The query that the ResultSet represents is B<only> executed against
53 the database when these methods are called:
54
55 =over
56
57 =item L</find>
58
59 =item L</next>
60
61 =item L</all>
62
63 =item L</count>
64
65 =item L</single>
66
67 =item L</first>
68
69 =back
70
71 =head1 EXAMPLES 
72
73 =head2 Chaining resultsets
74
75 Let's say you've got a query that needs to be run to return some data
76 to the user. But, you have an authorization system in place that
77 prevents certain users from seeing certain information. So, you want
78 to construct the basic query in one method, but add constraints to it in
79 another.
80
81   sub get_data {
82     my $self = shift;
83     my $request = $self->get_request; # Get a request object somehow.
84     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
85
86     my $cd_rs = $schema->resultset('CD')->search({
87       title => $request->param('title'),
88       year => $request->param('year'),
89     });
90
91     $self->apply_security_policy( $cd_rs );
92
93     return $cd_rs->all();
94   }
95
96   sub apply_security_policy {
97     my $self = shift;
98     my ($rs) = @_;
99
100     return $rs->search({
101       subversive => 0,
102     });
103   }
104
105 =head2 Multiple queries
106
107 Since a resultset just defines a query, you can do all sorts of
108 things with it with the same object.
109
110   # Don't hit the DB yet.
111   my $cd_rs = $schema->resultset('CD')->search({
112     title => 'something',
113     year => 2009,
114   });
115
116   # Each of these hits the DB individually.
117   my $count = $cd_rs->count;
118   my $most_recent = $cd_rs->get_column('date_released')->max();
119   my @records = $cd_rs->all;
120
121 And it's not just limited to SELECT statements.
122
123   $cd_rs->delete();
124
125 This is even cooler:
126
127   $cd_rs->create({ artist => 'Fred' });
128
129 Which is the same as:
130
131   $schema->resultset('CD')->create({
132     title => 'something',
133     year => 2009,
134     artist => 'Fred'
135   });
136
137 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
138
139 =head1 OVERLOADING
140
141 If a resultset is used in a numeric context it returns the L</count>.
142 However, if it is used in a booleand context it is always true.  So if
143 you want to check if a resultset has any results use C<if $rs != 0>.
144 C<if $rs> will always be true.
145
146 =head1 METHODS
147
148 =head2 new
149
150 =over 4
151
152 =item Arguments: $source, \%$attrs
153
154 =item Return Value: $rs
155
156 =back
157
158 The resultset constructor. Takes a source object (usually a
159 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
160 L</ATTRIBUTES> below).  Does not perform any queries -- these are
161 executed as needed by the other methods.
162
163 Generally you won't need to construct a resultset manually.  You'll
164 automatically get one from e.g. a L</search> called in scalar context:
165
166   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
167
168 IMPORTANT: If called on an object, proxies to new_result instead so
169
170   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
171
172 will return a CD object, not a ResultSet.
173
174 =cut
175
176 sub new {
177   my $class = shift;
178   return $class->new_result(@_) if ref $class;
179
180   my ($source, $attrs) = @_;
181   $source = $source->handle 
182     unless $source->isa('DBIx::Class::ResultSourceHandle');
183   $attrs = { %{$attrs||{}} };
184
185   if ($attrs->{page}) {
186     $attrs->{rows} ||= 10;
187   }
188
189   $attrs->{alias} ||= 'me';
190
191   # Creation of {} and bless separated to mitigate RH perl bug
192   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
193   my $self = {
194     _source_handle => $source,
195     cond => $attrs->{where},
196     count => undef,
197     pager => undef,
198     attrs => $attrs
199   };
200
201   bless $self, $class;
202
203   $self->result_class(
204     $attrs->{result_class} || $source->resolve->result_class
205   );
206
207   return $self;
208 }
209
210 =head2 search
211
212 =over 4
213
214 =item Arguments: $cond, \%attrs?
215
216 =item Return Value: $resultset (scalar context), @row_objs (list context)
217
218 =back
219
220   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
221   my $new_rs = $cd_rs->search({ year => 2005 });
222
223   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
224                  # year = 2005 OR year = 2004
225
226 If you need to pass in additional attributes but no additional condition,
227 call it as C<search(undef, \%attrs)>.
228
229   # "SELECT name, artistid FROM $artist_table"
230   my @all_artists = $schema->resultset('Artist')->search(undef, {
231     columns => [qw/name artistid/],
232   });
233
234 For a list of attributes that can be passed to C<search>, see
235 L</ATTRIBUTES>. For more examples of using this function, see
236 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
237 documentation for the first argument, see L<SQL::Abstract>.
238
239 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
240
241 =cut
242
243 sub search {
244   my $self = shift;
245   my $rs = $self->search_rs( @_ );
246   return (wantarray ? $rs->all : $rs);
247 }
248
249 =head2 search_rs
250
251 =over 4
252
253 =item Arguments: $cond, \%attrs?
254
255 =item Return Value: $resultset
256
257 =back
258
259 This method does the same exact thing as search() except it will
260 always return a resultset, even in list context.
261
262 =cut
263
264 sub search_rs {
265   my $self = shift;
266
267   my $attrs = {};
268   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
269   my $our_attrs = { %{$self->{attrs}} };
270   my $having = delete $our_attrs->{having};
271   my $where = delete $our_attrs->{where};
272
273   my $rows;
274
275   my %safe = (alias => 1, cache => 1);
276
277   unless (
278     (@_ && defined($_[0])) # @_ == () or (undef)
279     || 
280     (keys %$attrs # empty attrs or only 'safe' attrs
281     && List::Util::first { !$safe{$_} } keys %$attrs)
282   ) {
283     # no search, effectively just a clone
284     $rows = $self->get_cache;
285   }
286
287   my $new_attrs = { %{$our_attrs}, %{$attrs} };
288
289   # merge new attrs into inherited
290   foreach my $key (qw/join prefetch +select +as/) {
291     next unless exists $attrs->{$key};
292     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
293   }
294
295   my $cond = (@_
296     ? (
297         (@_ == 1 || ref $_[0] eq "HASH")
298           ? (
299               (ref $_[0] eq 'HASH')
300                 ? (
301                     (keys %{ $_[0] }  > 0)
302                       ? shift
303                       : undef
304                    )
305                 :  shift
306              )
307           : (
308               (@_ % 2)
309                 ? $self->throw_exception("Odd number of arguments to search")
310                 : {@_}
311              )
312       )
313     : undef
314   );
315
316   if (defined $where) {
317     $new_attrs->{where} = (
318       defined $new_attrs->{where}
319         ? { '-and' => [
320               map {
321                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
322               } $where, $new_attrs->{where}
323             ]
324           }
325         : $where);
326   }
327
328   if (defined $cond) {
329     $new_attrs->{where} = (
330       defined $new_attrs->{where}
331         ? { '-and' => [
332               map {
333                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
334               } $cond, $new_attrs->{where}
335             ]
336           }
337         : $cond);
338   }
339
340   if (defined $having) {
341     $new_attrs->{having} = (
342       defined $new_attrs->{having}
343         ? { '-and' => [
344               map {
345                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
346               } $having, $new_attrs->{having}
347             ]
348           }
349         : $having);
350   }
351
352   my $rs = (ref $self)->new($self->result_source, $new_attrs);
353   if ($rows) {
354     $rs->set_cache($rows);
355   }
356   return $rs;
357 }
358
359 =head2 search_literal
360
361 =over 4
362
363 =item Arguments: $sql_fragment, @bind_values
364
365 =item Return Value: $resultset (scalar context), @row_objs (list context)
366
367 =back
368
369   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
370   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
371
372 Pass a literal chunk of SQL to be added to the conditional part of the
373 resultset query.
374
375 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
376 only be used in that context. There are known problems using C<search_literal>
377 in chained queries; it can result in bind values in the wrong order.  See
378 L<DBIx::Class::Manual::Cookbook/Searching> and
379 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
380 require C<search_literal>.
381
382 =cut
383
384 sub search_literal {
385   my ($self, $cond, @vals) = @_;
386   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
387   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
388   return $self->search(\$cond, $attrs);
389 }
390
391 =head2 find
392
393 =over 4
394
395 =item Arguments: @values | \%cols, \%attrs?
396
397 =item Return Value: $row_object | undef
398
399 =back
400
401 Finds a row based on its primary key or unique constraint. For example, to find
402 a row by its primary key:
403
404   my $cd = $schema->resultset('CD')->find(5);
405
406 You can also find a row by a specific unique constraint using the C<key>
407 attribute. For example:
408
409   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
410     key => 'cd_artist_title'
411   });
412
413 Additionally, you can specify the columns explicitly by name:
414
415   my $cd = $schema->resultset('CD')->find(
416     {
417       artist => 'Massive Attack',
418       title  => 'Mezzanine',
419     },
420     { key => 'cd_artist_title' }
421   );
422
423 If the C<key> is specified as C<primary>, it searches only on the primary key.
424
425 If no C<key> is specified, it searches on all unique constraints defined on the
426 source for which column data is provided, including the primary key.
427
428 If your table does not have a primary key, you B<must> provide a value for the
429 C<key> attribute matching one of the unique constraints on the source.
430
431 In addition to C<key>, L</find> recognizes and applies standard
432 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
433
434 Note: If your query does not return only one row, a warning is generated:
435
436   Query returned more than one row
437
438 See also L</find_or_create> and L</update_or_create>. For information on how to
439 declare unique constraints, see
440 L<DBIx::Class::ResultSource/add_unique_constraint>.
441
442 =cut
443
444 sub find {
445   my $self = shift;
446   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
447
448   # Default to the primary key, but allow a specific key
449   my @cols = exists $attrs->{key}
450     ? $self->result_source->unique_constraint_columns($attrs->{key})
451     : $self->result_source->primary_columns;
452   $self->throw_exception(
453     "Can't find unless a primary key is defined or unique constraint is specified"
454   ) unless @cols;
455
456   # Parse out a hashref from input
457   my $input_query;
458   if (ref $_[0] eq 'HASH') {
459     $input_query = { %{$_[0]} };
460   }
461   elsif (@_ == @cols) {
462     $input_query = {};
463     @{$input_query}{@cols} = @_;
464   }
465   else {
466     # Compatibility: Allow e.g. find(id => $value)
467     carp "Find by key => value deprecated; please use a hashref instead";
468     $input_query = {@_};
469   }
470
471   my (%related, $info);
472
473   KEY: foreach my $key (keys %$input_query) {
474     if (ref($input_query->{$key})
475         && ($info = $self->result_source->relationship_info($key))) {
476       my $val = delete $input_query->{$key};
477       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
478       my $rel_q = $self->result_source->resolve_condition(
479                     $info->{cond}, $val, $key
480                   );
481       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
482       @related{keys %$rel_q} = values %$rel_q;
483     }
484   }
485   if (my @keys = keys %related) {
486     @{$input_query}{@keys} = values %related;
487   }
488
489
490   # Build the final query: Default to the disjunction of the unique queries,
491   # but allow the input query in case the ResultSet defines the query or the
492   # user is abusing find
493   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
494   my $query;
495   if (exists $attrs->{key}) {
496     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
497     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
498     $query = $self->_add_alias($unique_query, $alias);
499   }
500   else {
501     my @unique_queries = $self->_unique_queries($input_query, $attrs);
502     $query = @unique_queries
503       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
504       : $self->_add_alias($input_query, $alias);
505   }
506
507   # Run the query
508   if (keys %$attrs) {
509     my $rs = $self->search($query, $attrs);
510     if (keys %{$rs->_resolved_attrs->{collapse}}) {
511       my $row = $rs->next;
512       carp "Query returned more than one row" if $rs->next;
513       return $row;
514     }
515     else {
516       return $rs->single;
517     }
518   }
519   else {
520     if (keys %{$self->_resolved_attrs->{collapse}}) {
521       my $rs = $self->search($query);
522       my $row = $rs->next;
523       carp "Query returned more than one row" if $rs->next;
524       return $row;
525     }
526     else {
527       return $self->single($query);
528     }
529   }
530 }
531
532 # _add_alias
533 #
534 # Add the specified alias to the specified query hash. A copy is made so the
535 # original query is not modified.
536
537 sub _add_alias {
538   my ($self, $query, $alias) = @_;
539
540   my %aliased = %$query;
541   foreach my $col (grep { ! m/\./ } keys %aliased) {
542     $aliased{"$alias.$col"} = delete $aliased{$col};
543   }
544
545   return \%aliased;
546 }
547
548 # _unique_queries
549 #
550 # Build a list of queries which satisfy unique constraints.
551
552 sub _unique_queries {
553   my ($self, $query, $attrs) = @_;
554
555   my @constraint_names = exists $attrs->{key}
556     ? ($attrs->{key})
557     : $self->result_source->unique_constraint_names;
558
559   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
560   my $num_where = scalar keys %$where;
561
562   my @unique_queries;
563   foreach my $name (@constraint_names) {
564     my @unique_cols = $self->result_source->unique_constraint_columns($name);
565     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
566
567     my $num_cols = scalar @unique_cols;
568     my $num_query = scalar keys %$unique_query;
569
570     my $total = $num_query + $num_where;
571     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
572       # The query is either unique on its own or is unique in combination with
573       # the existing where clause
574       push @unique_queries, $unique_query;
575     }
576   }
577
578   return @unique_queries;
579 }
580
581 # _build_unique_query
582 #
583 # Constrain the specified query hash based on the specified column names.
584
585 sub _build_unique_query {
586   my ($self, $query, $unique_cols) = @_;
587
588   return {
589     map  { $_ => $query->{$_} }
590     grep { exists $query->{$_} }
591       @$unique_cols
592   };
593 }
594
595 =head2 search_related
596
597 =over 4
598
599 =item Arguments: $rel, $cond, \%attrs?
600
601 =item Return Value: $new_resultset
602
603 =back
604
605   $new_rs = $cd_rs->search_related('artist', {
606     name => 'Emo-R-Us',
607   });
608
609 Searches the specified relationship, optionally specifying a condition and
610 attributes for matching records. See L</ATTRIBUTES> for more information.
611
612 =cut
613
614 sub search_related {
615   return shift->related_resultset(shift)->search(@_);
616 }
617
618 =head2 search_related_rs
619
620 This method works exactly the same as search_related, except that
621 it guarantees a restultset, even in list context.
622
623 =cut
624
625 sub search_related_rs {
626   return shift->related_resultset(shift)->search_rs(@_);
627 }
628
629 =head2 cursor
630
631 =over 4
632
633 =item Arguments: none
634
635 =item Return Value: $cursor
636
637 =back
638
639 Returns a storage-driven cursor to the given resultset. See
640 L<DBIx::Class::Cursor> for more information.
641
642 =cut
643
644 sub cursor {
645   my ($self) = @_;
646
647   my $attrs = { %{$self->_resolved_attrs} };
648   return $self->{cursor}
649     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
650           $attrs->{where},$attrs);
651 }
652
653 =head2 single
654
655 =over 4
656
657 =item Arguments: $cond?
658
659 =item Return Value: $row_object?
660
661 =back
662
663   my $cd = $schema->resultset('CD')->single({ year => 2001 });
664
665 Inflates the first result without creating a cursor if the resultset has
666 any records in it; if not returns nothing. Used by L</find> as a lean version of
667 L</search>.
668
669 While this method can take an optional search condition (just like L</search>)
670 being a fast-code-path it does not recognize search attributes. If you need to
671 add extra joins or similar, call L</search> and then chain-call L</single> on the
672 L<DBIx::Class::ResultSet> returned.
673
674 =over
675
676 =item B<Note>
677
678 As of 0.08100, this method enforces the assumption that the preceeding
679 query returns only one row. If more than one row is returned, you will receive
680 a warning:
681
682   Query returned more than one row
683
684 In this case, you should be using L</first> or L</find> instead, or if you really
685 know what you are doing, use the L</rows> attribute to explicitly limit the size 
686 of the resultset.
687
688 =back
689
690 =cut
691
692 sub single {
693   my ($self, $where) = @_;
694   if(@_ > 2) {
695       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
696   }
697
698   my $attrs = { %{$self->_resolved_attrs} };
699   if ($where) {
700     if (defined $attrs->{where}) {
701       $attrs->{where} = {
702         '-and' =>
703             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
704                $where, delete $attrs->{where} ]
705       };
706     } else {
707       $attrs->{where} = $where;
708     }
709   }
710
711 #  XXX: Disabled since it doesn't infer uniqueness in all cases
712 #  unless ($self->_is_unique_query($attrs->{where})) {
713 #    carp "Query not guaranteed to return a single row"
714 #      . "; please declare your unique constraints or use search instead";
715 #  }
716
717   my @data = $self->result_source->storage->select_single(
718     $attrs->{from}, $attrs->{select},
719     $attrs->{where}, $attrs
720   );
721
722   return (@data ? ($self->_construct_object(@data))[0] : undef);
723 }
724
725 # _is_unique_query
726 #
727 # Try to determine if the specified query is guaranteed to be unique, based on
728 # the declared unique constraints.
729
730 sub _is_unique_query {
731   my ($self, $query) = @_;
732
733   my $collapsed = $self->_collapse_query($query);
734   my $alias = $self->{attrs}{alias};
735
736   foreach my $name ($self->result_source->unique_constraint_names) {
737     my @unique_cols = map {
738       "$alias.$_"
739     } $self->result_source->unique_constraint_columns($name);
740
741     # Count the values for each unique column
742     my %seen = map { $_ => 0 } @unique_cols;
743
744     foreach my $key (keys %$collapsed) {
745       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
746       next unless exists $seen{$aliased};  # Additional constraints are okay
747       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
748     }
749
750     # If we get 0 or more than 1 value for a column, it's not necessarily unique
751     return 1 unless grep { $_ != 1 } values %seen;
752   }
753
754   return 0;
755 }
756
757 # _collapse_query
758 #
759 # Recursively collapse the query, accumulating values for each column.
760
761 sub _collapse_query {
762   my ($self, $query, $collapsed) = @_;
763
764   $collapsed ||= {};
765
766   if (ref $query eq 'ARRAY') {
767     foreach my $subquery (@$query) {
768       next unless ref $subquery;  # -or
769 #      warn "ARRAY: " . Dumper $subquery;
770       $collapsed = $self->_collapse_query($subquery, $collapsed);
771     }
772   }
773   elsif (ref $query eq 'HASH') {
774     if (keys %$query and (keys %$query)[0] eq '-and') {
775       foreach my $subquery (@{$query->{-and}}) {
776 #        warn "HASH: " . Dumper $subquery;
777         $collapsed = $self->_collapse_query($subquery, $collapsed);
778       }
779     }
780     else {
781 #      warn "LEAF: " . Dumper $query;
782       foreach my $col (keys %$query) {
783         my $value = $query->{$col};
784         $collapsed->{$col}{$value}++;
785       }
786     }
787   }
788
789   return $collapsed;
790 }
791
792 =head2 get_column
793
794 =over 4
795
796 =item Arguments: $cond?
797
798 =item Return Value: $resultsetcolumn
799
800 =back
801
802   my $max_length = $rs->get_column('length')->max;
803
804 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
805
806 =cut
807
808 sub get_column {
809   my ($self, $column) = @_;
810   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
811   return $new;
812 }
813
814 =head2 search_like
815
816 =over 4
817
818 =item Arguments: $cond, \%attrs?
819
820 =item Return Value: $resultset (scalar context), @row_objs (list context)
821
822 =back
823
824   # WHERE title LIKE '%blue%'
825   $cd_rs = $rs->search_like({ title => '%blue%'});
826
827 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
828 that this is simply a convenience method retained for ex Class::DBI users.
829 You most likely want to use L</search> with specific operators.
830
831 For more information, see L<DBIx::Class::Manual::Cookbook>.
832
833 =cut
834
835 sub search_like {
836   my $class = shift;
837   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
839   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
840   return $class->search($query, { %$attrs });
841 }
842
843 =head2 slice
844
845 =over 4
846
847 =item Arguments: $first, $last
848
849 =item Return Value: $resultset (scalar context), @row_objs (list context)
850
851 =back
852
853 Returns a resultset or object list representing a subset of elements from the
854 resultset slice is called on. Indexes are from 0, i.e., to get the first
855 three records, call:
856
857   my ($one, $two, $three) = $rs->slice(0, 2);
858
859 =cut
860
861 sub slice {
862   my ($self, $min, $max) = @_;
863   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
864   $attrs->{offset} = $self->{attrs}{offset} || 0;
865   $attrs->{offset} += $min;
866   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
867   return $self->search(undef(), $attrs);
868   #my $slice = (ref $self)->new($self->result_source, $attrs);
869   #return (wantarray ? $slice->all : $slice);
870 }
871
872 =head2 next
873
874 =over 4
875
876 =item Arguments: none
877
878 =item Return Value: $result?
879
880 =back
881
882 Returns the next element in the resultset (C<undef> is there is none).
883
884 Can be used to efficiently iterate over records in the resultset:
885
886   my $rs = $schema->resultset('CD')->search;
887   while (my $cd = $rs->next) {
888     print $cd->title;
889   }
890
891 Note that you need to store the resultset object, and call C<next> on it.
892 Calling C<< resultset('Table')->next >> repeatedly will always return the
893 first record from the resultset.
894
895 =cut
896
897 sub next {
898   my ($self) = @_;
899   if (my $cache = $self->get_cache) {
900     $self->{all_cache_position} ||= 0;
901     return $cache->[$self->{all_cache_position}++];
902   }
903   if ($self->{attrs}{cache}) {
904     $self->{all_cache_position} = 1;
905     return ($self->all)[0];
906   }
907   if ($self->{stashed_objects}) {
908     my $obj = shift(@{$self->{stashed_objects}});
909     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
910     return $obj;
911   }
912   my @row = (
913     exists $self->{stashed_row}
914       ? @{delete $self->{stashed_row}}
915       : $self->cursor->next
916   );
917   return undef unless (@row);
918   my ($row, @more) = $self->_construct_object(@row);
919   $self->{stashed_objects} = \@more if @more;
920   return $row;
921 }
922
923 sub _construct_object {
924   my ($self, @row) = @_;
925   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
926   my @new = $self->result_class->inflate_result($self->result_source, @$info);
927   @new = $self->{_attrs}{record_filter}->(@new)
928     if exists $self->{_attrs}{record_filter};
929   return @new;
930 }
931
932 sub _collapse_result {
933   my ($self, $as_proto, $row) = @_;
934
935   my @copy = @$row;
936
937   # 'foo'         => [ undef, 'foo' ]
938   # 'foo.bar'     => [ 'foo', 'bar' ]
939   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
940
941   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
942
943   my %collapse = %{$self->{_attrs}{collapse}||{}};
944
945   my @pri_index;
946
947   # if we're doing collapsing (has_many prefetch) we need to grab records
948   # until the PK changes, so fill @pri_index. if not, we leave it empty so
949   # we know we don't have to bother.
950
951   # the reason for not using the collapse stuff directly is because if you
952   # had for e.g. two artists in a row with no cds, the collapse info for
953   # both would be NULL (undef) so you'd lose the second artist
954
955   # store just the index so we can check the array positions from the row
956   # without having to contruct the full hash
957
958   if (keys %collapse) {
959     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
960     foreach my $i (0 .. $#construct_as) {
961       next if defined($construct_as[$i][0]); # only self table
962       if (delete $pri{$construct_as[$i][1]}) {
963         push(@pri_index, $i);
964       }
965       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
966     }
967   }
968
969   # no need to do an if, it'll be empty if @pri_index is empty anyway
970
971   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
972
973   my @const_rows;
974
975   do { # no need to check anything at the front, we always want the first row
976
977     my %const;
978   
979     foreach my $this_as (@construct_as) {
980       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
981     }
982
983     push(@const_rows, \%const);
984
985   } until ( # no pri_index => no collapse => drop straight out
986       !@pri_index
987     or
988       do { # get another row, stash it, drop out if different PK
989
990         @copy = $self->cursor->next;
991         $self->{stashed_row} = \@copy;
992
993         # last thing in do block, counts as true if anything doesn't match
994
995         # check xor defined first for NULL vs. NOT NULL then if one is
996         # defined the other must be so check string equality
997
998         grep {
999           (defined $pri_vals{$_} ^ defined $copy[$_])
1000           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1001         } @pri_index;
1002       }
1003   );
1004
1005   my $alias = $self->{attrs}{alias};
1006   my $info = [];
1007
1008   my %collapse_pos;
1009
1010   my @const_keys;
1011
1012   foreach my $const (@const_rows) {
1013     scalar @const_keys or do {
1014       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1015     };
1016     foreach my $key (@const_keys) {
1017       if (length $key) {
1018         my $target = $info;
1019         my @parts = split(/\./, $key);
1020         my $cur = '';
1021         my $data = $const->{$key};
1022         foreach my $p (@parts) {
1023           $target = $target->[1]->{$p} ||= [];
1024           $cur .= ".${p}";
1025           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
1026             # collapsing at this point and on final part
1027             my $pos = $collapse_pos{$cur};
1028             CK: foreach my $ck (@ckey) {
1029               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1030                 $collapse_pos{$cur} = $data;
1031                 delete @collapse_pos{ # clear all positioning for sub-entries
1032                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1033                 };
1034                 push(@$target, []);
1035                 last CK;
1036               }
1037             }
1038           }
1039           if (exists $collapse{$cur}) {
1040             $target = $target->[-1];
1041           }
1042         }
1043         $target->[0] = $data;
1044       } else {
1045         $info->[0] = $const->{$key};
1046       }
1047     }
1048   }
1049
1050   return $info;
1051 }
1052
1053 =head2 result_source
1054
1055 =over 4
1056
1057 =item Arguments: $result_source?
1058
1059 =item Return Value: $result_source
1060
1061 =back
1062
1063 An accessor for the primary ResultSource object from which this ResultSet
1064 is derived.
1065
1066 =head2 result_class
1067
1068 =over 4
1069
1070 =item Arguments: $result_class?
1071
1072 =item Return Value: $result_class
1073
1074 =back
1075
1076 An accessor for the class to use when creating row objects. Defaults to 
1077 C<< result_source->result_class >> - which in most cases is the name of the 
1078 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1079
1080 =cut
1081
1082 sub result_class {
1083   my ($self, $result_class) = @_;
1084   if ($result_class) {
1085     $self->ensure_class_loaded($result_class);
1086     $self->_result_class($result_class);
1087   }
1088   $self->_result_class;
1089 }
1090
1091 =head2 count
1092
1093 =over 4
1094
1095 =item Arguments: $cond, \%attrs??
1096
1097 =item Return Value: $count
1098
1099 =back
1100
1101 Performs an SQL C<COUNT> with the same query as the resultset was built
1102 with to find the number of elements. If passed arguments, does a search
1103 on the resultset and counts the results of that.
1104
1105 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1106 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1107 not support C<DISTINCT> with multiple columns. If you are using such a
1108 database, you should only use columns from the main table in your C<group_by>
1109 clause.
1110
1111 =cut
1112
1113 sub count {
1114   my $self = shift;
1115   return $self->search(@_)->count if @_ and defined $_[0];
1116   return scalar @{ $self->get_cache } if $self->get_cache;
1117   my $count = $self->_count;
1118   return 0 unless $count;
1119
1120   # need to take offset from resolved attrs
1121
1122   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1123   $count = $self->{attrs}{rows} if
1124     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1125   $count = 0 if ($count < 0);
1126   return $count;
1127 }
1128
1129 sub _count { # Separated out so pager can get the full count
1130   my $self = shift;
1131   my $attrs = { %{$self->_resolved_attrs} };
1132
1133   if (my $group_by = $attrs->{group_by}) {
1134     delete $attrs->{having};
1135     delete $attrs->{order_by};
1136     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1137     # todo: try CONCAT for multi-column pk
1138     my @pk = $self->result_source->primary_columns;
1139     if (@pk == 1) {
1140       my $alias = $attrs->{alias};
1141       foreach my $column (@distinct) {
1142         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1143           @distinct = ($column);
1144           last;
1145         }
1146       }
1147     }
1148
1149     $attrs->{select} = $group_by; 
1150     $attrs->{from} = [ { 'mesub' => (ref $self)->new($self->result_source, $attrs)->cursor->as_query } ];
1151   }
1152
1153   $attrs->{select} = { count => '*' };
1154   $attrs->{as} = [qw/count/];
1155
1156   # offset, order by, group by, where and page are not needed to count. record_filter is cdbi
1157   delete $attrs->{$_} for qw/rows offset order_by group_by where page pager record_filter/;
1158
1159   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1160   my ($count) = $tmp_rs->cursor->next;
1161   return $count;
1162 }
1163
1164 sub _bool {
1165   return 1;
1166 }
1167
1168 =head2 count_literal
1169
1170 =over 4
1171
1172 =item Arguments: $sql_fragment, @bind_values
1173
1174 =item Return Value: $count
1175
1176 =back
1177
1178 Counts the results in a literal query. Equivalent to calling L</search_literal>
1179 with the passed arguments, then L</count>.
1180
1181 =cut
1182
1183 sub count_literal { shift->search_literal(@_)->count; }
1184
1185 =head2 all
1186
1187 =over 4
1188
1189 =item Arguments: none
1190
1191 =item Return Value: @objects
1192
1193 =back
1194
1195 Returns all elements in the resultset. Called implicitly if the resultset
1196 is returned in list context.
1197
1198 =cut
1199
1200 sub all {
1201   my $self = shift;
1202   if(@_) {
1203       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1204   }
1205
1206   return @{ $self->get_cache } if $self->get_cache;
1207
1208   my @obj;
1209
1210   # TODO: don't call resolve here
1211   if (keys %{$self->_resolved_attrs->{collapse}}) {
1212 #  if ($self->{attrs}{prefetch}) {
1213       # Using $self->cursor->all is really just an optimisation.
1214       # If we're collapsing has_many prefetches it probably makes
1215       # very little difference, and this is cleaner than hacking
1216       # _construct_object to survive the approach
1217     my @row = $self->cursor->next;
1218     while (@row) {
1219       push(@obj, $self->_construct_object(@row));
1220       @row = (exists $self->{stashed_row}
1221                ? @{delete $self->{stashed_row}}
1222                : $self->cursor->next);
1223     }
1224   } else {
1225     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1226   }
1227
1228   $self->set_cache(\@obj) if $self->{attrs}{cache};
1229   return @obj;
1230 }
1231
1232 =head2 reset
1233
1234 =over 4
1235
1236 =item Arguments: none
1237
1238 =item Return Value: $self
1239
1240 =back
1241
1242 Resets the resultset's cursor, so you can iterate through the elements again.
1243
1244 =cut
1245
1246 sub reset {
1247   my ($self) = @_;
1248   delete $self->{_attrs} if exists $self->{_attrs};
1249   $self->{all_cache_position} = 0;
1250   $self->cursor->reset;
1251   return $self;
1252 }
1253
1254 =head2 first
1255
1256 =over 4
1257
1258 =item Arguments: none
1259
1260 =item Return Value: $object?
1261
1262 =back
1263
1264 Resets the resultset and returns an object for the first result (if the
1265 resultset returns anything).
1266
1267 =cut
1268
1269 sub first {
1270   return $_[0]->reset->next;
1271 }
1272
1273 # _cond_for_update_delete
1274 #
1275 # update/delete require the condition to be modified to handle
1276 # the differing SQL syntax available.  This transforms the $self->{cond}
1277 # appropriately, returning the new condition.
1278
1279 sub _cond_for_update_delete {
1280   my ($self, $full_cond) = @_;
1281   my $cond = {};
1282
1283   $full_cond ||= $self->{cond};
1284   # No-op. No condition, we're updating/deleting everything
1285   return $cond unless ref $full_cond;
1286
1287   if (ref $full_cond eq 'ARRAY') {
1288     $cond = [
1289       map {
1290         my %hash;
1291         foreach my $key (keys %{$_}) {
1292           $key =~ /([^.]+)$/;
1293           $hash{$1} = $_->{$key};
1294         }
1295         \%hash;
1296       } @{$full_cond}
1297     ];
1298   }
1299   elsif (ref $full_cond eq 'HASH') {
1300     if ((keys %{$full_cond})[0] eq '-and') {
1301       $cond->{-and} = [];
1302
1303       my @cond = @{$full_cond->{-and}};
1304       for (my $i = 0; $i < @cond; $i++) {
1305         my $entry = $cond[$i];
1306
1307         my $hash;
1308         if (ref $entry eq 'HASH') {
1309           $hash = $self->_cond_for_update_delete($entry);
1310         }
1311         else {
1312           $entry =~ /([^.]+)$/;
1313           $hash->{$1} = $cond[++$i];
1314         }
1315
1316         push @{$cond->{-and}}, $hash;
1317       }
1318     }
1319     else {
1320       foreach my $key (keys %{$full_cond}) {
1321         $key =~ /([^.]+)$/;
1322         $cond->{$1} = $full_cond->{$key};
1323       }
1324     }
1325   }
1326   else {
1327     $self->throw_exception(
1328       "Can't update/delete on resultset with condition unless hash or array"
1329     );
1330   }
1331
1332   return $cond;
1333 }
1334
1335
1336 =head2 update
1337
1338 =over 4
1339
1340 =item Arguments: \%values
1341
1342 =item Return Value: $storage_rv
1343
1344 =back
1345
1346 Sets the specified columns in the resultset to the supplied values in a
1347 single query. Return value will be true if the update succeeded or false
1348 if no records were updated; exact type of success value is storage-dependent.
1349
1350 =cut
1351
1352 sub update {
1353   my ($self, $values) = @_;
1354   $self->throw_exception("Values for update must be a hash")
1355     unless ref $values eq 'HASH';
1356
1357   carp(   'WARNING! Currently $rs->update() does not generate proper SQL'
1358         . ' on joined resultsets, and may affect rows well outside of the'
1359         . ' contents of $rs. Use at your own risk' )
1360     if ( $self->{attrs}{seen_join} );
1361
1362   my $cond = $self->_cond_for_update_delete;
1363    
1364   return $self->result_source->storage->update(
1365     $self->result_source, $values, $cond
1366   );
1367 }
1368
1369 =head2 update_all
1370
1371 =over 4
1372
1373 =item Arguments: \%values
1374
1375 =item Return Value: 1
1376
1377 =back
1378
1379 Fetches all objects and updates them one at a time. Note that C<update_all>
1380 will run DBIC cascade triggers, while L</update> will not.
1381
1382 =cut
1383
1384 sub update_all {
1385   my ($self, $values) = @_;
1386   $self->throw_exception("Values for update must be a hash")
1387     unless ref $values eq 'HASH';
1388   foreach my $obj ($self->all) {
1389     $obj->set_columns($values)->update;
1390   }
1391   return 1;
1392 }
1393
1394 =head2 delete
1395
1396 =over 4
1397
1398 =item Arguments: none
1399
1400 =item Return Value: 1
1401
1402 =back
1403
1404 Deletes the contents of the resultset from its result source. Note that this
1405 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1406 to run. See also L<DBIx::Class::Row/delete>.
1407
1408 delete may not generate correct SQL for a query with joins or a resultset
1409 chained from a related resultset.  In this case it will generate a warning:-
1410
1411   WARNING! Currently $rs->delete() does not generate proper SQL on
1412   joined resultsets, and may delete rows well outside of the contents
1413   of $rs. Use at your own risk
1414
1415 In these cases you may find that delete_all is more appropriate, or you
1416 need to respecify your query in a way that can be expressed without a join.
1417
1418 =cut
1419
1420 sub delete {
1421   my ($self) = @_;
1422   $self->throw_exception("Delete should not be passed any arguments")
1423     if $_[1];
1424   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1425         . ' on joined resultsets, and may delete rows well outside of the'
1426         . ' contents of $rs. Use at your own risk' )
1427     if ( $self->{attrs}{seen_join} );
1428   my $cond = $self->_cond_for_update_delete;
1429
1430   $self->result_source->storage->delete($self->result_source, $cond);
1431   return 1;
1432 }
1433
1434 =head2 delete_all
1435
1436 =over 4
1437
1438 =item Arguments: none
1439
1440 =item Return Value: 1
1441
1442 =back
1443
1444 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1445 will run DBIC cascade triggers, while L</delete> will not.
1446
1447 =cut
1448
1449 sub delete_all {
1450   my ($self) = @_;
1451   $_->delete for $self->all;
1452   return 1;
1453 }
1454
1455 =head2 populate
1456
1457 =over 4
1458
1459 =item Arguments: \@data;
1460
1461 =back
1462
1463 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1464 For the arrayref of hashrefs style each hashref should be a structure suitable
1465 forsubmitting to a $resultset->create(...) method.
1466
1467 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1468 to insert the data, as this is a faster method.  
1469
1470 Otherwise, each set of data is inserted into the database using
1471 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1472 objects is returned.
1473
1474 Example:  Assuming an Artist Class that has many CDs Classes relating:
1475
1476   my $Artist_rs = $schema->resultset("Artist");
1477   
1478   ## Void Context Example 
1479   $Artist_rs->populate([
1480      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1481         { title => 'My First CD', year => 2006 },
1482         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1483       ],
1484      },
1485      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1486         { title => 'My parents sold me to a record company' ,year => 2005 },
1487         { title => 'Why Am I So Ugly?', year => 2006 },
1488         { title => 'I Got Surgery and am now Popular', year => 2007 }
1489       ],
1490      },
1491   ]);
1492   
1493   ## Array Context Example
1494   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1495     { name => "Artist One"},
1496     { name => "Artist Two"},
1497     { name => "Artist Three", cds=> [
1498     { title => "First CD", year => 2007},
1499     { title => "Second CD", year => 2008},
1500   ]}
1501   ]);
1502   
1503   print $ArtistOne->name; ## response is 'Artist One'
1504   print $ArtistThree->cds->count ## reponse is '2'
1505
1506 For the arrayref of arrayrefs style,  the first element should be a list of the
1507 fieldsnames to which the remaining elements are rows being inserted.  For
1508 example:
1509
1510   $Arstist_rs->populate([
1511     [qw/artistid name/],
1512     [100, 'A Formally Unknown Singer'],
1513     [101, 'A singer that jumped the shark two albums ago'],
1514     [102, 'An actually cool singer.'],
1515   ]);
1516
1517 Please note an important effect on your data when choosing between void and
1518 wantarray context. Since void context goes straight to C<insert_bulk> in 
1519 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1520 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1521 create primary keys for you, you will find that your PKs are empty.  In this 
1522 case you will have to use the wantarray context in order to create those 
1523 values.
1524
1525 =cut
1526
1527 sub populate {
1528   my $self = shift @_;
1529   my $data = ref $_[0][0] eq 'HASH'
1530     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1531     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1532   
1533   if(defined wantarray) {
1534     my @created;
1535     foreach my $item (@$data) {
1536       push(@created, $self->create($item));
1537     }
1538     return @created;
1539   } else {
1540     my ($first, @rest) = @$data;
1541
1542     my @names = grep {!ref $first->{$_}} keys %$first;
1543     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1544     my @pks = $self->result_source->primary_columns;  
1545
1546     ## do the belongs_to relationships  
1547     foreach my $index (0..$#$data) {
1548       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1549         my @ret = $self->populate($data);
1550         return;
1551       }
1552     
1553       foreach my $rel (@rels) {
1554         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1555         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1556         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1557         my $related = $result->result_source->resolve_condition(
1558           $result->result_source->relationship_info($reverse)->{cond},
1559           $self,        
1560           $result,        
1561         );
1562
1563         delete $data->[$index]->{$rel};
1564         $data->[$index] = {%{$data->[$index]}, %$related};
1565       
1566         push @names, keys %$related if $index == 0;
1567       }
1568     }
1569
1570     ## do bulk insert on current row
1571     my @values = map { [ @$_{@names} ] } @$data;
1572
1573     $self->result_source->storage->insert_bulk(
1574       $self->result_source, 
1575       \@names, 
1576       \@values,
1577     );
1578
1579     ## do the has_many relationships
1580     foreach my $item (@$data) {
1581
1582       foreach my $rel (@rels) {
1583         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1584
1585         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1586      || $self->throw_exception('Cannot find the relating object.');
1587      
1588         my $child = $parent->$rel;
1589     
1590         my $related = $child->result_source->resolve_condition(
1591           $parent->result_source->relationship_info($rel)->{cond},
1592           $child,
1593           $parent,
1594         );
1595
1596         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1597         my @populate = map { {%$_, %$related} } @rows_to_add;
1598
1599         $child->populate( \@populate );
1600       }
1601     }
1602   }
1603 }
1604
1605 =head2 _normalize_populate_args ($args)
1606
1607 Private method used by L</populate> to normalize its incoming arguments.  Factored
1608 out in case you want to subclass and accept new argument structures to the
1609 L</populate> method.
1610
1611 =cut
1612
1613 sub _normalize_populate_args {
1614   my ($self, $data) = @_;
1615   my @names = @{shift(@$data)};
1616   my @results_to_create;
1617   foreach my $datum (@$data) {
1618     my %result_to_create;
1619     foreach my $index (0..$#names) {
1620       $result_to_create{$names[$index]} = $$datum[$index];
1621     }
1622     push @results_to_create, \%result_to_create;    
1623   }
1624   return \@results_to_create;
1625 }
1626
1627 =head2 pager
1628
1629 =over 4
1630
1631 =item Arguments: none
1632
1633 =item Return Value: $pager
1634
1635 =back
1636
1637 Return Value a L<Data::Page> object for the current resultset. Only makes
1638 sense for queries with a C<page> attribute.
1639
1640 =cut
1641
1642 sub pager {
1643   my ($self) = @_;
1644   my $attrs = $self->{attrs};
1645   $self->throw_exception("Can't create pager for non-paged rs")
1646     unless $self->{attrs}{page};
1647   $attrs->{rows} ||= 10;
1648   return $self->{pager} ||= Data::Page->new(
1649     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1650 }
1651
1652 =head2 page
1653
1654 =over 4
1655
1656 =item Arguments: $page_number
1657
1658 =item Return Value: $rs
1659
1660 =back
1661
1662 Returns a resultset for the $page_number page of the resultset on which page
1663 is called, where each page contains a number of rows equal to the 'rows'
1664 attribute set on the resultset (10 by default).
1665
1666 =cut
1667
1668 sub page {
1669   my ($self, $page) = @_;
1670   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1671 }
1672
1673 =head2 new_result
1674
1675 =over 4
1676
1677 =item Arguments: \%vals
1678
1679 =item Return Value: $rowobject
1680
1681 =back
1682
1683 Creates a new row object in the resultset's result class and returns
1684 it. The row is not inserted into the database at this point, call
1685 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1686 will tell you whether the row object has been inserted or not.
1687
1688 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1689
1690 =cut
1691
1692 sub new_result {
1693   my ($self, $values) = @_;
1694   $self->throw_exception( "new_result needs a hash" )
1695     unless (ref $values eq 'HASH');
1696
1697   my %new;
1698   my $alias = $self->{attrs}{alias};
1699
1700   if (
1701     defined $self->{cond}
1702     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1703   ) {
1704     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
1705     $new{-from_resultset} = [ keys %new ] if keys %new;
1706   } else {
1707     $self->throw_exception(
1708       "Can't abstract implicit construct, condition not a hash"
1709     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1710   
1711     my $collapsed_cond = (
1712       $self->{cond}
1713         ? $self->_collapse_cond($self->{cond})
1714         : {}
1715     );
1716   
1717     # precendence must be given to passed values over values inherited from
1718     # the cond, so the order here is important.
1719     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1720     while( my($col,$value) = each %implied ){
1721       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1722         $new{$col} = $value->{'='};
1723         next;
1724       }
1725       $new{$col} = $value if $self->_is_deterministic_value($value);
1726     }
1727   }
1728
1729   %new = (
1730     %new,
1731     %{ $self->_remove_alias($values, $alias) },
1732     -source_handle => $self->_source_handle,
1733     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1734   );
1735
1736   return $self->result_class->new(\%new);
1737 }
1738
1739 # _is_deterministic_value
1740 #
1741 # Make an effor to strip non-deterministic values from the condition, 
1742 # to make sure new_result chokes less
1743
1744 sub _is_deterministic_value {
1745   my $self = shift;
1746   my $value = shift;
1747   my $ref_type = ref $value;
1748   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1749   return 1 if Scalar::Util::blessed($value);
1750   return 0;
1751 }
1752
1753 # _collapse_cond
1754 #
1755 # Recursively collapse the condition.
1756
1757 sub _collapse_cond {
1758   my ($self, $cond, $collapsed) = @_;
1759
1760   $collapsed ||= {};
1761
1762   if (ref $cond eq 'ARRAY') {
1763     foreach my $subcond (@$cond) {
1764       next unless ref $subcond;  # -or
1765 #      warn "ARRAY: " . Dumper $subcond;
1766       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1767     }
1768   }
1769   elsif (ref $cond eq 'HASH') {
1770     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1771       foreach my $subcond (@{$cond->{-and}}) {
1772 #        warn "HASH: " . Dumper $subcond;
1773         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1774       }
1775     }
1776     else {
1777 #      warn "LEAF: " . Dumper $cond;
1778       foreach my $col (keys %$cond) {
1779         my $value = $cond->{$col};
1780         $collapsed->{$col} = $value;
1781       }
1782     }
1783   }
1784
1785   return $collapsed;
1786 }
1787
1788 # _remove_alias
1789 #
1790 # Remove the specified alias from the specified query hash. A copy is made so
1791 # the original query is not modified.
1792
1793 sub _remove_alias {
1794   my ($self, $query, $alias) = @_;
1795
1796   my %orig = %{ $query || {} };
1797   my %unaliased;
1798
1799   foreach my $key (keys %orig) {
1800     if ($key !~ /\./) {
1801       $unaliased{$key} = $orig{$key};
1802       next;
1803     }
1804     $unaliased{$1} = $orig{$key}
1805       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1806   }
1807
1808   return \%unaliased;
1809 }
1810
1811 =head2 as_query (EXPERIMENTAL)
1812
1813 =over 4
1814
1815 =item Arguments: none
1816
1817 =item Return Value: \[ $sql, @bind ]
1818
1819 =back
1820
1821 Returns the SQL query and bind vars associated with the invocant.
1822
1823 This is generally used as the RHS for a subquery.
1824
1825 B<NOTE>: This feature is still experimental.
1826
1827 =cut
1828
1829 sub as_query { return shift->cursor->as_query(@_) }
1830
1831 =head2 find_or_new
1832
1833 =over 4
1834
1835 =item Arguments: \%vals, \%attrs?
1836
1837 =item Return Value: $rowobject
1838
1839 =back
1840
1841   my $artist = $schema->resultset('Artist')->find_or_new(
1842     { artist => 'fred' }, { key => 'artists' });
1843
1844   $cd->cd_to_producer->find_or_new({ producer => $producer },
1845                                    { key => 'primary });
1846
1847 Find an existing record from this resultset, based on its primary
1848 key, or a unique constraint. If none exists, instantiate a new result
1849 object and return it. The object will not be saved into your storage
1850 until you call L<DBIx::Class::Row/insert> on it.
1851
1852 You most likely want this method when looking for existing rows using
1853 a unique constraint that is not the primary key, or looking for
1854 related rows.
1855
1856 If you want objects to be saved immediately, use L</find_or_create> instead.
1857
1858 B<Note>: C<find_or_new> is probably not what you want when creating a
1859 new row in a table that uses primary keys supplied by the
1860 database. Passing in a primary key column with a value of I<undef>
1861 will cause L</find> to attempt to search for a row with a value of
1862 I<NULL>.
1863
1864 =cut
1865
1866 sub find_or_new {
1867   my $self     = shift;
1868   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1869   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1870   my $exists   = $self->find($hash, $attrs);
1871   return defined $exists ? $exists : $self->new_result($hash);
1872 }
1873
1874 =head2 create
1875
1876 =over 4
1877
1878 =item Arguments: \%vals
1879
1880 =item Return Value: a L<DBIx::Class::Row> $object
1881
1882 =back
1883
1884 Attempt to create a single new row or a row with multiple related rows
1885 in the table represented by the resultset (and related tables). This
1886 will not check for duplicate rows before inserting, use
1887 L</find_or_create> to do that.
1888
1889 To create one row for this resultset, pass a hashref of key/value
1890 pairs representing the columns of the table and the values you wish to
1891 store. If the appropriate relationships are set up, foreign key fields
1892 can also be passed an object representing the foreign row, and the
1893 value will be set to its primary key.
1894
1895 To create related objects, pass a hashref for the value if the related
1896 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1897 and use the name of the relationship as the key. (NOT the name of the field,
1898 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1899 of hashrefs containing the data for each of the rows to create in the foreign
1900 tables, again using the relationship name as the key.
1901
1902 Instead of hashrefs of plain related data (key/value pairs), you may
1903 also pass new or inserted objects. New objects (not inserted yet, see
1904 L</new>), will be inserted into their appropriate tables.
1905
1906 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1907
1908 Example of creating a new row.
1909
1910   $person_rs->create({
1911     name=>"Some Person",
1912     email=>"somebody@someplace.com"
1913   });
1914   
1915 Example of creating a new row and also creating rows in a related C<has_many>
1916 or C<has_one> resultset.  Note Arrayref.
1917
1918   $artist_rs->create(
1919      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1920         { title => 'My First CD', year => 2006 },
1921         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1922       ],
1923      },
1924   );
1925
1926 Example of creating a new row and also creating a row in a related
1927 C<belongs_to>resultset. Note Hashref.
1928
1929   $cd_rs->create({
1930     title=>"Music for Silly Walks",
1931     year=>2000,
1932     artist => {
1933       name=>"Silly Musician",
1934     }
1935   });
1936
1937 =cut
1938
1939 sub create {
1940   my ($self, $attrs) = @_;
1941   $self->throw_exception( "create needs a hashref" )
1942     unless ref $attrs eq 'HASH';
1943   return $self->new_result($attrs)->insert;
1944 }
1945
1946 =head2 find_or_create
1947
1948 =over 4
1949
1950 =item Arguments: \%vals, \%attrs?
1951
1952 =item Return Value: $rowobject
1953
1954 =back
1955
1956   $cd->cd_to_producer->find_or_create({ producer => $producer },
1957                                       { key => 'primary });
1958
1959 Tries to find a record based on its primary key or unique constraints; if none
1960 is found, creates one and returns that instead.
1961
1962   my $cd = $schema->resultset('CD')->find_or_create({
1963     cdid   => 5,
1964     artist => 'Massive Attack',
1965     title  => 'Mezzanine',
1966     year   => 2005,
1967   });
1968
1969 Also takes an optional C<key> attribute, to search by a specific key or unique
1970 constraint. For example:
1971
1972   my $cd = $schema->resultset('CD')->find_or_create(
1973     {
1974       artist => 'Massive Attack',
1975       title  => 'Mezzanine',
1976     },
1977     { key => 'cd_artist_title' }
1978   );
1979
1980 B<Note>: Because find_or_create() reads from the database and then
1981 possibly inserts based on the result, this method is subject to a race
1982 condition. Another process could create a record in the table after
1983 the find has completed and before the create has started. To avoid
1984 this problem, use find_or_create() inside a transaction.
1985
1986 B<Note>: C<find_or_create> is probably not what you want when creating
1987 a new row in a table that uses primary keys supplied by the
1988 database. Passing in a primary key column with a value of I<undef>
1989 will cause L</find> to attempt to search for a row with a value of
1990 I<NULL>.
1991
1992 See also L</find> and L</update_or_create>. For information on how to declare
1993 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1994
1995 =cut
1996
1997 sub find_or_create {
1998   my $self     = shift;
1999   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2000   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2001   my $exists   = $self->find($hash, $attrs);
2002   return defined $exists ? $exists : $self->create($hash);
2003 }
2004
2005 =head2 update_or_create
2006
2007 =over 4
2008
2009 =item Arguments: \%col_values, { key => $unique_constraint }?
2010
2011 =item Return Value: $rowobject
2012
2013 =back
2014
2015   $resultset->update_or_create({ col => $val, ... });
2016
2017 First, searches for an existing row matching one of the unique constraints
2018 (including the primary key) on the source of this resultset. If a row is
2019 found, updates it with the other given column values. Otherwise, creates a new
2020 row.
2021
2022 Takes an optional C<key> attribute to search on a specific unique constraint.
2023 For example:
2024
2025   # In your application
2026   my $cd = $schema->resultset('CD')->update_or_create(
2027     {
2028       artist => 'Massive Attack',
2029       title  => 'Mezzanine',
2030       year   => 1998,
2031     },
2032     { key => 'cd_artist_title' }
2033   );
2034
2035   $cd->cd_to_producer->update_or_create({ 
2036     producer => $producer, 
2037     name => 'harry',
2038   }, { 
2039     key => 'primary,
2040   });
2041
2042
2043 If no C<key> is specified, it searches on all unique constraints defined on the
2044 source, including the primary key.
2045
2046 If the C<key> is specified as C<primary>, it searches only on the primary key.
2047
2048 See also L</find> and L</find_or_create>. For information on how to declare
2049 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2050
2051 B<Note>: C<update_or_create> is probably not what you want when
2052 looking for a row in a table that uses primary keys supplied by the
2053 database, unless you actually have a key value. Passing in a primary
2054 key column with a value of I<undef> will cause L</find> to attempt to
2055 search for a row with a value of I<NULL>.
2056
2057 =cut
2058
2059 sub update_or_create {
2060   my $self = shift;
2061   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2062   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2063
2064   my $row = $self->find($cond, $attrs);
2065   if (defined $row) {
2066     $row->update($cond);
2067     return $row;
2068   }
2069
2070   return $self->create($cond);
2071 }
2072
2073 =head2 get_cache
2074
2075 =over 4
2076
2077 =item Arguments: none
2078
2079 =item Return Value: \@cache_objects?
2080
2081 =back
2082
2083 Gets the contents of the cache for the resultset, if the cache is set.
2084
2085 The cache is populated either by using the L</prefetch> attribute to
2086 L</search> or by calling L</set_cache>.
2087
2088 =cut
2089
2090 sub get_cache {
2091   shift->{all_cache};
2092 }
2093
2094 =head2 set_cache
2095
2096 =over 4
2097
2098 =item Arguments: \@cache_objects
2099
2100 =item Return Value: \@cache_objects
2101
2102 =back
2103
2104 Sets the contents of the cache for the resultset. Expects an arrayref
2105 of objects of the same class as those produced by the resultset. Note that
2106 if the cache is set the resultset will return the cached objects rather
2107 than re-querying the database even if the cache attr is not set.
2108
2109 The contents of the cache can also be populated by using the
2110 L</prefetch> attribute to L</search>.
2111
2112 =cut
2113
2114 sub set_cache {
2115   my ( $self, $data ) = @_;
2116   $self->throw_exception("set_cache requires an arrayref")
2117       if defined($data) && (ref $data ne 'ARRAY');
2118   $self->{all_cache} = $data;
2119 }
2120
2121 =head2 clear_cache
2122
2123 =over 4
2124
2125 =item Arguments: none
2126
2127 =item Return Value: []
2128
2129 =back
2130
2131 Clears the cache for the resultset.
2132
2133 =cut
2134
2135 sub clear_cache {
2136   shift->set_cache(undef);
2137 }
2138
2139 =head2 related_resultset
2140
2141 =over 4
2142
2143 =item Arguments: $relationship_name
2144
2145 =item Return Value: $resultset
2146
2147 =back
2148
2149 Returns a related resultset for the supplied relationship name.
2150
2151   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2152
2153 =cut
2154
2155 sub related_resultset {
2156   my ($self, $rel) = @_;
2157
2158   $self->{related_resultsets} ||= {};
2159   return $self->{related_resultsets}{$rel} ||= do {
2160     my $rel_obj = $self->result_source->relationship_info($rel);
2161
2162     $self->throw_exception(
2163       "search_related: result source '" . $self->result_source->source_name .
2164         "' has no such relationship $rel")
2165       unless $rel_obj;
2166     
2167     my ($from,$seen) = $self->_resolve_from($rel);
2168
2169     my $join_count = $seen->{$rel};
2170     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2171
2172     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2173     my %attrs = %{$self->{attrs}||{}};
2174     delete @attrs{qw(result_class alias)};
2175
2176     my $new_cache;
2177
2178     if (my $cache = $self->get_cache) {
2179       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2180         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2181                         @$cache ];
2182       }
2183     }
2184
2185     my $rel_source = $self->result_source->related_source($rel);
2186
2187     my $new = do {
2188
2189       # The reason we do this now instead of passing the alias to the
2190       # search_rs below is that if you wrap/overload resultset on the
2191       # source you need to know what alias it's -going- to have for things
2192       # to work sanely (e.g. RestrictWithObject wants to be able to add
2193       # extra query restrictions, and these may need to be $alias.)
2194
2195       my $attrs = $rel_source->resultset_attributes;
2196       local $attrs->{alias} = $alias;
2197
2198       $rel_source->resultset
2199                  ->search_rs(
2200                      undef, {
2201                        %attrs,
2202                        join => undef,
2203                        prefetch => undef,
2204                        select => undef,
2205                        as => undef,
2206                        where => $self->{cond},
2207                        seen_join => $seen,
2208                        from => $from,
2209                    });
2210     };
2211     $new->set_cache($new_cache) if $new_cache;
2212     $new;
2213   };
2214 }
2215
2216 =head2 current_source_alias
2217
2218 =over 4
2219
2220 =item Arguments: none
2221
2222 =item Return Value: $source_alias
2223
2224 =back
2225
2226 Returns the current table alias for the result source this resultset is built
2227 on, that will be used in the SQL query. Usually it is C<me>.
2228
2229 Currently the source alias that refers to the result set returned by a
2230 L</search>/L</find> family method depends on how you got to the resultset: it's
2231 C<me> by default, but eg. L</search_related> aliases it to the related result
2232 source name (and keeps C<me> referring to the original result set). The long
2233 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2234 (and make this method unnecessary).
2235
2236 Thus it's currently necessary to use this method in predefined queries (see
2237 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2238 source alias of the current result set:
2239
2240   # in a result set class
2241   sub modified_by {
2242     my ($self, $user) = @_;
2243
2244     my $me = $self->current_source_alias;
2245
2246     return $self->search(
2247       "$me.modified" => $user->id,
2248     );
2249   }
2250
2251 =cut
2252
2253 sub current_source_alias {
2254   my ($self) = @_;
2255
2256   return ($self->{attrs} || {})->{alias} || 'me';
2257 }
2258
2259 sub _resolve_from {
2260   my ($self, $extra_join) = @_;
2261   my $source = $self->result_source;
2262   my $attrs = $self->{attrs};
2263   
2264   my $from = $attrs->{from}
2265     || [ { $attrs->{alias} => $source->from } ];
2266     
2267   my $seen = { %{$attrs->{seen_join}||{}} };
2268
2269   my $join = ($attrs->{join}
2270                ? [ $attrs->{join}, $extra_join ]
2271                : $extra_join);
2272
2273   # we need to take the prefetch the attrs into account before we 
2274   # ->resolve_join as otherwise they get lost - captainL
2275   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2276
2277   $from = [
2278     @$from,
2279     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2280   ];
2281
2282   return ($from,$seen);
2283 }
2284
2285 sub _resolved_attrs {
2286   my $self = shift;
2287   return $self->{_attrs} if $self->{_attrs};
2288
2289   my $attrs  = { %{ $self->{attrs} || {} } };
2290   my $source = $self->result_source;
2291   my $alias  = $attrs->{alias};
2292
2293   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2294   my @colbits;
2295
2296   # build columns (as long as select isn't set) into a set of as/select hashes
2297   unless ( $attrs->{select} ) {
2298       @colbits = map {
2299           ( ref($_) eq 'HASH' ) ? $_
2300             : {
2301               (
2302                   /^\Q${alias}.\E(.+)$/ ? $1
2303                   : $_
2304                 ) => ( /\./ ? $_ : "${alias}.$_" )
2305             }
2306       } ( ref($attrs->{columns}) eq 'ARRAY' ) ? @{ delete $attrs->{columns}} : (delete $attrs->{columns} || $source->columns );
2307   }
2308   # add the additional columns on
2309   foreach ( 'include_columns', '+columns' ) {
2310       push @colbits, map {
2311           ( ref($_) eq 'HASH' )
2312             ? $_
2313             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2314       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2315   }
2316
2317   # start with initial select items
2318   if ( $attrs->{select} ) {
2319     $attrs->{select} =
2320         ( ref $attrs->{select} eq 'ARRAY' )
2321       ? [ @{ $attrs->{select} } ]
2322       : [ $attrs->{select} ];
2323     $attrs->{as} = (
2324       $attrs->{as}
2325       ? (
2326         ref $attrs->{as} eq 'ARRAY'
2327         ? [ @{ $attrs->{as} } ]
2328         : [ $attrs->{as} ]
2329         )
2330       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2331     );
2332   }
2333   else {
2334
2335     # otherwise we intialise select & as to empty
2336     $attrs->{select} = [];
2337     $attrs->{as}     = [];
2338   }
2339
2340   # now add colbits to select/as
2341   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2342   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2343
2344   my $adds;
2345   if ( $adds = delete $attrs->{'+select'} ) {
2346     $adds = [$adds] unless ref $adds eq 'ARRAY';
2347     push(
2348       @{ $attrs->{select} },
2349       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2350     );
2351   }
2352   if ( $adds = delete $attrs->{'+as'} ) {
2353     $adds = [$adds] unless ref $adds eq 'ARRAY';
2354     push( @{ $attrs->{as} }, @$adds );
2355   }
2356
2357   $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
2358
2359   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
2360     my $join = delete $attrs->{join} || {};
2361
2362     if ( defined $attrs->{prefetch} ) {
2363       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2364
2365     }
2366
2367     $attrs->{from} =    # have to copy here to avoid corrupting the original
2368       [
2369       @{ $attrs->{from} },
2370       $source->resolve_join(
2371         $join, $alias, { %{ $attrs->{seen_join} || {} } }
2372       )
2373       ];
2374
2375   }
2376
2377   $attrs->{group_by} ||= $attrs->{select}
2378     if delete $attrs->{distinct};
2379   if ( $attrs->{order_by} ) {
2380     $attrs->{order_by} = (
2381       ref( $attrs->{order_by} ) eq 'ARRAY'
2382       ? [ @{ $attrs->{order_by} } ]
2383       : [ $attrs->{order_by} ]
2384     );
2385   }
2386   else {
2387     $attrs->{order_by} = [];
2388   }
2389
2390   my $collapse = $attrs->{collapse} || {};
2391   if ( my $prefetch = delete $attrs->{prefetch} ) {
2392     $prefetch = $self->_merge_attr( {}, $prefetch );
2393     my @pre_order;
2394     my $seen = { %{ $attrs->{seen_join} || {} } };
2395     foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
2396
2397       # bring joins back to level of current class
2398       my @prefetch =
2399         $source->resolve_prefetch( $p, $alias, $seen, \@pre_order, $collapse );
2400       push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
2401       push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
2402     }
2403     push( @{ $attrs->{order_by} }, @pre_order );
2404   }
2405   $attrs->{collapse} = $collapse;
2406
2407   if ( $attrs->{page} ) {
2408     $attrs->{offset} ||= 0;
2409     $attrs->{offset} += ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
2410   }
2411
2412   return $self->{_attrs} = $attrs;
2413 }
2414
2415 sub _rollout_attr {
2416   my ($self, $attr) = @_;
2417   
2418   if (ref $attr eq 'HASH') {
2419     return $self->_rollout_hash($attr);
2420   } elsif (ref $attr eq 'ARRAY') {
2421     return $self->_rollout_array($attr);
2422   } else {
2423     return [$attr];
2424   }
2425 }
2426
2427 sub _rollout_array {
2428   my ($self, $attr) = @_;
2429
2430   my @rolled_array;
2431   foreach my $element (@{$attr}) {
2432     if (ref $element eq 'HASH') {
2433       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2434     } elsif (ref $element eq 'ARRAY') {
2435       #  XXX - should probably recurse here
2436       push( @rolled_array, @{$self->_rollout_array($element)} );
2437     } else {
2438       push( @rolled_array, $element );
2439     }
2440   }
2441   return \@rolled_array;
2442 }
2443
2444 sub _rollout_hash {
2445   my ($self, $attr) = @_;
2446
2447   my @rolled_array;
2448   foreach my $key (keys %{$attr}) {
2449     push( @rolled_array, { $key => $attr->{$key} } );
2450   }
2451   return \@rolled_array;
2452 }
2453
2454 sub _calculate_score {
2455   my ($self, $a, $b) = @_;
2456
2457   if (ref $b eq 'HASH') {
2458     my ($b_key) = keys %{$b};
2459     if (ref $a eq 'HASH') {
2460       my ($a_key) = keys %{$a};
2461       if ($a_key eq $b_key) {
2462         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2463       } else {
2464         return 0;
2465       }
2466     } else {
2467       return ($a eq $b_key) ? 1 : 0;
2468     }       
2469   } else {
2470     if (ref $a eq 'HASH') {
2471       my ($a_key) = keys %{$a};
2472       return ($b eq $a_key) ? 1 : 0;
2473     } else {
2474       return ($b eq $a) ? 1 : 0;
2475     }
2476   }
2477 }
2478
2479 sub _merge_attr {
2480   my ($self, $orig, $import) = @_;
2481
2482   return $import unless defined($orig);
2483   return $orig unless defined($import);
2484   
2485   $orig = $self->_rollout_attr($orig);
2486   $import = $self->_rollout_attr($import);
2487
2488   my $seen_keys;
2489   foreach my $import_element ( @{$import} ) {
2490     # find best candidate from $orig to merge $b_element into
2491     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2492     foreach my $orig_element ( @{$orig} ) {
2493       my $score = $self->_calculate_score( $orig_element, $import_element );
2494       if ($score > $best_candidate->{score}) {
2495         $best_candidate->{position} = $position;
2496         $best_candidate->{score} = $score;
2497       }
2498       $position++;
2499     }
2500     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2501
2502     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2503       push( @{$orig}, $import_element );
2504     } else {
2505       my $orig_best = $orig->[$best_candidate->{position}];
2506       # merge orig_best and b_element together and replace original with merged
2507       if (ref $orig_best ne 'HASH') {
2508         $orig->[$best_candidate->{position}] = $import_element;
2509       } elsif (ref $import_element eq 'HASH') {
2510         my ($key) = keys %{$orig_best};
2511         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2512       }
2513     }
2514     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2515   }
2516
2517   return $orig;
2518 }
2519
2520 sub result_source {
2521     my $self = shift;
2522
2523     if (@_) {
2524         $self->_source_handle($_[0]->handle);
2525     } else {
2526         $self->_source_handle->resolve;
2527     }
2528 }
2529
2530 =head2 throw_exception
2531
2532 See L<DBIx::Class::Schema/throw_exception> for details.
2533
2534 =cut
2535
2536 sub throw_exception {
2537   my $self=shift;
2538   if (ref $self && $self->_source_handle->schema) {
2539     $self->_source_handle->schema->throw_exception(@_)
2540   } else {
2541     croak(@_);
2542   }
2543
2544 }
2545
2546 # XXX: FIXME: Attributes docs need clearing up
2547
2548 =head1 ATTRIBUTES
2549
2550 Attributes are used to refine a ResultSet in various ways when
2551 searching for data. They can be passed to any method which takes an
2552 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2553 L</count>.
2554
2555 These are in no particular order:
2556
2557 =head2 order_by
2558
2559 =over 4
2560
2561 =item Value: ( $order_by | \@order_by | \%order_by )
2562
2563 =back
2564
2565 Which column(s) to order the results by. If a single column name, or
2566 an arrayref of names is supplied, the argument is passed through
2567 directly to SQL. The hashref syntax allows for connection-agnostic
2568 specification of ordering direction:
2569
2570  For descending order:
2571
2572   order_by => { -desc => [qw/col1 col2 col3/] }
2573
2574  For explicit ascending order:
2575
2576   order_by => { -asc => 'col' }
2577
2578 The old scalarref syntax (i.e. order_by => \'year DESC') is still
2579 supported, although you are strongly encouraged to use the hashref
2580 syntax as outlined above.
2581
2582 =head2 columns
2583
2584 =over 4
2585
2586 =item Value: \@columns
2587
2588 =back
2589
2590 Shortcut to request a particular set of columns to be retrieved. Each
2591 column spec may be a string (a table column name), or a hash (in which
2592 case the key is the C<as> value, and the value is used as the C<select>
2593 expression). Adds C<me.> onto the start of any column without a C<.> in
2594 it and sets C<select> from that, then auto-populates C<as> from
2595 C<select> as normal. (You may also use the C<cols> attribute, as in
2596 earlier versions of DBIC.)
2597
2598 =head2 +columns
2599
2600 =over 4
2601
2602 =item Value: \@columns
2603
2604 =back
2605
2606 Indicates additional columns to be selected from storage. Works the same
2607 as L</columns> but adds columns to the selection. (You may also use the
2608 C<include_columns> attribute, as in earlier versions of DBIC). For
2609 example:-
2610
2611   $schema->resultset('CD')->search(undef, {
2612     '+columns' => ['artist.name'],
2613     join => ['artist']
2614   });
2615
2616 would return all CDs and include a 'name' column to the information
2617 passed to object inflation. Note that the 'artist' is the name of the
2618 column (or relationship) accessor, and 'name' is the name of the column
2619 accessor in the related table.
2620
2621 =head2 include_columns
2622
2623 =over 4
2624
2625 =item Value: \@columns
2626
2627 =back
2628
2629 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
2630
2631 =head2 select
2632
2633 =over 4
2634
2635 =item Value: \@select_columns
2636
2637 =back
2638
2639 Indicates which columns should be selected from the storage. You can use
2640 column names, or in the case of RDBMS back ends, function or stored procedure
2641 names:
2642
2643   $rs = $schema->resultset('Employee')->search(undef, {
2644     select => [
2645       'name',
2646       { count => 'employeeid' },
2647       { sum => 'salary' }
2648     ]
2649   });
2650
2651 When you use function/stored procedure names and do not supply an C<as>
2652 attribute, the column names returned are storage-dependent. E.g. MySQL would
2653 return a column named C<count(employeeid)> in the above example.
2654
2655 =head2 +select
2656
2657 =over 4
2658
2659 Indicates additional columns to be selected from storage.  Works the same as
2660 L</select> but adds columns to the selection.
2661
2662 =back
2663
2664 =head2 +as
2665
2666 =over 4
2667
2668 Indicates additional column names for those added via L</+select>. See L</as>.
2669
2670 =back
2671
2672 =head2 as
2673
2674 =over 4
2675
2676 =item Value: \@inflation_names
2677
2678 =back
2679
2680 Indicates column names for object inflation. That is, C<as>
2681 indicates the name that the column can be accessed as via the
2682 C<get_column> method (or via the object accessor, B<if one already
2683 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2684
2685 The C<as> attribute is used in conjunction with C<select>,
2686 usually when C<select> contains one or more function or stored
2687 procedure names:
2688
2689   $rs = $schema->resultset('Employee')->search(undef, {
2690     select => [
2691       'name',
2692       { count => 'employeeid' }
2693     ],
2694     as => ['name', 'employee_count'],
2695   });
2696
2697   my $employee = $rs->first(); # get the first Employee
2698
2699 If the object against which the search is performed already has an accessor
2700 matching a column name specified in C<as>, the value can be retrieved using
2701 the accessor as normal:
2702
2703   my $name = $employee->name();
2704
2705 If on the other hand an accessor does not exist in the object, you need to
2706 use C<get_column> instead:
2707
2708   my $employee_count = $employee->get_column('employee_count');
2709
2710 You can create your own accessors if required - see
2711 L<DBIx::Class::Manual::Cookbook> for details.
2712
2713 Please note: This will NOT insert an C<AS employee_count> into the SQL
2714 statement produced, it is used for internal access only. Thus
2715 attempting to use the accessor in an C<order_by> clause or similar
2716 will fail miserably.
2717
2718 To get around this limitation, you can supply literal SQL to your
2719 C<select> attibute that contains the C<AS alias> text, eg:
2720
2721   select => [\'myfield AS alias']
2722
2723 =head2 join
2724
2725 =over 4
2726
2727 =item Value: ($rel_name | \@rel_names | \%rel_names)
2728
2729 =back
2730
2731 Contains a list of relationships that should be joined for this query.  For
2732 example:
2733
2734   # Get CDs by Nine Inch Nails
2735   my $rs = $schema->resultset('CD')->search(
2736     { 'artist.name' => 'Nine Inch Nails' },
2737     { join => 'artist' }
2738   );
2739
2740 Can also contain a hash reference to refer to the other relation's relations.
2741 For example:
2742
2743   package MyApp::Schema::Track;
2744   use base qw/DBIx::Class/;
2745   __PACKAGE__->table('track');
2746   __PACKAGE__->add_columns(qw/trackid cd position title/);
2747   __PACKAGE__->set_primary_key('trackid');
2748   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2749   1;
2750
2751   # In your application
2752   my $rs = $schema->resultset('Artist')->search(
2753     { 'track.title' => 'Teardrop' },
2754     {
2755       join     => { cd => 'track' },
2756       order_by => 'artist.name',
2757     }
2758   );
2759
2760 You need to use the relationship (not the table) name in  conditions, 
2761 because they are aliased as such. The current table is aliased as "me", so 
2762 you need to use me.column_name in order to avoid ambiguity. For example:
2763
2764   # Get CDs from 1984 with a 'Foo' track 
2765   my $rs = $schema->resultset('CD')->search(
2766     { 
2767       'me.year' => 1984,
2768       'tracks.name' => 'Foo'
2769     },
2770     { join => 'tracks' }
2771   );
2772   
2773 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2774 similarly for a third time). For e.g.
2775
2776   my $rs = $schema->resultset('Artist')->search({
2777     'cds.title'   => 'Down to Earth',
2778     'cds_2.title' => 'Popular',
2779   }, {
2780     join => [ qw/cds cds/ ],
2781   });
2782
2783 will return a set of all artists that have both a cd with title 'Down
2784 to Earth' and a cd with title 'Popular'.
2785
2786 If you want to fetch related objects from other tables as well, see C<prefetch>
2787 below.
2788
2789 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2790
2791 =head2 prefetch
2792
2793 =over 4
2794
2795 =item Value: ($rel_name | \@rel_names | \%rel_names)
2796
2797 =back
2798
2799 Contains one or more relationships that should be fetched along with
2800 the main query (when they are accessed afterwards the data will
2801 already be available, without extra queries to the database).  This is
2802 useful for when you know you will need the related objects, because it
2803 saves at least one query:
2804
2805   my $rs = $schema->resultset('Tag')->search(
2806     undef,
2807     {
2808       prefetch => {
2809         cd => 'artist'
2810       }
2811     }
2812   );
2813
2814 The initial search results in SQL like the following:
2815
2816   SELECT tag.*, cd.*, artist.* FROM tag
2817   JOIN cd ON tag.cd = cd.cdid
2818   JOIN artist ON cd.artist = artist.artistid
2819
2820 L<DBIx::Class> has no need to go back to the database when we access the
2821 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2822 case.
2823
2824 Simple prefetches will be joined automatically, so there is no need
2825 for a C<join> attribute in the above search. 
2826
2827 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2828 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2829 with an accessor type of 'single' or 'filter'). A more complex example that
2830 prefetches an artists cds, the tracks on those cds, and the tags associted 
2831 with that artist is given below (assuming many-to-many from artists to tags):
2832
2833  my $rs = $schema->resultset('Artist')->search(
2834    undef,
2835    {
2836      prefetch => [
2837        { cds => 'tracks' },
2838        { artist_tags => 'tags' }
2839      ]
2840    }
2841  );
2842  
2843
2844 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2845 attributes will be ignored.
2846
2847 =head2 page
2848
2849 =over 4
2850
2851 =item Value: $page
2852
2853 =back
2854
2855 Makes the resultset paged and specifies the page to retrieve. Effectively
2856 identical to creating a non-pages resultset and then calling ->page($page)
2857 on it.
2858
2859 If L<rows> attribute is not specified it defualts to 10 rows per page.
2860
2861 =head2 rows
2862
2863 =over 4
2864
2865 =item Value: $rows
2866
2867 =back
2868
2869 Specifes the maximum number of rows for direct retrieval or the number of
2870 rows per page if the page attribute or method is used.
2871
2872 =head2 offset
2873
2874 =over 4
2875
2876 =item Value: $offset
2877
2878 =back
2879
2880 Specifies the (zero-based) row number for the  first row to be returned, or the
2881 of the first row of the first page if paging is used.
2882
2883 =head2 group_by
2884
2885 =over 4
2886
2887 =item Value: \@columns
2888
2889 =back
2890
2891 A arrayref of columns to group by. Can include columns of joined tables.
2892
2893   group_by => [qw/ column1 column2 ... /]
2894
2895 =head2 having
2896
2897 =over 4
2898
2899 =item Value: $condition
2900
2901 =back
2902
2903 HAVING is a select statement attribute that is applied between GROUP BY and
2904 ORDER BY. It is applied to the after the grouping calculations have been
2905 done.
2906
2907   having => { 'count(employee)' => { '>=', 100 } }
2908
2909 =head2 distinct
2910
2911 =over 4
2912
2913 =item Value: (0 | 1)
2914
2915 =back
2916
2917 Set to 1 to group by all columns.
2918
2919 =head2 where
2920
2921 =over 4
2922
2923 Adds to the WHERE clause.
2924
2925   # only return rows WHERE deleted IS NULL for all searches
2926   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2927
2928 Can be overridden by passing C<{ where => undef }> as an attribute
2929 to a resulset.
2930
2931 =back
2932
2933 =head2 cache
2934
2935 Set to 1 to cache search results. This prevents extra SQL queries if you
2936 revisit rows in your ResultSet:
2937
2938   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2939
2940   while( my $artist = $resultset->next ) {
2941     ... do stuff ...
2942   }
2943
2944   $rs->first; # without cache, this would issue a query
2945
2946 By default, searches are not cached.
2947
2948 For more examples of using these attributes, see
2949 L<DBIx::Class::Manual::Cookbook>.
2950
2951 =head2 from
2952
2953 =over 4
2954
2955 =item Value: \@from_clause
2956
2957 =back
2958
2959 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2960 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2961 clauses.
2962
2963 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2964
2965 C<join> will usually do what you need and it is strongly recommended that you
2966 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2967 And we really do mean "cannot", not just tried and failed. Attempting to use
2968 this because you're having problems with C<join> is like trying to use x86
2969 ASM because you've got a syntax error in your C. Trust us on this.
2970
2971 Now, if you're still really, really sure you need to use this (and if you're
2972 not 100% sure, ask the mailing list first), here's an explanation of how this
2973 works.
2974
2975 The syntax is as follows -
2976
2977   [
2978     { <alias1> => <table1> },
2979     [
2980       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2981       [], # nested JOIN (optional)
2982       { <table1.column1> => <table2.column2>, ... (more conditions) },
2983     ],
2984     # More of the above [ ] may follow for additional joins
2985   ]
2986
2987   <table1> <alias1>
2988   JOIN
2989     <table2> <alias2>
2990     [JOIN ...]
2991   ON <table1.column1> = <table2.column2>
2992   <more joins may follow>
2993
2994 An easy way to follow the examples below is to remember the following:
2995
2996     Anything inside "[]" is a JOIN
2997     Anything inside "{}" is a condition for the enclosing JOIN
2998
2999 The following examples utilize a "person" table in a family tree application.
3000 In order to express parent->child relationships, this table is self-joined:
3001
3002     # Person->belongs_to('father' => 'Person');
3003     # Person->belongs_to('mother' => 'Person');
3004
3005 C<from> can be used to nest joins. Here we return all children with a father,
3006 then search against all mothers of those children:
3007
3008   $rs = $schema->resultset('Person')->search(
3009       undef,
3010       {
3011           alias => 'mother', # alias columns in accordance with "from"
3012           from => [
3013               { mother => 'person' },
3014               [
3015                   [
3016                       { child => 'person' },
3017                       [
3018                           { father => 'person' },
3019                           { 'father.person_id' => 'child.father_id' }
3020                       ]
3021                   ],
3022                   { 'mother.person_id' => 'child.mother_id' }
3023               ],
3024           ]
3025       },
3026   );
3027
3028   # Equivalent SQL:
3029   # SELECT mother.* FROM person mother
3030   # JOIN (
3031   #   person child
3032   #   JOIN person father
3033   #   ON ( father.person_id = child.father_id )
3034   # )
3035   # ON ( mother.person_id = child.mother_id )
3036
3037 The type of any join can be controlled manually. To search against only people
3038 with a father in the person table, we could explicitly use C<INNER JOIN>:
3039
3040     $rs = $schema->resultset('Person')->search(
3041         undef,
3042         {
3043             alias => 'child', # alias columns in accordance with "from"
3044             from => [
3045                 { child => 'person' },
3046                 [
3047                     { father => 'person', -join_type => 'inner' },
3048                     { 'father.id' => 'child.father_id' }
3049                 ],
3050             ]
3051         },
3052     );
3053
3054     # Equivalent SQL:
3055     # SELECT child.* FROM person child
3056     # INNER JOIN person father ON child.father_id = father.id
3057
3058 If you need to express really complex joins or you need a subselect, you
3059 can supply literal SQL to C<from> via a scalar reference. In this case
3060 the contents of the scalar will replace the table name asscoiated with the
3061 resultsource.
3062
3063 WARNING: This technique might very well not work as expected on chained
3064 searches - you have been warned.
3065
3066     # Assuming the Event resultsource is defined as:
3067
3068         MySchema::Event->add_columns (
3069             sequence => {
3070                 data_type => 'INT',
3071                 is_auto_increment => 1,
3072             },
3073             location => {
3074                 data_type => 'INT',
3075             },
3076             type => {
3077                 data_type => 'INT',
3078             },
3079         );
3080         MySchema::Event->set_primary_key ('sequence');
3081
3082     # This will get back the latest event for every location. The column
3083     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3084     # combo to limit the resultset
3085
3086     $rs = $schema->resultset('Event');
3087     $table = $rs->result_source->name;
3088     $latest = $rs->search (
3089         undef,
3090         { from => \ " 
3091             (SELECT e1.* FROM $table e1 
3092                 JOIN $table e2 
3093                     ON e1.location = e2.location 
3094                     AND e1.sequence < e2.sequence 
3095                 WHERE e2.sequence is NULL 
3096             ) me",
3097         },
3098     );
3099
3100     # Equivalent SQL (with the DBIC chunks added):
3101
3102     SELECT me.sequence, me.location, me.type FROM
3103        (SELECT e1.* FROM events e1
3104            JOIN events e2
3105                ON e1.location = e2.location
3106                AND e1.sequence < e2.sequence
3107            WHERE e2.sequence is NULL
3108        ) me;
3109
3110 =head2 for
3111
3112 =over 4
3113
3114 =item Value: ( 'update' | 'shared' )
3115
3116 =back
3117
3118 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3119 ... FOR SHARED.
3120
3121 =cut
3122
3123 1;