created count_distinct branch
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet is also an iterator. L</next> is used to return all the
50 L<DBIx::Class::Row>s the ResultSet represents.
51
52 The query that the ResultSet represents is B<only> executed against
53 the database when these methods are called:
54
55 =over
56
57 =item L</find>
58
59 =item L</next>
60
61 =item L</all>
62
63 =item L</count>
64
65 =item L</single>
66
67 =item L</first>
68
69 =back
70
71 =head1 EXAMPLES 
72
73 =head2 Chaining resultsets
74
75 Let's say you've got a query that needs to be run to return some data
76 to the user. But, you have an authorization system in place that
77 prevents certain users from seeing certain information. So, you want
78 to construct the basic query in one method, but add constraints to it in
79 another.
80
81   sub get_data {
82     my $self = shift;
83     my $request = $self->get_request; # Get a request object somehow.
84     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
85
86     my $cd_rs = $schema->resultset('CD')->search({
87       title => $request->param('title'),
88       year => $request->param('year'),
89     });
90
91     $self->apply_security_policy( $cd_rs );
92
93     return $cd_rs->all();
94   }
95
96   sub apply_security_policy {
97     my $self = shift;
98     my ($rs) = @_;
99
100     return $rs->search({
101       subversive => 0,
102     });
103   }
104
105 =head2 Multiple queries
106
107 Since a resultset just defines a query, you can do all sorts of
108 things with it with the same object.
109
110   # Don't hit the DB yet.
111   my $cd_rs = $schema->resultset('CD')->search({
112     title => 'something',
113     year => 2009,
114   });
115
116   # Each of these hits the DB individually.
117   my $count = $cd_rs->count;
118   my $most_recent = $cd_rs->get_column('date_released')->max();
119   my @records = $cd_rs->all;
120
121 And it's not just limited to SELECT statements.
122
123   $cd_rs->delete();
124
125 This is even cooler:
126
127   $cd_rs->create({ artist => 'Fred' });
128
129 Which is the same as:
130
131   $schema->resultset('CD')->create({
132     title => 'something',
133     year => 2009,
134     artist => 'Fred'
135   });
136
137 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
138
139 =head1 OVERLOADING
140
141 If a resultset is used in a numeric context it returns the L</count>.
142 However, if it is used in a booleand context it is always true.  So if
143 you want to check if a resultset has any results use C<if $rs != 0>.
144 C<if $rs> will always be true.
145
146 =head1 METHODS
147
148 =head2 new
149
150 =over 4
151
152 =item Arguments: $source, \%$attrs
153
154 =item Return Value: $rs
155
156 =back
157
158 The resultset constructor. Takes a source object (usually a
159 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
160 L</ATTRIBUTES> below).  Does not perform any queries -- these are
161 executed as needed by the other methods.
162
163 Generally you won't need to construct a resultset manually.  You'll
164 automatically get one from e.g. a L</search> called in scalar context:
165
166   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
167
168 IMPORTANT: If called on an object, proxies to new_result instead so
169
170   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
171
172 will return a CD object, not a ResultSet.
173
174 =cut
175
176 sub new {
177   my $class = shift;
178   return $class->new_result(@_) if ref $class;
179
180   my ($source, $attrs) = @_;
181   $source = $source->handle 
182     unless $source->isa('DBIx::Class::ResultSourceHandle');
183   $attrs = { %{$attrs||{}} };
184
185   if ($attrs->{page}) {
186     $attrs->{rows} ||= 10;
187   }
188
189   $attrs->{alias} ||= 'me';
190
191   # Creation of {} and bless separated to mitigate RH perl bug
192   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
193   my $self = {
194     _source_handle => $source,
195     cond => $attrs->{where},
196     count => undef,
197     pager => undef,
198     attrs => $attrs
199   };
200
201   bless $self, $class;
202
203   $self->result_class(
204     $attrs->{result_class} || $source->resolve->result_class
205   );
206
207   return $self;
208 }
209
210 =head2 search
211
212 =over 4
213
214 =item Arguments: $cond, \%attrs?
215
216 =item Return Value: $resultset (scalar context), @row_objs (list context)
217
218 =back
219
220   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
221   my $new_rs = $cd_rs->search({ year => 2005 });
222
223   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
224                  # year = 2005 OR year = 2004
225
226 If you need to pass in additional attributes but no additional condition,
227 call it as C<search(undef, \%attrs)>.
228
229   # "SELECT name, artistid FROM $artist_table"
230   my @all_artists = $schema->resultset('Artist')->search(undef, {
231     columns => [qw/name artistid/],
232   });
233
234 For a list of attributes that can be passed to C<search>, see
235 L</ATTRIBUTES>. For more examples of using this function, see
236 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
237 documentation for the first argument, see L<SQL::Abstract>.
238
239 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
240
241 =cut
242
243 sub search {
244   my $self = shift;
245   my $rs = $self->search_rs( @_ );
246   return (wantarray ? $rs->all : $rs);
247 }
248
249 =head2 search_rs
250
251 =over 4
252
253 =item Arguments: $cond, \%attrs?
254
255 =item Return Value: $resultset
256
257 =back
258
259 This method does the same exact thing as search() except it will
260 always return a resultset, even in list context.
261
262 =cut
263
264 sub search_rs {
265   my $self = shift;
266
267   my $attrs = {};
268   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
269   my $our_attrs = { %{$self->{attrs}} };
270   my $having = delete $our_attrs->{having};
271   my $where = delete $our_attrs->{where};
272
273   my $rows;
274
275   my %safe = (alias => 1, cache => 1);
276
277   unless (
278     (@_ && defined($_[0])) # @_ == () or (undef)
279     || 
280     (keys %$attrs # empty attrs or only 'safe' attrs
281     && List::Util::first { !$safe{$_} } keys %$attrs)
282   ) {
283     # no search, effectively just a clone
284     $rows = $self->get_cache;
285   }
286
287   my $new_attrs = { %{$our_attrs}, %{$attrs} };
288
289   # merge new attrs into inherited
290   foreach my $key (qw/join prefetch +select +as/) {
291     next unless exists $attrs->{$key};
292     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
293   }
294
295   my $cond = (@_
296     ? (
297         (@_ == 1 || ref $_[0] eq "HASH")
298           ? (
299               (ref $_[0] eq 'HASH')
300                 ? (
301                     (keys %{ $_[0] }  > 0)
302                       ? shift
303                       : undef
304                    )
305                 :  shift
306              )
307           : (
308               (@_ % 2)
309                 ? $self->throw_exception("Odd number of arguments to search")
310                 : {@_}
311              )
312       )
313     : undef
314   );
315
316   if (defined $where) {
317     $new_attrs->{where} = (
318       defined $new_attrs->{where}
319         ? { '-and' => [
320               map {
321                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
322               } $where, $new_attrs->{where}
323             ]
324           }
325         : $where);
326   }
327
328   if (defined $cond) {
329     $new_attrs->{where} = (
330       defined $new_attrs->{where}
331         ? { '-and' => [
332               map {
333                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
334               } $cond, $new_attrs->{where}
335             ]
336           }
337         : $cond);
338   }
339
340   if (defined $having) {
341     $new_attrs->{having} = (
342       defined $new_attrs->{having}
343         ? { '-and' => [
344               map {
345                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
346               } $having, $new_attrs->{having}
347             ]
348           }
349         : $having);
350   }
351
352   my $rs = (ref $self)->new($self->result_source, $new_attrs);
353   if ($rows) {
354     $rs->set_cache($rows);
355   }
356   return $rs;
357 }
358
359 =head2 search_literal
360
361 =over 4
362
363 =item Arguments: $sql_fragment, @bind_values
364
365 =item Return Value: $resultset (scalar context), @row_objs (list context)
366
367 =back
368
369   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
370   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
371
372 Pass a literal chunk of SQL to be added to the conditional part of the
373 resultset query.
374
375 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
376 only be used in that context. There are known problems using C<search_literal>
377 in chained queries; it can result in bind values in the wrong order.  See
378 L<DBIx::Class::Manual::Cookbook/Searching> and
379 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
380 require C<search_literal>.
381
382 =cut
383
384 sub search_literal {
385   my ($self, $cond, @vals) = @_;
386   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
387   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
388   return $self->search(\$cond, $attrs);
389 }
390
391 =head2 find
392
393 =over 4
394
395 =item Arguments: @values | \%cols, \%attrs?
396
397 =item Return Value: $row_object | undef
398
399 =back
400
401 Finds a row based on its primary key or unique constraint. For example, to find
402 a row by its primary key:
403
404   my $cd = $schema->resultset('CD')->find(5);
405
406 You can also find a row by a specific unique constraint using the C<key>
407 attribute. For example:
408
409   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
410     key => 'cd_artist_title'
411   });
412
413 Additionally, you can specify the columns explicitly by name:
414
415   my $cd = $schema->resultset('CD')->find(
416     {
417       artist => 'Massive Attack',
418       title  => 'Mezzanine',
419     },
420     { key => 'cd_artist_title' }
421   );
422
423 If the C<key> is specified as C<primary>, it searches only on the primary key.
424
425 If no C<key> is specified, it searches on all unique constraints defined on the
426 source for which column data is provided, including the primary key.
427
428 If your table does not have a primary key, you B<must> provide a value for the
429 C<key> attribute matching one of the unique constraints on the source.
430
431 In addition to C<key>, L</find> recognizes and applies standard
432 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
433
434 Note: If your query does not return only one row, a warning is generated:
435
436   Query returned more than one row
437
438 See also L</find_or_create> and L</update_or_create>. For information on how to
439 declare unique constraints, see
440 L<DBIx::Class::ResultSource/add_unique_constraint>.
441
442 =cut
443
444 sub find {
445   my $self = shift;
446   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
447
448   # Default to the primary key, but allow a specific key
449   my @cols = exists $attrs->{key}
450     ? $self->result_source->unique_constraint_columns($attrs->{key})
451     : $self->result_source->primary_columns;
452   $self->throw_exception(
453     "Can't find unless a primary key is defined or unique constraint is specified"
454   ) unless @cols;
455
456   # Parse out a hashref from input
457   my $input_query;
458   if (ref $_[0] eq 'HASH') {
459     $input_query = { %{$_[0]} };
460   }
461   elsif (@_ == @cols) {
462     $input_query = {};
463     @{$input_query}{@cols} = @_;
464   }
465   else {
466     # Compatibility: Allow e.g. find(id => $value)
467     carp "Find by key => value deprecated; please use a hashref instead";
468     $input_query = {@_};
469   }
470
471   my (%related, $info);
472
473   KEY: foreach my $key (keys %$input_query) {
474     if (ref($input_query->{$key})
475         && ($info = $self->result_source->relationship_info($key))) {
476       my $val = delete $input_query->{$key};
477       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
478       my $rel_q = $self->result_source->resolve_condition(
479                     $info->{cond}, $val, $key
480                   );
481       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
482       @related{keys %$rel_q} = values %$rel_q;
483     }
484   }
485   if (my @keys = keys %related) {
486     @{$input_query}{@keys} = values %related;
487   }
488
489
490   # Build the final query: Default to the disjunction of the unique queries,
491   # but allow the input query in case the ResultSet defines the query or the
492   # user is abusing find
493   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
494   my $query;
495   if (exists $attrs->{key}) {
496     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
497     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
498     $query = $self->_add_alias($unique_query, $alias);
499   }
500   else {
501     my @unique_queries = $self->_unique_queries($input_query, $attrs);
502     $query = @unique_queries
503       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
504       : $self->_add_alias($input_query, $alias);
505   }
506
507   # Run the query
508   if (keys %$attrs) {
509     my $rs = $self->search($query, $attrs);
510     if (keys %{$rs->_resolved_attrs->{collapse}}) {
511       my $row = $rs->next;
512       carp "Query returned more than one row" if $rs->next;
513       return $row;
514     }
515     else {
516       return $rs->single;
517     }
518   }
519   else {
520     if (keys %{$self->_resolved_attrs->{collapse}}) {
521       my $rs = $self->search($query);
522       my $row = $rs->next;
523       carp "Query returned more than one row" if $rs->next;
524       return $row;
525     }
526     else {
527       return $self->single($query);
528     }
529   }
530 }
531
532 # _add_alias
533 #
534 # Add the specified alias to the specified query hash. A copy is made so the
535 # original query is not modified.
536
537 sub _add_alias {
538   my ($self, $query, $alias) = @_;
539
540   my %aliased = %$query;
541   foreach my $col (grep { ! m/\./ } keys %aliased) {
542     $aliased{"$alias.$col"} = delete $aliased{$col};
543   }
544
545   return \%aliased;
546 }
547
548 # _unique_queries
549 #
550 # Build a list of queries which satisfy unique constraints.
551
552 sub _unique_queries {
553   my ($self, $query, $attrs) = @_;
554
555   my @constraint_names = exists $attrs->{key}
556     ? ($attrs->{key})
557     : $self->result_source->unique_constraint_names;
558
559   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
560   my $num_where = scalar keys %$where;
561
562   my @unique_queries;
563   foreach my $name (@constraint_names) {
564     my @unique_cols = $self->result_source->unique_constraint_columns($name);
565     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
566
567     my $num_cols = scalar @unique_cols;
568     my $num_query = scalar keys %$unique_query;
569
570     my $total = $num_query + $num_where;
571     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
572       # The query is either unique on its own or is unique in combination with
573       # the existing where clause
574       push @unique_queries, $unique_query;
575     }
576   }
577
578   return @unique_queries;
579 }
580
581 # _build_unique_query
582 #
583 # Constrain the specified query hash based on the specified column names.
584
585 sub _build_unique_query {
586   my ($self, $query, $unique_cols) = @_;
587
588   return {
589     map  { $_ => $query->{$_} }
590     grep { exists $query->{$_} }
591       @$unique_cols
592   };
593 }
594
595 =head2 search_related
596
597 =over 4
598
599 =item Arguments: $rel, $cond, \%attrs?
600
601 =item Return Value: $new_resultset
602
603 =back
604
605   $new_rs = $cd_rs->search_related('artist', {
606     name => 'Emo-R-Us',
607   });
608
609 Searches the specified relationship, optionally specifying a condition and
610 attributes for matching records. See L</ATTRIBUTES> for more information.
611
612 =cut
613
614 sub search_related {
615   return shift->related_resultset(shift)->search(@_);
616 }
617
618 =head2 search_related_rs
619
620 This method works exactly the same as search_related, except that
621 it guarantees a restultset, even in list context.
622
623 =cut
624
625 sub search_related_rs {
626   return shift->related_resultset(shift)->search_rs(@_);
627 }
628
629 =head2 cursor
630
631 =over 4
632
633 =item Arguments: none
634
635 =item Return Value: $cursor
636
637 =back
638
639 Returns a storage-driven cursor to the given resultset. See
640 L<DBIx::Class::Cursor> for more information.
641
642 =cut
643
644 sub cursor {
645   my ($self) = @_;
646
647   my $attrs = { %{$self->_resolved_attrs} };
648   return $self->{cursor}
649     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
650           $attrs->{where},$attrs);
651 }
652
653 =head2 single
654
655 =over 4
656
657 =item Arguments: $cond?
658
659 =item Return Value: $row_object?
660
661 =back
662
663   my $cd = $schema->resultset('CD')->single({ year => 2001 });
664
665 Inflates the first result without creating a cursor if the resultset has
666 any records in it; if not returns nothing. Used by L</find> as a lean version of
667 L</search>.
668
669 While this method can take an optional search condition (just like L</search>)
670 being a fast-code-path it does not recognize search attributes. If you need to
671 add extra joins or similar, call L</search> and then chain-call L</single> on the
672 L<DBIx::Class::ResultSet> returned.
673
674 =over
675
676 =item B<Note>
677
678 As of 0.08100, this method enforces the assumption that the preceeding
679 query returns only one row. If more than one row is returned, you will receive
680 a warning:
681
682   Query returned more than one row
683
684 In this case, you should be using L</first> or L</find> instead, or if you really
685 know what you are doing, use the L</rows> attribute to explicitly limit the size 
686 of the resultset.
687
688 =back
689
690 =cut
691
692 sub single {
693   my ($self, $where) = @_;
694   if(@_ > 2) {
695       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
696   }
697
698   my $attrs = { %{$self->_resolved_attrs} };
699   if ($where) {
700     if (defined $attrs->{where}) {
701       $attrs->{where} = {
702         '-and' =>
703             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
704                $where, delete $attrs->{where} ]
705       };
706     } else {
707       $attrs->{where} = $where;
708     }
709   }
710
711 #  XXX: Disabled since it doesn't infer uniqueness in all cases
712 #  unless ($self->_is_unique_query($attrs->{where})) {
713 #    carp "Query not guaranteed to return a single row"
714 #      . "; please declare your unique constraints or use search instead";
715 #  }
716
717   my @data = $self->result_source->storage->select_single(
718     $attrs->{from}, $attrs->{select},
719     $attrs->{where}, $attrs
720   );
721
722   return (@data ? ($self->_construct_object(@data))[0] : undef);
723 }
724
725 # _is_unique_query
726 #
727 # Try to determine if the specified query is guaranteed to be unique, based on
728 # the declared unique constraints.
729
730 sub _is_unique_query {
731   my ($self, $query) = @_;
732
733   my $collapsed = $self->_collapse_query($query);
734   my $alias = $self->{attrs}{alias};
735
736   foreach my $name ($self->result_source->unique_constraint_names) {
737     my @unique_cols = map {
738       "$alias.$_"
739     } $self->result_source->unique_constraint_columns($name);
740
741     # Count the values for each unique column
742     my %seen = map { $_ => 0 } @unique_cols;
743
744     foreach my $key (keys %$collapsed) {
745       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
746       next unless exists $seen{$aliased};  # Additional constraints are okay
747       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
748     }
749
750     # If we get 0 or more than 1 value for a column, it's not necessarily unique
751     return 1 unless grep { $_ != 1 } values %seen;
752   }
753
754   return 0;
755 }
756
757 # _collapse_query
758 #
759 # Recursively collapse the query, accumulating values for each column.
760
761 sub _collapse_query {
762   my ($self, $query, $collapsed) = @_;
763
764   $collapsed ||= {};
765
766   if (ref $query eq 'ARRAY') {
767     foreach my $subquery (@$query) {
768       next unless ref $subquery;  # -or
769 #      warn "ARRAY: " . Dumper $subquery;
770       $collapsed = $self->_collapse_query($subquery, $collapsed);
771     }
772   }
773   elsif (ref $query eq 'HASH') {
774     if (keys %$query and (keys %$query)[0] eq '-and') {
775       foreach my $subquery (@{$query->{-and}}) {
776 #        warn "HASH: " . Dumper $subquery;
777         $collapsed = $self->_collapse_query($subquery, $collapsed);
778       }
779     }
780     else {
781 #      warn "LEAF: " . Dumper $query;
782       foreach my $col (keys %$query) {
783         my $value = $query->{$col};
784         $collapsed->{$col}{$value}++;
785       }
786     }
787   }
788
789   return $collapsed;
790 }
791
792 =head2 get_column
793
794 =over 4
795
796 =item Arguments: $cond?
797
798 =item Return Value: $resultsetcolumn
799
800 =back
801
802   my $max_length = $rs->get_column('length')->max;
803
804 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
805
806 =cut
807
808 sub get_column {
809   my ($self, $column) = @_;
810   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
811   return $new;
812 }
813
814 =head2 search_like
815
816 =over 4
817
818 =item Arguments: $cond, \%attrs?
819
820 =item Return Value: $resultset (scalar context), @row_objs (list context)
821
822 =back
823
824   # WHERE title LIKE '%blue%'
825   $cd_rs = $rs->search_like({ title => '%blue%'});
826
827 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
828 that this is simply a convenience method retained for ex Class::DBI users.
829 You most likely want to use L</search> with specific operators.
830
831 For more information, see L<DBIx::Class::Manual::Cookbook>.
832
833 =cut
834
835 sub search_like {
836   my $class = shift;
837   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
838   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
839   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
840   return $class->search($query, { %$attrs });
841 }
842
843 =head2 slice
844
845 =over 4
846
847 =item Arguments: $first, $last
848
849 =item Return Value: $resultset (scalar context), @row_objs (list context)
850
851 =back
852
853 Returns a resultset or object list representing a subset of elements from the
854 resultset slice is called on. Indexes are from 0, i.e., to get the first
855 three records, call:
856
857   my ($one, $two, $three) = $rs->slice(0, 2);
858
859 =cut
860
861 sub slice {
862   my ($self, $min, $max) = @_;
863   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
864   $attrs->{offset} = $self->{attrs}{offset} || 0;
865   $attrs->{offset} += $min;
866   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
867   return $self->search(undef(), $attrs);
868   #my $slice = (ref $self)->new($self->result_source, $attrs);
869   #return (wantarray ? $slice->all : $slice);
870 }
871
872 =head2 next
873
874 =over 4
875
876 =item Arguments: none
877
878 =item Return Value: $result?
879
880 =back
881
882 Returns the next element in the resultset (C<undef> is there is none).
883
884 Can be used to efficiently iterate over records in the resultset:
885
886   my $rs = $schema->resultset('CD')->search;
887   while (my $cd = $rs->next) {
888     print $cd->title;
889   }
890
891 Note that you need to store the resultset object, and call C<next> on it.
892 Calling C<< resultset('Table')->next >> repeatedly will always return the
893 first record from the resultset.
894
895 =cut
896
897 sub next {
898   my ($self) = @_;
899   if (my $cache = $self->get_cache) {
900     $self->{all_cache_position} ||= 0;
901     return $cache->[$self->{all_cache_position}++];
902   }
903   if ($self->{attrs}{cache}) {
904     $self->{all_cache_position} = 1;
905     return ($self->all)[0];
906   }
907   if ($self->{stashed_objects}) {
908     my $obj = shift(@{$self->{stashed_objects}});
909     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
910     return $obj;
911   }
912   my @row = (
913     exists $self->{stashed_row}
914       ? @{delete $self->{stashed_row}}
915       : $self->cursor->next
916   );
917   return undef unless (@row);
918   my ($row, @more) = $self->_construct_object(@row);
919   $self->{stashed_objects} = \@more if @more;
920   return $row;
921 }
922
923 sub _construct_object {
924   my ($self, @row) = @_;
925   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
926   my @new = $self->result_class->inflate_result($self->result_source, @$info);
927   @new = $self->{_attrs}{record_filter}->(@new)
928     if exists $self->{_attrs}{record_filter};
929   return @new;
930 }
931
932 sub _collapse_result {
933   my ($self, $as_proto, $row) = @_;
934
935   my @copy = @$row;
936
937   # 'foo'         => [ undef, 'foo' ]
938   # 'foo.bar'     => [ 'foo', 'bar' ]
939   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
940
941   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
942
943   my %collapse = %{$self->{_attrs}{collapse}||{}};
944
945   my @pri_index;
946
947   # if we're doing collapsing (has_many prefetch) we need to grab records
948   # until the PK changes, so fill @pri_index. if not, we leave it empty so
949   # we know we don't have to bother.
950
951   # the reason for not using the collapse stuff directly is because if you
952   # had for e.g. two artists in a row with no cds, the collapse info for
953   # both would be NULL (undef) so you'd lose the second artist
954
955   # store just the index so we can check the array positions from the row
956   # without having to contruct the full hash
957
958   if (keys %collapse) {
959     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
960     foreach my $i (0 .. $#construct_as) {
961       next if defined($construct_as[$i][0]); # only self table
962       if (delete $pri{$construct_as[$i][1]}) {
963         push(@pri_index, $i);
964       }
965       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
966     }
967   }
968
969   # no need to do an if, it'll be empty if @pri_index is empty anyway
970
971   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
972
973   my @const_rows;
974
975   do { # no need to check anything at the front, we always want the first row
976
977     my %const;
978   
979     foreach my $this_as (@construct_as) {
980       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
981     }
982
983     push(@const_rows, \%const);
984
985   } until ( # no pri_index => no collapse => drop straight out
986       !@pri_index
987     or
988       do { # get another row, stash it, drop out if different PK
989
990         @copy = $self->cursor->next;
991         $self->{stashed_row} = \@copy;
992
993         # last thing in do block, counts as true if anything doesn't match
994
995         # check xor defined first for NULL vs. NOT NULL then if one is
996         # defined the other must be so check string equality
997
998         grep {
999           (defined $pri_vals{$_} ^ defined $copy[$_])
1000           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1001         } @pri_index;
1002       }
1003   );
1004
1005   my $alias = $self->{attrs}{alias};
1006   my $info = [];
1007
1008   my %collapse_pos;
1009
1010   my @const_keys;
1011
1012   foreach my $const (@const_rows) {
1013     scalar @const_keys or do {
1014       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1015     };
1016     foreach my $key (@const_keys) {
1017       if (length $key) {
1018         my $target = $info;
1019         my @parts = split(/\./, $key);
1020         my $cur = '';
1021         my $data = $const->{$key};
1022         foreach my $p (@parts) {
1023           $target = $target->[1]->{$p} ||= [];
1024           $cur .= ".${p}";
1025           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
1026             # collapsing at this point and on final part
1027             my $pos = $collapse_pos{$cur};
1028             CK: foreach my $ck (@ckey) {
1029               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1030                 $collapse_pos{$cur} = $data;
1031                 delete @collapse_pos{ # clear all positioning for sub-entries
1032                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1033                 };
1034                 push(@$target, []);
1035                 last CK;
1036               }
1037             }
1038           }
1039           if (exists $collapse{$cur}) {
1040             $target = $target->[-1];
1041           }
1042         }
1043         $target->[0] = $data;
1044       } else {
1045         $info->[0] = $const->{$key};
1046       }
1047     }
1048   }
1049
1050   return $info;
1051 }
1052
1053 =head2 result_source
1054
1055 =over 4
1056
1057 =item Arguments: $result_source?
1058
1059 =item Return Value: $result_source
1060
1061 =back
1062
1063 An accessor for the primary ResultSource object from which this ResultSet
1064 is derived.
1065
1066 =head2 result_class
1067
1068 =over 4
1069
1070 =item Arguments: $result_class?
1071
1072 =item Return Value: $result_class
1073
1074 =back
1075
1076 An accessor for the class to use when creating row objects. Defaults to 
1077 C<< result_source->result_class >> - which in most cases is the name of the 
1078 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1079
1080 =cut
1081
1082 sub result_class {
1083   my ($self, $result_class) = @_;
1084   if ($result_class) {
1085     $self->ensure_class_loaded($result_class);
1086     $self->_result_class($result_class);
1087   }
1088   $self->_result_class;
1089 }
1090
1091 =head2 count
1092
1093 =over 4
1094
1095 =item Arguments: $cond, \%attrs??
1096
1097 =item Return Value: $count
1098
1099 =back
1100
1101 Performs an SQL C<COUNT> with the same query as the resultset was built
1102 with to find the number of elements. If passed arguments, does a search
1103 on the resultset and counts the results of that.
1104
1105 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1106 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1107 not support C<DISTINCT> with multiple columns. If you are using such a
1108 database, you should only use columns from the main table in your C<group_by>
1109 clause.
1110
1111 =cut
1112
1113 sub count {
1114   my $self = shift;
1115   return $self->search(@_)->count if @_ and defined $_[0];
1116   return scalar @{ $self->get_cache } if $self->get_cache;
1117   my $count = $self->_count;
1118   return 0 unless $count;
1119
1120   # need to take offset from resolved attrs
1121
1122   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1123   $count = $self->{attrs}{rows} if
1124     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1125   $count = 0 if ($count < 0);
1126   return $count;
1127 }
1128
1129 sub _count { # Separated out so pager can get the full count
1130   my $self = shift;
1131   my $attrs = { %{$self->_resolved_attrs} };
1132
1133   if (my $group_by = $attrs->{group_by}) {
1134     delete $attrs->{having};
1135     delete $attrs->{order_by};
1136     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1137     # todo: try CONCAT for multi-column pk
1138     my @pk = $self->result_source->primary_columns;
1139     if (@pk == 1) {
1140       my $alias = $attrs->{alias};
1141       foreach my $column (@distinct) {
1142         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1143           @distinct = ($column);
1144           last;
1145         }
1146       }
1147     }
1148
1149     $attrs->{select} = $group_by; 
1150     $attrs->{from} = (ref $self)->new($self->result_source, $attrs)->cursor->as_query;
1151   }
1152
1153   $attrs->{select} = { count => '*' };
1154   $attrs->{as} = [qw/count/];
1155
1156   # offset, order by, group by, where and page are not needed to count. record_filter is cdbi
1157   delete $attrs->{$_} for qw/rows offset order_by group_by where page pager record_filter/;
1158
1159   $self->result_source->resultset;
1160   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1161   my ($count) = $tmp_rs->cursor->next;
1162   return $count;
1163 }
1164
1165 sub _bool {
1166   return 1;
1167 }
1168
1169 =head2 count_literal
1170
1171 =over 4
1172
1173 =item Arguments: $sql_fragment, @bind_values
1174
1175 =item Return Value: $count
1176
1177 =back
1178
1179 Counts the results in a literal query. Equivalent to calling L</search_literal>
1180 with the passed arguments, then L</count>.
1181
1182 =cut
1183
1184 sub count_literal { shift->search_literal(@_)->count; }
1185
1186 =head2 all
1187
1188 =over 4
1189
1190 =item Arguments: none
1191
1192 =item Return Value: @objects
1193
1194 =back
1195
1196 Returns all elements in the resultset. Called implicitly if the resultset
1197 is returned in list context.
1198
1199 =cut
1200
1201 sub all {
1202   my $self = shift;
1203   if(@_) {
1204       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1205   }
1206
1207   return @{ $self->get_cache } if $self->get_cache;
1208
1209   my @obj;
1210
1211   # TODO: don't call resolve here
1212   if (keys %{$self->_resolved_attrs->{collapse}}) {
1213 #  if ($self->{attrs}{prefetch}) {
1214       # Using $self->cursor->all is really just an optimisation.
1215       # If we're collapsing has_many prefetches it probably makes
1216       # very little difference, and this is cleaner than hacking
1217       # _construct_object to survive the approach
1218     my @row = $self->cursor->next;
1219     while (@row) {
1220       push(@obj, $self->_construct_object(@row));
1221       @row = (exists $self->{stashed_row}
1222                ? @{delete $self->{stashed_row}}
1223                : $self->cursor->next);
1224     }
1225   } else {
1226     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1227   }
1228
1229   $self->set_cache(\@obj) if $self->{attrs}{cache};
1230   return @obj;
1231 }
1232
1233 =head2 reset
1234
1235 =over 4
1236
1237 =item Arguments: none
1238
1239 =item Return Value: $self
1240
1241 =back
1242
1243 Resets the resultset's cursor, so you can iterate through the elements again.
1244
1245 =cut
1246
1247 sub reset {
1248   my ($self) = @_;
1249   delete $self->{_attrs} if exists $self->{_attrs};
1250   $self->{all_cache_position} = 0;
1251   $self->cursor->reset;
1252   return $self;
1253 }
1254
1255 =head2 first
1256
1257 =over 4
1258
1259 =item Arguments: none
1260
1261 =item Return Value: $object?
1262
1263 =back
1264
1265 Resets the resultset and returns an object for the first result (if the
1266 resultset returns anything).
1267
1268 =cut
1269
1270 sub first {
1271   return $_[0]->reset->next;
1272 }
1273
1274 # _cond_for_update_delete
1275 #
1276 # update/delete require the condition to be modified to handle
1277 # the differing SQL syntax available.  This transforms the $self->{cond}
1278 # appropriately, returning the new condition.
1279
1280 sub _cond_for_update_delete {
1281   my ($self, $full_cond) = @_;
1282   my $cond = {};
1283
1284   $full_cond ||= $self->{cond};
1285   # No-op. No condition, we're updating/deleting everything
1286   return $cond unless ref $full_cond;
1287
1288   if (ref $full_cond eq 'ARRAY') {
1289     $cond = [
1290       map {
1291         my %hash;
1292         foreach my $key (keys %{$_}) {
1293           $key =~ /([^.]+)$/;
1294           $hash{$1} = $_->{$key};
1295         }
1296         \%hash;
1297       } @{$full_cond}
1298     ];
1299   }
1300   elsif (ref $full_cond eq 'HASH') {
1301     if ((keys %{$full_cond})[0] eq '-and') {
1302       $cond->{-and} = [];
1303
1304       my @cond = @{$full_cond->{-and}};
1305       for (my $i = 0; $i < @cond; $i++) {
1306         my $entry = $cond[$i];
1307
1308         my $hash;
1309         if (ref $entry eq 'HASH') {
1310           $hash = $self->_cond_for_update_delete($entry);
1311         }
1312         else {
1313           $entry =~ /([^.]+)$/;
1314           $hash->{$1} = $cond[++$i];
1315         }
1316
1317         push @{$cond->{-and}}, $hash;
1318       }
1319     }
1320     else {
1321       foreach my $key (keys %{$full_cond}) {
1322         $key =~ /([^.]+)$/;
1323         $cond->{$1} = $full_cond->{$key};
1324       }
1325     }
1326   }
1327   else {
1328     $self->throw_exception(
1329       "Can't update/delete on resultset with condition unless hash or array"
1330     );
1331   }
1332
1333   return $cond;
1334 }
1335
1336
1337 =head2 update
1338
1339 =over 4
1340
1341 =item Arguments: \%values
1342
1343 =item Return Value: $storage_rv
1344
1345 =back
1346
1347 Sets the specified columns in the resultset to the supplied values in a
1348 single query. Return value will be true if the update succeeded or false
1349 if no records were updated; exact type of success value is storage-dependent.
1350
1351 =cut
1352
1353 sub update {
1354   my ($self, $values) = @_;
1355   $self->throw_exception("Values for update must be a hash")
1356     unless ref $values eq 'HASH';
1357
1358   carp(   'WARNING! Currently $rs->update() does not generate proper SQL'
1359         . ' on joined resultsets, and may affect rows well outside of the'
1360         . ' contents of $rs. Use at your own risk' )
1361     if ( $self->{attrs}{seen_join} );
1362
1363   my $cond = $self->_cond_for_update_delete;
1364    
1365   return $self->result_source->storage->update(
1366     $self->result_source, $values, $cond
1367   );
1368 }
1369
1370 =head2 update_all
1371
1372 =over 4
1373
1374 =item Arguments: \%values
1375
1376 =item Return Value: 1
1377
1378 =back
1379
1380 Fetches all objects and updates them one at a time. Note that C<update_all>
1381 will run DBIC cascade triggers, while L</update> will not.
1382
1383 =cut
1384
1385 sub update_all {
1386   my ($self, $values) = @_;
1387   $self->throw_exception("Values for update must be a hash")
1388     unless ref $values eq 'HASH';
1389   foreach my $obj ($self->all) {
1390     $obj->set_columns($values)->update;
1391   }
1392   return 1;
1393 }
1394
1395 =head2 delete
1396
1397 =over 4
1398
1399 =item Arguments: none
1400
1401 =item Return Value: 1
1402
1403 =back
1404
1405 Deletes the contents of the resultset from its result source. Note that this
1406 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1407 to run. See also L<DBIx::Class::Row/delete>.
1408
1409 delete may not generate correct SQL for a query with joins or a resultset
1410 chained from a related resultset.  In this case it will generate a warning:-
1411
1412   WARNING! Currently $rs->delete() does not generate proper SQL on
1413   joined resultsets, and may delete rows well outside of the contents
1414   of $rs. Use at your own risk
1415
1416 In these cases you may find that delete_all is more appropriate, or you
1417 need to respecify your query in a way that can be expressed without a join.
1418
1419 =cut
1420
1421 sub delete {
1422   my ($self) = @_;
1423   $self->throw_exception("Delete should not be passed any arguments")
1424     if $_[1];
1425   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1426         . ' on joined resultsets, and may delete rows well outside of the'
1427         . ' contents of $rs. Use at your own risk' )
1428     if ( $self->{attrs}{seen_join} );
1429   my $cond = $self->_cond_for_update_delete;
1430
1431   $self->result_source->storage->delete($self->result_source, $cond);
1432   return 1;
1433 }
1434
1435 =head2 delete_all
1436
1437 =over 4
1438
1439 =item Arguments: none
1440
1441 =item Return Value: 1
1442
1443 =back
1444
1445 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1446 will run DBIC cascade triggers, while L</delete> will not.
1447
1448 =cut
1449
1450 sub delete_all {
1451   my ($self) = @_;
1452   $_->delete for $self->all;
1453   return 1;
1454 }
1455
1456 =head2 populate
1457
1458 =over 4
1459
1460 =item Arguments: \@data;
1461
1462 =back
1463
1464 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1465 For the arrayref of hashrefs style each hashref should be a structure suitable
1466 forsubmitting to a $resultset->create(...) method.
1467
1468 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1469 to insert the data, as this is a faster method.  
1470
1471 Otherwise, each set of data is inserted into the database using
1472 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1473 objects is returned.
1474
1475 Example:  Assuming an Artist Class that has many CDs Classes relating:
1476
1477   my $Artist_rs = $schema->resultset("Artist");
1478   
1479   ## Void Context Example 
1480   $Artist_rs->populate([
1481      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1482         { title => 'My First CD', year => 2006 },
1483         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1484       ],
1485      },
1486      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1487         { title => 'My parents sold me to a record company' ,year => 2005 },
1488         { title => 'Why Am I So Ugly?', year => 2006 },
1489         { title => 'I Got Surgery and am now Popular', year => 2007 }
1490       ],
1491      },
1492   ]);
1493   
1494   ## Array Context Example
1495   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1496     { name => "Artist One"},
1497     { name => "Artist Two"},
1498     { name => "Artist Three", cds=> [
1499     { title => "First CD", year => 2007},
1500     { title => "Second CD", year => 2008},
1501   ]}
1502   ]);
1503   
1504   print $ArtistOne->name; ## response is 'Artist One'
1505   print $ArtistThree->cds->count ## reponse is '2'
1506
1507 For the arrayref of arrayrefs style,  the first element should be a list of the
1508 fieldsnames to which the remaining elements are rows being inserted.  For
1509 example:
1510
1511   $Arstist_rs->populate([
1512     [qw/artistid name/],
1513     [100, 'A Formally Unknown Singer'],
1514     [101, 'A singer that jumped the shark two albums ago'],
1515     [102, 'An actually cool singer.'],
1516   ]);
1517
1518 Please note an important effect on your data when choosing between void and
1519 wantarray context. Since void context goes straight to C<insert_bulk> in 
1520 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1521 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1522 create primary keys for you, you will find that your PKs are empty.  In this 
1523 case you will have to use the wantarray context in order to create those 
1524 values.
1525
1526 =cut
1527
1528 sub populate {
1529   my $self = shift @_;
1530   my $data = ref $_[0][0] eq 'HASH'
1531     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1532     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1533   
1534   if(defined wantarray) {
1535     my @created;
1536     foreach my $item (@$data) {
1537       push(@created, $self->create($item));
1538     }
1539     return @created;
1540   } else {
1541     my ($first, @rest) = @$data;
1542
1543     my @names = grep {!ref $first->{$_}} keys %$first;
1544     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1545     my @pks = $self->result_source->primary_columns;  
1546
1547     ## do the belongs_to relationships  
1548     foreach my $index (0..$#$data) {
1549       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1550         my @ret = $self->populate($data);
1551         return;
1552       }
1553     
1554       foreach my $rel (@rels) {
1555         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1556         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1557         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1558         my $related = $result->result_source->resolve_condition(
1559           $result->result_source->relationship_info($reverse)->{cond},
1560           $self,        
1561           $result,        
1562         );
1563
1564         delete $data->[$index]->{$rel};
1565         $data->[$index] = {%{$data->[$index]}, %$related};
1566       
1567         push @names, keys %$related if $index == 0;
1568       }
1569     }
1570
1571     ## do bulk insert on current row
1572     my @values = map { [ @$_{@names} ] } @$data;
1573
1574     $self->result_source->storage->insert_bulk(
1575       $self->result_source, 
1576       \@names, 
1577       \@values,
1578     );
1579
1580     ## do the has_many relationships
1581     foreach my $item (@$data) {
1582
1583       foreach my $rel (@rels) {
1584         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1585
1586         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1587      || $self->throw_exception('Cannot find the relating object.');
1588      
1589         my $child = $parent->$rel;
1590     
1591         my $related = $child->result_source->resolve_condition(
1592           $parent->result_source->relationship_info($rel)->{cond},
1593           $child,
1594           $parent,
1595         );
1596
1597         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1598         my @populate = map { {%$_, %$related} } @rows_to_add;
1599
1600         $child->populate( \@populate );
1601       }
1602     }
1603   }
1604 }
1605
1606 =head2 _normalize_populate_args ($args)
1607
1608 Private method used by L</populate> to normalize its incoming arguments.  Factored
1609 out in case you want to subclass and accept new argument structures to the
1610 L</populate> method.
1611
1612 =cut
1613
1614 sub _normalize_populate_args {
1615   my ($self, $data) = @_;
1616   my @names = @{shift(@$data)};
1617   my @results_to_create;
1618   foreach my $datum (@$data) {
1619     my %result_to_create;
1620     foreach my $index (0..$#names) {
1621       $result_to_create{$names[$index]} = $$datum[$index];
1622     }
1623     push @results_to_create, \%result_to_create;    
1624   }
1625   return \@results_to_create;
1626 }
1627
1628 =head2 pager
1629
1630 =over 4
1631
1632 =item Arguments: none
1633
1634 =item Return Value: $pager
1635
1636 =back
1637
1638 Return Value a L<Data::Page> object for the current resultset. Only makes
1639 sense for queries with a C<page> attribute.
1640
1641 =cut
1642
1643 sub pager {
1644   my ($self) = @_;
1645   my $attrs = $self->{attrs};
1646   $self->throw_exception("Can't create pager for non-paged rs")
1647     unless $self->{attrs}{page};
1648   $attrs->{rows} ||= 10;
1649   return $self->{pager} ||= Data::Page->new(
1650     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1651 }
1652
1653 =head2 page
1654
1655 =over 4
1656
1657 =item Arguments: $page_number
1658
1659 =item Return Value: $rs
1660
1661 =back
1662
1663 Returns a resultset for the $page_number page of the resultset on which page
1664 is called, where each page contains a number of rows equal to the 'rows'
1665 attribute set on the resultset (10 by default).
1666
1667 =cut
1668
1669 sub page {
1670   my ($self, $page) = @_;
1671   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1672 }
1673
1674 =head2 new_result
1675
1676 =over 4
1677
1678 =item Arguments: \%vals
1679
1680 =item Return Value: $rowobject
1681
1682 =back
1683
1684 Creates a new row object in the resultset's result class and returns
1685 it. The row is not inserted into the database at this point, call
1686 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1687 will tell you whether the row object has been inserted or not.
1688
1689 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1690
1691 =cut
1692
1693 sub new_result {
1694   my ($self, $values) = @_;
1695   $self->throw_exception( "new_result needs a hash" )
1696     unless (ref $values eq 'HASH');
1697
1698   my %new;
1699   my $alias = $self->{attrs}{alias};
1700
1701   if (
1702     defined $self->{cond}
1703     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1704   ) {
1705     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
1706     $new{-from_resultset} = [ keys %new ] if keys %new;
1707   } else {
1708     $self->throw_exception(
1709       "Can't abstract implicit construct, condition not a hash"
1710     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1711   
1712     my $collapsed_cond = (
1713       $self->{cond}
1714         ? $self->_collapse_cond($self->{cond})
1715         : {}
1716     );
1717   
1718     # precendence must be given to passed values over values inherited from
1719     # the cond, so the order here is important.
1720     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1721     while( my($col,$value) = each %implied ){
1722       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1723         $new{$col} = $value->{'='};
1724         next;
1725       }
1726       $new{$col} = $value if $self->_is_deterministic_value($value);
1727     }
1728   }
1729
1730   %new = (
1731     %new,
1732     %{ $self->_remove_alias($values, $alias) },
1733     -source_handle => $self->_source_handle,
1734     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1735   );
1736
1737   return $self->result_class->new(\%new);
1738 }
1739
1740 # _is_deterministic_value
1741 #
1742 # Make an effor to strip non-deterministic values from the condition, 
1743 # to make sure new_result chokes less
1744
1745 sub _is_deterministic_value {
1746   my $self = shift;
1747   my $value = shift;
1748   my $ref_type = ref $value;
1749   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1750   return 1 if Scalar::Util::blessed($value);
1751   return 0;
1752 }
1753
1754 # _collapse_cond
1755 #
1756 # Recursively collapse the condition.
1757
1758 sub _collapse_cond {
1759   my ($self, $cond, $collapsed) = @_;
1760
1761   $collapsed ||= {};
1762
1763   if (ref $cond eq 'ARRAY') {
1764     foreach my $subcond (@$cond) {
1765       next unless ref $subcond;  # -or
1766 #      warn "ARRAY: " . Dumper $subcond;
1767       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1768     }
1769   }
1770   elsif (ref $cond eq 'HASH') {
1771     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1772       foreach my $subcond (@{$cond->{-and}}) {
1773 #        warn "HASH: " . Dumper $subcond;
1774         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1775       }
1776     }
1777     else {
1778 #      warn "LEAF: " . Dumper $cond;
1779       foreach my $col (keys %$cond) {
1780         my $value = $cond->{$col};
1781         $collapsed->{$col} = $value;
1782       }
1783     }
1784   }
1785
1786   return $collapsed;
1787 }
1788
1789 # _remove_alias
1790 #
1791 # Remove the specified alias from the specified query hash. A copy is made so
1792 # the original query is not modified.
1793
1794 sub _remove_alias {
1795   my ($self, $query, $alias) = @_;
1796
1797   my %orig = %{ $query || {} };
1798   my %unaliased;
1799
1800   foreach my $key (keys %orig) {
1801     if ($key !~ /\./) {
1802       $unaliased{$key} = $orig{$key};
1803       next;
1804     }
1805     $unaliased{$1} = $orig{$key}
1806       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1807   }
1808
1809   return \%unaliased;
1810 }
1811
1812 =head2 as_query (EXPERIMENTAL)
1813
1814 =over 4
1815
1816 =item Arguments: none
1817
1818 =item Return Value: \[ $sql, @bind ]
1819
1820 =back
1821
1822 Returns the SQL query and bind vars associated with the invocant.
1823
1824 This is generally used as the RHS for a subquery.
1825
1826 B<NOTE>: This feature is still experimental.
1827
1828 =cut
1829
1830 sub as_query { return shift->cursor->as_query(@_) }
1831
1832 =head2 find_or_new
1833
1834 =over 4
1835
1836 =item Arguments: \%vals, \%attrs?
1837
1838 =item Return Value: $rowobject
1839
1840 =back
1841
1842   my $artist = $schema->resultset('Artist')->find_or_new(
1843     { artist => 'fred' }, { key => 'artists' });
1844
1845   $cd->cd_to_producer->find_or_new({ producer => $producer },
1846                                    { key => 'primary });
1847
1848 Find an existing record from this resultset, based on its primary
1849 key, or a unique constraint. If none exists, instantiate a new result
1850 object and return it. The object will not be saved into your storage
1851 until you call L<DBIx::Class::Row/insert> on it.
1852
1853 You most likely want this method when looking for existing rows using
1854 a unique constraint that is not the primary key, or looking for
1855 related rows.
1856
1857 If you want objects to be saved immediately, use L</find_or_create> instead.
1858
1859 B<Note>: C<find_or_new> is probably not what you want when creating a
1860 new row in a table that uses primary keys supplied by the
1861 database. Passing in a primary key column with a value of I<undef>
1862 will cause L</find> to attempt to search for a row with a value of
1863 I<NULL>.
1864
1865 =cut
1866
1867 sub find_or_new {
1868   my $self     = shift;
1869   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1870   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1871   my $exists   = $self->find($hash, $attrs);
1872   return defined $exists ? $exists : $self->new_result($hash);
1873 }
1874
1875 =head2 create
1876
1877 =over 4
1878
1879 =item Arguments: \%vals
1880
1881 =item Return Value: a L<DBIx::Class::Row> $object
1882
1883 =back
1884
1885 Attempt to create a single new row or a row with multiple related rows
1886 in the table represented by the resultset (and related tables). This
1887 will not check for duplicate rows before inserting, use
1888 L</find_or_create> to do that.
1889
1890 To create one row for this resultset, pass a hashref of key/value
1891 pairs representing the columns of the table and the values you wish to
1892 store. If the appropriate relationships are set up, foreign key fields
1893 can also be passed an object representing the foreign row, and the
1894 value will be set to its primary key.
1895
1896 To create related objects, pass a hashref for the value if the related
1897 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1898 and use the name of the relationship as the key. (NOT the name of the field,
1899 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1900 of hashrefs containing the data for each of the rows to create in the foreign
1901 tables, again using the relationship name as the key.
1902
1903 Instead of hashrefs of plain related data (key/value pairs), you may
1904 also pass new or inserted objects. New objects (not inserted yet, see
1905 L</new>), will be inserted into their appropriate tables.
1906
1907 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1908
1909 Example of creating a new row.
1910
1911   $person_rs->create({
1912     name=>"Some Person",
1913     email=>"somebody@someplace.com"
1914   });
1915   
1916 Example of creating a new row and also creating rows in a related C<has_many>
1917 or C<has_one> resultset.  Note Arrayref.
1918
1919   $artist_rs->create(
1920      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1921         { title => 'My First CD', year => 2006 },
1922         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1923       ],
1924      },
1925   );
1926
1927 Example of creating a new row and also creating a row in a related
1928 C<belongs_to>resultset. Note Hashref.
1929
1930   $cd_rs->create({
1931     title=>"Music for Silly Walks",
1932     year=>2000,
1933     artist => {
1934       name=>"Silly Musician",
1935     }
1936   });
1937
1938 =cut
1939
1940 sub create {
1941   my ($self, $attrs) = @_;
1942   $self->throw_exception( "create needs a hashref" )
1943     unless ref $attrs eq 'HASH';
1944   return $self->new_result($attrs)->insert;
1945 }
1946
1947 =head2 find_or_create
1948
1949 =over 4
1950
1951 =item Arguments: \%vals, \%attrs?
1952
1953 =item Return Value: $rowobject
1954
1955 =back
1956
1957   $cd->cd_to_producer->find_or_create({ producer => $producer },
1958                                       { key => 'primary });
1959
1960 Tries to find a record based on its primary key or unique constraints; if none
1961 is found, creates one and returns that instead.
1962
1963   my $cd = $schema->resultset('CD')->find_or_create({
1964     cdid   => 5,
1965     artist => 'Massive Attack',
1966     title  => 'Mezzanine',
1967     year   => 2005,
1968   });
1969
1970 Also takes an optional C<key> attribute, to search by a specific key or unique
1971 constraint. For example:
1972
1973   my $cd = $schema->resultset('CD')->find_or_create(
1974     {
1975       artist => 'Massive Attack',
1976       title  => 'Mezzanine',
1977     },
1978     { key => 'cd_artist_title' }
1979   );
1980
1981 B<Note>: Because find_or_create() reads from the database and then
1982 possibly inserts based on the result, this method is subject to a race
1983 condition. Another process could create a record in the table after
1984 the find has completed and before the create has started. To avoid
1985 this problem, use find_or_create() inside a transaction.
1986
1987 B<Note>: C<find_or_create> is probably not what you want when creating
1988 a new row in a table that uses primary keys supplied by the
1989 database. Passing in a primary key column with a value of I<undef>
1990 will cause L</find> to attempt to search for a row with a value of
1991 I<NULL>.
1992
1993 See also L</find> and L</update_or_create>. For information on how to declare
1994 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1995
1996 =cut
1997
1998 sub find_or_create {
1999   my $self     = shift;
2000   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2001   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2002   my $exists   = $self->find($hash, $attrs);
2003   return defined $exists ? $exists : $self->create($hash);
2004 }
2005
2006 =head2 update_or_create
2007
2008 =over 4
2009
2010 =item Arguments: \%col_values, { key => $unique_constraint }?
2011
2012 =item Return Value: $rowobject
2013
2014 =back
2015
2016   $resultset->update_or_create({ col => $val, ... });
2017
2018 First, searches for an existing row matching one of the unique constraints
2019 (including the primary key) on the source of this resultset. If a row is
2020 found, updates it with the other given column values. Otherwise, creates a new
2021 row.
2022
2023 Takes an optional C<key> attribute to search on a specific unique constraint.
2024 For example:
2025
2026   # In your application
2027   my $cd = $schema->resultset('CD')->update_or_create(
2028     {
2029       artist => 'Massive Attack',
2030       title  => 'Mezzanine',
2031       year   => 1998,
2032     },
2033     { key => 'cd_artist_title' }
2034   );
2035
2036   $cd->cd_to_producer->update_or_create({ 
2037     producer => $producer, 
2038     name => 'harry',
2039   }, { 
2040     key => 'primary,
2041   });
2042
2043
2044 If no C<key> is specified, it searches on all unique constraints defined on the
2045 source, including the primary key.
2046
2047 If the C<key> is specified as C<primary>, it searches only on the primary key.
2048
2049 See also L</find> and L</find_or_create>. For information on how to declare
2050 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2051
2052 B<Note>: C<update_or_create> is probably not what you want when
2053 looking for a row in a table that uses primary keys supplied by the
2054 database, unless you actually have a key value. Passing in a primary
2055 key column with a value of I<undef> will cause L</find> to attempt to
2056 search for a row with a value of I<NULL>.
2057
2058 =cut
2059
2060 sub update_or_create {
2061   my $self = shift;
2062   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2063   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2064
2065   my $row = $self->find($cond, $attrs);
2066   if (defined $row) {
2067     $row->update($cond);
2068     return $row;
2069   }
2070
2071   return $self->create($cond);
2072 }
2073
2074 =head2 get_cache
2075
2076 =over 4
2077
2078 =item Arguments: none
2079
2080 =item Return Value: \@cache_objects?
2081
2082 =back
2083
2084 Gets the contents of the cache for the resultset, if the cache is set.
2085
2086 The cache is populated either by using the L</prefetch> attribute to
2087 L</search> or by calling L</set_cache>.
2088
2089 =cut
2090
2091 sub get_cache {
2092   shift->{all_cache};
2093 }
2094
2095 =head2 set_cache
2096
2097 =over 4
2098
2099 =item Arguments: \@cache_objects
2100
2101 =item Return Value: \@cache_objects
2102
2103 =back
2104
2105 Sets the contents of the cache for the resultset. Expects an arrayref
2106 of objects of the same class as those produced by the resultset. Note that
2107 if the cache is set the resultset will return the cached objects rather
2108 than re-querying the database even if the cache attr is not set.
2109
2110 The contents of the cache can also be populated by using the
2111 L</prefetch> attribute to L</search>.
2112
2113 =cut
2114
2115 sub set_cache {
2116   my ( $self, $data ) = @_;
2117   $self->throw_exception("set_cache requires an arrayref")
2118       if defined($data) && (ref $data ne 'ARRAY');
2119   $self->{all_cache} = $data;
2120 }
2121
2122 =head2 clear_cache
2123
2124 =over 4
2125
2126 =item Arguments: none
2127
2128 =item Return Value: []
2129
2130 =back
2131
2132 Clears the cache for the resultset.
2133
2134 =cut
2135
2136 sub clear_cache {
2137   shift->set_cache(undef);
2138 }
2139
2140 =head2 related_resultset
2141
2142 =over 4
2143
2144 =item Arguments: $relationship_name
2145
2146 =item Return Value: $resultset
2147
2148 =back
2149
2150 Returns a related resultset for the supplied relationship name.
2151
2152   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2153
2154 =cut
2155
2156 sub related_resultset {
2157   my ($self, $rel) = @_;
2158
2159   $self->{related_resultsets} ||= {};
2160   return $self->{related_resultsets}{$rel} ||= do {
2161     my $rel_obj = $self->result_source->relationship_info($rel);
2162
2163     $self->throw_exception(
2164       "search_related: result source '" . $self->result_source->source_name .
2165         "' has no such relationship $rel")
2166       unless $rel_obj;
2167     
2168     my ($from,$seen) = $self->_resolve_from($rel);
2169
2170     my $join_count = $seen->{$rel};
2171     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2172
2173     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2174     my %attrs = %{$self->{attrs}||{}};
2175     delete @attrs{qw(result_class alias)};
2176
2177     my $new_cache;
2178
2179     if (my $cache = $self->get_cache) {
2180       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2181         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2182                         @$cache ];
2183       }
2184     }
2185
2186     my $rel_source = $self->result_source->related_source($rel);
2187
2188     my $new = do {
2189
2190       # The reason we do this now instead of passing the alias to the
2191       # search_rs below is that if you wrap/overload resultset on the
2192       # source you need to know what alias it's -going- to have for things
2193       # to work sanely (e.g. RestrictWithObject wants to be able to add
2194       # extra query restrictions, and these may need to be $alias.)
2195
2196       my $attrs = $rel_source->resultset_attributes;
2197       local $attrs->{alias} = $alias;
2198
2199       $rel_source->resultset
2200                  ->search_rs(
2201                      undef, {
2202                        %attrs,
2203                        join => undef,
2204                        prefetch => undef,
2205                        select => undef,
2206                        as => undef,
2207                        where => $self->{cond},
2208                        seen_join => $seen,
2209                        from => $from,
2210                    });
2211     };
2212     $new->set_cache($new_cache) if $new_cache;
2213     $new;
2214   };
2215 }
2216
2217 =head2 current_source_alias
2218
2219 =over 4
2220
2221 =item Arguments: none
2222
2223 =item Return Value: $source_alias
2224
2225 =back
2226
2227 Returns the current table alias for the result source this resultset is built
2228 on, that will be used in the SQL query. Usually it is C<me>.
2229
2230 Currently the source alias that refers to the result set returned by a
2231 L</search>/L</find> family method depends on how you got to the resultset: it's
2232 C<me> by default, but eg. L</search_related> aliases it to the related result
2233 source name (and keeps C<me> referring to the original result set). The long
2234 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2235 (and make this method unnecessary).
2236
2237 Thus it's currently necessary to use this method in predefined queries (see
2238 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2239 source alias of the current result set:
2240
2241   # in a result set class
2242   sub modified_by {
2243     my ($self, $user) = @_;
2244
2245     my $me = $self->current_source_alias;
2246
2247     return $self->search(
2248       "$me.modified" => $user->id,
2249     );
2250   }
2251
2252 =cut
2253
2254 sub current_source_alias {
2255   my ($self) = @_;
2256
2257   return ($self->{attrs} || {})->{alias} || 'me';
2258 }
2259
2260 sub _resolve_from {
2261   my ($self, $extra_join) = @_;
2262   my $source = $self->result_source;
2263   my $attrs = $self->{attrs};
2264   
2265   my $from = $attrs->{from}
2266     || [ { $attrs->{alias} => $source->from } ];
2267     
2268   my $seen = { %{$attrs->{seen_join}||{}} };
2269
2270   my $join = ($attrs->{join}
2271                ? [ $attrs->{join}, $extra_join ]
2272                : $extra_join);
2273
2274   # we need to take the prefetch the attrs into account before we 
2275   # ->resolve_join as otherwise they get lost - captainL
2276   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2277
2278   $from = [
2279     @$from,
2280     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2281   ];
2282
2283   return ($from,$seen);
2284 }
2285
2286 sub _resolved_attrs {
2287   my $self = shift;
2288   return $self->{_attrs} if $self->{_attrs};
2289
2290   my $attrs  = { %{ $self->{attrs} || {} } };
2291   my $source = $self->result_source;
2292   my $alias  = $attrs->{alias};
2293
2294   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2295   my @colbits;
2296
2297   # build columns (as long as select isn't set) into a set of as/select hashes
2298   unless ( $attrs->{select} ) {
2299       @colbits = map {
2300           ( ref($_) eq 'HASH' ) ? $_
2301             : {
2302               (
2303                   /^\Q${alias}.\E(.+)$/ ? $1
2304                   : $_
2305                 ) => ( /\./ ? $_ : "${alias}.$_" )
2306             }
2307       } ( ref($attrs->{columns}) eq 'ARRAY' ) ? @{ delete $attrs->{columns}} : (delete $attrs->{columns} || $source->columns );
2308   }
2309   # add the additional columns on
2310   foreach ( 'include_columns', '+columns' ) {
2311       push @colbits, map {
2312           ( ref($_) eq 'HASH' )
2313             ? $_
2314             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2315       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2316   }
2317
2318   # start with initial select items
2319   if ( $attrs->{select} ) {
2320     $attrs->{select} =
2321         ( ref $attrs->{select} eq 'ARRAY' )
2322       ? [ @{ $attrs->{select} } ]
2323       : [ $attrs->{select} ];
2324     $attrs->{as} = (
2325       $attrs->{as}
2326       ? (
2327         ref $attrs->{as} eq 'ARRAY'
2328         ? [ @{ $attrs->{as} } ]
2329         : [ $attrs->{as} ]
2330         )
2331       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2332     );
2333   }
2334   else {
2335
2336     # otherwise we intialise select & as to empty
2337     $attrs->{select} = [];
2338     $attrs->{as}     = [];
2339   }
2340
2341   # now add colbits to select/as
2342   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2343   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2344
2345   my $adds;
2346   if ( $adds = delete $attrs->{'+select'} ) {
2347     $adds = [$adds] unless ref $adds eq 'ARRAY';
2348     push(
2349       @{ $attrs->{select} },
2350       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2351     );
2352   }
2353   if ( $adds = delete $attrs->{'+as'} ) {
2354     $adds = [$adds] unless ref $adds eq 'ARRAY';
2355     push( @{ $attrs->{as} }, @$adds );
2356   }
2357
2358   $attrs->{from} ||= [ { $self->{attrs}{alias} => $source->from } ];
2359
2360   if ( exists $attrs->{join} || exists $attrs->{prefetch} ) {
2361     my $join = delete $attrs->{join} || {};
2362
2363     if ( defined $attrs->{prefetch} ) {
2364       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2365
2366     }
2367
2368     $attrs->{from} =    # have to copy here to avoid corrupting the original
2369       [
2370       @{ $attrs->{from} },
2371       $source->resolve_join(
2372         $join, $alias, { %{ $attrs->{seen_join} || {} } }
2373       )
2374       ];
2375
2376   }
2377
2378   $attrs->{group_by} ||= $attrs->{select}
2379     if delete $attrs->{distinct};
2380   if ( $attrs->{order_by} ) {
2381     $attrs->{order_by} = (
2382       ref( $attrs->{order_by} ) eq 'ARRAY'
2383       ? [ @{ $attrs->{order_by} } ]
2384       : [ $attrs->{order_by} ]
2385     );
2386   }
2387   else {
2388     $attrs->{order_by} = [];
2389   }
2390
2391   my $collapse = $attrs->{collapse} || {};
2392   if ( my $prefetch = delete $attrs->{prefetch} ) {
2393     $prefetch = $self->_merge_attr( {}, $prefetch );
2394     my @pre_order;
2395     my $seen = { %{ $attrs->{seen_join} || {} } };
2396     foreach my $p ( ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch) ) {
2397
2398       # bring joins back to level of current class
2399       my @prefetch =
2400         $source->resolve_prefetch( $p, $alias, $seen, \@pre_order, $collapse );
2401       push( @{ $attrs->{select} }, map { $_->[0] } @prefetch );
2402       push( @{ $attrs->{as} },     map { $_->[1] } @prefetch );
2403     }
2404     push( @{ $attrs->{order_by} }, @pre_order );
2405   }
2406   $attrs->{collapse} = $collapse;
2407
2408   if ( $attrs->{page} ) {
2409     $attrs->{offset} ||= 0;
2410     $attrs->{offset} += ( $attrs->{rows} * ( $attrs->{page} - 1 ) );
2411   }
2412
2413   return $self->{_attrs} = $attrs;
2414 }
2415
2416 sub _rollout_attr {
2417   my ($self, $attr) = @_;
2418   
2419   if (ref $attr eq 'HASH') {
2420     return $self->_rollout_hash($attr);
2421   } elsif (ref $attr eq 'ARRAY') {
2422     return $self->_rollout_array($attr);
2423   } else {
2424     return [$attr];
2425   }
2426 }
2427
2428 sub _rollout_array {
2429   my ($self, $attr) = @_;
2430
2431   my @rolled_array;
2432   foreach my $element (@{$attr}) {
2433     if (ref $element eq 'HASH') {
2434       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2435     } elsif (ref $element eq 'ARRAY') {
2436       #  XXX - should probably recurse here
2437       push( @rolled_array, @{$self->_rollout_array($element)} );
2438     } else {
2439       push( @rolled_array, $element );
2440     }
2441   }
2442   return \@rolled_array;
2443 }
2444
2445 sub _rollout_hash {
2446   my ($self, $attr) = @_;
2447
2448   my @rolled_array;
2449   foreach my $key (keys %{$attr}) {
2450     push( @rolled_array, { $key => $attr->{$key} } );
2451   }
2452   return \@rolled_array;
2453 }
2454
2455 sub _calculate_score {
2456   my ($self, $a, $b) = @_;
2457
2458   if (ref $b eq 'HASH') {
2459     my ($b_key) = keys %{$b};
2460     if (ref $a eq 'HASH') {
2461       my ($a_key) = keys %{$a};
2462       if ($a_key eq $b_key) {
2463         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2464       } else {
2465         return 0;
2466       }
2467     } else {
2468       return ($a eq $b_key) ? 1 : 0;
2469     }       
2470   } else {
2471     if (ref $a eq 'HASH') {
2472       my ($a_key) = keys %{$a};
2473       return ($b eq $a_key) ? 1 : 0;
2474     } else {
2475       return ($b eq $a) ? 1 : 0;
2476     }
2477   }
2478 }
2479
2480 sub _merge_attr {
2481   my ($self, $orig, $import) = @_;
2482
2483   return $import unless defined($orig);
2484   return $orig unless defined($import);
2485   
2486   $orig = $self->_rollout_attr($orig);
2487   $import = $self->_rollout_attr($import);
2488
2489   my $seen_keys;
2490   foreach my $import_element ( @{$import} ) {
2491     # find best candidate from $orig to merge $b_element into
2492     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2493     foreach my $orig_element ( @{$orig} ) {
2494       my $score = $self->_calculate_score( $orig_element, $import_element );
2495       if ($score > $best_candidate->{score}) {
2496         $best_candidate->{position} = $position;
2497         $best_candidate->{score} = $score;
2498       }
2499       $position++;
2500     }
2501     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2502
2503     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2504       push( @{$orig}, $import_element );
2505     } else {
2506       my $orig_best = $orig->[$best_candidate->{position}];
2507       # merge orig_best and b_element together and replace original with merged
2508       if (ref $orig_best ne 'HASH') {
2509         $orig->[$best_candidate->{position}] = $import_element;
2510       } elsif (ref $import_element eq 'HASH') {
2511         my ($key) = keys %{$orig_best};
2512         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2513       }
2514     }
2515     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2516   }
2517
2518   return $orig;
2519 }
2520
2521 sub result_source {
2522     my $self = shift;
2523
2524     if (@_) {
2525         $self->_source_handle($_[0]->handle);
2526     } else {
2527         $self->_source_handle->resolve;
2528     }
2529 }
2530
2531 =head2 throw_exception
2532
2533 See L<DBIx::Class::Schema/throw_exception> for details.
2534
2535 =cut
2536
2537 sub throw_exception {
2538   my $self=shift;
2539   if (ref $self && $self->_source_handle->schema) {
2540     $self->_source_handle->schema->throw_exception(@_)
2541   } else {
2542     croak(@_);
2543   }
2544
2545 }
2546
2547 # XXX: FIXME: Attributes docs need clearing up
2548
2549 =head1 ATTRIBUTES
2550
2551 Attributes are used to refine a ResultSet in various ways when
2552 searching for data. They can be passed to any method which takes an
2553 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2554 L</count>.
2555
2556 These are in no particular order:
2557
2558 =head2 order_by
2559
2560 =over 4
2561
2562 =item Value: ( $order_by | \@order_by | \%order_by )
2563
2564 =back
2565
2566 Which column(s) to order the results by. If a single column name, or
2567 an arrayref of names is supplied, the argument is passed through
2568 directly to SQL. The hashref syntax allows for connection-agnostic
2569 specification of ordering direction:
2570
2571  For descending order:
2572
2573   order_by => { -desc => [qw/col1 col2 col3/] }
2574
2575  For explicit ascending order:
2576
2577   order_by => { -asc => 'col' }
2578
2579 The old scalarref syntax (i.e. order_by => \'year DESC') is still
2580 supported, although you are strongly encouraged to use the hashref
2581 syntax as outlined above.
2582
2583 =head2 columns
2584
2585 =over 4
2586
2587 =item Value: \@columns
2588
2589 =back
2590
2591 Shortcut to request a particular set of columns to be retrieved. Each
2592 column spec may be a string (a table column name), or a hash (in which
2593 case the key is the C<as> value, and the value is used as the C<select>
2594 expression). Adds C<me.> onto the start of any column without a C<.> in
2595 it and sets C<select> from that, then auto-populates C<as> from
2596 C<select> as normal. (You may also use the C<cols> attribute, as in
2597 earlier versions of DBIC.)
2598
2599 =head2 +columns
2600
2601 =over 4
2602
2603 =item Value: \@columns
2604
2605 =back
2606
2607 Indicates additional columns to be selected from storage. Works the same
2608 as L</columns> but adds columns to the selection. (You may also use the
2609 C<include_columns> attribute, as in earlier versions of DBIC). For
2610 example:-
2611
2612   $schema->resultset('CD')->search(undef, {
2613     '+columns' => ['artist.name'],
2614     join => ['artist']
2615   });
2616
2617 would return all CDs and include a 'name' column to the information
2618 passed to object inflation. Note that the 'artist' is the name of the
2619 column (or relationship) accessor, and 'name' is the name of the column
2620 accessor in the related table.
2621
2622 =head2 include_columns
2623
2624 =over 4
2625
2626 =item Value: \@columns
2627
2628 =back
2629
2630 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
2631
2632 =head2 select
2633
2634 =over 4
2635
2636 =item Value: \@select_columns
2637
2638 =back
2639
2640 Indicates which columns should be selected from the storage. You can use
2641 column names, or in the case of RDBMS back ends, function or stored procedure
2642 names:
2643
2644   $rs = $schema->resultset('Employee')->search(undef, {
2645     select => [
2646       'name',
2647       { count => 'employeeid' },
2648       { sum => 'salary' }
2649     ]
2650   });
2651
2652 When you use function/stored procedure names and do not supply an C<as>
2653 attribute, the column names returned are storage-dependent. E.g. MySQL would
2654 return a column named C<count(employeeid)> in the above example.
2655
2656 =head2 +select
2657
2658 =over 4
2659
2660 Indicates additional columns to be selected from storage.  Works the same as
2661 L</select> but adds columns to the selection.
2662
2663 =back
2664
2665 =head2 +as
2666
2667 =over 4
2668
2669 Indicates additional column names for those added via L</+select>. See L</as>.
2670
2671 =back
2672
2673 =head2 as
2674
2675 =over 4
2676
2677 =item Value: \@inflation_names
2678
2679 =back
2680
2681 Indicates column names for object inflation. That is, C<as>
2682 indicates the name that the column can be accessed as via the
2683 C<get_column> method (or via the object accessor, B<if one already
2684 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2685
2686 The C<as> attribute is used in conjunction with C<select>,
2687 usually when C<select> contains one or more function or stored
2688 procedure names:
2689
2690   $rs = $schema->resultset('Employee')->search(undef, {
2691     select => [
2692       'name',
2693       { count => 'employeeid' }
2694     ],
2695     as => ['name', 'employee_count'],
2696   });
2697
2698   my $employee = $rs->first(); # get the first Employee
2699
2700 If the object against which the search is performed already has an accessor
2701 matching a column name specified in C<as>, the value can be retrieved using
2702 the accessor as normal:
2703
2704   my $name = $employee->name();
2705
2706 If on the other hand an accessor does not exist in the object, you need to
2707 use C<get_column> instead:
2708
2709   my $employee_count = $employee->get_column('employee_count');
2710
2711 You can create your own accessors if required - see
2712 L<DBIx::Class::Manual::Cookbook> for details.
2713
2714 Please note: This will NOT insert an C<AS employee_count> into the SQL
2715 statement produced, it is used for internal access only. Thus
2716 attempting to use the accessor in an C<order_by> clause or similar
2717 will fail miserably.
2718
2719 To get around this limitation, you can supply literal SQL to your
2720 C<select> attibute that contains the C<AS alias> text, eg:
2721
2722   select => [\'myfield AS alias']
2723
2724 =head2 join
2725
2726 =over 4
2727
2728 =item Value: ($rel_name | \@rel_names | \%rel_names)
2729
2730 =back
2731
2732 Contains a list of relationships that should be joined for this query.  For
2733 example:
2734
2735   # Get CDs by Nine Inch Nails
2736   my $rs = $schema->resultset('CD')->search(
2737     { 'artist.name' => 'Nine Inch Nails' },
2738     { join => 'artist' }
2739   );
2740
2741 Can also contain a hash reference to refer to the other relation's relations.
2742 For example:
2743
2744   package MyApp::Schema::Track;
2745   use base qw/DBIx::Class/;
2746   __PACKAGE__->table('track');
2747   __PACKAGE__->add_columns(qw/trackid cd position title/);
2748   __PACKAGE__->set_primary_key('trackid');
2749   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2750   1;
2751
2752   # In your application
2753   my $rs = $schema->resultset('Artist')->search(
2754     { 'track.title' => 'Teardrop' },
2755     {
2756       join     => { cd => 'track' },
2757       order_by => 'artist.name',
2758     }
2759   );
2760
2761 You need to use the relationship (not the table) name in  conditions, 
2762 because they are aliased as such. The current table is aliased as "me", so 
2763 you need to use me.column_name in order to avoid ambiguity. For example:
2764
2765   # Get CDs from 1984 with a 'Foo' track 
2766   my $rs = $schema->resultset('CD')->search(
2767     { 
2768       'me.year' => 1984,
2769       'tracks.name' => 'Foo'
2770     },
2771     { join => 'tracks' }
2772   );
2773   
2774 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2775 similarly for a third time). For e.g.
2776
2777   my $rs = $schema->resultset('Artist')->search({
2778     'cds.title'   => 'Down to Earth',
2779     'cds_2.title' => 'Popular',
2780   }, {
2781     join => [ qw/cds cds/ ],
2782   });
2783
2784 will return a set of all artists that have both a cd with title 'Down
2785 to Earth' and a cd with title 'Popular'.
2786
2787 If you want to fetch related objects from other tables as well, see C<prefetch>
2788 below.
2789
2790 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2791
2792 =head2 prefetch
2793
2794 =over 4
2795
2796 =item Value: ($rel_name | \@rel_names | \%rel_names)
2797
2798 =back
2799
2800 Contains one or more relationships that should be fetched along with
2801 the main query (when they are accessed afterwards the data will
2802 already be available, without extra queries to the database).  This is
2803 useful for when you know you will need the related objects, because it
2804 saves at least one query:
2805
2806   my $rs = $schema->resultset('Tag')->search(
2807     undef,
2808     {
2809       prefetch => {
2810         cd => 'artist'
2811       }
2812     }
2813   );
2814
2815 The initial search results in SQL like the following:
2816
2817   SELECT tag.*, cd.*, artist.* FROM tag
2818   JOIN cd ON tag.cd = cd.cdid
2819   JOIN artist ON cd.artist = artist.artistid
2820
2821 L<DBIx::Class> has no need to go back to the database when we access the
2822 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2823 case.
2824
2825 Simple prefetches will be joined automatically, so there is no need
2826 for a C<join> attribute in the above search. 
2827
2828 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2829 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2830 with an accessor type of 'single' or 'filter'). A more complex example that
2831 prefetches an artists cds, the tracks on those cds, and the tags associted 
2832 with that artist is given below (assuming many-to-many from artists to tags):
2833
2834  my $rs = $schema->resultset('Artist')->search(
2835    undef,
2836    {
2837      prefetch => [
2838        { cds => 'tracks' },
2839        { artist_tags => 'tags' }
2840      ]
2841    }
2842  );
2843  
2844
2845 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2846 attributes will be ignored.
2847
2848 =head2 page
2849
2850 =over 4
2851
2852 =item Value: $page
2853
2854 =back
2855
2856 Makes the resultset paged and specifies the page to retrieve. Effectively
2857 identical to creating a non-pages resultset and then calling ->page($page)
2858 on it.
2859
2860 If L<rows> attribute is not specified it defualts to 10 rows per page.
2861
2862 =head2 rows
2863
2864 =over 4
2865
2866 =item Value: $rows
2867
2868 =back
2869
2870 Specifes the maximum number of rows for direct retrieval or the number of
2871 rows per page if the page attribute or method is used.
2872
2873 =head2 offset
2874
2875 =over 4
2876
2877 =item Value: $offset
2878
2879 =back
2880
2881 Specifies the (zero-based) row number for the  first row to be returned, or the
2882 of the first row of the first page if paging is used.
2883
2884 =head2 group_by
2885
2886 =over 4
2887
2888 =item Value: \@columns
2889
2890 =back
2891
2892 A arrayref of columns to group by. Can include columns of joined tables.
2893
2894   group_by => [qw/ column1 column2 ... /]
2895
2896 =head2 having
2897
2898 =over 4
2899
2900 =item Value: $condition
2901
2902 =back
2903
2904 HAVING is a select statement attribute that is applied between GROUP BY and
2905 ORDER BY. It is applied to the after the grouping calculations have been
2906 done.
2907
2908   having => { 'count(employee)' => { '>=', 100 } }
2909
2910 =head2 distinct
2911
2912 =over 4
2913
2914 =item Value: (0 | 1)
2915
2916 =back
2917
2918 Set to 1 to group by all columns.
2919
2920 =head2 where
2921
2922 =over 4
2923
2924 Adds to the WHERE clause.
2925
2926   # only return rows WHERE deleted IS NULL for all searches
2927   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2928
2929 Can be overridden by passing C<{ where => undef }> as an attribute
2930 to a resulset.
2931
2932 =back
2933
2934 =head2 cache
2935
2936 Set to 1 to cache search results. This prevents extra SQL queries if you
2937 revisit rows in your ResultSet:
2938
2939   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2940
2941   while( my $artist = $resultset->next ) {
2942     ... do stuff ...
2943   }
2944
2945   $rs->first; # without cache, this would issue a query
2946
2947 By default, searches are not cached.
2948
2949 For more examples of using these attributes, see
2950 L<DBIx::Class::Manual::Cookbook>.
2951
2952 =head2 from
2953
2954 =over 4
2955
2956 =item Value: \@from_clause
2957
2958 =back
2959
2960 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2961 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2962 clauses.
2963
2964 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2965
2966 C<join> will usually do what you need and it is strongly recommended that you
2967 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2968 And we really do mean "cannot", not just tried and failed. Attempting to use
2969 this because you're having problems with C<join> is like trying to use x86
2970 ASM because you've got a syntax error in your C. Trust us on this.
2971
2972 Now, if you're still really, really sure you need to use this (and if you're
2973 not 100% sure, ask the mailing list first), here's an explanation of how this
2974 works.
2975
2976 The syntax is as follows -
2977
2978   [
2979     { <alias1> => <table1> },
2980     [
2981       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2982       [], # nested JOIN (optional)
2983       { <table1.column1> => <table2.column2>, ... (more conditions) },
2984     ],
2985     # More of the above [ ] may follow for additional joins
2986   ]
2987
2988   <table1> <alias1>
2989   JOIN
2990     <table2> <alias2>
2991     [JOIN ...]
2992   ON <table1.column1> = <table2.column2>
2993   <more joins may follow>
2994
2995 An easy way to follow the examples below is to remember the following:
2996
2997     Anything inside "[]" is a JOIN
2998     Anything inside "{}" is a condition for the enclosing JOIN
2999
3000 The following examples utilize a "person" table in a family tree application.
3001 In order to express parent->child relationships, this table is self-joined:
3002
3003     # Person->belongs_to('father' => 'Person');
3004     # Person->belongs_to('mother' => 'Person');
3005
3006 C<from> can be used to nest joins. Here we return all children with a father,
3007 then search against all mothers of those children:
3008
3009   $rs = $schema->resultset('Person')->search(
3010       undef,
3011       {
3012           alias => 'mother', # alias columns in accordance with "from"
3013           from => [
3014               { mother => 'person' },
3015               [
3016                   [
3017                       { child => 'person' },
3018                       [
3019                           { father => 'person' },
3020                           { 'father.person_id' => 'child.father_id' }
3021                       ]
3022                   ],
3023                   { 'mother.person_id' => 'child.mother_id' }
3024               ],
3025           ]
3026       },
3027   );
3028
3029   # Equivalent SQL:
3030   # SELECT mother.* FROM person mother
3031   # JOIN (
3032   #   person child
3033   #   JOIN person father
3034   #   ON ( father.person_id = child.father_id )
3035   # )
3036   # ON ( mother.person_id = child.mother_id )
3037
3038 The type of any join can be controlled manually. To search against only people
3039 with a father in the person table, we could explicitly use C<INNER JOIN>:
3040
3041     $rs = $schema->resultset('Person')->search(
3042         undef,
3043         {
3044             alias => 'child', # alias columns in accordance with "from"
3045             from => [
3046                 { child => 'person' },
3047                 [
3048                     { father => 'person', -join_type => 'inner' },
3049                     { 'father.id' => 'child.father_id' }
3050                 ],
3051             ]
3052         },
3053     );
3054
3055     # Equivalent SQL:
3056     # SELECT child.* FROM person child
3057     # INNER JOIN person father ON child.father_id = father.id
3058
3059 If you need to express really complex joins or you need a subselect, you
3060 can supply literal SQL to C<from> via a scalar reference. In this case
3061 the contents of the scalar will replace the table name asscoiated with the
3062 resultsource.
3063
3064 WARNING: This technique might very well not work as expected on chained
3065 searches - you have been warned.
3066
3067     # Assuming the Event resultsource is defined as:
3068
3069         MySchema::Event->add_columns (
3070             sequence => {
3071                 data_type => 'INT',
3072                 is_auto_increment => 1,
3073             },
3074             location => {
3075                 data_type => 'INT',
3076             },
3077             type => {
3078                 data_type => 'INT',
3079             },
3080         );
3081         MySchema::Event->set_primary_key ('sequence');
3082
3083     # This will get back the latest event for every location. The column
3084     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3085     # combo to limit the resultset
3086
3087     $rs = $schema->resultset('Event');
3088     $table = $rs->result_source->name;
3089     $latest = $rs->search (
3090         undef,
3091         { from => \ " 
3092             (SELECT e1.* FROM $table e1 
3093                 JOIN $table e2 
3094                     ON e1.location = e2.location 
3095                     AND e1.sequence < e2.sequence 
3096                 WHERE e2.sequence is NULL 
3097             ) me",
3098         },
3099     );
3100
3101     # Equivalent SQL (with the DBIC chunks added):
3102
3103     SELECT me.sequence, me.location, me.type FROM
3104        (SELECT e1.* FROM events e1
3105            JOIN events e2
3106                ON e1.location = e2.location
3107                AND e1.sequence < e2.sequence
3108            WHERE e2.sequence is NULL
3109        ) me;
3110
3111 =head2 for
3112
3113 =over 4
3114
3115 =item Value: ( 'update' | 'shared' )
3116
3117 =back
3118
3119 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3120 ... FOR SHARED.
3121
3122 =cut
3123
3124 1;