rewrite of _collapse_result to support prefetch of multiple has_many
[dbsrgits/DBIx-Class-Historic.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
23
24 =head1 SYNOPSIS
25
26   my $users_rs   = $schema->resultset('User');
27   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
28   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
29
30 =head1 DESCRIPTION
31
32 A ResultSet is an object which stores a set of conditions representing
33 a query. It is the backbone of DBIx::Class (i.e. the really
34 important/useful bit).
35
36 No SQL is executed on the database when a ResultSet is created, it
37 just stores all the conditions needed to create the query.
38
39 A basic ResultSet representing the data of an entire table is returned
40 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
41 L<Source|DBIx::Class::Manual::Glossary/Source> name.
42
43   my $users_rs = $schema->resultset('User');
44
45 A new ResultSet is returned from calling L</search> on an existing
46 ResultSet. The new one will contain all the conditions of the
47 original, plus any new conditions added in the C<search> call.
48
49 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
50 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
51 represents.
52
53 The query that the ResultSet represents is B<only> executed against
54 the database when these methods are called:
55 L</find> L</next> L</all> L</first> L</single> L</count>
56
57 =head1 EXAMPLES
58
59 =head2 Chaining resultsets
60
61 Let's say you've got a query that needs to be run to return some data
62 to the user. But, you have an authorization system in place that
63 prevents certain users from seeing certain information. So, you want
64 to construct the basic query in one method, but add constraints to it in
65 another.
66
67   sub get_data {
68     my $self = shift;
69     my $request = $self->get_request; # Get a request object somehow.
70     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
71
72     my $cd_rs = $schema->resultset('CD')->search({
73       title => $request->param('title'),
74       year => $request->param('year'),
75     });
76
77     $self->apply_security_policy( $cd_rs );
78
79     return $cd_rs->all();
80   }
81
82   sub apply_security_policy {
83     my $self = shift;
84     my ($rs) = @_;
85
86     return $rs->search({
87       subversive => 0,
88     });
89   }
90
91 =head3 Resolving conditions and attributes
92
93 When a resultset is chained from another resultset, conditions and
94 attributes with the same keys need resolving.
95
96 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
97 into the existing ones from the original resultset.
98
99 The L</where>, L</having> attribute, and any search conditions are
100 merged with an SQL C<AND> to the existing condition from the original
101 resultset.
102
103 All other attributes are overridden by any new ones supplied in the
104 search attributes.
105
106 =head2 Multiple queries
107
108 Since a resultset just defines a query, you can do all sorts of
109 things with it with the same object.
110
111   # Don't hit the DB yet.
112   my $cd_rs = $schema->resultset('CD')->search({
113     title => 'something',
114     year => 2009,
115   });
116
117   # Each of these hits the DB individually.
118   my $count = $cd_rs->count;
119   my $most_recent = $cd_rs->get_column('date_released')->max();
120   my @records = $cd_rs->all;
121
122 And it's not just limited to SELECT statements.
123
124   $cd_rs->delete();
125
126 This is even cooler:
127
128   $cd_rs->create({ artist => 'Fred' });
129
130 Which is the same as:
131
132   $schema->resultset('CD')->create({
133     title => 'something',
134     year => 2009,
135     artist => 'Fred'
136   });
137
138 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
139
140 =head1 OVERLOADING
141
142 If a resultset is used in a numeric context it returns the L</count>.
143 However, if it is used in a booleand context it is always true.  So if
144 you want to check if a resultset has any results use C<if $rs != 0>.
145 C<if $rs> will always be true.
146
147 =head1 METHODS
148
149 =head2 new
150
151 =over 4
152
153 =item Arguments: $source, \%$attrs
154
155 =item Return Value: $rs
156
157 =back
158
159 The resultset constructor. Takes a source object (usually a
160 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
161 L</ATTRIBUTES> below).  Does not perform any queries -- these are
162 executed as needed by the other methods.
163
164 Generally you won't need to construct a resultset manually.  You'll
165 automatically get one from e.g. a L</search> called in scalar context:
166
167   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
168
169 IMPORTANT: If called on an object, proxies to new_result instead so
170
171   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
172
173 will return a CD object, not a ResultSet.
174
175 =cut
176
177 sub new {
178   my $class = shift;
179   return $class->new_result(@_) if ref $class;
180
181   my ($source, $attrs) = @_;
182   $source = $source->handle
183     unless $source->isa('DBIx::Class::ResultSourceHandle');
184   $attrs = { %{$attrs||{}} };
185
186   if ($attrs->{page}) {
187     $attrs->{rows} ||= 10;
188   }
189
190   $attrs->{alias} ||= 'me';
191
192   # Creation of {} and bless separated to mitigate RH perl bug
193   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
194   my $self = {
195     _source_handle => $source,
196     cond => $attrs->{where},
197     count => undef,
198     pager => undef,
199     attrs => $attrs
200   };
201
202   bless $self, $class;
203
204   $self->result_class(
205     $attrs->{result_class} || $source->resolve->result_class
206   );
207
208   return $self;
209 }
210
211 =head2 search
212
213 =over 4
214
215 =item Arguments: $cond, \%attrs?
216
217 =item Return Value: $resultset (scalar context), @row_objs (list context)
218
219 =back
220
221   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
222   my $new_rs = $cd_rs->search({ year => 2005 });
223
224   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
225                  # year = 2005 OR year = 2004
226
227 If you need to pass in additional attributes but no additional condition,
228 call it as C<search(undef, \%attrs)>.
229
230   # "SELECT name, artistid FROM $artist_table"
231   my @all_artists = $schema->resultset('Artist')->search(undef, {
232     columns => [qw/name artistid/],
233   });
234
235 For a list of attributes that can be passed to C<search>, see
236 L</ATTRIBUTES>. For more examples of using this function, see
237 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
238 documentation for the first argument, see L<SQL::Abstract>.
239
240 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
241
242 =cut
243
244 sub search {
245   my $self = shift;
246   my $rs = $self->search_rs( @_ );
247   return (wantarray ? $rs->all : $rs);
248 }
249
250 =head2 search_rs
251
252 =over 4
253
254 =item Arguments: $cond, \%attrs?
255
256 =item Return Value: $resultset
257
258 =back
259
260 This method does the same exact thing as search() except it will
261 always return a resultset, even in list context.
262
263 =cut
264
265 sub search_rs {
266   my $self = shift;
267
268   # Special-case handling for (undef, undef).
269   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
270     pop(@_); pop(@_);
271   }
272
273   my $attrs = {};
274   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
275   my $our_attrs = { %{$self->{attrs}} };
276   my $having = delete $our_attrs->{having};
277   my $where = delete $our_attrs->{where};
278
279   my $rows;
280
281   my %safe = (alias => 1, cache => 1);
282
283   unless (
284     (@_ && defined($_[0])) # @_ == () or (undef)
285     ||
286     (keys %$attrs # empty attrs or only 'safe' attrs
287     && List::Util::first { !$safe{$_} } keys %$attrs)
288   ) {
289     # no search, effectively just a clone
290     $rows = $self->get_cache;
291   }
292
293   my $new_attrs = { %{$our_attrs}, %{$attrs} };
294
295   # merge new attrs into inherited
296   foreach my $key (qw/join prefetch +select +as bind/) {
297     next unless exists $attrs->{$key};
298     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
299   }
300
301   my $cond = (@_
302     ? (
303         (@_ == 1 || ref $_[0] eq "HASH")
304           ? (
305               (ref $_[0] eq 'HASH')
306                 ? (
307                     (keys %{ $_[0] }  > 0)
308                       ? shift
309                       : undef
310                    )
311                 :  shift
312              )
313           : (
314               (@_ % 2)
315                 ? $self->throw_exception("Odd number of arguments to search")
316                 : {@_}
317              )
318       )
319     : undef
320   );
321
322   if (defined $where) {
323     $new_attrs->{where} = (
324       defined $new_attrs->{where}
325         ? { '-and' => [
326               map {
327                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
328               } $where, $new_attrs->{where}
329             ]
330           }
331         : $where);
332   }
333
334   if (defined $cond) {
335     $new_attrs->{where} = (
336       defined $new_attrs->{where}
337         ? { '-and' => [
338               map {
339                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
340               } $cond, $new_attrs->{where}
341             ]
342           }
343         : $cond);
344   }
345
346   if (defined $having) {
347     $new_attrs->{having} = (
348       defined $new_attrs->{having}
349         ? { '-and' => [
350               map {
351                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
352               } $having, $new_attrs->{having}
353             ]
354           }
355         : $having);
356   }
357
358   my $rs = (ref $self)->new($self->result_source, $new_attrs);
359   if ($rows) {
360     $rs->set_cache($rows);
361   }
362   return $rs;
363 }
364
365 =head2 search_literal
366
367 =over 4
368
369 =item Arguments: $sql_fragment, @bind_values
370
371 =item Return Value: $resultset (scalar context), @row_objs (list context)
372
373 =back
374
375   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
376   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
377
378 Pass a literal chunk of SQL to be added to the conditional part of the
379 resultset query.
380
381 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
382 only be used in that context. C<search_literal> is a convenience method.
383 It is equivalent to calling $schema->search(\[]), but if you want to ensure
384 columns are bound correctly, use C<search>.
385
386 Example of how to use C<search> instead of C<search_literal>
387
388   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
389   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
390
391
392 See L<DBIx::Class::Manual::Cookbook/Searching> and
393 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
394 require C<search_literal>.
395
396 =cut
397
398 sub search_literal {
399   my ($self, $sql, @bind) = @_;
400   my $attr;
401   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
402     $attr = pop @bind;
403   }
404   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
405 }
406
407 =head2 find
408
409 =over 4
410
411 =item Arguments: @values | \%cols, \%attrs?
412
413 =item Return Value: $row_object | undef
414
415 =back
416
417 Finds a row based on its primary key or unique constraint. For example, to find
418 a row by its primary key:
419
420   my $cd = $schema->resultset('CD')->find(5);
421
422 You can also find a row by a specific unique constraint using the C<key>
423 attribute. For example:
424
425   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
426     key => 'cd_artist_title'
427   });
428
429 Additionally, you can specify the columns explicitly by name:
430
431   my $cd = $schema->resultset('CD')->find(
432     {
433       artist => 'Massive Attack',
434       title  => 'Mezzanine',
435     },
436     { key => 'cd_artist_title' }
437   );
438
439 If the C<key> is specified as C<primary>, it searches only on the primary key.
440
441 If no C<key> is specified, it searches on all unique constraints defined on the
442 source for which column data is provided, including the primary key.
443
444 If your table does not have a primary key, you B<must> provide a value for the
445 C<key> attribute matching one of the unique constraints on the source.
446
447 In addition to C<key>, L</find> recognizes and applies standard
448 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
449
450 Note: If your query does not return only one row, a warning is generated:
451
452   Query returned more than one row
453
454 See also L</find_or_create> and L</update_or_create>. For information on how to
455 declare unique constraints, see
456 L<DBIx::Class::ResultSource/add_unique_constraint>.
457
458 =cut
459
460 sub find {
461   my $self = shift;
462   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
463
464   # Default to the primary key, but allow a specific key
465   my @cols = exists $attrs->{key}
466     ? $self->result_source->unique_constraint_columns($attrs->{key})
467     : $self->result_source->primary_columns;
468   $self->throw_exception(
469     "Can't find unless a primary key is defined or unique constraint is specified"
470   ) unless @cols;
471
472   # Parse out a hashref from input
473   my $input_query;
474   if (ref $_[0] eq 'HASH') {
475     $input_query = { %{$_[0]} };
476   }
477   elsif (@_ == @cols) {
478     $input_query = {};
479     @{$input_query}{@cols} = @_;
480   }
481   else {
482     # Compatibility: Allow e.g. find(id => $value)
483     carp "Find by key => value deprecated; please use a hashref instead";
484     $input_query = {@_};
485   }
486
487   my (%related, $info);
488
489   KEY: foreach my $key (keys %$input_query) {
490     if (ref($input_query->{$key})
491         && ($info = $self->result_source->relationship_info($key))) {
492       my $val = delete $input_query->{$key};
493       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
494       my $rel_q = $self->result_source->_resolve_condition(
495                     $info->{cond}, $val, $key
496                   );
497       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
498       @related{keys %$rel_q} = values %$rel_q;
499     }
500   }
501   if (my @keys = keys %related) {
502     @{$input_query}{@keys} = values %related;
503   }
504
505
506   # Build the final query: Default to the disjunction of the unique queries,
507   # but allow the input query in case the ResultSet defines the query or the
508   # user is abusing find
509   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
510   my $query;
511   if (exists $attrs->{key}) {
512     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
513     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
514     $query = $self->_add_alias($unique_query, $alias);
515   }
516   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
517     # This means that we got here after a merger of relationship conditions
518     # in ::Relationship::Base::search_related (the row method), and furthermore
519     # the relationship is of the 'single' type. This means that the condition
520     # provided by the relationship (already attached to $self) is sufficient,
521     # as there can be only one row in the databse that would satisfy the 
522     # relationship
523   }
524   else {
525     my @unique_queries = $self->_unique_queries($input_query, $attrs);
526     $query = @unique_queries
527       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
528       : $self->_add_alias($input_query, $alias);
529   }
530
531   # Run the query
532   my $rs = $self->search ($query, $attrs);
533   if (keys %{$rs->_resolved_attrs->{collapse}}) {
534     my $row = $rs->next;
535     carp "Query returned more than one row" if $rs->next;
536     return $row;
537   }
538   else {
539     return $rs->single;
540   }
541 }
542
543 # _add_alias
544 #
545 # Add the specified alias to the specified query hash. A copy is made so the
546 # original query is not modified.
547
548 sub _add_alias {
549   my ($self, $query, $alias) = @_;
550
551   my %aliased = %$query;
552   foreach my $col (grep { ! m/\./ } keys %aliased) {
553     $aliased{"$alias.$col"} = delete $aliased{$col};
554   }
555
556   return \%aliased;
557 }
558
559 # _unique_queries
560 #
561 # Build a list of queries which satisfy unique constraints.
562
563 sub _unique_queries {
564   my ($self, $query, $attrs) = @_;
565
566   my @constraint_names = exists $attrs->{key}
567     ? ($attrs->{key})
568     : $self->result_source->unique_constraint_names;
569
570   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
571   my $num_where = scalar keys %$where;
572
573   my @unique_queries;
574   foreach my $name (@constraint_names) {
575     my @unique_cols = $self->result_source->unique_constraint_columns($name);
576     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
577
578     my $num_cols = scalar @unique_cols;
579     my $num_query = scalar keys %$unique_query;
580
581     my $total = $num_query + $num_where;
582     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
583       # The query is either unique on its own or is unique in combination with
584       # the existing where clause
585       push @unique_queries, $unique_query;
586     }
587   }
588
589   return @unique_queries;
590 }
591
592 # _build_unique_query
593 #
594 # Constrain the specified query hash based on the specified column names.
595
596 sub _build_unique_query {
597   my ($self, $query, $unique_cols) = @_;
598
599   return {
600     map  { $_ => $query->{$_} }
601     grep { exists $query->{$_} }
602       @$unique_cols
603   };
604 }
605
606 =head2 search_related
607
608 =over 4
609
610 =item Arguments: $rel, $cond, \%attrs?
611
612 =item Return Value: $new_resultset
613
614 =back
615
616   $new_rs = $cd_rs->search_related('artist', {
617     name => 'Emo-R-Us',
618   });
619
620 Searches the specified relationship, optionally specifying a condition and
621 attributes for matching records. See L</ATTRIBUTES> for more information.
622
623 =cut
624
625 sub search_related {
626   return shift->related_resultset(shift)->search(@_);
627 }
628
629 =head2 search_related_rs
630
631 This method works exactly the same as search_related, except that
632 it guarantees a restultset, even in list context.
633
634 =cut
635
636 sub search_related_rs {
637   return shift->related_resultset(shift)->search_rs(@_);
638 }
639
640 =head2 cursor
641
642 =over 4
643
644 =item Arguments: none
645
646 =item Return Value: $cursor
647
648 =back
649
650 Returns a storage-driven cursor to the given resultset. See
651 L<DBIx::Class::Cursor> for more information.
652
653 =cut
654
655 sub cursor {
656   my ($self) = @_;
657
658   my $attrs = $self->_resolved_attrs_copy;
659
660   return $self->{cursor}
661     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
662           $attrs->{where},$attrs);
663 }
664
665 =head2 single
666
667 =over 4
668
669 =item Arguments: $cond?
670
671 =item Return Value: $row_object?
672
673 =back
674
675   my $cd = $schema->resultset('CD')->single({ year => 2001 });
676
677 Inflates the first result without creating a cursor if the resultset has
678 any records in it; if not returns nothing. Used by L</find> as a lean version of
679 L</search>.
680
681 While this method can take an optional search condition (just like L</search>)
682 being a fast-code-path it does not recognize search attributes. If you need to
683 add extra joins or similar, call L</search> and then chain-call L</single> on the
684 L<DBIx::Class::ResultSet> returned.
685
686 =over
687
688 =item B<Note>
689
690 As of 0.08100, this method enforces the assumption that the preceeding
691 query returns only one row. If more than one row is returned, you will receive
692 a warning:
693
694   Query returned more than one row
695
696 In this case, you should be using L</next> or L</find> instead, or if you really
697 know what you are doing, use the L</rows> attribute to explicitly limit the size
698 of the resultset.
699
700 This method will also throw an exception if it is called on a resultset prefetching
701 has_many, as such a prefetch implies fetching multiple rows from the database in
702 order to assemble the resulting object.
703
704 =back
705
706 =cut
707
708 sub single {
709   my ($self, $where) = @_;
710   if(@_ > 2) {
711       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
712   }
713
714   my $attrs = $self->_resolved_attrs_copy;
715
716   if (keys %{$attrs->{collapse}}) {
717     $self->throw_exception(
718       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
719     );
720   }
721
722   if ($where) {
723     if (defined $attrs->{where}) {
724       $attrs->{where} = {
725         '-and' =>
726             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
727                $where, delete $attrs->{where} ]
728       };
729     } else {
730       $attrs->{where} = $where;
731     }
732   }
733
734 #  XXX: Disabled since it doesn't infer uniqueness in all cases
735 #  unless ($self->_is_unique_query($attrs->{where})) {
736 #    carp "Query not guaranteed to return a single row"
737 #      . "; please declare your unique constraints or use search instead";
738 #  }
739
740   my @data = $self->result_source->storage->select_single(
741     $attrs->{from}, $attrs->{select},
742     $attrs->{where}, $attrs
743   );
744
745   return (@data ? ($self->_construct_object(@data))[0] : undef);
746 }
747
748
749 # _is_unique_query
750 #
751 # Try to determine if the specified query is guaranteed to be unique, based on
752 # the declared unique constraints.
753
754 sub _is_unique_query {
755   my ($self, $query) = @_;
756
757   my $collapsed = $self->_collapse_query($query);
758   my $alias = $self->{attrs}{alias};
759
760   foreach my $name ($self->result_source->unique_constraint_names) {
761     my @unique_cols = map {
762       "$alias.$_"
763     } $self->result_source->unique_constraint_columns($name);
764
765     # Count the values for each unique column
766     my %seen = map { $_ => 0 } @unique_cols;
767
768     foreach my $key (keys %$collapsed) {
769       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
770       next unless exists $seen{$aliased};  # Additional constraints are okay
771       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
772     }
773
774     # If we get 0 or more than 1 value for a column, it's not necessarily unique
775     return 1 unless grep { $_ != 1 } values %seen;
776   }
777
778   return 0;
779 }
780
781 # _collapse_query
782 #
783 # Recursively collapse the query, accumulating values for each column.
784
785 sub _collapse_query {
786   my ($self, $query, $collapsed) = @_;
787
788   $collapsed ||= {};
789
790   if (ref $query eq 'ARRAY') {
791     foreach my $subquery (@$query) {
792       next unless ref $subquery;  # -or
793       $collapsed = $self->_collapse_query($subquery, $collapsed);
794     }
795   }
796   elsif (ref $query eq 'HASH') {
797     if (keys %$query and (keys %$query)[0] eq '-and') {
798       foreach my $subquery (@{$query->{-and}}) {
799         $collapsed = $self->_collapse_query($subquery, $collapsed);
800       }
801     }
802     else {
803       foreach my $col (keys %$query) {
804         my $value = $query->{$col};
805         $collapsed->{$col}{$value}++;
806       }
807     }
808   }
809
810   return $collapsed;
811 }
812
813 =head2 get_column
814
815 =over 4
816
817 =item Arguments: $cond?
818
819 =item Return Value: $resultsetcolumn
820
821 =back
822
823   my $max_length = $rs->get_column('length')->max;
824
825 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
826
827 =cut
828
829 sub get_column {
830   my ($self, $column) = @_;
831   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
832   return $new;
833 }
834
835 =head2 search_like
836
837 =over 4
838
839 =item Arguments: $cond, \%attrs?
840
841 =item Return Value: $resultset (scalar context), @row_objs (list context)
842
843 =back
844
845   # WHERE title LIKE '%blue%'
846   $cd_rs = $rs->search_like({ title => '%blue%'});
847
848 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
849 that this is simply a convenience method retained for ex Class::DBI users.
850 You most likely want to use L</search> with specific operators.
851
852 For more information, see L<DBIx::Class::Manual::Cookbook>.
853
854 This method is deprecated and will be removed in 0.09. Use L</search()>
855 instead. An example conversion is:
856
857   ->search_like({ foo => 'bar' });
858
859   # Becomes
860
861   ->search({ foo => { like => 'bar' } });
862
863 =cut
864
865 sub search_like {
866   my $class = shift;
867   carp (
868     'search_like() is deprecated and will be removed in DBIC version 0.09.'
869    .' Instead use ->search({ x => { -like => "y%" } })'
870    .' (note the outer pair of {}s - they are important!)'
871   );
872   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
873   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
874   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
875   return $class->search($query, { %$attrs });
876 }
877
878 =head2 slice
879
880 =over 4
881
882 =item Arguments: $first, $last
883
884 =item Return Value: $resultset (scalar context), @row_objs (list context)
885
886 =back
887
888 Returns a resultset or object list representing a subset of elements from the
889 resultset slice is called on. Indexes are from 0, i.e., to get the first
890 three records, call:
891
892   my ($one, $two, $three) = $rs->slice(0, 2);
893
894 =cut
895
896 sub slice {
897   my ($self, $min, $max) = @_;
898   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
899   $attrs->{offset} = $self->{attrs}{offset} || 0;
900   $attrs->{offset} += $min;
901   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
902   return $self->search(undef(), $attrs);
903   #my $slice = (ref $self)->new($self->result_source, $attrs);
904   #return (wantarray ? $slice->all : $slice);
905 }
906
907 =head2 next
908
909 =over 4
910
911 =item Arguments: none
912
913 =item Return Value: $result?
914
915 =back
916
917 Returns the next element in the resultset (C<undef> is there is none).
918
919 Can be used to efficiently iterate over records in the resultset:
920
921   my $rs = $schema->resultset('CD')->search;
922   while (my $cd = $rs->next) {
923     print $cd->title;
924   }
925
926 Note that you need to store the resultset object, and call C<next> on it.
927 Calling C<< resultset('Table')->next >> repeatedly will always return the
928 first record from the resultset.
929
930 =cut
931
932 sub next {
933   my ($self) = @_;
934   if (my $cache = $self->get_cache) {
935     $self->{all_cache_position} ||= 0;
936     return $cache->[$self->{all_cache_position}++];
937   }
938   if ($self->{attrs}{cache}) {
939     $self->{all_cache_position} = 1;
940     return ($self->all)[0];
941   }
942   if ($self->{stashed_objects}) {
943     my $obj = shift(@{$self->{stashed_objects}});
944     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
945     return $obj;
946   }
947   my @row = (
948     exists $self->{stashed_row}
949       ? @{delete $self->{stashed_row}}
950       : $self->cursor->next
951   );
952   return undef unless (@row);
953   my ($row, @more) = $self->_construct_object(@row);
954   $self->{stashed_objects} = \@more if @more;
955   return $row;
956 }
957
958 sub _construct_object {
959   my ($self, @row) = @_;
960
961   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
962     or return ();
963   my @new = $self->result_class->inflate_result($self->result_source, @$info);
964   @new = $self->{_attrs}{record_filter}->(@new)
965     if exists $self->{_attrs}{record_filter};
966   return @new;
967 }
968
969 # _unflatten_result takes a row hashref which looks like this:
970 # $VAR1 = {
971 #   'cd.artist.artistid' => '1',
972 #   'cd.artist' => '1',
973 #   'cd_id' => '1',
974 #   'cd.genreid' => undef,
975 #   'cd.year' => '1999',
976 #   'cd.title' => 'Spoonful of bees',
977 #   'cd.single_track' => undef,
978 #   'cd.artist.name' => 'Caterwauler McCrae',
979 #   'cd.artist.rank' => '13',
980 #   'cd.artist.charfield' => undef,
981 #   'cd.cdid' => '1'
982 # };
983
984 # and generates the following structure:
985
986 # $VAR1 = [
987 #   {
988 #     'cd_id' => '1'
989 #   },
990 #   {
991 #     'cd' => [
992 #       {
993 #         'single_track' => undef,
994 #         'cdid' => '1',
995 #         'artist' => '1',
996 #         'title' => 'Spoonful of bees',
997 #         'year' => '1999',
998 #         'genreid' => undef
999 #       },
1000 #       {
1001 #         'artist' => [
1002 #           {
1003 #             'artistid' => '1',
1004 #             'charfield' => undef,
1005 #             'name' => 'Caterwauler McCrae',
1006 #             'rank' => '13'
1007 #           }
1008 #         ]
1009 #       }
1010 #     ]
1011 #   }
1012 # ];  
1013
1014 # It returns one row object which consists of an arrayref with two
1015 # elements. The first contains the plain column data, the second 
1016 # contains the data of relationships. Those are row arrayrefs, themselves.
1017
1018 # it's a recursive function. It needs to request the relationship_info
1019 # to decide whether to put the data of a relationship in a hashref
1020 # (i.e. belongs_to) or an arrayref (i.e. has_many).
1021
1022 sub _unflatten_result {
1023     my ( $self, $row ) = @_;
1024
1025     my $columns = {};
1026     my $rels    = {};
1027
1028     foreach my $column ( sort keys %$row ) {
1029         if ( $column =~ /^(.*?)\.(.*)$/ ) {
1030             $rels->{$1} ||= {};
1031             $rels->{$1}->{$2} = $row->{$column};
1032         }
1033         else {
1034             $columns->{$column} = $row->{$column};
1035         }
1036     }
1037
1038     foreach my $rel ( sort keys %$rels ) {
1039         my $rel_info = $self->result_source->relationship_info($rel);
1040         $rels->{$rel} =
1041           $self->related_resultset($rel)->_unflatten_result( $rels->{$rel} );
1042         $rels->{$rel} = [ $rels->{$rel} ]
1043           if ( $rel_info->{attrs}->{accessor} eq 'multi' );
1044     }
1045
1046     return keys %$rels ? [ $columns, $rels ] : [$columns];
1047 }
1048
1049 # two arguments: $as_proto is an arrayref of column names,
1050 # $row_ref is an arrayref of the data. If none of the row data
1051 # is defined we return undef (that's copied from the old
1052 # _collapse_result). Next we decide whether we need to collapse
1053 # the resultset (i.e. we prefetch something) or not. $collapse
1054 # indicates that. The do-while loop will run once if we do not need
1055 # to collapse the result and will run as long as _merge_result returns
1056 # a true value. It will return undef if the current added row does not
1057 # match the previous row. A bit of stashing and cursor magic is
1058 # required so that the cursor is not mixed up.
1059
1060 # "$rows" is a bit misleading. In the end, there should only be one
1061 # element in this arrayref. 
1062
1063 sub _collapse_result {
1064     my ( $self, $as_proto, $row_ref ) = @_;
1065     my $has_def;
1066     for (@$row_ref) {
1067         if ( defined $_ ) {
1068             $has_def++;
1069             last;
1070         }
1071     }
1072     return undef unless $has_def;
1073
1074     my $collapse = keys %{ $self->{_attrs}{collapse} || {} };
1075     my $rows     = [];
1076     my @row      = @$row_ref;
1077     do {
1078         my $i = 0;
1079         my $row = { map { $_ => $row[ $i++ ] } @$as_proto };
1080         $row = $self->_unflatten_result($row);
1081         unless ( scalar @$rows ) {
1082             push( @$rows, $row );
1083         }
1084         $collapse = undef unless ( $self->_merge_result( $rows, $row ) );
1085       } while (
1086         $collapse
1087         && do { @row = $self->cursor->next; $self->{stashed_row} = \@row if @row; }
1088       );
1089
1090     return $rows->[0];
1091
1092 }
1093
1094 # _merge_result accepts an arrayref of rows objects (again, an arrayref of two elements)
1095 # and a row object which should be merged into the first object.
1096 # First we try to find out whether $row is already in $rows. If this is the case
1097 # we try to merge them by iteration through their relationship data. We call
1098 # _merge_result again on them, so they get merged.
1099
1100 # If we don't find the $row in $rows, we append it to $rows and return undef.
1101 # _merge_result returns 1 otherwise (i.e. $row has been found in $rows).
1102
1103 sub _merge_result {
1104     my ( $self, $rows, $row ) = @_;
1105     my ( $columns, $rels ) = @$row;
1106     my $found = undef;
1107     foreach my $seen (@$rows) {
1108         my $match = 1;
1109         foreach my $column ( keys %$columns ) {
1110             if (   defined $seen->[0]->{$column} ^ defined $columns->{$column}
1111                 or defined $columns->{$column}
1112                 && $seen->[0]->{$column} ne $columns->{$column} )
1113             {
1114
1115                 $match = 0;
1116                 last;
1117             }
1118         }
1119         if ($match) {
1120             $found = $seen;
1121             last;
1122         }
1123     }
1124     if ($found) {
1125         foreach my $rel ( keys %$rels ) {
1126             my $old_rows = $found->[1]->{$rel};
1127             $self->_merge_result(
1128                 ref $found->[1]->{$rel}->[0] eq 'HASH' ? [ $found->[1]->{$rel} ]
1129                 : $found->[1]->{$rel},
1130                 ref $rels->{$rel}->[0] eq 'HASH' ? [ $rels->{$rel}->[0], $rels->{$rel}->[1] ]
1131                 : $rels->{$rel}->[0]
1132             );
1133
1134         }
1135
1136     }
1137     else {
1138         push( @$rows, $row );
1139         return undef;
1140     }
1141
1142     return 1;
1143 }
1144
1145
1146 =head2 result_source
1147
1148 =over 4
1149
1150 =item Arguments: $result_source?
1151
1152 =item Return Value: $result_source
1153
1154 =back
1155
1156 An accessor for the primary ResultSource object from which this ResultSet
1157 is derived.
1158
1159 =head2 result_class
1160
1161 =over 4
1162
1163 =item Arguments: $result_class?
1164
1165 =item Return Value: $result_class
1166
1167 =back
1168
1169 An accessor for the class to use when creating row objects. Defaults to
1170 C<< result_source->result_class >> - which in most cases is the name of the
1171 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1172
1173 Note that changing the result_class will also remove any components
1174 that were originally loaded in the source class via
1175 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1176 in the original source class will not run.
1177
1178 =cut
1179
1180 sub result_class {
1181   my ($self, $result_class) = @_;
1182   if ($result_class) {
1183     $self->ensure_class_loaded($result_class);
1184     $self->_result_class($result_class);
1185   }
1186   $self->_result_class;
1187 }
1188
1189 =head2 count
1190
1191 =over 4
1192
1193 =item Arguments: $cond, \%attrs??
1194
1195 =item Return Value: $count
1196
1197 =back
1198
1199 Performs an SQL C<COUNT> with the same query as the resultset was built
1200 with to find the number of elements. Passing arguments is equivalent to
1201 C<< $rs->search ($cond, \%attrs)->count >>
1202
1203 =cut
1204
1205 sub count {
1206   my $self = shift;
1207   return $self->search(@_)->count if @_ and defined $_[0];
1208   return scalar @{ $self->get_cache } if $self->get_cache;
1209
1210   my $attrs = $self->_resolved_attrs_copy;
1211
1212   # this is a little optimization - it is faster to do the limit
1213   # adjustments in software, instead of a subquery
1214   my $rows = delete $attrs->{rows};
1215   my $offset = delete $attrs->{offset};
1216
1217   my $crs;
1218   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1219     $crs = $self->_count_subq_rs ($attrs);
1220   }
1221   else {
1222     $crs = $self->_count_rs ($attrs);
1223   }
1224   my $count = $crs->next;
1225
1226   $count -= $offset if $offset;
1227   $count = $rows if $rows and $rows < $count;
1228   $count = 0 if ($count < 0);
1229
1230   return $count;
1231 }
1232
1233 =head2 count_rs
1234
1235 =over 4
1236
1237 =item Arguments: $cond, \%attrs??
1238
1239 =item Return Value: $count_rs
1240
1241 =back
1242
1243 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1244 This can be very handy for subqueries:
1245
1246   ->search( { amount => $some_rs->count_rs->as_query } )
1247
1248 As with regular resultsets the SQL query will be executed only after
1249 the resultset is accessed via L</next> or L</all>. That would return
1250 the same single value obtainable via L</count>.
1251
1252 =cut
1253
1254 sub count_rs {
1255   my $self = shift;
1256   return $self->search(@_)->count_rs if @_;
1257
1258   # this may look like a lack of abstraction (count() does about the same)
1259   # but in fact an _rs *must* use a subquery for the limits, as the
1260   # software based limiting can not be ported if this $rs is to be used
1261   # in a subquery itself (i.e. ->as_query)
1262   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1263     return $self->_count_subq_rs;
1264   }
1265   else {
1266     return $self->_count_rs;
1267   }
1268 }
1269
1270 #
1271 # returns a ResultSetColumn object tied to the count query
1272 #
1273 sub _count_rs {
1274   my ($self, $attrs) = @_;
1275
1276   my $rsrc = $self->result_source;
1277   $attrs ||= $self->_resolved_attrs;
1278
1279   my $tmp_attrs = { %$attrs };
1280
1281   # take off any limits, record_filter is cdbi, and no point of ordering a count 
1282   delete $tmp_attrs->{$_} for (qw/select as rows offset order_by record_filter/);
1283
1284   # overwrite the selector (supplied by the storage)
1285   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
1286   $tmp_attrs->{as} = 'count';
1287
1288   # read the comment on top of the actual function to see what this does
1289   $tmp_attrs->{from} = $self->_switch_to_inner_join_if_needed (
1290     $tmp_attrs->{from}, $tmp_attrs->{alias}
1291   );
1292
1293   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1294
1295   return $tmp_rs;
1296 }
1297
1298 #
1299 # same as above but uses a subquery
1300 #
1301 sub _count_subq_rs {
1302   my ($self, $attrs) = @_;
1303
1304   my $rsrc = $self->result_source;
1305   $attrs ||= $self->_resolved_attrs_copy;
1306
1307   my $sub_attrs = { %$attrs };
1308
1309   # extra selectors do not go in the subquery and there is no point of ordering it
1310   delete $sub_attrs->{$_} for qw/collapse select _prefetch_select as order_by/;
1311
1312   # if we prefetch, we group_by primary keys only as this is what we would get out
1313   # of the rs via ->next/->all. We DO WANT to clobber old group_by regardless
1314   if ( keys %{$attrs->{collapse}} ) {
1315     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->primary_columns) ]
1316   }
1317
1318   $sub_attrs->{select} = $rsrc->storage->_subq_count_select ($rsrc, $sub_attrs);
1319
1320   # read the comment on top of the actual function to see what this does
1321   $sub_attrs->{from} = $self->_switch_to_inner_join_if_needed (
1322     $sub_attrs->{from}, $sub_attrs->{alias}
1323   );
1324
1325   # this is so that ordering can be thrown away in things like Top limit
1326   $sub_attrs->{-for_count_only} = 1;
1327
1328   my $sub_rs = $rsrc->resultset_class->new ($rsrc, $sub_attrs);
1329
1330   $attrs->{from} = [{
1331     -alias => 'count_subq',
1332     -source_handle => $rsrc->handle,
1333     count_subq => $sub_rs->as_query,
1334   }];
1335
1336   # the subquery replaces this
1337   delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
1338
1339   return $self->_count_rs ($attrs);
1340 }
1341
1342
1343 # The DBIC relationship chaining implementation is pretty simple - every
1344 # new related_relationship is pushed onto the {from} stack, and the {select}
1345 # window simply slides further in. This means that when we count somewhere
1346 # in the middle, we got to make sure that everything in the join chain is an
1347 # actual inner join, otherwise the count will come back with unpredictable
1348 # results (a resultset may be generated with _some_ rows regardless of if
1349 # the relation which the $rs currently selects has rows or not). E.g.
1350 # $artist_rs->cds->count - normally generates:
1351 # SELECT COUNT( * ) FROM artist me LEFT JOIN cd cds ON cds.artist = me.artistid
1352 # which actually returns the number of artists * (number of cds || 1)
1353 #
1354 # So what we do here is crawl {from}, determine if the current alias is at
1355 # the top of the stack, and if not - make sure the chain is inner-joined down
1356 # to the root.
1357 #
1358 sub _switch_to_inner_join_if_needed {
1359   my ($self, $from, $alias) = @_;
1360
1361   # subqueries and other oddness is naturally not supported
1362   return $from if (
1363     ref $from ne 'ARRAY'
1364       ||
1365     @$from <= 1
1366       ||
1367     ref $from->[0] ne 'HASH'
1368       ||
1369     ! $from->[0]{-alias}
1370       ||
1371     $from->[0]{-alias} eq $alias
1372   );
1373
1374   my $switch_branch;
1375   JOINSCAN:
1376   for my $j (@{$from}[1 .. $#$from]) {
1377     if ($j->[0]{-alias} eq $alias) {
1378       $switch_branch = $j->[0]{-join_path};
1379       last JOINSCAN;
1380     }
1381   }
1382
1383   # something else went wrong
1384   return $from unless $switch_branch;
1385
1386   # So it looks like we will have to switch some stuff around.
1387   # local() is useless here as we will be leaving the scope
1388   # anyway, and deep cloning is just too fucking expensive
1389   # So replace the inner hashref manually
1390   my @new_from = ($from->[0]);
1391   my $sw_idx = { map { $_ => 1 } @$switch_branch };
1392
1393   for my $j (@{$from}[1 .. $#$from]) {
1394     my $jalias = $j->[0]{-alias};
1395
1396     if ($sw_idx->{$jalias}) {
1397       my %attrs = %{$j->[0]};
1398       delete $attrs{-join_type};
1399       push @new_from, [
1400         \%attrs,
1401         @{$j}[ 1 .. $#$j ],
1402       ];
1403     }
1404     else {
1405       push @new_from, $j;
1406     }
1407   }
1408
1409   return \@new_from;
1410 }
1411
1412
1413 sub _bool {
1414   return 1;
1415 }
1416
1417 =head2 count_literal
1418
1419 =over 4
1420
1421 =item Arguments: $sql_fragment, @bind_values
1422
1423 =item Return Value: $count
1424
1425 =back
1426
1427 Counts the results in a literal query. Equivalent to calling L</search_literal>
1428 with the passed arguments, then L</count>.
1429
1430 =cut
1431
1432 sub count_literal { shift->search_literal(@_)->count; }
1433
1434 =head2 all
1435
1436 =over 4
1437
1438 =item Arguments: none
1439
1440 =item Return Value: @objects
1441
1442 =back
1443
1444 Returns all elements in the resultset. Called implicitly if the resultset
1445 is returned in list context.
1446
1447 =cut
1448
1449 sub all {
1450   my $self = shift;
1451   if(@_) {
1452       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1453   }
1454
1455   return @{ $self->get_cache } if $self->get_cache;
1456
1457   my @obj;
1458
1459   if (keys %{$self->_resolved_attrs->{collapse}}) {
1460     # Using $self->cursor->all is really just an optimisation.
1461     # If we're collapsing has_many prefetches it probably makes
1462     # very little difference, and this is cleaner than hacking
1463     # _construct_object to survive the approach
1464     $self->cursor->reset;
1465     my @row = $self->cursor->next;
1466     while (@row) {
1467       push(@obj, $self->_construct_object(@row));
1468       @row = (exists $self->{stashed_row}
1469                ? @{delete $self->{stashed_row}}
1470                : $self->cursor->next);
1471     }
1472   } else {
1473     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1474   }
1475
1476   $self->set_cache(\@obj) if $self->{attrs}{cache};
1477
1478   return @obj;
1479 }
1480
1481 =head2 reset
1482
1483 =over 4
1484
1485 =item Arguments: none
1486
1487 =item Return Value: $self
1488
1489 =back
1490
1491 Resets the resultset's cursor, so you can iterate through the elements again.
1492 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1493 another query.
1494
1495 =cut
1496
1497 sub reset {
1498   my ($self) = @_;
1499   delete $self->{_attrs} if exists $self->{_attrs};
1500   $self->{all_cache_position} = 0;
1501   $self->cursor->reset;
1502   return $self;
1503 }
1504
1505 =head2 first
1506
1507 =over 4
1508
1509 =item Arguments: none
1510
1511 =item Return Value: $object?
1512
1513 =back
1514
1515 Resets the resultset and returns an object for the first result (if the
1516 resultset returns anything).
1517
1518 =cut
1519
1520 sub first {
1521   return $_[0]->reset->next;
1522 }
1523
1524
1525 # _rs_update_delete
1526 #
1527 # Determines whether and what type of subquery is required for the $rs operation.
1528 # If grouping is necessary either supplies its own, or verifies the current one
1529 # After all is done delegates to the proper storage method.
1530
1531 sub _rs_update_delete {
1532   my ($self, $op, $values) = @_;
1533
1534   my $rsrc = $self->result_source;
1535
1536   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1537   my $needs_subq = $self->_has_resolved_attr (qw/row offset/);
1538
1539   if ($needs_group_by_subq or $needs_subq) {
1540
1541     # make a new $rs selecting only the PKs (that's all we really need)
1542     my $attrs = $self->_resolved_attrs_copy;
1543
1544     delete $attrs->{$_} for qw/collapse select as/;
1545     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->primary_columns) ];
1546
1547     if ($needs_group_by_subq) {
1548       # make sure no group_by was supplied, or if there is one - make sure it matches
1549       # the columns compiled above perfectly. Anything else can not be sanely executed
1550       # on most databases so croak right then and there
1551
1552       if (my $g = $attrs->{group_by}) {
1553         my @current_group_by = map
1554           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1555           @$g
1556         ;
1557
1558         if (
1559           join ("\x00", sort @current_group_by)
1560             ne
1561           join ("\x00", sort @{$attrs->{columns}} )
1562         ) {
1563           $self->throw_exception (
1564             "You have just attempted a $op operation on a resultset which does group_by"
1565             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1566             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1567             . ' kind of queries. Please retry the operation with a modified group_by or'
1568             . ' without using one at all.'
1569           );
1570         }
1571       }
1572       else {
1573         $attrs->{group_by} = $attrs->{columns};
1574       }
1575     }
1576
1577     my $subrs = (ref $self)->new($rsrc, $attrs);
1578
1579     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1580   }
1581   else {
1582     return $rsrc->storage->$op(
1583       $rsrc,
1584       $op eq 'update' ? $values : (),
1585       $self->_cond_for_update_delete,
1586     );
1587   }
1588 }
1589
1590
1591 # _cond_for_update_delete
1592 #
1593 # update/delete require the condition to be modified to handle
1594 # the differing SQL syntax available.  This transforms the $self->{cond}
1595 # appropriately, returning the new condition.
1596
1597 sub _cond_for_update_delete {
1598   my ($self, $full_cond) = @_;
1599   my $cond = {};
1600
1601   $full_cond ||= $self->{cond};
1602   # No-op. No condition, we're updating/deleting everything
1603   return $cond unless ref $full_cond;
1604
1605   if (ref $full_cond eq 'ARRAY') {
1606     $cond = [
1607       map {
1608         my %hash;
1609         foreach my $key (keys %{$_}) {
1610           $key =~ /([^.]+)$/;
1611           $hash{$1} = $_->{$key};
1612         }
1613         \%hash;
1614       } @{$full_cond}
1615     ];
1616   }
1617   elsif (ref $full_cond eq 'HASH') {
1618     if ((keys %{$full_cond})[0] eq '-and') {
1619       $cond->{-and} = [];
1620       my @cond = @{$full_cond->{-and}};
1621        for (my $i = 0; $i < @cond; $i++) {
1622         my $entry = $cond[$i];
1623         my $hash;
1624         if (ref $entry eq 'HASH') {
1625           $hash = $self->_cond_for_update_delete($entry);
1626         }
1627         else {
1628           $entry =~ /([^.]+)$/;
1629           $hash->{$1} = $cond[++$i];
1630         }
1631         push @{$cond->{-and}}, $hash;
1632       }
1633     }
1634     else {
1635       foreach my $key (keys %{$full_cond}) {
1636         $key =~ /([^.]+)$/;
1637         $cond->{$1} = $full_cond->{$key};
1638       }
1639     }
1640   }
1641   else {
1642     $self->throw_exception("Can't update/delete on resultset with condition unless hash or array");
1643   }
1644
1645   return $cond;
1646 }
1647
1648
1649 =head2 update
1650
1651 =over 4
1652
1653 =item Arguments: \%values
1654
1655 =item Return Value: $storage_rv
1656
1657 =back
1658
1659 Sets the specified columns in the resultset to the supplied values in a
1660 single query. Return value will be true if the update succeeded or false
1661 if no records were updated; exact type of success value is storage-dependent.
1662
1663 =cut
1664
1665 sub update {
1666   my ($self, $values) = @_;
1667   $self->throw_exception('Values for update must be a hash')
1668     unless ref $values eq 'HASH';
1669
1670   return $self->_rs_update_delete ('update', $values);
1671 }
1672
1673 =head2 update_all
1674
1675 =over 4
1676
1677 =item Arguments: \%values
1678
1679 =item Return Value: 1
1680
1681 =back
1682
1683 Fetches all objects and updates them one at a time. Note that C<update_all>
1684 will run DBIC cascade triggers, while L</update> will not.
1685
1686 =cut
1687
1688 sub update_all {
1689   my ($self, $values) = @_;
1690   $self->throw_exception('Values for update_all must be a hash')
1691     unless ref $values eq 'HASH';
1692   foreach my $obj ($self->all) {
1693     $obj->set_columns($values)->update;
1694   }
1695   return 1;
1696 }
1697
1698 =head2 delete
1699
1700 =over 4
1701
1702 =item Arguments: none
1703
1704 =item Return Value: $storage_rv
1705
1706 =back
1707
1708 Deletes the contents of the resultset from its result source. Note that this
1709 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1710 to run. See also L<DBIx::Class::Row/delete>.
1711
1712 Return value will be the amount of rows deleted; exact type of return value
1713 is storage-dependent.
1714
1715 =cut
1716
1717 sub delete {
1718   my $self = shift;
1719   $self->throw_exception('delete does not accept any arguments')
1720     if @_;
1721
1722   return $self->_rs_update_delete ('delete');
1723 }
1724
1725 =head2 delete_all
1726
1727 =over 4
1728
1729 =item Arguments: none
1730
1731 =item Return Value: 1
1732
1733 =back
1734
1735 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1736 will run DBIC cascade triggers, while L</delete> will not.
1737
1738 =cut
1739
1740 sub delete_all {
1741   my $self = shift;
1742   $self->throw_exception('delete_all does not accept any arguments')
1743     if @_;
1744
1745   $_->delete for $self->all;
1746   return 1;
1747 }
1748
1749 =head2 populate
1750
1751 =over 4
1752
1753 =item Arguments: \@data;
1754
1755 =back
1756
1757 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1758 For the arrayref of hashrefs style each hashref should be a structure suitable
1759 forsubmitting to a $resultset->create(...) method.
1760
1761 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1762 to insert the data, as this is a faster method.
1763
1764 Otherwise, each set of data is inserted into the database using
1765 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1766 accumulated into an array. The array itself, or an array reference
1767 is returned depending on scalar or list context.
1768
1769 Example:  Assuming an Artist Class that has many CDs Classes relating:
1770
1771   my $Artist_rs = $schema->resultset("Artist");
1772
1773   ## Void Context Example
1774   $Artist_rs->populate([
1775      { artistid => 4, name => 'Manufactured Crap', cds => [
1776         { title => 'My First CD', year => 2006 },
1777         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1778       ],
1779      },
1780      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1781         { title => 'My parents sold me to a record company' ,year => 2005 },
1782         { title => 'Why Am I So Ugly?', year => 2006 },
1783         { title => 'I Got Surgery and am now Popular', year => 2007 }
1784       ],
1785      },
1786   ]);
1787
1788   ## Array Context Example
1789   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1790     { name => "Artist One"},
1791     { name => "Artist Two"},
1792     { name => "Artist Three", cds=> [
1793     { title => "First CD", year => 2007},
1794     { title => "Second CD", year => 2008},
1795   ]}
1796   ]);
1797
1798   print $ArtistOne->name; ## response is 'Artist One'
1799   print $ArtistThree->cds->count ## reponse is '2'
1800
1801 For the arrayref of arrayrefs style,  the first element should be a list of the
1802 fieldsnames to which the remaining elements are rows being inserted.  For
1803 example:
1804
1805   $Arstist_rs->populate([
1806     [qw/artistid name/],
1807     [100, 'A Formally Unknown Singer'],
1808     [101, 'A singer that jumped the shark two albums ago'],
1809     [102, 'An actually cool singer.'],
1810   ]);
1811
1812 Please note an important effect on your data when choosing between void and
1813 wantarray context. Since void context goes straight to C<insert_bulk> in
1814 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1815 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1816 create primary keys for you, you will find that your PKs are empty.  In this
1817 case you will have to use the wantarray context in order to create those
1818 values.
1819
1820 =cut
1821
1822 sub populate {
1823   my $self = shift @_;
1824   my $data = ref $_[0][0] eq 'HASH'
1825     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1826     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1827
1828   if(defined wantarray) {
1829     my @created;
1830     foreach my $item (@$data) {
1831       push(@created, $self->create($item));
1832     }
1833     return wantarray ? @created : \@created;
1834   } else {
1835     my ($first, @rest) = @$data;
1836
1837     my @names = grep {!ref $first->{$_}} keys %$first;
1838     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1839     my @pks = $self->result_source->primary_columns;
1840
1841     ## do the belongs_to relationships
1842     foreach my $index (0..$#$data) {
1843
1844       # delegate to create() for any dataset without primary keys with specified relationships
1845       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1846         for my $r (@rels) {
1847           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1848             my @ret = $self->populate($data);
1849             return;
1850           }
1851         }
1852       }
1853
1854       foreach my $rel (@rels) {
1855         next unless ref $data->[$index]->{$rel} eq "HASH";
1856         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1857         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1858         my $related = $result->result_source->_resolve_condition(
1859           $result->result_source->relationship_info($reverse)->{cond},
1860           $self,
1861           $result,
1862         );
1863
1864         delete $data->[$index]->{$rel};
1865         $data->[$index] = {%{$data->[$index]}, %$related};
1866
1867         push @names, keys %$related if $index == 0;
1868       }
1869     }
1870
1871     ## do bulk insert on current row
1872     my @values = map { [ @$_{@names} ] } @$data;
1873
1874     $self->result_source->storage->insert_bulk(
1875       $self->result_source,
1876       \@names,
1877       \@values,
1878     );
1879
1880     ## do the has_many relationships
1881     foreach my $item (@$data) {
1882
1883       foreach my $rel (@rels) {
1884         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1885
1886         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks)
1887      || $self->throw_exception('Cannot find the relating object.');
1888
1889         my $child = $parent->$rel;
1890
1891         my $related = $child->result_source->_resolve_condition(
1892           $parent->result_source->relationship_info($rel)->{cond},
1893           $child,
1894           $parent,
1895         );
1896
1897         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1898         my @populate = map { {%$_, %$related} } @rows_to_add;
1899
1900         $child->populate( \@populate );
1901       }
1902     }
1903   }
1904 }
1905
1906 =head2 _normalize_populate_args ($args)
1907
1908 Private method used by L</populate> to normalize its incoming arguments.  Factored
1909 out in case you want to subclass and accept new argument structures to the
1910 L</populate> method.
1911
1912 =cut
1913
1914 sub _normalize_populate_args {
1915   my ($self, $data) = @_;
1916   my @names = @{shift(@$data)};
1917   my @results_to_create;
1918   foreach my $datum (@$data) {
1919     my %result_to_create;
1920     foreach my $index (0..$#names) {
1921       $result_to_create{$names[$index]} = $$datum[$index];
1922     }
1923     push @results_to_create, \%result_to_create;
1924   }
1925   return \@results_to_create;
1926 }
1927
1928 =head2 pager
1929
1930 =over 4
1931
1932 =item Arguments: none
1933
1934 =item Return Value: $pager
1935
1936 =back
1937
1938 Return Value a L<Data::Page> object for the current resultset. Only makes
1939 sense for queries with a C<page> attribute.
1940
1941 To get the full count of entries for a paged resultset, call
1942 C<total_entries> on the L<Data::Page> object.
1943
1944 =cut
1945
1946 sub pager {
1947   my ($self) = @_;
1948
1949   return $self->{pager} if $self->{pager};
1950
1951   my $attrs = $self->{attrs};
1952   $self->throw_exception("Can't create pager for non-paged rs")
1953     unless $self->{attrs}{page};
1954   $attrs->{rows} ||= 10;
1955
1956   # throw away the paging flags and re-run the count (possibly
1957   # with a subselect) to get the real total count
1958   my $count_attrs = { %$attrs };
1959   delete $count_attrs->{$_} for qw/rows offset page pager/;
1960   my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
1961
1962   return $self->{pager} = Data::Page->new(
1963     $total_count,
1964     $attrs->{rows},
1965     $self->{attrs}{page}
1966   );
1967 }
1968
1969 =head2 page
1970
1971 =over 4
1972
1973 =item Arguments: $page_number
1974
1975 =item Return Value: $rs
1976
1977 =back
1978
1979 Returns a resultset for the $page_number page of the resultset on which page
1980 is called, where each page contains a number of rows equal to the 'rows'
1981 attribute set on the resultset (10 by default).
1982
1983 =cut
1984
1985 sub page {
1986   my ($self, $page) = @_;
1987   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1988 }
1989
1990 =head2 new_result
1991
1992 =over 4
1993
1994 =item Arguments: \%vals
1995
1996 =item Return Value: $rowobject
1997
1998 =back
1999
2000 Creates a new row object in the resultset's result class and returns
2001 it. The row is not inserted into the database at this point, call
2002 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2003 will tell you whether the row object has been inserted or not.
2004
2005 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2006
2007 =cut
2008
2009 sub new_result {
2010   my ($self, $values) = @_;
2011   $self->throw_exception( "new_result needs a hash" )
2012     unless (ref $values eq 'HASH');
2013
2014   my %new;
2015   my $alias = $self->{attrs}{alias};
2016
2017   if (
2018     defined $self->{cond}
2019     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
2020   ) {
2021     %new = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2022     $new{-from_resultset} = [ keys %new ] if keys %new;
2023   } else {
2024     $self->throw_exception(
2025       "Can't abstract implicit construct, condition not a hash"
2026     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
2027
2028     my $collapsed_cond = (
2029       $self->{cond}
2030         ? $self->_collapse_cond($self->{cond})
2031         : {}
2032     );
2033
2034     # precendence must be given to passed values over values inherited from
2035     # the cond, so the order here is important.
2036     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
2037     while( my($col,$value) = each %implied ){
2038       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
2039         $new{$col} = $value->{'='};
2040         next;
2041       }
2042       $new{$col} = $value if $self->_is_deterministic_value($value);
2043     }
2044   }
2045
2046   %new = (
2047     %new,
2048     %{ $self->_remove_alias($values, $alias) },
2049     -source_handle => $self->_source_handle,
2050     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2051   );
2052
2053   return $self->result_class->new(\%new);
2054 }
2055
2056 # _is_deterministic_value
2057 #
2058 # Make an effor to strip non-deterministic values from the condition,
2059 # to make sure new_result chokes less
2060
2061 sub _is_deterministic_value {
2062   my $self = shift;
2063   my $value = shift;
2064   my $ref_type = ref $value;
2065   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
2066   return 1 if Scalar::Util::blessed($value);
2067   return 0;
2068 }
2069
2070 # _has_resolved_attr
2071 #
2072 # determines if the resultset defines at least one
2073 # of the attributes supplied
2074 #
2075 # used to determine if a subquery is neccessary
2076 #
2077 # supports some virtual attributes:
2078 #   -join
2079 #     This will scan for any joins being present on the resultset.
2080 #     It is not a mere key-search but a deep inspection of {from}
2081 #
2082
2083 sub _has_resolved_attr {
2084   my ($self, @attr_names) = @_;
2085
2086   my $attrs = $self->_resolved_attrs;
2087
2088   my %extra_checks;
2089
2090   for my $n (@attr_names) {
2091     if (grep { $n eq $_ } (qw/-join/) ) {
2092       $extra_checks{$n}++;
2093       next;
2094     }
2095
2096     my $attr =  $attrs->{$n};
2097
2098     next if not defined $attr;
2099
2100     if (ref $attr eq 'HASH') {
2101       return 1 if keys %$attr;
2102     }
2103     elsif (ref $attr eq 'ARRAY') {
2104       return 1 if @$attr;
2105     }
2106     else {
2107       return 1 if $attr;
2108     }
2109   }
2110
2111   # a resolved join is expressed as a multi-level from
2112   return 1 if (
2113     $extra_checks{-join}
2114       and
2115     ref $attrs->{from} eq 'ARRAY'
2116       and
2117     @{$attrs->{from}} > 1
2118   );
2119
2120   return 0;
2121 }
2122
2123 # _collapse_cond
2124 #
2125 # Recursively collapse the condition.
2126
2127 sub _collapse_cond {
2128   my ($self, $cond, $collapsed) = @_;
2129
2130   $collapsed ||= {};
2131
2132   if (ref $cond eq 'ARRAY') {
2133     foreach my $subcond (@$cond) {
2134       next unless ref $subcond;  # -or
2135       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2136     }
2137   }
2138   elsif (ref $cond eq 'HASH') {
2139     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2140       foreach my $subcond (@{$cond->{-and}}) {
2141         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2142       }
2143     }
2144     else {
2145       foreach my $col (keys %$cond) {
2146         my $value = $cond->{$col};
2147         $collapsed->{$col} = $value;
2148       }
2149     }
2150   }
2151
2152   return $collapsed;
2153 }
2154
2155 # _remove_alias
2156 #
2157 # Remove the specified alias from the specified query hash. A copy is made so
2158 # the original query is not modified.
2159
2160 sub _remove_alias {
2161   my ($self, $query, $alias) = @_;
2162
2163   my %orig = %{ $query || {} };
2164   my %unaliased;
2165
2166   foreach my $key (keys %orig) {
2167     if ($key !~ /\./) {
2168       $unaliased{$key} = $orig{$key};
2169       next;
2170     }
2171     $unaliased{$1} = $orig{$key}
2172       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2173   }
2174
2175   return \%unaliased;
2176 }
2177
2178 =head2 as_query (EXPERIMENTAL)
2179
2180 =over 4
2181
2182 =item Arguments: none
2183
2184 =item Return Value: \[ $sql, @bind ]
2185
2186 =back
2187
2188 Returns the SQL query and bind vars associated with the invocant.
2189
2190 This is generally used as the RHS for a subquery.
2191
2192 B<NOTE>: This feature is still experimental.
2193
2194 =cut
2195
2196 sub as_query {
2197   my $self = shift;
2198
2199   my $attrs = $self->_resolved_attrs_copy;
2200
2201   # For future use:
2202   #
2203   # in list ctx:
2204   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2205   # $sql also has no wrapping parenthesis in list ctx
2206   #
2207   my $sqlbind = $self->result_source->storage
2208     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2209
2210   return $sqlbind;
2211 }
2212
2213 =head2 find_or_new
2214
2215 =over 4
2216
2217 =item Arguments: \%vals, \%attrs?
2218
2219 =item Return Value: $rowobject
2220
2221 =back
2222
2223   my $artist = $schema->resultset('Artist')->find_or_new(
2224     { artist => 'fred' }, { key => 'artists' });
2225
2226   $cd->cd_to_producer->find_or_new({ producer => $producer },
2227                                    { key => 'primary });
2228
2229 Find an existing record from this resultset, based on its primary
2230 key, or a unique constraint. If none exists, instantiate a new result
2231 object and return it. The object will not be saved into your storage
2232 until you call L<DBIx::Class::Row/insert> on it.
2233
2234 You most likely want this method when looking for existing rows using
2235 a unique constraint that is not the primary key, or looking for
2236 related rows.
2237
2238 If you want objects to be saved immediately, use L</find_or_create> instead.
2239
2240 B<Note>: C<find_or_new> is probably not what you want when creating a
2241 new row in a table that uses primary keys supplied by the
2242 database. Passing in a primary key column with a value of I<undef>
2243 will cause L</find> to attempt to search for a row with a value of
2244 I<NULL>.
2245
2246 =cut
2247
2248 sub find_or_new {
2249   my $self     = shift;
2250   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2251   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2252   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2253     return $row;
2254   }
2255   return $self->new_result($hash);
2256 }
2257
2258 =head2 create
2259
2260 =over 4
2261
2262 =item Arguments: \%vals
2263
2264 =item Return Value: a L<DBIx::Class::Row> $object
2265
2266 =back
2267
2268 Attempt to create a single new row or a row with multiple related rows
2269 in the table represented by the resultset (and related tables). This
2270 will not check for duplicate rows before inserting, use
2271 L</find_or_create> to do that.
2272
2273 To create one row for this resultset, pass a hashref of key/value
2274 pairs representing the columns of the table and the values you wish to
2275 store. If the appropriate relationships are set up, foreign key fields
2276 can also be passed an object representing the foreign row, and the
2277 value will be set to its primary key.
2278
2279 To create related objects, pass a hashref of related-object column values
2280 B<keyed on the relationship name>. If the relationship is of type C<multi>
2281 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2282 The process will correctly identify columns holding foreign keys, and will
2283 transparrently populate them from the keys of the corresponding relation.
2284 This can be applied recursively, and will work correctly for a structure
2285 with an arbitrary depth and width, as long as the relationships actually
2286 exists and the correct column data has been supplied.
2287
2288
2289 Instead of hashrefs of plain related data (key/value pairs), you may
2290 also pass new or inserted objects. New objects (not inserted yet, see
2291 L</new>), will be inserted into their appropriate tables.
2292
2293 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2294
2295 Example of creating a new row.
2296
2297   $person_rs->create({
2298     name=>"Some Person",
2299     email=>"somebody@someplace.com"
2300   });
2301
2302 Example of creating a new row and also creating rows in a related C<has_many>
2303 or C<has_one> resultset.  Note Arrayref.
2304
2305   $artist_rs->create(
2306      { artistid => 4, name => 'Manufactured Crap', cds => [
2307         { title => 'My First CD', year => 2006 },
2308         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2309       ],
2310      },
2311   );
2312
2313 Example of creating a new row and also creating a row in a related
2314 C<belongs_to>resultset. Note Hashref.
2315
2316   $cd_rs->create({
2317     title=>"Music for Silly Walks",
2318     year=>2000,
2319     artist => {
2320       name=>"Silly Musician",
2321     }
2322   });
2323
2324 =cut
2325
2326 sub create {
2327   my ($self, $attrs) = @_;
2328   $self->throw_exception( "create needs a hashref" )
2329     unless ref $attrs eq 'HASH';
2330   return $self->new_result($attrs)->insert;
2331 }
2332
2333 =head2 find_or_create
2334
2335 =over 4
2336
2337 =item Arguments: \%vals, \%attrs?
2338
2339 =item Return Value: $rowobject
2340
2341 =back
2342
2343   $cd->cd_to_producer->find_or_create({ producer => $producer },
2344                                       { key => 'primary' });
2345
2346 Tries to find a record based on its primary key or unique constraints; if none
2347 is found, creates one and returns that instead.
2348
2349   my $cd = $schema->resultset('CD')->find_or_create({
2350     cdid   => 5,
2351     artist => 'Massive Attack',
2352     title  => 'Mezzanine',
2353     year   => 2005,
2354   });
2355
2356 Also takes an optional C<key> attribute, to search by a specific key or unique
2357 constraint. For example:
2358
2359   my $cd = $schema->resultset('CD')->find_or_create(
2360     {
2361       artist => 'Massive Attack',
2362       title  => 'Mezzanine',
2363     },
2364     { key => 'cd_artist_title' }
2365   );
2366
2367 B<Note>: Because find_or_create() reads from the database and then
2368 possibly inserts based on the result, this method is subject to a race
2369 condition. Another process could create a record in the table after
2370 the find has completed and before the create has started. To avoid
2371 this problem, use find_or_create() inside a transaction.
2372
2373 B<Note>: C<find_or_create> is probably not what you want when creating
2374 a new row in a table that uses primary keys supplied by the
2375 database. Passing in a primary key column with a value of I<undef>
2376 will cause L</find> to attempt to search for a row with a value of
2377 I<NULL>.
2378
2379 See also L</find> and L</update_or_create>. For information on how to declare
2380 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2381
2382 =cut
2383
2384 sub find_or_create {
2385   my $self     = shift;
2386   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2387   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2388   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2389     return $row;
2390   }
2391   return $self->create($hash);
2392 }
2393
2394 =head2 update_or_create
2395
2396 =over 4
2397
2398 =item Arguments: \%col_values, { key => $unique_constraint }?
2399
2400 =item Return Value: $rowobject
2401
2402 =back
2403
2404   $resultset->update_or_create({ col => $val, ... });
2405
2406 First, searches for an existing row matching one of the unique constraints
2407 (including the primary key) on the source of this resultset. If a row is
2408 found, updates it with the other given column values. Otherwise, creates a new
2409 row.
2410
2411 Takes an optional C<key> attribute to search on a specific unique constraint.
2412 For example:
2413
2414   # In your application
2415   my $cd = $schema->resultset('CD')->update_or_create(
2416     {
2417       artist => 'Massive Attack',
2418       title  => 'Mezzanine',
2419       year   => 1998,
2420     },
2421     { key => 'cd_artist_title' }
2422   );
2423
2424   $cd->cd_to_producer->update_or_create({
2425     producer => $producer,
2426     name => 'harry',
2427   }, {
2428     key => 'primary,
2429   });
2430
2431
2432 If no C<key> is specified, it searches on all unique constraints defined on the
2433 source, including the primary key.
2434
2435 If the C<key> is specified as C<primary>, it searches only on the primary key.
2436
2437 See also L</find> and L</find_or_create>. For information on how to declare
2438 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2439
2440 B<Note>: C<update_or_create> is probably not what you want when
2441 looking for a row in a table that uses primary keys supplied by the
2442 database, unless you actually have a key value. Passing in a primary
2443 key column with a value of I<undef> will cause L</find> to attempt to
2444 search for a row with a value of I<NULL>.
2445
2446 =cut
2447
2448 sub update_or_create {
2449   my $self = shift;
2450   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2451   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2452
2453   my $row = $self->find($cond, $attrs);
2454   if (defined $row) {
2455     $row->update($cond);
2456     return $row;
2457   }
2458
2459   return $self->create($cond);
2460 }
2461
2462 =head2 update_or_new
2463
2464 =over 4
2465
2466 =item Arguments: \%col_values, { key => $unique_constraint }?
2467
2468 =item Return Value: $rowobject
2469
2470 =back
2471
2472   $resultset->update_or_new({ col => $val, ... });
2473
2474 First, searches for an existing row matching one of the unique constraints
2475 (including the primary key) on the source of this resultset. If a row is
2476 found, updates it with the other given column values. Otherwise, instantiate
2477 a new result object and return it. The object will not be saved into your storage
2478 until you call L<DBIx::Class::Row/insert> on it.
2479
2480 Takes an optional C<key> attribute to search on a specific unique constraint.
2481 For example:
2482
2483   # In your application
2484   my $cd = $schema->resultset('CD')->update_or_new(
2485     {
2486       artist => 'Massive Attack',
2487       title  => 'Mezzanine',
2488       year   => 1998,
2489     },
2490     { key => 'cd_artist_title' }
2491   );
2492
2493   if ($cd->in_storage) {
2494       # the cd was updated
2495   }
2496   else {
2497       # the cd is not yet in the database, let's insert it
2498       $cd->insert;
2499   }
2500
2501 See also L</find>, L</find_or_create> and L<find_or_new>.
2502
2503 =cut
2504
2505 sub update_or_new {
2506     my $self  = shift;
2507     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2508     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2509
2510     my $row = $self->find( $cond, $attrs );
2511     if ( defined $row ) {
2512         $row->update($cond);
2513         return $row;
2514     }
2515
2516     return $self->new_result($cond);
2517 }
2518
2519 =head2 get_cache
2520
2521 =over 4
2522
2523 =item Arguments: none
2524
2525 =item Return Value: \@cache_objects?
2526
2527 =back
2528
2529 Gets the contents of the cache for the resultset, if the cache is set.
2530
2531 The cache is populated either by using the L</prefetch> attribute to
2532 L</search> or by calling L</set_cache>.
2533
2534 =cut
2535
2536 sub get_cache {
2537   shift->{all_cache};
2538 }
2539
2540 =head2 set_cache
2541
2542 =over 4
2543
2544 =item Arguments: \@cache_objects
2545
2546 =item Return Value: \@cache_objects
2547
2548 =back
2549
2550 Sets the contents of the cache for the resultset. Expects an arrayref
2551 of objects of the same class as those produced by the resultset. Note that
2552 if the cache is set the resultset will return the cached objects rather
2553 than re-querying the database even if the cache attr is not set.
2554
2555 The contents of the cache can also be populated by using the
2556 L</prefetch> attribute to L</search>.
2557
2558 =cut
2559
2560 sub set_cache {
2561   my ( $self, $data ) = @_;
2562   $self->throw_exception("set_cache requires an arrayref")
2563       if defined($data) && (ref $data ne 'ARRAY');
2564   $self->{all_cache} = $data;
2565 }
2566
2567 =head2 clear_cache
2568
2569 =over 4
2570
2571 =item Arguments: none
2572
2573 =item Return Value: []
2574
2575 =back
2576
2577 Clears the cache for the resultset.
2578
2579 =cut
2580
2581 sub clear_cache {
2582   shift->set_cache(undef);
2583 }
2584
2585 =head2 related_resultset
2586
2587 =over 4
2588
2589 =item Arguments: $relationship_name
2590
2591 =item Return Value: $resultset
2592
2593 =back
2594
2595 Returns a related resultset for the supplied relationship name.
2596
2597   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2598
2599 =cut
2600
2601 sub related_resultset {
2602   my ($self, $rel) = @_;
2603
2604   $self->{related_resultsets} ||= {};
2605   return $self->{related_resultsets}{$rel} ||= do {
2606     my $rel_info = $self->result_source->relationship_info($rel);
2607
2608     $self->throw_exception(
2609       "search_related: result source '" . $self->result_source->source_name .
2610         "' has no such relationship $rel")
2611       unless $rel_info;
2612
2613     my ($from,$seen) = $self->_chain_relationship($rel);
2614
2615     my $join_count = $seen->{$rel};
2616     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2617
2618     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2619     my %attrs = %{$self->{attrs}||{}};
2620     delete @attrs{qw(result_class alias)};
2621
2622     my $new_cache;
2623
2624     if (my $cache = $self->get_cache) {
2625       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2626         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2627                         @$cache ];
2628       }
2629     }
2630
2631     my $rel_source = $self->result_source->related_source($rel);
2632
2633     my $new = do {
2634
2635       # The reason we do this now instead of passing the alias to the
2636       # search_rs below is that if you wrap/overload resultset on the
2637       # source you need to know what alias it's -going- to have for things
2638       # to work sanely (e.g. RestrictWithObject wants to be able to add
2639       # extra query restrictions, and these may need to be $alias.)
2640
2641       my $attrs = $rel_source->resultset_attributes;
2642       local $attrs->{alias} = $alias;
2643
2644       $rel_source->resultset
2645                  ->search_rs(
2646                      undef, {
2647                        %attrs,
2648                        join => undef,
2649                        prefetch => undef,
2650                        select => undef,
2651                        as => undef,
2652                        where => $self->{cond},
2653                        seen_join => $seen,
2654                        from => $from,
2655                    });
2656     };
2657     $new->set_cache($new_cache) if $new_cache;
2658     $new;
2659   };
2660 }
2661
2662 =head2 current_source_alias
2663
2664 =over 4
2665
2666 =item Arguments: none
2667
2668 =item Return Value: $source_alias
2669
2670 =back
2671
2672 Returns the current table alias for the result source this resultset is built
2673 on, that will be used in the SQL query. Usually it is C<me>.
2674
2675 Currently the source alias that refers to the result set returned by a
2676 L</search>/L</find> family method depends on how you got to the resultset: it's
2677 C<me> by default, but eg. L</search_related> aliases it to the related result
2678 source name (and keeps C<me> referring to the original result set). The long
2679 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2680 (and make this method unnecessary).
2681
2682 Thus it's currently necessary to use this method in predefined queries (see
2683 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2684 source alias of the current result set:
2685
2686   # in a result set class
2687   sub modified_by {
2688     my ($self, $user) = @_;
2689
2690     my $me = $self->current_source_alias;
2691
2692     return $self->search(
2693       "$me.modified" => $user->id,
2694     );
2695   }
2696
2697 =cut
2698
2699 sub current_source_alias {
2700   my ($self) = @_;
2701
2702   return ($self->{attrs} || {})->{alias} || 'me';
2703 }
2704
2705 # This code is called by search_related, and makes sure there
2706 # is clear separation between the joins before, during, and
2707 # after the relationship. This information is needed later
2708 # in order to properly resolve prefetch aliases (any alias
2709 # with a relation_chain_depth less than the depth of the
2710 # current prefetch is not considered)
2711 #
2712 # The increments happen in 1/2s to make it easier to correlate the
2713 # join depth with the join path. An integer means a relationship
2714 # specified via a search_related, whereas a fraction means an added
2715 # join/prefetch via attributes
2716 sub _chain_relationship {
2717   my ($self, $rel) = @_;
2718   my $source = $self->result_source;
2719   my $attrs = $self->{attrs};
2720
2721   my $from = [ @{
2722       $attrs->{from}
2723         ||
2724       [{
2725         -source_handle => $source->handle,
2726         -alias => $attrs->{alias},
2727         $attrs->{alias} => $source->from,
2728       }]
2729   }];
2730
2731   my $seen = { %{$attrs->{seen_join} || {} } };
2732   my $jpath = ($attrs->{seen_join} && keys %{$attrs->{seen_join}}) 
2733     ? $from->[-1][0]{-join_path} 
2734     : [];
2735
2736
2737   # we need to take the prefetch the attrs into account before we
2738   # ->_resolve_join as otherwise they get lost - captainL
2739   my $merged = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
2740
2741   my @requested_joins = $source->_resolve_join(
2742     $merged,
2743     $attrs->{alias},
2744     $seen,
2745     $jpath,
2746   );
2747
2748   push @$from, @requested_joins;
2749
2750   $seen->{-relation_chain_depth} += 0.5;
2751
2752   # if $self already had a join/prefetch specified on it, the requested
2753   # $rel might very well be already included. What we do in this case
2754   # is effectively a no-op (except that we bump up the chain_depth on
2755   # the join in question so we could tell it *is* the search_related)
2756   my $already_joined;
2757
2758
2759   # we consider the last one thus reverse
2760   for my $j (reverse @requested_joins) {
2761     if ($rel eq $j->[0]{-join_path}[-1]) {
2762       $j->[0]{-relation_chain_depth} += 0.5;
2763       $already_joined++;
2764       last;
2765     }
2766   }
2767
2768 # alternative way to scan the entire chain - not backwards compatible
2769 #  for my $j (reverse @$from) {
2770 #    next unless ref $j eq 'ARRAY';
2771 #    if ($j->[0]{-join_path} && $j->[0]{-join_path}[-1] eq $rel) {
2772 #      $j->[0]{-relation_chain_depth} += 0.5;
2773 #      $already_joined++;
2774 #      last;
2775 #    }
2776 #  }
2777
2778   unless ($already_joined) {
2779     push @$from, $source->_resolve_join(
2780       $rel,
2781       $attrs->{alias},
2782       $seen,
2783       $jpath,
2784     );
2785   }
2786
2787   $seen->{-relation_chain_depth} += 0.5;
2788
2789   return ($from,$seen);
2790 }
2791
2792 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
2793 sub _resolved_attrs_copy {
2794   my $self = shift;
2795   return { %{$self->_resolved_attrs (@_)} };
2796 }
2797
2798 sub _resolved_attrs {
2799   my $self = shift;
2800   return $self->{_attrs} if $self->{_attrs};
2801
2802   my $attrs  = { %{ $self->{attrs} || {} } };
2803   my $source = $self->result_source;
2804   my $alias  = $attrs->{alias};
2805
2806   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2807   my @colbits;
2808
2809   # build columns (as long as select isn't set) into a set of as/select hashes
2810   unless ( $attrs->{select} ) {
2811       @colbits = map {
2812           ( ref($_) eq 'HASH' )
2813               ? $_
2814               : {
2815                   (
2816                     /^\Q${alias}.\E(.+)$/
2817                       ? "$1"
2818                       : "$_"
2819                   )
2820                 =>
2821                   (
2822                     /\./
2823                       ? "$_"
2824                       : "${alias}.$_"
2825                   )
2826             }
2827       } ( ref($attrs->{columns}) eq 'ARRAY' ) ? @{ delete $attrs->{columns}} : (delete $attrs->{columns} || $source->columns );
2828   }
2829   # add the additional columns on
2830   foreach ( 'include_columns', '+columns' ) {
2831       push @colbits, map {
2832           ( ref($_) eq 'HASH' )
2833             ? $_
2834             : { ( split( /\./, $_ ) )[-1] => ( /\./ ? $_ : "${alias}.$_" ) }
2835       } ( ref($attrs->{$_}) eq 'ARRAY' ) ? @{ delete $attrs->{$_} } : delete $attrs->{$_} if ( $attrs->{$_} );
2836   }
2837
2838   # start with initial select items
2839   if ( $attrs->{select} ) {
2840     $attrs->{select} =
2841         ( ref $attrs->{select} eq 'ARRAY' )
2842       ? [ @{ $attrs->{select} } ]
2843       : [ $attrs->{select} ];
2844     $attrs->{as} = (
2845       $attrs->{as}
2846       ? (
2847         ref $attrs->{as} eq 'ARRAY'
2848         ? [ @{ $attrs->{as} } ]
2849         : [ $attrs->{as} ]
2850         )
2851       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{ $attrs->{select} } ]
2852     );
2853   }
2854   else {
2855
2856     # otherwise we intialise select & as to empty
2857     $attrs->{select} = [];
2858     $attrs->{as}     = [];
2859   }
2860
2861   # now add colbits to select/as
2862   push( @{ $attrs->{select} }, map { values( %{$_} ) } @colbits );
2863   push( @{ $attrs->{as} },     map { keys( %{$_} ) } @colbits );
2864
2865   my $adds;
2866   if ( $adds = delete $attrs->{'+select'} ) {
2867     $adds = [$adds] unless ref $adds eq 'ARRAY';
2868     push(
2869       @{ $attrs->{select} },
2870       map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds
2871     );
2872   }
2873   if ( $adds = delete $attrs->{'+as'} ) {
2874     $adds = [$adds] unless ref $adds eq 'ARRAY';
2875     push( @{ $attrs->{as} }, @$adds );
2876   }
2877
2878   $attrs->{from} ||= [ {
2879     -source_handle => $source->handle,
2880     -alias => $self->{attrs}{alias},
2881     $self->{attrs}{alias} => $source->from,
2882   } ];
2883
2884   if ( $attrs->{join} || $attrs->{prefetch} ) {
2885
2886     $self->throw_exception ('join/prefetch can not be used with a literal scalarref {from}')
2887       if ref $attrs->{from} ne 'ARRAY';
2888
2889     my $join = delete $attrs->{join} || {};
2890
2891     if ( defined $attrs->{prefetch} ) {
2892       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2893     }
2894
2895     $attrs->{from} =    # have to copy here to avoid corrupting the original
2896       [
2897         @{ $attrs->{from} },
2898         $source->_resolve_join(
2899           $join,
2900           $alias,
2901           { %{ $attrs->{seen_join} || {} } },
2902           ($attrs->{seen_join} && keys %{$attrs->{seen_join}})
2903             ? $attrs->{from}[-1][0]{-join_path}
2904             : []
2905           ,
2906         )
2907       ];
2908   }
2909
2910   if ( defined $attrs->{order_by} ) {
2911     $attrs->{order_by} = (
2912       ref( $attrs->{order_by} ) eq 'ARRAY'
2913       ? [ @{ $attrs->{order_by} } ]
2914       : [ $attrs->{order_by} || () ]
2915     );
2916   }
2917
2918   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
2919     $attrs->{group_by} = [ $attrs->{group_by} ];
2920   }
2921
2922   # generate the distinct induced group_by early, as prefetch will be carried via a
2923   # subquery (since a group_by is present)
2924   if (delete $attrs->{distinct}) {
2925     $attrs->{group_by} ||= [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
2926   }
2927
2928   $attrs->{collapse} ||= {};
2929   if ( my $prefetch = delete $attrs->{prefetch} ) {
2930     $prefetch = $self->_merge_attr( {}, $prefetch );
2931
2932     my $prefetch_ordering = [];
2933
2934     my $join_map = $self->_joinpath_aliases ($attrs->{from}, $attrs->{seen_join});
2935
2936     my @prefetch =
2937       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
2938
2939     # we need to somehow mark which columns came from prefetch
2940     $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
2941
2942     push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
2943     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
2944
2945     push( @{$attrs->{order_by}}, @$prefetch_ordering );
2946     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
2947   }
2948
2949   # if both page and offset are specified, produce a combined offset
2950   # even though it doesn't make much sense, this is what pre 081xx has
2951   # been doing
2952   if (my $page = delete $attrs->{page}) {
2953     $attrs->{offset} = 
2954       ($attrs->{rows} * ($page - 1))
2955             +
2956       ($attrs->{offset} || 0)
2957     ;
2958   }
2959
2960   return $self->{_attrs} = $attrs;
2961 }
2962
2963 sub _joinpath_aliases {
2964   my ($self, $fromspec, $seen) = @_;
2965
2966   my $paths = {};
2967   return $paths unless ref $fromspec eq 'ARRAY';
2968
2969   my $cur_depth = $seen->{-relation_chain_depth} || 0;
2970
2971   if (int ($cur_depth) != $cur_depth) {
2972     $self->throw_exception ("-relation_chain_depth is not an integer, something went horribly wrong ($cur_depth)");
2973   }
2974
2975   for my $j (@$fromspec) {
2976
2977     next if ref $j ne 'ARRAY';
2978     next if ($j->[0]{-relation_chain_depth} || 0) < $cur_depth;
2979
2980     my $jpath = $j->[0]{-join_path};
2981
2982     my $p = $paths;
2983     $p = $p->{$_} ||= {} for @{$jpath}[$cur_depth .. $#$jpath];
2984     push @{$p->{-join_aliases} }, $j->[0]{-alias};
2985   }
2986
2987   return $paths;
2988 }
2989
2990 sub _rollout_attr {
2991   my ($self, $attr) = @_;
2992
2993   if (ref $attr eq 'HASH') {
2994     return $self->_rollout_hash($attr);
2995   } elsif (ref $attr eq 'ARRAY') {
2996     return $self->_rollout_array($attr);
2997   } else {
2998     return [$attr];
2999   }
3000 }
3001
3002 sub _rollout_array {
3003   my ($self, $attr) = @_;
3004
3005   my @rolled_array;
3006   foreach my $element (@{$attr}) {
3007     if (ref $element eq 'HASH') {
3008       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3009     } elsif (ref $element eq 'ARRAY') {
3010       #  XXX - should probably recurse here
3011       push( @rolled_array, @{$self->_rollout_array($element)} );
3012     } else {
3013       push( @rolled_array, $element );
3014     }
3015   }
3016   return \@rolled_array;
3017 }
3018
3019 sub _rollout_hash {
3020   my ($self, $attr) = @_;
3021
3022   my @rolled_array;
3023   foreach my $key (keys %{$attr}) {
3024     push( @rolled_array, { $key => $attr->{$key} } );
3025   }
3026   return \@rolled_array;
3027 }
3028
3029 sub _calculate_score {
3030   my ($self, $a, $b) = @_;
3031
3032   if (ref $b eq 'HASH') {
3033     my ($b_key) = keys %{$b};
3034     if (ref $a eq 'HASH') {
3035       my ($a_key) = keys %{$a};
3036       if ($a_key eq $b_key) {
3037         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3038       } else {
3039         return 0;
3040       }
3041     } else {
3042       return ($a eq $b_key) ? 1 : 0;
3043     }
3044   } else {
3045     if (ref $a eq 'HASH') {
3046       my ($a_key) = keys %{$a};
3047       return ($b eq $a_key) ? 1 : 0;
3048     } else {
3049       return ($b eq $a) ? 1 : 0;
3050     }
3051   }
3052 }
3053
3054 sub _merge_attr {
3055   my ($self, $orig, $import) = @_;
3056
3057   return $import unless defined($orig);
3058   return $orig unless defined($import);
3059
3060   $orig = $self->_rollout_attr($orig);
3061   $import = $self->_rollout_attr($import);
3062
3063   my $seen_keys;
3064   foreach my $import_element ( @{$import} ) {
3065     # find best candidate from $orig to merge $b_element into
3066     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3067     foreach my $orig_element ( @{$orig} ) {
3068       my $score = $self->_calculate_score( $orig_element, $import_element );
3069       if ($score > $best_candidate->{score}) {
3070         $best_candidate->{position} = $position;
3071         $best_candidate->{score} = $score;
3072       }
3073       $position++;
3074     }
3075     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3076
3077     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3078       push( @{$orig}, $import_element );
3079     } else {
3080       my $orig_best = $orig->[$best_candidate->{position}];
3081       # merge orig_best and b_element together and replace original with merged
3082       if (ref $orig_best ne 'HASH') {
3083         $orig->[$best_candidate->{position}] = $import_element;
3084       } elsif (ref $import_element eq 'HASH') {
3085         my ($key) = keys %{$orig_best};
3086         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
3087       }
3088     }
3089     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3090   }
3091
3092   return $orig;
3093 }
3094
3095 sub result_source {
3096     my $self = shift;
3097
3098     if (@_) {
3099         $self->_source_handle($_[0]->handle);
3100     } else {
3101         $self->_source_handle->resolve;
3102     }
3103 }
3104
3105 =head2 throw_exception
3106
3107 See L<DBIx::Class::Schema/throw_exception> for details.
3108
3109 =cut
3110
3111 sub throw_exception {
3112   my $self=shift;
3113   if (ref $self && $self->_source_handle->schema) {
3114     $self->_source_handle->schema->throw_exception(@_)
3115   } else {
3116     croak(@_);
3117   }
3118
3119 }
3120
3121 # XXX: FIXME: Attributes docs need clearing up
3122
3123 =head1 ATTRIBUTES
3124
3125 Attributes are used to refine a ResultSet in various ways when
3126 searching for data. They can be passed to any method which takes an
3127 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3128 L</count>.
3129
3130 These are in no particular order:
3131
3132 =head2 order_by
3133
3134 =over 4
3135
3136 =item Value: ( $order_by | \@order_by | \%order_by )
3137
3138 =back
3139
3140 Which column(s) to order the results by. 
3141
3142 [The full list of suitable values is documented in
3143 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3144 common options.]
3145
3146 If a single column name, or an arrayref of names is supplied, the
3147 argument is passed through directly to SQL. The hashref syntax allows
3148 for connection-agnostic specification of ordering direction:
3149
3150  For descending order:
3151
3152   order_by => { -desc => [qw/col1 col2 col3/] }
3153
3154  For explicit ascending order:
3155
3156   order_by => { -asc => 'col' }
3157
3158 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3159 supported, although you are strongly encouraged to use the hashref
3160 syntax as outlined above.
3161
3162 =head2 columns
3163
3164 =over 4
3165
3166 =item Value: \@columns
3167
3168 =back
3169
3170 Shortcut to request a particular set of columns to be retrieved. Each
3171 column spec may be a string (a table column name), or a hash (in which
3172 case the key is the C<as> value, and the value is used as the C<select>
3173 expression). Adds C<me.> onto the start of any column without a C<.> in
3174 it and sets C<select> from that, then auto-populates C<as> from
3175 C<select> as normal. (You may also use the C<cols> attribute, as in
3176 earlier versions of DBIC.)
3177
3178 =head2 +columns
3179
3180 =over 4
3181
3182 =item Value: \@columns
3183
3184 =back
3185
3186 Indicates additional columns to be selected from storage. Works the same
3187 as L</columns> but adds columns to the selection. (You may also use the
3188 C<include_columns> attribute, as in earlier versions of DBIC). For
3189 example:-
3190
3191   $schema->resultset('CD')->search(undef, {
3192     '+columns' => ['artist.name'],
3193     join => ['artist']
3194   });
3195
3196 would return all CDs and include a 'name' column to the information
3197 passed to object inflation. Note that the 'artist' is the name of the
3198 column (or relationship) accessor, and 'name' is the name of the column
3199 accessor in the related table.
3200
3201 =head2 include_columns
3202
3203 =over 4
3204
3205 =item Value: \@columns
3206
3207 =back
3208
3209 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3210
3211 =head2 select
3212
3213 =over 4
3214
3215 =item Value: \@select_columns
3216
3217 =back
3218
3219 Indicates which columns should be selected from the storage. You can use
3220 column names, or in the case of RDBMS back ends, function or stored procedure
3221 names:
3222
3223   $rs = $schema->resultset('Employee')->search(undef, {
3224     select => [
3225       'name',
3226       { count => 'employeeid' },
3227       { sum => 'salary' }
3228     ]
3229   });
3230
3231 When you use function/stored procedure names and do not supply an C<as>
3232 attribute, the column names returned are storage-dependent. E.g. MySQL would
3233 return a column named C<count(employeeid)> in the above example.
3234
3235 =head2 +select
3236
3237 =over 4
3238
3239 Indicates additional columns to be selected from storage.  Works the same as
3240 L</select> but adds columns to the selection.
3241
3242 =back
3243
3244 =head2 +as
3245
3246 =over 4
3247
3248 Indicates additional column names for those added via L</+select>. See L</as>.
3249
3250 =back
3251
3252 =head2 as
3253
3254 =over 4
3255
3256 =item Value: \@inflation_names
3257
3258 =back
3259
3260 Indicates column names for object inflation. That is, C<as>
3261 indicates the name that the column can be accessed as via the
3262 C<get_column> method (or via the object accessor, B<if one already
3263 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
3264
3265 The C<as> attribute is used in conjunction with C<select>,
3266 usually when C<select> contains one or more function or stored
3267 procedure names:
3268
3269   $rs = $schema->resultset('Employee')->search(undef, {
3270     select => [
3271       'name',
3272       { count => 'employeeid' }
3273     ],
3274     as => ['name', 'employee_count'],
3275   });
3276
3277   my $employee = $rs->first(); # get the first Employee
3278
3279 If the object against which the search is performed already has an accessor
3280 matching a column name specified in C<as>, the value can be retrieved using
3281 the accessor as normal:
3282
3283   my $name = $employee->name();
3284
3285 If on the other hand an accessor does not exist in the object, you need to
3286 use C<get_column> instead:
3287
3288   my $employee_count = $employee->get_column('employee_count');
3289
3290 You can create your own accessors if required - see
3291 L<DBIx::Class::Manual::Cookbook> for details.
3292
3293 Please note: This will NOT insert an C<AS employee_count> into the SQL
3294 statement produced, it is used for internal access only. Thus
3295 attempting to use the accessor in an C<order_by> clause or similar
3296 will fail miserably.
3297
3298 To get around this limitation, you can supply literal SQL to your
3299 C<select> attibute that contains the C<AS alias> text, eg:
3300
3301   select => [\'myfield AS alias']
3302
3303 =head2 join
3304
3305 =over 4
3306
3307 =item Value: ($rel_name | \@rel_names | \%rel_names)
3308
3309 =back
3310
3311 Contains a list of relationships that should be joined for this query.  For
3312 example:
3313
3314   # Get CDs by Nine Inch Nails
3315   my $rs = $schema->resultset('CD')->search(
3316     { 'artist.name' => 'Nine Inch Nails' },
3317     { join => 'artist' }
3318   );
3319
3320 Can also contain a hash reference to refer to the other relation's relations.
3321 For example:
3322
3323   package MyApp::Schema::Track;
3324   use base qw/DBIx::Class/;
3325   __PACKAGE__->table('track');
3326   __PACKAGE__->add_columns(qw/trackid cd position title/);
3327   __PACKAGE__->set_primary_key('trackid');
3328   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3329   1;
3330
3331   # In your application
3332   my $rs = $schema->resultset('Artist')->search(
3333     { 'track.title' => 'Teardrop' },
3334     {
3335       join     => { cd => 'track' },
3336       order_by => 'artist.name',
3337     }
3338   );
3339
3340 You need to use the relationship (not the table) name in  conditions,
3341 because they are aliased as such. The current table is aliased as "me", so
3342 you need to use me.column_name in order to avoid ambiguity. For example:
3343
3344   # Get CDs from 1984 with a 'Foo' track
3345   my $rs = $schema->resultset('CD')->search(
3346     {
3347       'me.year' => 1984,
3348       'tracks.name' => 'Foo'
3349     },
3350     { join => 'tracks' }
3351   );
3352
3353 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3354 similarly for a third time). For e.g.
3355
3356   my $rs = $schema->resultset('Artist')->search({
3357     'cds.title'   => 'Down to Earth',
3358     'cds_2.title' => 'Popular',
3359   }, {
3360     join => [ qw/cds cds/ ],
3361   });
3362
3363 will return a set of all artists that have both a cd with title 'Down
3364 to Earth' and a cd with title 'Popular'.
3365
3366 If you want to fetch related objects from other tables as well, see C<prefetch>
3367 below.
3368
3369 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3370
3371 =head2 prefetch
3372
3373 =over 4
3374
3375 =item Value: ($rel_name | \@rel_names | \%rel_names)
3376
3377 =back
3378
3379 Contains one or more relationships that should be fetched along with
3380 the main query (when they are accessed afterwards the data will
3381 already be available, without extra queries to the database).  This is
3382 useful for when you know you will need the related objects, because it
3383 saves at least one query:
3384
3385   my $rs = $schema->resultset('Tag')->search(
3386     undef,
3387     {
3388       prefetch => {
3389         cd => 'artist'
3390       }
3391     }
3392   );
3393
3394 The initial search results in SQL like the following:
3395
3396   SELECT tag.*, cd.*, artist.* FROM tag
3397   JOIN cd ON tag.cd = cd.cdid
3398   JOIN artist ON cd.artist = artist.artistid
3399
3400 L<DBIx::Class> has no need to go back to the database when we access the
3401 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3402 case.
3403
3404 Simple prefetches will be joined automatically, so there is no need
3405 for a C<join> attribute in the above search.
3406
3407 C<prefetch> can be used with the following relationship types: C<belongs_to>,
3408 C<has_one> (or if you're using C<add_relationship>, any relationship declared
3409 with an accessor type of 'single' or 'filter'). A more complex example that
3410 prefetches an artists cds, the tracks on those cds, and the tags associted
3411 with that artist is given below (assuming many-to-many from artists to tags):
3412
3413  my $rs = $schema->resultset('Artist')->search(
3414    undef,
3415    {
3416      prefetch => [
3417        { cds => 'tracks' },
3418        { artist_tags => 'tags' }
3419      ]
3420    }
3421  );
3422
3423
3424 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
3425 attributes will be ignored.
3426
3427 B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
3428 exactly as you might expect.
3429
3430 =over 4
3431
3432 =item * 
3433
3434 Prefetch uses the L</cache> to populate the prefetched relationships. This
3435 may or may not be what you want.
3436
3437 =item * 
3438
3439 If you specify a condition on a prefetched relationship, ONLY those
3440 rows that match the prefetched condition will be fetched into that relationship.
3441 This means that adding prefetch to a search() B<may alter> what is returned by
3442 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
3443
3444   my $artist_rs = $schema->resultset('Artist')->search({
3445       'cds.year' => 2008,
3446   }, {
3447       join => 'cds',
3448   });
3449
3450   my $count = $artist_rs->first->cds->count;
3451
3452   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
3453
3454   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
3455
3456   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
3457
3458 that cmp_ok() may or may not pass depending on the datasets involved. This
3459 behavior may or may not survive the 0.09 transition.
3460
3461 =back
3462
3463 =head2 page
3464
3465 =over 4
3466
3467 =item Value: $page
3468
3469 =back
3470
3471 Makes the resultset paged and specifies the page to retrieve. Effectively
3472 identical to creating a non-pages resultset and then calling ->page($page)
3473 on it.
3474
3475 If L<rows> attribute is not specified it defaults to 10 rows per page.
3476
3477 When you have a paged resultset, L</count> will only return the number
3478 of rows in the page. To get the total, use the L</pager> and call
3479 C<total_entries> on it.
3480
3481 =head2 rows
3482
3483 =over 4
3484
3485 =item Value: $rows
3486
3487 =back
3488
3489 Specifes the maximum number of rows for direct retrieval or the number of
3490 rows per page if the page attribute or method is used.
3491
3492 =head2 offset
3493
3494 =over 4
3495
3496 =item Value: $offset
3497
3498 =back
3499
3500 Specifies the (zero-based) row number for the  first row to be returned, or the
3501 of the first row of the first page if paging is used.
3502
3503 =head2 group_by
3504
3505 =over 4
3506
3507 =item Value: \@columns
3508
3509 =back
3510
3511 A arrayref of columns to group by. Can include columns of joined tables.
3512
3513   group_by => [qw/ column1 column2 ... /]
3514
3515 =head2 having
3516
3517 =over 4
3518
3519 =item Value: $condition
3520
3521 =back
3522
3523 HAVING is a select statement attribute that is applied between GROUP BY and
3524 ORDER BY. It is applied to the after the grouping calculations have been
3525 done.
3526
3527   having => { 'count(employee)' => { '>=', 100 } }
3528
3529 =head2 distinct
3530
3531 =over 4
3532
3533 =item Value: (0 | 1)
3534
3535 =back
3536
3537 Set to 1 to group by all columns.
3538
3539 =head2 where
3540
3541 =over 4
3542
3543 Adds to the WHERE clause.
3544
3545   # only return rows WHERE deleted IS NULL for all searches
3546   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
3547
3548 Can be overridden by passing C<{ where => undef }> as an attribute
3549 to a resulset.
3550
3551 =back
3552
3553 =head2 cache
3554
3555 Set to 1 to cache search results. This prevents extra SQL queries if you
3556 revisit rows in your ResultSet:
3557
3558   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
3559
3560   while( my $artist = $resultset->next ) {
3561     ... do stuff ...
3562   }
3563
3564   $rs->first; # without cache, this would issue a query
3565
3566 By default, searches are not cached.
3567
3568 For more examples of using these attributes, see
3569 L<DBIx::Class::Manual::Cookbook>.
3570
3571 =head2 from
3572
3573 =over 4
3574
3575 =item Value: \@from_clause
3576
3577 =back
3578
3579 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
3580 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
3581 clauses.
3582
3583 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
3584
3585 C<join> will usually do what you need and it is strongly recommended that you
3586 avoid using C<from> unless you cannot achieve the desired result using C<join>.
3587 And we really do mean "cannot", not just tried and failed. Attempting to use
3588 this because you're having problems with C<join> is like trying to use x86
3589 ASM because you've got a syntax error in your C. Trust us on this.
3590
3591 Now, if you're still really, really sure you need to use this (and if you're
3592 not 100% sure, ask the mailing list first), here's an explanation of how this
3593 works.
3594
3595 The syntax is as follows -
3596
3597   [
3598     { <alias1> => <table1> },
3599     [
3600       { <alias2> => <table2>, -join_type => 'inner|left|right' },
3601       [], # nested JOIN (optional)
3602       { <table1.column1> => <table2.column2>, ... (more conditions) },
3603     ],
3604     # More of the above [ ] may follow for additional joins
3605   ]
3606
3607   <table1> <alias1>
3608   JOIN
3609     <table2> <alias2>
3610     [JOIN ...]
3611   ON <table1.column1> = <table2.column2>
3612   <more joins may follow>
3613
3614 An easy way to follow the examples below is to remember the following:
3615
3616     Anything inside "[]" is a JOIN
3617     Anything inside "{}" is a condition for the enclosing JOIN
3618
3619 The following examples utilize a "person" table in a family tree application.
3620 In order to express parent->child relationships, this table is self-joined:
3621
3622     # Person->belongs_to('father' => 'Person');
3623     # Person->belongs_to('mother' => 'Person');
3624
3625 C<from> can be used to nest joins. Here we return all children with a father,
3626 then search against all mothers of those children:
3627
3628   $rs = $schema->resultset('Person')->search(
3629       undef,
3630       {
3631           alias => 'mother', # alias columns in accordance with "from"
3632           from => [
3633               { mother => 'person' },
3634               [
3635                   [
3636                       { child => 'person' },
3637                       [
3638                           { father => 'person' },
3639                           { 'father.person_id' => 'child.father_id' }
3640                       ]
3641                   ],
3642                   { 'mother.person_id' => 'child.mother_id' }
3643               ],
3644           ]
3645       },
3646   );
3647
3648   # Equivalent SQL:
3649   # SELECT mother.* FROM person mother
3650   # JOIN (
3651   #   person child
3652   #   JOIN person father
3653   #   ON ( father.person_id = child.father_id )
3654   # )
3655   # ON ( mother.person_id = child.mother_id )
3656
3657 The type of any join can be controlled manually. To search against only people
3658 with a father in the person table, we could explicitly use C<INNER JOIN>:
3659
3660     $rs = $schema->resultset('Person')->search(
3661         undef,
3662         {
3663             alias => 'child', # alias columns in accordance with "from"
3664             from => [
3665                 { child => 'person' },
3666                 [
3667                     { father => 'person', -join_type => 'inner' },
3668                     { 'father.id' => 'child.father_id' }
3669                 ],
3670             ]
3671         },
3672     );
3673
3674     # Equivalent SQL:
3675     # SELECT child.* FROM person child
3676     # INNER JOIN person father ON child.father_id = father.id
3677
3678 You can select from a subquery by passing a resultset to from as follows.
3679
3680     $schema->resultset('Artist')->search( 
3681         undef, 
3682         {   alias => 'artist2',
3683             from  => [ { artist2 => $artist_rs->as_query } ],
3684         } );
3685
3686     # and you'll get sql like this..
3687     # SELECT artist2.artistid, artist2.name, artist2.rank, artist2.charfield FROM 
3688     #   ( SELECT me.artistid, me.name, me.rank, me.charfield FROM artists me ) artist2
3689
3690 If you need to express really complex joins, you
3691 can supply literal SQL to C<from> via a scalar reference. In this case
3692 the contents of the scalar will replace the table name associated with the
3693 resultsource.
3694
3695 WARNING: This technique might very well not work as expected on chained
3696 searches - you have been warned.
3697
3698     # Assuming the Event resultsource is defined as:
3699
3700         MySchema::Event->add_columns (
3701             sequence => {
3702                 data_type => 'INT',
3703                 is_auto_increment => 1,
3704             },
3705             location => {
3706                 data_type => 'INT',
3707             },
3708             type => {
3709                 data_type => 'INT',
3710             },
3711         );
3712         MySchema::Event->set_primary_key ('sequence');
3713
3714     # This will get back the latest event for every location. The column
3715     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
3716     # combo to limit the resultset
3717
3718     $rs = $schema->resultset('Event');
3719     $table = $rs->result_source->name;
3720     $latest = $rs->search (
3721         undef,
3722         { from => \ "
3723             (SELECT e1.* FROM $table e1
3724                 JOIN $table e2
3725                     ON e1.location = e2.location
3726                     AND e1.sequence < e2.sequence
3727                 WHERE e2.sequence is NULL
3728             ) me",
3729         },
3730     );
3731
3732     # Equivalent SQL (with the DBIC chunks added):
3733
3734     SELECT me.sequence, me.location, me.type FROM
3735        (SELECT e1.* FROM events e1
3736            JOIN events e2
3737                ON e1.location = e2.location
3738                AND e1.sequence < e2.sequence
3739            WHERE e2.sequence is NULL
3740        ) me;
3741
3742 =head2 for
3743
3744 =over 4
3745
3746 =item Value: ( 'update' | 'shared' )
3747
3748 =back
3749
3750 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3751 ... FOR SHARED.
3752
3753 =cut
3754
3755 1;