a27f8a2fb18f4e52f71f2df5c6a3522f99d6ce15
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use DBIx::Class::Exception;
11 use Data::Page;
12 use Storable;
13 use DBIx::Class::ResultSetColumn;
14 use DBIx::Class::ResultSourceHandle;
15 use List::Util ();
16 use Scalar::Util ();
17 use base qw/DBIx::Class/;
18
19 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
20
21 =head1 NAME
22
23 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
24
25 =head1 SYNOPSIS
26
27   my $users_rs   = $schema->resultset('User');
28   while( $user = $users_rs->next) {
29     print $user->username;
30   }
31
32   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
33   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
34
35 =head1 DESCRIPTION
36
37 A ResultSet is an object which stores a set of conditions representing
38 a query. It is the backbone of DBIx::Class (i.e. the really
39 important/useful bit).
40
41 No SQL is executed on the database when a ResultSet is created, it
42 just stores all the conditions needed to create the query.
43
44 A basic ResultSet representing the data of an entire table is returned
45 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
46 L<Source|DBIx::Class::Manual::Glossary/Source> name.
47
48   my $users_rs = $schema->resultset('User');
49
50 A new ResultSet is returned from calling L</search> on an existing
51 ResultSet. The new one will contain all the conditions of the
52 original, plus any new conditions added in the C<search> call.
53
54 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
55 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
56 represents.
57
58 The query that the ResultSet represents is B<only> executed against
59 the database when these methods are called:
60 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
61
62 If a resultset is used in a numeric context it returns the L</count>.
63 However, if it is used in a boolean context it is B<always> true.  So if
64 you want to check if a resultset has any results, you must use C<if $rs
65 != 0>.
66
67 =head1 EXAMPLES
68
69 =head2 Chaining resultsets
70
71 Let's say you've got a query that needs to be run to return some data
72 to the user. But, you have an authorization system in place that
73 prevents certain users from seeing certain information. So, you want
74 to construct the basic query in one method, but add constraints to it in
75 another.
76
77   sub get_data {
78     my $self = shift;
79     my $request = $self->get_request; # Get a request object somehow.
80     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
81
82     my $cd_rs = $schema->resultset('CD')->search({
83       title => $request->param('title'),
84       year => $request->param('year'),
85     });
86
87     $self->apply_security_policy( $cd_rs );
88
89     return $cd_rs->all();
90   }
91
92   sub apply_security_policy {
93     my $self = shift;
94     my ($rs) = @_;
95
96     return $rs->search({
97       subversive => 0,
98     });
99   }
100
101 =head3 Resolving conditions and attributes
102
103 When a resultset is chained from another resultset, conditions and
104 attributes with the same keys need resolving.
105
106 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
107 into the existing ones from the original resultset.
108
109 The L</where> and L</having> attributes, and any search conditions, are
110 merged with an SQL C<AND> to the existing condition from the original
111 resultset.
112
113 All other attributes are overridden by any new ones supplied in the
114 search attributes.
115
116 =head2 Multiple queries
117
118 Since a resultset just defines a query, you can do all sorts of
119 things with it with the same object.
120
121   # Don't hit the DB yet.
122   my $cd_rs = $schema->resultset('CD')->search({
123     title => 'something',
124     year => 2009,
125   });
126
127   # Each of these hits the DB individually.
128   my $count = $cd_rs->count;
129   my $most_recent = $cd_rs->get_column('date_released')->max();
130   my @records = $cd_rs->all;
131
132 And it's not just limited to SELECT statements.
133
134   $cd_rs->delete();
135
136 This is even cooler:
137
138   $cd_rs->create({ artist => 'Fred' });
139
140 Which is the same as:
141
142   $schema->resultset('CD')->create({
143     title => 'something',
144     year => 2009,
145     artist => 'Fred'
146   });
147
148 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
149
150 =head1 METHODS
151
152 =head2 new
153
154 =over 4
155
156 =item Arguments: $source, \%$attrs
157
158 =item Return Value: $rs
159
160 =back
161
162 The resultset constructor. Takes a source object (usually a
163 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
164 L</ATTRIBUTES> below).  Does not perform any queries -- these are
165 executed as needed by the other methods.
166
167 Generally you won't need to construct a resultset manually.  You'll
168 automatically get one from e.g. a L</search> called in scalar context:
169
170   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
171
172 IMPORTANT: If called on an object, proxies to new_result instead so
173
174   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
175
176 will return a CD object, not a ResultSet.
177
178 =cut
179
180 sub new {
181   my $class = shift;
182   return $class->new_result(@_) if ref $class;
183
184   my ($source, $attrs) = @_;
185   $source = $source->handle
186     unless $source->isa('DBIx::Class::ResultSourceHandle');
187   $attrs = { %{$attrs||{}} };
188
189   if ($attrs->{page}) {
190     $attrs->{rows} ||= 10;
191   }
192
193   $attrs->{alias} ||= 'me';
194
195   # Creation of {} and bless separated to mitigate RH perl bug
196   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
197   my $self = {
198     _source_handle => $source,
199     cond => $attrs->{where},
200     count => undef,
201     pager => undef,
202     attrs => $attrs
203   };
204
205   bless $self, $class;
206
207   $self->result_class(
208     $attrs->{result_class} || $source->resolve->result_class
209   );
210
211   return $self;
212 }
213
214 =head2 search
215
216 =over 4
217
218 =item Arguments: $cond, \%attrs?
219
220 =item Return Value: $resultset (scalar context), @row_objs (list context)
221
222 =back
223
224   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
225   my $new_rs = $cd_rs->search({ year => 2005 });
226
227   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
228                  # year = 2005 OR year = 2004
229
230 If you need to pass in additional attributes but no additional condition,
231 call it as C<search(undef, \%attrs)>.
232
233   # "SELECT name, artistid FROM $artist_table"
234   my @all_artists = $schema->resultset('Artist')->search(undef, {
235     columns => [qw/name artistid/],
236   });
237
238 For a list of attributes that can be passed to C<search>, see
239 L</ATTRIBUTES>. For more examples of using this function, see
240 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
241 documentation for the first argument, see L<SQL::Abstract>.
242
243 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
244
245 =cut
246
247 sub search {
248   my $self = shift;
249   my $rs = $self->search_rs( @_ );
250   return (wantarray ? $rs->all : $rs);
251 }
252
253 =head2 search_rs
254
255 =over 4
256
257 =item Arguments: $cond, \%attrs?
258
259 =item Return Value: $resultset
260
261 =back
262
263 This method does the same exact thing as search() except it will
264 always return a resultset, even in list context.
265
266 =cut
267
268 sub search_rs {
269   my $self = shift;
270
271   # Special-case handling for (undef, undef).
272   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
273     pop(@_); pop(@_);
274   }
275
276   my $attrs = {};
277   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
278   my $our_attrs = { %{$self->{attrs}} };
279   my $having = delete $our_attrs->{having};
280   my $where = delete $our_attrs->{where};
281
282   my $rows;
283
284   my %safe = (alias => 1, cache => 1);
285
286   unless (
287     (@_ && defined($_[0])) # @_ == () or (undef)
288     ||
289     (keys %$attrs # empty attrs or only 'safe' attrs
290     && List::Util::first { !$safe{$_} } keys %$attrs)
291   ) {
292     # no search, effectively just a clone
293     $rows = $self->get_cache;
294   }
295
296   # reset the selector list
297   if (List::Util::first { exists $attrs->{$_} } qw{columns select as}) {
298      delete @{$our_attrs}{qw{select as columns +select +as +columns include_columns}};
299   }
300
301   my $new_attrs = { %{$our_attrs}, %{$attrs} };
302
303   # merge new attrs into inherited
304   foreach my $key (qw/join prefetch +select +as +columns include_columns bind/) {
305     next unless exists $attrs->{$key};
306     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
307   }
308
309   my $cond = (@_
310     ? (
311         (@_ == 1 || ref $_[0] eq "HASH")
312           ? (
313               (ref $_[0] eq 'HASH')
314                 ? (
315                     (keys %{ $_[0] }  > 0)
316                       ? shift
317                       : undef
318                    )
319                 :  shift
320              )
321           : (
322               (@_ % 2)
323                 ? $self->throw_exception("Odd number of arguments to search")
324                 : {@_}
325              )
326       )
327     : undef
328   );
329
330   if (defined $where) {
331     $new_attrs->{where} = (
332       defined $new_attrs->{where}
333         ? { '-and' => [
334               map {
335                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
336               } $where, $new_attrs->{where}
337             ]
338           }
339         : $where);
340   }
341
342   if (defined $cond) {
343     $new_attrs->{where} = (
344       defined $new_attrs->{where}
345         ? { '-and' => [
346               map {
347                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
348               } $cond, $new_attrs->{where}
349             ]
350           }
351         : $cond);
352   }
353
354   if (defined $having) {
355     $new_attrs->{having} = (
356       defined $new_attrs->{having}
357         ? { '-and' => [
358               map {
359                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
360               } $having, $new_attrs->{having}
361             ]
362           }
363         : $having);
364   }
365
366   my $rs = (ref $self)->new($self->result_source, $new_attrs);
367
368   $rs->set_cache($rows) if ($rows);
369
370   return $rs;
371 }
372
373 =head2 search_literal
374
375 =over 4
376
377 =item Arguments: $sql_fragment, @bind_values
378
379 =item Return Value: $resultset (scalar context), @row_objs (list context)
380
381 =back
382
383   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
384   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
385
386 Pass a literal chunk of SQL to be added to the conditional part of the
387 resultset query.
388
389 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
390 only be used in that context. C<search_literal> is a convenience method.
391 It is equivalent to calling $schema->search(\[]), but if you want to ensure
392 columns are bound correctly, use C<search>.
393
394 Example of how to use C<search> instead of C<search_literal>
395
396   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
397   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
398
399
400 See L<DBIx::Class::Manual::Cookbook/Searching> and
401 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
402 require C<search_literal>.
403
404 =cut
405
406 sub search_literal {
407   my ($self, $sql, @bind) = @_;
408   my $attr;
409   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
410     $attr = pop @bind;
411   }
412   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
413 }
414
415 =head2 find
416
417 =over 4
418
419 =item Arguments: @values | \%cols, \%attrs?
420
421 =item Return Value: $row_object | undef
422
423 =back
424
425 Finds a row based on its primary key or unique constraint. For example, to find
426 a row by its primary key:
427
428   my $cd = $schema->resultset('CD')->find(5);
429
430 You can also find a row by a specific unique constraint using the C<key>
431 attribute. For example:
432
433   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
434     key => 'cd_artist_title'
435   });
436
437 Additionally, you can specify the columns explicitly by name:
438
439   my $cd = $schema->resultset('CD')->find(
440     {
441       artist => 'Massive Attack',
442       title  => 'Mezzanine',
443     },
444     { key => 'cd_artist_title' }
445   );
446
447 If the C<key> is specified as C<primary>, it searches only on the primary key.
448
449 If no C<key> is specified, it searches on all unique constraints defined on the
450 source for which column data is provided, including the primary key.
451
452 If your table does not have a primary key, you B<must> provide a value for the
453 C<key> attribute matching one of the unique constraints on the source.
454
455 In addition to C<key>, L</find> recognizes and applies standard
456 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
457
458 Note: If your query does not return only one row, a warning is generated:
459
460   Query returned more than one row
461
462 See also L</find_or_create> and L</update_or_create>. For information on how to
463 declare unique constraints, see
464 L<DBIx::Class::ResultSource/add_unique_constraint>.
465
466 =cut
467
468 sub find {
469   my $self = shift;
470   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
471
472   # Default to the primary key, but allow a specific key
473   my @cols = exists $attrs->{key}
474     ? $self->result_source->unique_constraint_columns($attrs->{key})
475     : $self->result_source->primary_columns;
476   $self->throw_exception(
477     "Can't find unless a primary key is defined or unique constraint is specified"
478   ) unless @cols;
479
480   # Parse out a hashref from input
481   my $input_query;
482   if (ref $_[0] eq 'HASH') {
483     $input_query = { %{$_[0]} };
484   }
485   elsif (@_ == @cols) {
486     $input_query = {};
487     @{$input_query}{@cols} = @_;
488   }
489   else {
490     # Compatibility: Allow e.g. find(id => $value)
491     carp "Find by key => value deprecated; please use a hashref instead";
492     $input_query = {@_};
493   }
494
495   my (%related, $info);
496
497   KEY: foreach my $key (keys %$input_query) {
498     if (ref($input_query->{$key})
499         && ($info = $self->result_source->relationship_info($key))) {
500       my $val = delete $input_query->{$key};
501       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
502       my $rel_q = $self->result_source->_resolve_condition(
503                     $info->{cond}, $val, $key
504                   );
505       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
506       @related{keys %$rel_q} = values %$rel_q;
507     }
508   }
509   if (my @keys = keys %related) {
510     @{$input_query}{@keys} = values %related;
511   }
512
513
514   # Build the final query: Default to the disjunction of the unique queries,
515   # but allow the input query in case the ResultSet defines the query or the
516   # user is abusing find
517   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
518   my $query;
519   if (exists $attrs->{key}) {
520     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
521     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
522     $query = $self->_add_alias($unique_query, $alias);
523   }
524   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
525     # This means that we got here after a merger of relationship conditions
526     # in ::Relationship::Base::search_related (the row method), and furthermore
527     # the relationship is of the 'single' type. This means that the condition
528     # provided by the relationship (already attached to $self) is sufficient,
529     # as there can be only one row in the database that would satisfy the
530     # relationship
531   }
532   else {
533     my @unique_queries = $self->_unique_queries($input_query, $attrs);
534     $query = @unique_queries
535       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
536       : $self->_add_alias($input_query, $alias);
537   }
538
539   # Run the query, passing the result_class since it should propagate for find
540   my $rs = $self->search ($query, {result_class => $self->result_class, %$attrs});
541   if (keys %{$rs->_resolved_attrs->{collapse}}) {
542     my $row = $rs->next;
543     carp "Query returned more than one row" if $rs->next;
544     return $row;
545   }
546   else {
547     return $rs->single;
548   }
549 }
550
551 # _add_alias
552 #
553 # Add the specified alias to the specified query hash. A copy is made so the
554 # original query is not modified.
555
556 sub _add_alias {
557   my ($self, $query, $alias) = @_;
558
559   my %aliased = %$query;
560   foreach my $col (grep { ! m/\./ } keys %aliased) {
561     $aliased{"$alias.$col"} = delete $aliased{$col};
562   }
563
564   return \%aliased;
565 }
566
567 # _unique_queries
568 #
569 # Build a list of queries which satisfy unique constraints.
570
571 sub _unique_queries {
572   my ($self, $query, $attrs) = @_;
573
574   my @constraint_names = exists $attrs->{key}
575     ? ($attrs->{key})
576     : $self->result_source->unique_constraint_names;
577
578   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
579   my $num_where = scalar keys %$where;
580
581   my (@unique_queries, %seen_column_combinations);
582   foreach my $name (@constraint_names) {
583     my @constraint_cols = $self->result_source->unique_constraint_columns($name);
584
585     my $constraint_sig = join "\x00", sort @constraint_cols;
586     next if $seen_column_combinations{$constraint_sig}++;
587
588     my $unique_query = $self->_build_unique_query($query, \@constraint_cols);
589
590     my $num_cols = scalar @constraint_cols;
591     my $num_query = scalar keys %$unique_query;
592
593     my $total = $num_query + $num_where;
594     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
595       # The query is either unique on its own or is unique in combination with
596       # the existing where clause
597       push @unique_queries, $unique_query;
598     }
599   }
600
601   return @unique_queries;
602 }
603
604 # _build_unique_query
605 #
606 # Constrain the specified query hash based on the specified column names.
607
608 sub _build_unique_query {
609   my ($self, $query, $unique_cols) = @_;
610
611   return {
612     map  { $_ => $query->{$_} }
613     grep { exists $query->{$_} }
614       @$unique_cols
615   };
616 }
617
618 =head2 search_related
619
620 =over 4
621
622 =item Arguments: $rel, $cond, \%attrs?
623
624 =item Return Value: $new_resultset
625
626 =back
627
628   $new_rs = $cd_rs->search_related('artist', {
629     name => 'Emo-R-Us',
630   });
631
632 Searches the specified relationship, optionally specifying a condition and
633 attributes for matching records. See L</ATTRIBUTES> for more information.
634
635 =cut
636
637 sub search_related {
638   return shift->related_resultset(shift)->search(@_);
639 }
640
641 =head2 search_related_rs
642
643 This method works exactly the same as search_related, except that
644 it guarantees a resultset, even in list context.
645
646 =cut
647
648 sub search_related_rs {
649   return shift->related_resultset(shift)->search_rs(@_);
650 }
651
652 =head2 cursor
653
654 =over 4
655
656 =item Arguments: none
657
658 =item Return Value: $cursor
659
660 =back
661
662 Returns a storage-driven cursor to the given resultset. See
663 L<DBIx::Class::Cursor> for more information.
664
665 =cut
666
667 sub cursor {
668   my ($self) = @_;
669
670   my $attrs = $self->_resolved_attrs_copy;
671
672   return $self->{cursor}
673     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
674           $attrs->{where},$attrs);
675 }
676
677 =head2 single
678
679 =over 4
680
681 =item Arguments: $cond?
682
683 =item Return Value: $row_object?
684
685 =back
686
687   my $cd = $schema->resultset('CD')->single({ year => 2001 });
688
689 Inflates the first result without creating a cursor if the resultset has
690 any records in it; if not returns nothing. Used by L</find> as a lean version of
691 L</search>.
692
693 While this method can take an optional search condition (just like L</search>)
694 being a fast-code-path it does not recognize search attributes. If you need to
695 add extra joins or similar, call L</search> and then chain-call L</single> on the
696 L<DBIx::Class::ResultSet> returned.
697
698 =over
699
700 =item B<Note>
701
702 As of 0.08100, this method enforces the assumption that the preceding
703 query returns only one row. If more than one row is returned, you will receive
704 a warning:
705
706   Query returned more than one row
707
708 In this case, you should be using L</next> or L</find> instead, or if you really
709 know what you are doing, use the L</rows> attribute to explicitly limit the size
710 of the resultset.
711
712 This method will also throw an exception if it is called on a resultset prefetching
713 has_many, as such a prefetch implies fetching multiple rows from the database in
714 order to assemble the resulting object.
715
716 =back
717
718 =cut
719
720 sub single {
721   my ($self, $where) = @_;
722   if(@_ > 2) {
723       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
724   }
725
726   my $attrs = $self->_resolved_attrs_copy;
727
728   if (keys %{$attrs->{collapse}}) {
729     $self->throw_exception(
730       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
731     );
732   }
733
734   if ($where) {
735     if (defined $attrs->{where}) {
736       $attrs->{where} = {
737         '-and' =>
738             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
739                $where, delete $attrs->{where} ]
740       };
741     } else {
742       $attrs->{where} = $where;
743     }
744   }
745
746 #  XXX: Disabled since it doesn't infer uniqueness in all cases
747 #  unless ($self->_is_unique_query($attrs->{where})) {
748 #    carp "Query not guaranteed to return a single row"
749 #      . "; please declare your unique constraints or use search instead";
750 #  }
751
752   my @data = $self->result_source->storage->select_single(
753     $attrs->{from}, $attrs->{select},
754     $attrs->{where}, $attrs
755   );
756
757   return (@data ? ($self->_construct_object(@data))[0] : undef);
758 }
759
760
761 # _is_unique_query
762 #
763 # Try to determine if the specified query is guaranteed to be unique, based on
764 # the declared unique constraints.
765
766 sub _is_unique_query {
767   my ($self, $query) = @_;
768
769   my $collapsed = $self->_collapse_query($query);
770   my $alias = $self->{attrs}{alias};
771
772   foreach my $name ($self->result_source->unique_constraint_names) {
773     my @unique_cols = map {
774       "$alias.$_"
775     } $self->result_source->unique_constraint_columns($name);
776
777     # Count the values for each unique column
778     my %seen = map { $_ => 0 } @unique_cols;
779
780     foreach my $key (keys %$collapsed) {
781       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
782       next unless exists $seen{$aliased};  # Additional constraints are okay
783       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
784     }
785
786     # If we get 0 or more than 1 value for a column, it's not necessarily unique
787     return 1 unless grep { $_ != 1 } values %seen;
788   }
789
790   return 0;
791 }
792
793 # _collapse_query
794 #
795 # Recursively collapse the query, accumulating values for each column.
796
797 sub _collapse_query {
798   my ($self, $query, $collapsed) = @_;
799
800   $collapsed ||= {};
801
802   if (ref $query eq 'ARRAY') {
803     foreach my $subquery (@$query) {
804       next unless ref $subquery;  # -or
805       $collapsed = $self->_collapse_query($subquery, $collapsed);
806     }
807   }
808   elsif (ref $query eq 'HASH') {
809     if (keys %$query and (keys %$query)[0] eq '-and') {
810       foreach my $subquery (@{$query->{-and}}) {
811         $collapsed = $self->_collapse_query($subquery, $collapsed);
812       }
813     }
814     else {
815       foreach my $col (keys %$query) {
816         my $value = $query->{$col};
817         $collapsed->{$col}{$value}++;
818       }
819     }
820   }
821
822   return $collapsed;
823 }
824
825 =head2 get_column
826
827 =over 4
828
829 =item Arguments: $cond?
830
831 =item Return Value: $resultsetcolumn
832
833 =back
834
835   my $max_length = $rs->get_column('length')->max;
836
837 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
838
839 =cut
840
841 sub get_column {
842   my ($self, $column) = @_;
843   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
844   return $new;
845 }
846
847 =head2 search_like
848
849 =over 4
850
851 =item Arguments: $cond, \%attrs?
852
853 =item Return Value: $resultset (scalar context), @row_objs (list context)
854
855 =back
856
857   # WHERE title LIKE '%blue%'
858   $cd_rs = $rs->search_like({ title => '%blue%'});
859
860 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
861 that this is simply a convenience method retained for ex Class::DBI users.
862 You most likely want to use L</search> with specific operators.
863
864 For more information, see L<DBIx::Class::Manual::Cookbook>.
865
866 This method is deprecated and will be removed in 0.09. Use L</search()>
867 instead. An example conversion is:
868
869   ->search_like({ foo => 'bar' });
870
871   # Becomes
872
873   ->search({ foo => { like => 'bar' } });
874
875 =cut
876
877 sub search_like {
878   my $class = shift;
879   carp (
880     'search_like() is deprecated and will be removed in DBIC version 0.09.'
881    .' Instead use ->search({ x => { -like => "y%" } })'
882    .' (note the outer pair of {}s - they are important!)'
883   );
884   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
885   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
886   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
887   return $class->search($query, { %$attrs });
888 }
889
890 =head2 slice
891
892 =over 4
893
894 =item Arguments: $first, $last
895
896 =item Return Value: $resultset (scalar context), @row_objs (list context)
897
898 =back
899
900 Returns a resultset or object list representing a subset of elements from the
901 resultset slice is called on. Indexes are from 0, i.e., to get the first
902 three records, call:
903
904   my ($one, $two, $three) = $rs->slice(0, 2);
905
906 =cut
907
908 sub slice {
909   my ($self, $min, $max) = @_;
910   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
911   $attrs->{offset} = $self->{attrs}{offset} || 0;
912   $attrs->{offset} += $min;
913   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
914   return $self->search(undef(), $attrs);
915   #my $slice = (ref $self)->new($self->result_source, $attrs);
916   #return (wantarray ? $slice->all : $slice);
917 }
918
919 =head2 next
920
921 =over 4
922
923 =item Arguments: none
924
925 =item Return Value: $result?
926
927 =back
928
929 Returns the next element in the resultset (C<undef> is there is none).
930
931 Can be used to efficiently iterate over records in the resultset:
932
933   my $rs = $schema->resultset('CD')->search;
934   while (my $cd = $rs->next) {
935     print $cd->title;
936   }
937
938 Note that you need to store the resultset object, and call C<next> on it.
939 Calling C<< resultset('Table')->next >> repeatedly will always return the
940 first record from the resultset.
941
942 =cut
943
944 sub next {
945   my ($self) = @_;
946   if (my $cache = $self->get_cache) {
947     $self->{all_cache_position} ||= 0;
948     return $cache->[$self->{all_cache_position}++];
949   }
950   if ($self->{attrs}{cache}) {
951     $self->{all_cache_position} = 1;
952     return ($self->all)[0];
953   }
954   if ($self->{stashed_objects}) {
955     my $obj = shift(@{$self->{stashed_objects}});
956     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
957     return $obj;
958   }
959   my @row = (
960     exists $self->{stashed_row}
961       ? @{delete $self->{stashed_row}}
962       : $self->cursor->next
963   );
964   return undef unless (@row);
965   my ($row, @more) = $self->_construct_object(@row);
966   $self->{stashed_objects} = \@more if @more;
967   return $row;
968 }
969
970 sub _construct_object {
971   my ($self, @row) = @_;
972
973   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
974     or return ();
975   my @new = $self->result_class->inflate_result($self->result_source, @$info);
976   @new = $self->{_attrs}{record_filter}->(@new)
977     if exists $self->{_attrs}{record_filter};
978   return @new;
979 }
980
981 sub _collapse_result {
982   my ($self, $as_proto, $row) = @_;
983
984   my @copy = @$row;
985
986   # 'foo'         => [ undef, 'foo' ]
987   # 'foo.bar'     => [ 'foo', 'bar' ]
988   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
989
990   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
991
992   my %collapse = %{$self->{_attrs}{collapse}||{}};
993
994   my @pri_index;
995
996   # if we're doing collapsing (has_many prefetch) we need to grab records
997   # until the PK changes, so fill @pri_index. if not, we leave it empty so
998   # we know we don't have to bother.
999
1000   # the reason for not using the collapse stuff directly is because if you
1001   # had for e.g. two artists in a row with no cds, the collapse info for
1002   # both would be NULL (undef) so you'd lose the second artist
1003
1004   # store just the index so we can check the array positions from the row
1005   # without having to contruct the full hash
1006
1007   if (keys %collapse) {
1008     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
1009     foreach my $i (0 .. $#construct_as) {
1010       next if defined($construct_as[$i][0]); # only self table
1011       if (delete $pri{$construct_as[$i][1]}) {
1012         push(@pri_index, $i);
1013       }
1014       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
1015     }
1016   }
1017
1018   # no need to do an if, it'll be empty if @pri_index is empty anyway
1019
1020   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1021
1022   my @const_rows;
1023
1024   do { # no need to check anything at the front, we always want the first row
1025
1026     my %const;
1027
1028     foreach my $this_as (@construct_as) {
1029       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1030     }
1031
1032     push(@const_rows, \%const);
1033
1034   } until ( # no pri_index => no collapse => drop straight out
1035       !@pri_index
1036     or
1037       do { # get another row, stash it, drop out if different PK
1038
1039         @copy = $self->cursor->next;
1040         $self->{stashed_row} = \@copy;
1041
1042         # last thing in do block, counts as true if anything doesn't match
1043
1044         # check xor defined first for NULL vs. NOT NULL then if one is
1045         # defined the other must be so check string equality
1046
1047         grep {
1048           (defined $pri_vals{$_} ^ defined $copy[$_])
1049           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1050         } @pri_index;
1051       }
1052   );
1053
1054   my $alias = $self->{attrs}{alias};
1055   my $info = [];
1056
1057   my %collapse_pos;
1058
1059   my @const_keys;
1060
1061   foreach my $const (@const_rows) {
1062     scalar @const_keys or do {
1063       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1064     };
1065     foreach my $key (@const_keys) {
1066       if (length $key) {
1067         my $target = $info;
1068         my @parts = split(/\./, $key);
1069         my $cur = '';
1070         my $data = $const->{$key};
1071         foreach my $p (@parts) {
1072           $target = $target->[1]->{$p} ||= [];
1073           $cur .= ".${p}";
1074           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1075             # collapsing at this point and on final part
1076             my $pos = $collapse_pos{$cur};
1077             CK: foreach my $ck (@ckey) {
1078               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1079                 $collapse_pos{$cur} = $data;
1080                 delete @collapse_pos{ # clear all positioning for sub-entries
1081                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1082                 };
1083                 push(@$target, []);
1084                 last CK;
1085               }
1086             }
1087           }
1088           if (exists $collapse{$cur}) {
1089             $target = $target->[-1];
1090           }
1091         }
1092         $target->[0] = $data;
1093       } else {
1094         $info->[0] = $const->{$key};
1095       }
1096     }
1097   }
1098
1099   return $info;
1100 }
1101
1102 =head2 result_source
1103
1104 =over 4
1105
1106 =item Arguments: $result_source?
1107
1108 =item Return Value: $result_source
1109
1110 =back
1111
1112 An accessor for the primary ResultSource object from which this ResultSet
1113 is derived.
1114
1115 =head2 result_class
1116
1117 =over 4
1118
1119 =item Arguments: $result_class?
1120
1121 =item Return Value: $result_class
1122
1123 =back
1124
1125 An accessor for the class to use when creating row objects. Defaults to
1126 C<< result_source->result_class >> - which in most cases is the name of the
1127 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1128
1129 Note that changing the result_class will also remove any components
1130 that were originally loaded in the source class via
1131 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1132 in the original source class will not run.
1133
1134 =cut
1135
1136 sub result_class {
1137   my ($self, $result_class) = @_;
1138   if ($result_class) {
1139     unless (ref $result_class) { # don't fire this for an object
1140       $self->ensure_class_loaded($result_class);
1141     }
1142     $self->_result_class($result_class);
1143     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1144     # permit the user to set result class on one result set only; it only
1145     # chains if provided to search()
1146     #$self->{attrs}{result_class} = $result_class if ref $self;
1147   }
1148   $self->_result_class;
1149 }
1150
1151 =head2 count
1152
1153 =over 4
1154
1155 =item Arguments: $cond, \%attrs??
1156
1157 =item Return Value: $count
1158
1159 =back
1160
1161 Performs an SQL C<COUNT> with the same query as the resultset was built
1162 with to find the number of elements. Passing arguments is equivalent to
1163 C<< $rs->search ($cond, \%attrs)->count >>
1164
1165 =cut
1166
1167 sub count {
1168   my $self = shift;
1169   return $self->search(@_)->count if @_ and defined $_[0];
1170   return scalar @{ $self->get_cache } if $self->get_cache;
1171
1172   my $attrs = $self->_resolved_attrs_copy;
1173
1174   # this is a little optimization - it is faster to do the limit
1175   # adjustments in software, instead of a subquery
1176   my $rows = delete $attrs->{rows};
1177   my $offset = delete $attrs->{offset};
1178
1179   my $crs;
1180   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1181     $crs = $self->_count_subq_rs ($attrs);
1182   }
1183   else {
1184     $crs = $self->_count_rs ($attrs);
1185   }
1186   my $count = $crs->next;
1187
1188   $count -= $offset if $offset;
1189   $count = $rows if $rows and $rows < $count;
1190   $count = 0 if ($count < 0);
1191
1192   return $count;
1193 }
1194
1195 =head2 count_rs
1196
1197 =over 4
1198
1199 =item Arguments: $cond, \%attrs??
1200
1201 =item Return Value: $count_rs
1202
1203 =back
1204
1205 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1206 This can be very handy for subqueries:
1207
1208   ->search( { amount => $some_rs->count_rs->as_query } )
1209
1210 As with regular resultsets the SQL query will be executed only after
1211 the resultset is accessed via L</next> or L</all>. That would return
1212 the same single value obtainable via L</count>.
1213
1214 =cut
1215
1216 sub count_rs {
1217   my $self = shift;
1218   return $self->search(@_)->count_rs if @_;
1219
1220   # this may look like a lack of abstraction (count() does about the same)
1221   # but in fact an _rs *must* use a subquery for the limits, as the
1222   # software based limiting can not be ported if this $rs is to be used
1223   # in a subquery itself (i.e. ->as_query)
1224   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1225     return $self->_count_subq_rs;
1226   }
1227   else {
1228     return $self->_count_rs;
1229   }
1230 }
1231
1232 #
1233 # returns a ResultSetColumn object tied to the count query
1234 #
1235 sub _count_rs {
1236   my ($self, $attrs) = @_;
1237
1238   my $rsrc = $self->result_source;
1239   $attrs ||= $self->_resolved_attrs;
1240
1241   # only take pieces we need for a simple count
1242   my $tmp_attrs = { map
1243     { $_ => $attrs->{$_} }
1244     qw/ alias from where bind join /
1245   };
1246
1247   # overwrite the selector (supplied by the storage)
1248   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
1249   $tmp_attrs->{as} = 'count';
1250
1251   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1252
1253   return $tmp_rs;
1254 }
1255
1256 #
1257 # same as above but uses a subquery
1258 #
1259 sub _count_subq_rs {
1260   my ($self, $attrs) = @_;
1261
1262   my $rsrc = $self->result_source;
1263   $attrs ||= $self->_resolved_attrs;
1264
1265   my $sub_attrs = { map
1266     { $_ => $attrs->{$_} }
1267     qw/ alias from where bind join group_by having rows offset /
1268   };
1269
1270   # if we multi-prefetch we group_by primary keys only as this is what we would
1271   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1272   if ( keys %{$attrs->{collapse}}  ) {
1273     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1274   }
1275
1276   # Calculate subquery selector
1277   if (my $g = $sub_attrs->{group_by}) {
1278
1279     # necessary as the group_by may refer to aliased functions
1280     my $sel_index;
1281     for my $sel (@{$attrs->{select}}) {
1282       $sel_index->{$sel->{-as}} = $sel
1283         if (ref $sel eq 'HASH' and $sel->{-as});
1284     }
1285
1286     for my $g_part (@$g) {
1287       push @{$sub_attrs->{select}}, $sel_index->{$g_part} || $g_part;
1288     }
1289   }
1290   else {
1291     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1292     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1293   }
1294
1295   return $rsrc->resultset_class
1296                ->new ($rsrc, $sub_attrs)
1297                 ->as_subselect_rs
1298                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1299                   -> get_column ('count');
1300 }
1301
1302 sub _bool {
1303   return 1;
1304 }
1305
1306 =head2 count_literal
1307
1308 =over 4
1309
1310 =item Arguments: $sql_fragment, @bind_values
1311
1312 =item Return Value: $count
1313
1314 =back
1315
1316 Counts the results in a literal query. Equivalent to calling L</search_literal>
1317 with the passed arguments, then L</count>.
1318
1319 =cut
1320
1321 sub count_literal { shift->search_literal(@_)->count; }
1322
1323 =head2 all
1324
1325 =over 4
1326
1327 =item Arguments: none
1328
1329 =item Return Value: @objects
1330
1331 =back
1332
1333 Returns all elements in the resultset. Called implicitly if the resultset
1334 is returned in list context.
1335
1336 =cut
1337
1338 sub all {
1339   my $self = shift;
1340   if(@_) {
1341       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1342   }
1343
1344   return @{ $self->get_cache } if $self->get_cache;
1345
1346   my @obj;
1347
1348   if (keys %{$self->_resolved_attrs->{collapse}}) {
1349     # Using $self->cursor->all is really just an optimisation.
1350     # If we're collapsing has_many prefetches it probably makes
1351     # very little difference, and this is cleaner than hacking
1352     # _construct_object to survive the approach
1353     $self->cursor->reset;
1354     my @row = $self->cursor->next;
1355     while (@row) {
1356       push(@obj, $self->_construct_object(@row));
1357       @row = (exists $self->{stashed_row}
1358                ? @{delete $self->{stashed_row}}
1359                : $self->cursor->next);
1360     }
1361   } else {
1362     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1363   }
1364
1365   $self->set_cache(\@obj) if $self->{attrs}{cache};
1366
1367   return @obj;
1368 }
1369
1370 =head2 reset
1371
1372 =over 4
1373
1374 =item Arguments: none
1375
1376 =item Return Value: $self
1377
1378 =back
1379
1380 Resets the resultset's cursor, so you can iterate through the elements again.
1381 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1382 another query.
1383
1384 =cut
1385
1386 sub reset {
1387   my ($self) = @_;
1388   delete $self->{_attrs} if exists $self->{_attrs};
1389   $self->{all_cache_position} = 0;
1390   $self->cursor->reset;
1391   return $self;
1392 }
1393
1394 =head2 first
1395
1396 =over 4
1397
1398 =item Arguments: none
1399
1400 =item Return Value: $object?
1401
1402 =back
1403
1404 Resets the resultset and returns an object for the first result (if the
1405 resultset returns anything).
1406
1407 =cut
1408
1409 sub first {
1410   return $_[0]->reset->next;
1411 }
1412
1413
1414 # _rs_update_delete
1415 #
1416 # Determines whether and what type of subquery is required for the $rs operation.
1417 # If grouping is necessary either supplies its own, or verifies the current one
1418 # After all is done delegates to the proper storage method.
1419
1420 sub _rs_update_delete {
1421   my ($self, $op, $values) = @_;
1422
1423   my $rsrc = $self->result_source;
1424
1425   # if a condition exists we need to strip all table qualifiers
1426   # if this is not possible we'll force a subquery below
1427   my $cond = $rsrc->schema->storage->_strip_cond_qualifiers ($self->{cond});
1428
1429   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1430   my $needs_subq = $needs_group_by_subq || (not defined $cond) || $self->_has_resolved_attr(qw/rows offset/);
1431
1432   if ($needs_group_by_subq or $needs_subq) {
1433
1434     # make a new $rs selecting only the PKs (that's all we really need)
1435     my $attrs = $self->_resolved_attrs_copy;
1436
1437     delete $attrs->{$_} for qw/collapse select as/;
1438     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1439
1440     if ($needs_group_by_subq) {
1441       # make sure no group_by was supplied, or if there is one - make sure it matches
1442       # the columns compiled above perfectly. Anything else can not be sanely executed
1443       # on most databases so croak right then and there
1444
1445       if (my $g = $attrs->{group_by}) {
1446         my @current_group_by = map
1447           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1448           @$g
1449         ;
1450
1451         if (
1452           join ("\x00", sort @current_group_by)
1453             ne
1454           join ("\x00", sort @{$attrs->{columns}} )
1455         ) {
1456           $self->throw_exception (
1457             "You have just attempted a $op operation on a resultset which does group_by"
1458             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1459             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1460             . ' kind of queries. Please retry the operation with a modified group_by or'
1461             . ' without using one at all.'
1462           );
1463         }
1464       }
1465       else {
1466         $attrs->{group_by} = $attrs->{columns};
1467       }
1468     }
1469
1470     my $subrs = (ref $self)->new($rsrc, $attrs);
1471
1472     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1473   }
1474   else {
1475     return $rsrc->storage->$op(
1476       $rsrc,
1477       $op eq 'update' ? $values : (),
1478       $cond,
1479     );
1480   }
1481 }
1482
1483 =head2 update
1484
1485 =over 4
1486
1487 =item Arguments: \%values
1488
1489 =item Return Value: $storage_rv
1490
1491 =back
1492
1493 Sets the specified columns in the resultset to the supplied values in a
1494 single query. Return value will be true if the update succeeded or false
1495 if no records were updated; exact type of success value is storage-dependent.
1496
1497 =cut
1498
1499 sub update {
1500   my ($self, $values) = @_;
1501   $self->throw_exception('Values for update must be a hash')
1502     unless ref $values eq 'HASH';
1503
1504   return $self->_rs_update_delete ('update', $values);
1505 }
1506
1507 =head2 update_all
1508
1509 =over 4
1510
1511 =item Arguments: \%values
1512
1513 =item Return Value: 1
1514
1515 =back
1516
1517 Fetches all objects and updates them one at a time. Note that C<update_all>
1518 will run DBIC cascade triggers, while L</update> will not.
1519
1520 =cut
1521
1522 sub update_all {
1523   my ($self, $values) = @_;
1524   $self->throw_exception('Values for update_all must be a hash')
1525     unless ref $values eq 'HASH';
1526
1527   my $guard = $self->result_source->schema->txn_scope_guard;
1528   $_->update($values) for $self->all;
1529   $guard->commit;
1530   return 1;
1531 }
1532
1533 =head2 delete
1534
1535 =over 4
1536
1537 =item Arguments: none
1538
1539 =item Return Value: $storage_rv
1540
1541 =back
1542
1543 Deletes the contents of the resultset from its result source. Note that this
1544 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1545 to run. See also L<DBIx::Class::Row/delete>.
1546
1547 Return value will be the number of rows deleted; exact type of return value
1548 is storage-dependent.
1549
1550 =cut
1551
1552 sub delete {
1553   my $self = shift;
1554   $self->throw_exception('delete does not accept any arguments')
1555     if @_;
1556
1557   return $self->_rs_update_delete ('delete');
1558 }
1559
1560 =head2 delete_all
1561
1562 =over 4
1563
1564 =item Arguments: none
1565
1566 =item Return Value: 1
1567
1568 =back
1569
1570 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1571 will run DBIC cascade triggers, while L</delete> will not.
1572
1573 =cut
1574
1575 sub delete_all {
1576   my $self = shift;
1577   $self->throw_exception('delete_all does not accept any arguments')
1578     if @_;
1579
1580   my $guard = $self->result_source->schema->txn_scope_guard;
1581   $_->delete for $self->all;
1582   $guard->commit;
1583   return 1;
1584 }
1585
1586 =head2 populate
1587
1588 =over 4
1589
1590 =item Arguments: \@data;
1591
1592 =back
1593
1594 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1595 For the arrayref of hashrefs style each hashref should be a structure suitable
1596 forsubmitting to a $resultset->create(...) method.
1597
1598 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1599 to insert the data, as this is a faster method.
1600
1601 Otherwise, each set of data is inserted into the database using
1602 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1603 accumulated into an array. The array itself, or an array reference
1604 is returned depending on scalar or list context.
1605
1606 Example:  Assuming an Artist Class that has many CDs Classes relating:
1607
1608   my $Artist_rs = $schema->resultset("Artist");
1609
1610   ## Void Context Example
1611   $Artist_rs->populate([
1612      { artistid => 4, name => 'Manufactured Crap', cds => [
1613         { title => 'My First CD', year => 2006 },
1614         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1615       ],
1616      },
1617      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1618         { title => 'My parents sold me to a record company', year => 2005 },
1619         { title => 'Why Am I So Ugly?', year => 2006 },
1620         { title => 'I Got Surgery and am now Popular', year => 2007 }
1621       ],
1622      },
1623   ]);
1624
1625   ## Array Context Example
1626   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1627     { name => "Artist One"},
1628     { name => "Artist Two"},
1629     { name => "Artist Three", cds=> [
1630     { title => "First CD", year => 2007},
1631     { title => "Second CD", year => 2008},
1632   ]}
1633   ]);
1634
1635   print $ArtistOne->name; ## response is 'Artist One'
1636   print $ArtistThree->cds->count ## reponse is '2'
1637
1638 For the arrayref of arrayrefs style,  the first element should be a list of the
1639 fieldsnames to which the remaining elements are rows being inserted.  For
1640 example:
1641
1642   $Arstist_rs->populate([
1643     [qw/artistid name/],
1644     [100, 'A Formally Unknown Singer'],
1645     [101, 'A singer that jumped the shark two albums ago'],
1646     [102, 'An actually cool singer'],
1647   ]);
1648
1649 Please note an important effect on your data when choosing between void and
1650 wantarray context. Since void context goes straight to C<insert_bulk> in
1651 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1652 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1653 create primary keys for you, you will find that your PKs are empty.  In this
1654 case you will have to use the wantarray context in order to create those
1655 values.
1656
1657 =cut
1658
1659 sub populate {
1660   my $self = shift;
1661
1662   # cruft placed in standalone method
1663   my $data = $self->_normalize_populate_args(@_);
1664
1665   if(defined wantarray) {
1666     my @created;
1667     foreach my $item (@$data) {
1668       push(@created, $self->create($item));
1669     }
1670     return wantarray ? @created : \@created;
1671   } else {
1672     my $first = $data->[0];
1673
1674     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1675     # it relationship data
1676     my (@rels, @columns);
1677     for (keys %$first) {
1678       my $ref = ref $first->{$_};
1679       $self->result_source->has_relationship($_) && ($ref eq 'ARRAY' or $ref eq 'HASH')
1680         ? push @rels, $_
1681         : push @columns, $_
1682       ;
1683     }
1684
1685     my @pks = $self->result_source->primary_columns;
1686
1687     ## do the belongs_to relationships
1688     foreach my $index (0..$#$data) {
1689
1690       # delegate to create() for any dataset without primary keys with specified relationships
1691       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1692         for my $r (@rels) {
1693           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1694             my @ret = $self->populate($data);
1695             return;
1696           }
1697         }
1698       }
1699
1700       foreach my $rel (@rels) {
1701         next unless ref $data->[$index]->{$rel} eq "HASH";
1702         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1703         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1704         my $related = $result->result_source->_resolve_condition(
1705           $result->result_source->relationship_info($reverse)->{cond},
1706           $self,
1707           $result,
1708         );
1709
1710         delete $data->[$index]->{$rel};
1711         $data->[$index] = {%{$data->[$index]}, %$related};
1712
1713         push @columns, keys %$related if $index == 0;
1714       }
1715     }
1716
1717     ## inherit the data locked in the conditions of the resultset
1718     my ($rs_data) = $self->_merge_cond_with_data({});
1719     delete @{$rs_data}{@columns};
1720     my @inherit_cols = keys %$rs_data;
1721     my @inherit_data = values %$rs_data;
1722
1723     ## do bulk insert on current row
1724     $self->result_source->storage->insert_bulk(
1725       $self->result_source,
1726       [@columns, @inherit_cols],
1727       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
1728     );
1729
1730     ## do the has_many relationships
1731     foreach my $item (@$data) {
1732
1733       foreach my $rel (@rels) {
1734         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1735
1736         my $parent = $self->find({map { $_ => $item->{$_} } @pks})
1737      || $self->throw_exception('Cannot find the relating object.');
1738
1739         my $child = $parent->$rel;
1740
1741         my $related = $child->result_source->_resolve_condition(
1742           $parent->result_source->relationship_info($rel)->{cond},
1743           $child,
1744           $parent,
1745         );
1746
1747         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1748         my @populate = map { {%$_, %$related} } @rows_to_add;
1749
1750         $child->populate( \@populate );
1751       }
1752     }
1753   }
1754 }
1755
1756
1757 # populate() argumnets went over several incarnations
1758 # What we ultimately support is AoH
1759 sub _normalize_populate_args {
1760   my ($self, $arg) = @_;
1761
1762   if (ref $arg eq 'ARRAY') {
1763     if (ref $arg->[0] eq 'HASH') {
1764       return $arg;
1765     }
1766     elsif (ref $arg->[0] eq 'ARRAY') {
1767       my @ret;
1768       my @colnames = @{$arg->[0]};
1769       foreach my $values (@{$arg}[1 .. $#$arg]) {
1770         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
1771       }
1772       return \@ret;
1773     }
1774   }
1775
1776   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
1777 }
1778
1779 =head2 pager
1780
1781 =over 4
1782
1783 =item Arguments: none
1784
1785 =item Return Value: $pager
1786
1787 =back
1788
1789 Return Value a L<Data::Page> object for the current resultset. Only makes
1790 sense for queries with a C<page> attribute.
1791
1792 To get the full count of entries for a paged resultset, call
1793 C<total_entries> on the L<Data::Page> object.
1794
1795 =cut
1796
1797 sub pager {
1798   my ($self) = @_;
1799
1800   return $self->{pager} if $self->{pager};
1801
1802   my $attrs = $self->{attrs};
1803   $self->throw_exception("Can't create pager for non-paged rs")
1804     unless $self->{attrs}{page};
1805   $attrs->{rows} ||= 10;
1806
1807   # throw away the paging flags and re-run the count (possibly
1808   # with a subselect) to get the real total count
1809   my $count_attrs = { %$attrs };
1810   delete $count_attrs->{$_} for qw/rows offset page pager/;
1811   my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
1812
1813   return $self->{pager} = Data::Page->new(
1814     $total_count,
1815     $attrs->{rows},
1816     $self->{attrs}{page}
1817   );
1818 }
1819
1820 =head2 page
1821
1822 =over 4
1823
1824 =item Arguments: $page_number
1825
1826 =item Return Value: $rs
1827
1828 =back
1829
1830 Returns a resultset for the $page_number page of the resultset on which page
1831 is called, where each page contains a number of rows equal to the 'rows'
1832 attribute set on the resultset (10 by default).
1833
1834 =cut
1835
1836 sub page {
1837   my ($self, $page) = @_;
1838   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1839 }
1840
1841 =head2 new_result
1842
1843 =over 4
1844
1845 =item Arguments: \%vals
1846
1847 =item Return Value: $rowobject
1848
1849 =back
1850
1851 Creates a new row object in the resultset's result class and returns
1852 it. The row is not inserted into the database at this point, call
1853 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1854 will tell you whether the row object has been inserted or not.
1855
1856 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1857
1858 =cut
1859
1860 sub new_result {
1861   my ($self, $values) = @_;
1862   $self->throw_exception( "new_result needs a hash" )
1863     unless (ref $values eq 'HASH');
1864
1865   my ($merged_cond, $cols_from_relations) = $self->_merge_cond_with_data($values);
1866
1867   my %new = (
1868     %$merged_cond,
1869     @$cols_from_relations
1870       ? (-cols_from_relations => $cols_from_relations)
1871       : (),
1872     -source_handle => $self->_source_handle,
1873     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1874   );
1875
1876   return $self->result_class->new(\%new);
1877 }
1878
1879 # _merge_cond_with_data
1880 #
1881 # Takes a simple hash of K/V data and returns its copy merged with the
1882 # condition already present on the resultset. Additionally returns an
1883 # arrayref of value/condition names, which were inferred from related
1884 # objects (this is needed for in-memory related objects)
1885 sub _merge_cond_with_data {
1886   my ($self, $data) = @_;
1887
1888   my (%new_data, @cols_from_relations);
1889
1890   my $alias = $self->{attrs}{alias};
1891
1892   if (! defined $self->{cond}) {
1893     # just massage $data below
1894   }
1895   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
1896     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
1897     @cols_from_relations = keys %new_data;
1898   }
1899   elsif (ref $self->{cond} ne 'HASH') {
1900     $self->throw_exception(
1901       "Can't abstract implicit construct, resultset condition not a hash"
1902     );
1903   }
1904   else {
1905     # precendence must be given to passed values over values inherited from
1906     # the cond, so the order here is important.
1907     my $collapsed_cond = $self->_collapse_cond($self->{cond});
1908     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
1909
1910     while ( my($col, $value) = each %implied ) {
1911       if (ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '=') {
1912         $new_data{$col} = $value->{'='};
1913         next;
1914       }
1915       $new_data{$col} = $value if $self->_is_deterministic_value($value);
1916     }
1917   }
1918
1919   %new_data = (
1920     %new_data,
1921     %{ $self->_remove_alias($data, $alias) },
1922   );
1923
1924   return (\%new_data, \@cols_from_relations);
1925 }
1926
1927 # _is_deterministic_value
1928 #
1929 # Make an effor to strip non-deterministic values from the condition,
1930 # to make sure new_result chokes less
1931
1932 sub _is_deterministic_value {
1933   my $self = shift;
1934   my $value = shift;
1935   my $ref_type = ref $value;
1936   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1937   return 1 if Scalar::Util::blessed($value);
1938   return 0;
1939 }
1940
1941 # _has_resolved_attr
1942 #
1943 # determines if the resultset defines at least one
1944 # of the attributes supplied
1945 #
1946 # used to determine if a subquery is neccessary
1947 #
1948 # supports some virtual attributes:
1949 #   -join
1950 #     This will scan for any joins being present on the resultset.
1951 #     It is not a mere key-search but a deep inspection of {from}
1952 #
1953
1954 sub _has_resolved_attr {
1955   my ($self, @attr_names) = @_;
1956
1957   my $attrs = $self->_resolved_attrs;
1958
1959   my %extra_checks;
1960
1961   for my $n (@attr_names) {
1962     if (grep { $n eq $_ } (qw/-join/) ) {
1963       $extra_checks{$n}++;
1964       next;
1965     }
1966
1967     my $attr =  $attrs->{$n};
1968
1969     next if not defined $attr;
1970
1971     if (ref $attr eq 'HASH') {
1972       return 1 if keys %$attr;
1973     }
1974     elsif (ref $attr eq 'ARRAY') {
1975       return 1 if @$attr;
1976     }
1977     else {
1978       return 1 if $attr;
1979     }
1980   }
1981
1982   # a resolved join is expressed as a multi-level from
1983   return 1 if (
1984     $extra_checks{-join}
1985       and
1986     ref $attrs->{from} eq 'ARRAY'
1987       and
1988     @{$attrs->{from}} > 1
1989   );
1990
1991   return 0;
1992 }
1993
1994 # _collapse_cond
1995 #
1996 # Recursively collapse the condition.
1997
1998 sub _collapse_cond {
1999   my ($self, $cond, $collapsed) = @_;
2000
2001   $collapsed ||= {};
2002
2003   if (ref $cond eq 'ARRAY') {
2004     foreach my $subcond (@$cond) {
2005       next unless ref $subcond;  # -or
2006       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2007     }
2008   }
2009   elsif (ref $cond eq 'HASH') {
2010     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2011       foreach my $subcond (@{$cond->{-and}}) {
2012         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2013       }
2014     }
2015     else {
2016       foreach my $col (keys %$cond) {
2017         my $value = $cond->{$col};
2018         $collapsed->{$col} = $value;
2019       }
2020     }
2021   }
2022
2023   return $collapsed;
2024 }
2025
2026 # _remove_alias
2027 #
2028 # Remove the specified alias from the specified query hash. A copy is made so
2029 # the original query is not modified.
2030
2031 sub _remove_alias {
2032   my ($self, $query, $alias) = @_;
2033
2034   my %orig = %{ $query || {} };
2035   my %unaliased;
2036
2037   foreach my $key (keys %orig) {
2038     if ($key !~ /\./) {
2039       $unaliased{$key} = $orig{$key};
2040       next;
2041     }
2042     $unaliased{$1} = $orig{$key}
2043       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2044   }
2045
2046   return \%unaliased;
2047 }
2048
2049 =head2 as_query
2050
2051 =over 4
2052
2053 =item Arguments: none
2054
2055 =item Return Value: \[ $sql, @bind ]
2056
2057 =back
2058
2059 Returns the SQL query and bind vars associated with the invocant.
2060
2061 This is generally used as the RHS for a subquery.
2062
2063 =cut
2064
2065 sub as_query {
2066   my $self = shift;
2067
2068   my $attrs = $self->_resolved_attrs_copy;
2069
2070   # For future use:
2071   #
2072   # in list ctx:
2073   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2074   # $sql also has no wrapping parenthesis in list ctx
2075   #
2076   my $sqlbind = $self->result_source->storage
2077     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2078
2079   return $sqlbind;
2080 }
2081
2082 =head2 find_or_new
2083
2084 =over 4
2085
2086 =item Arguments: \%vals, \%attrs?
2087
2088 =item Return Value: $rowobject
2089
2090 =back
2091
2092   my $artist = $schema->resultset('Artist')->find_or_new(
2093     { artist => 'fred' }, { key => 'artists' });
2094
2095   $cd->cd_to_producer->find_or_new({ producer => $producer },
2096                                    { key => 'primary });
2097
2098 Find an existing record from this resultset, based on its primary
2099 key, or a unique constraint. If none exists, instantiate a new result
2100 object and return it. The object will not be saved into your storage
2101 until you call L<DBIx::Class::Row/insert> on it.
2102
2103 You most likely want this method when looking for existing rows using
2104 a unique constraint that is not the primary key, or looking for
2105 related rows.
2106
2107 If you want objects to be saved immediately, use L</find_or_create>
2108 instead.
2109
2110 B<Note>: Take care when using C<find_or_new> with a table having
2111 columns with default values that you intend to be automatically
2112 supplied by the database (e.g. an auto_increment primary key column).
2113 In normal usage, the value of such columns should NOT be included at
2114 all in the call to C<find_or_new>, even when set to C<undef>.
2115
2116 =cut
2117
2118 sub find_or_new {
2119   my $self     = shift;
2120   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2121   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2122   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2123     return $row;
2124   }
2125   return $self->new_result($hash);
2126 }
2127
2128 =head2 create
2129
2130 =over 4
2131
2132 =item Arguments: \%vals
2133
2134 =item Return Value: a L<DBIx::Class::Row> $object
2135
2136 =back
2137
2138 Attempt to create a single new row or a row with multiple related rows
2139 in the table represented by the resultset (and related tables). This
2140 will not check for duplicate rows before inserting, use
2141 L</find_or_create> to do that.
2142
2143 To create one row for this resultset, pass a hashref of key/value
2144 pairs representing the columns of the table and the values you wish to
2145 store. If the appropriate relationships are set up, foreign key fields
2146 can also be passed an object representing the foreign row, and the
2147 value will be set to its primary key.
2148
2149 To create related objects, pass a hashref of related-object column values
2150 B<keyed on the relationship name>. If the relationship is of type C<multi>
2151 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2152 The process will correctly identify columns holding foreign keys, and will
2153 transparently populate them from the keys of the corresponding relation.
2154 This can be applied recursively, and will work correctly for a structure
2155 with an arbitrary depth and width, as long as the relationships actually
2156 exists and the correct column data has been supplied.
2157
2158
2159 Instead of hashrefs of plain related data (key/value pairs), you may
2160 also pass new or inserted objects. New objects (not inserted yet, see
2161 L</new>), will be inserted into their appropriate tables.
2162
2163 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2164
2165 Example of creating a new row.
2166
2167   $person_rs->create({
2168     name=>"Some Person",
2169     email=>"somebody@someplace.com"
2170   });
2171
2172 Example of creating a new row and also creating rows in a related C<has_many>
2173 or C<has_one> resultset.  Note Arrayref.
2174
2175   $artist_rs->create(
2176      { artistid => 4, name => 'Manufactured Crap', cds => [
2177         { title => 'My First CD', year => 2006 },
2178         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2179       ],
2180      },
2181   );
2182
2183 Example of creating a new row and also creating a row in a related
2184 C<belongs_to>resultset. Note Hashref.
2185
2186   $cd_rs->create({
2187     title=>"Music for Silly Walks",
2188     year=>2000,
2189     artist => {
2190       name=>"Silly Musician",
2191     }
2192   });
2193
2194 =over
2195
2196 =item WARNING
2197
2198 When subclassing ResultSet never attempt to override this method. Since
2199 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2200 lot of the internals simply never call it, so your override will be
2201 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2202 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2203 L</create> process you need to intervene.
2204
2205 =back
2206
2207 =cut
2208
2209 sub create {
2210   my ($self, $attrs) = @_;
2211   $self->throw_exception( "create needs a hashref" )
2212     unless ref $attrs eq 'HASH';
2213   return $self->new_result($attrs)->insert;
2214 }
2215
2216 =head2 find_or_create
2217
2218 =over 4
2219
2220 =item Arguments: \%vals, \%attrs?
2221
2222 =item Return Value: $rowobject
2223
2224 =back
2225
2226   $cd->cd_to_producer->find_or_create({ producer => $producer },
2227                                       { key => 'primary' });
2228
2229 Tries to find a record based on its primary key or unique constraints; if none
2230 is found, creates one and returns that instead.
2231
2232   my $cd = $schema->resultset('CD')->find_or_create({
2233     cdid   => 5,
2234     artist => 'Massive Attack',
2235     title  => 'Mezzanine',
2236     year   => 2005,
2237   });
2238
2239 Also takes an optional C<key> attribute, to search by a specific key or unique
2240 constraint. For example:
2241
2242   my $cd = $schema->resultset('CD')->find_or_create(
2243     {
2244       artist => 'Massive Attack',
2245       title  => 'Mezzanine',
2246     },
2247     { key => 'cd_artist_title' }
2248   );
2249
2250 B<Note>: Because find_or_create() reads from the database and then
2251 possibly inserts based on the result, this method is subject to a race
2252 condition. Another process could create a record in the table after
2253 the find has completed and before the create has started. To avoid
2254 this problem, use find_or_create() inside a transaction.
2255
2256 B<Note>: Take care when using C<find_or_create> with a table having
2257 columns with default values that you intend to be automatically
2258 supplied by the database (e.g. an auto_increment primary key column).
2259 In normal usage, the value of such columns should NOT be included at
2260 all in the call to C<find_or_create>, even when set to C<undef>.
2261
2262 See also L</find> and L</update_or_create>. For information on how to declare
2263 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2264
2265 =cut
2266
2267 sub find_or_create {
2268   my $self     = shift;
2269   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2270   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2271   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2272     return $row;
2273   }
2274   return $self->create($hash);
2275 }
2276
2277 =head2 update_or_create
2278
2279 =over 4
2280
2281 =item Arguments: \%col_values, { key => $unique_constraint }?
2282
2283 =item Return Value: $rowobject
2284
2285 =back
2286
2287   $resultset->update_or_create({ col => $val, ... });
2288
2289 First, searches for an existing row matching one of the unique constraints
2290 (including the primary key) on the source of this resultset. If a row is
2291 found, updates it with the other given column values. Otherwise, creates a new
2292 row.
2293
2294 Takes an optional C<key> attribute to search on a specific unique constraint.
2295 For example:
2296
2297   # In your application
2298   my $cd = $schema->resultset('CD')->update_or_create(
2299     {
2300       artist => 'Massive Attack',
2301       title  => 'Mezzanine',
2302       year   => 1998,
2303     },
2304     { key => 'cd_artist_title' }
2305   );
2306
2307   $cd->cd_to_producer->update_or_create({
2308     producer => $producer,
2309     name => 'harry',
2310   }, {
2311     key => 'primary',
2312   });
2313
2314
2315 If no C<key> is specified, it searches on all unique constraints defined on the
2316 source, including the primary key.
2317
2318 If the C<key> is specified as C<primary>, it searches only on the primary key.
2319
2320 See also L</find> and L</find_or_create>. For information on how to declare
2321 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2322
2323 B<Note>: Take care when using C<update_or_create> with a table having
2324 columns with default values that you intend to be automatically
2325 supplied by the database (e.g. an auto_increment primary key column).
2326 In normal usage, the value of such columns should NOT be included at
2327 all in the call to C<update_or_create>, even when set to C<undef>.
2328
2329 =cut
2330
2331 sub update_or_create {
2332   my $self = shift;
2333   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2334   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2335
2336   my $row = $self->find($cond, $attrs);
2337   if (defined $row) {
2338     $row->update($cond);
2339     return $row;
2340   }
2341
2342   return $self->create($cond);
2343 }
2344
2345 =head2 update_or_new
2346
2347 =over 4
2348
2349 =item Arguments: \%col_values, { key => $unique_constraint }?
2350
2351 =item Return Value: $rowobject
2352
2353 =back
2354
2355   $resultset->update_or_new({ col => $val, ... });
2356
2357 First, searches for an existing row matching one of the unique constraints
2358 (including the primary key) on the source of this resultset. If a row is
2359 found, updates it with the other given column values. Otherwise, instantiate
2360 a new result object and return it. The object will not be saved into your storage
2361 until you call L<DBIx::Class::Row/insert> on it.
2362
2363 Takes an optional C<key> attribute to search on a specific unique constraint.
2364 For example:
2365
2366   # In your application
2367   my $cd = $schema->resultset('CD')->update_or_new(
2368     {
2369       artist => 'Massive Attack',
2370       title  => 'Mezzanine',
2371       year   => 1998,
2372     },
2373     { key => 'cd_artist_title' }
2374   );
2375
2376   if ($cd->in_storage) {
2377       # the cd was updated
2378   }
2379   else {
2380       # the cd is not yet in the database, let's insert it
2381       $cd->insert;
2382   }
2383
2384 B<Note>: Take care when using C<update_or_new> with a table having
2385 columns with default values that you intend to be automatically
2386 supplied by the database (e.g. an auto_increment primary key column).
2387 In normal usage, the value of such columns should NOT be included at
2388 all in the call to C<update_or_new>, even when set to C<undef>.
2389
2390 See also L</find>, L</find_or_create> and L</find_or_new>.
2391
2392 =cut
2393
2394 sub update_or_new {
2395     my $self  = shift;
2396     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2397     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2398
2399     my $row = $self->find( $cond, $attrs );
2400     if ( defined $row ) {
2401         $row->update($cond);
2402         return $row;
2403     }
2404
2405     return $self->new_result($cond);
2406 }
2407
2408 =head2 get_cache
2409
2410 =over 4
2411
2412 =item Arguments: none
2413
2414 =item Return Value: \@cache_objects?
2415
2416 =back
2417
2418 Gets the contents of the cache for the resultset, if the cache is set.
2419
2420 The cache is populated either by using the L</prefetch> attribute to
2421 L</search> or by calling L</set_cache>.
2422
2423 =cut
2424
2425 sub get_cache {
2426   shift->{all_cache};
2427 }
2428
2429 =head2 set_cache
2430
2431 =over 4
2432
2433 =item Arguments: \@cache_objects
2434
2435 =item Return Value: \@cache_objects
2436
2437 =back
2438
2439 Sets the contents of the cache for the resultset. Expects an arrayref
2440 of objects of the same class as those produced by the resultset. Note that
2441 if the cache is set the resultset will return the cached objects rather
2442 than re-querying the database even if the cache attr is not set.
2443
2444 The contents of the cache can also be populated by using the
2445 L</prefetch> attribute to L</search>.
2446
2447 =cut
2448
2449 sub set_cache {
2450   my ( $self, $data ) = @_;
2451   $self->throw_exception("set_cache requires an arrayref")
2452       if defined($data) && (ref $data ne 'ARRAY');
2453   $self->{all_cache} = $data;
2454 }
2455
2456 =head2 clear_cache
2457
2458 =over 4
2459
2460 =item Arguments: none
2461
2462 =item Return Value: []
2463
2464 =back
2465
2466 Clears the cache for the resultset.
2467
2468 =cut
2469
2470 sub clear_cache {
2471   shift->set_cache(undef);
2472 }
2473
2474 =head2 is_paged
2475
2476 =over 4
2477
2478 =item Arguments: none
2479
2480 =item Return Value: true, if the resultset has been paginated
2481
2482 =back
2483
2484 =cut
2485
2486 sub is_paged {
2487   my ($self) = @_;
2488   return !!$self->{attrs}{page};
2489 }
2490
2491 =head2 is_ordered
2492
2493 =over 4
2494
2495 =item Arguments: none
2496
2497 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2498
2499 =back
2500
2501 =cut
2502
2503 sub is_ordered {
2504   my ($self) = @_;
2505   return scalar $self->result_source->storage->_parse_order_by($self->{attrs}{order_by});
2506 }
2507
2508 =head2 related_resultset
2509
2510 =over 4
2511
2512 =item Arguments: $relationship_name
2513
2514 =item Return Value: $resultset
2515
2516 =back
2517
2518 Returns a related resultset for the supplied relationship name.
2519
2520   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2521
2522 =cut
2523
2524 sub related_resultset {
2525   my ($self, $rel) = @_;
2526
2527   $self->{related_resultsets} ||= {};
2528   return $self->{related_resultsets}{$rel} ||= do {
2529     my $rsrc = $self->result_source;
2530     my $rel_info = $rsrc->relationship_info($rel);
2531
2532     $self->throw_exception(
2533       "search_related: result source '" . $rsrc->source_name .
2534         "' has no such relationship $rel")
2535       unless $rel_info;
2536
2537     my $attrs = $self->_chain_relationship($rel);
2538
2539     my $join_count = $attrs->{seen_join}{$rel};
2540
2541     my $alias = $self->result_source->storage
2542         ->relname_to_table_alias($rel, $join_count);
2543
2544     # since this is search_related, and we already slid the select window inwards
2545     # (the select/as attrs were deleted in the beginning), we need to flip all
2546     # left joins to inner, so we get the expected results
2547     # read the comment on top of the actual function to see what this does
2548     $attrs->{from} = $rsrc->schema->storage->_straight_join_to_node ($attrs->{from}, $alias);
2549
2550
2551     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2552     delete @{$attrs}{qw(result_class alias)};
2553
2554     my $new_cache;
2555
2556     if (my $cache = $self->get_cache) {
2557       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2558         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2559                         @$cache ];
2560       }
2561     }
2562
2563     my $rel_source = $rsrc->related_source($rel);
2564
2565     my $new = do {
2566
2567       # The reason we do this now instead of passing the alias to the
2568       # search_rs below is that if you wrap/overload resultset on the
2569       # source you need to know what alias it's -going- to have for things
2570       # to work sanely (e.g. RestrictWithObject wants to be able to add
2571       # extra query restrictions, and these may need to be $alias.)
2572
2573       my $rel_attrs = $rel_source->resultset_attributes;
2574       local $rel_attrs->{alias} = $alias;
2575
2576       $rel_source->resultset
2577                  ->search_rs(
2578                      undef, {
2579                        %$attrs,
2580                        where => $attrs->{where},
2581                    });
2582     };
2583     $new->set_cache($new_cache) if $new_cache;
2584     $new;
2585   };
2586 }
2587
2588 =head2 current_source_alias
2589
2590 =over 4
2591
2592 =item Arguments: none
2593
2594 =item Return Value: $source_alias
2595
2596 =back
2597
2598 Returns the current table alias for the result source this resultset is built
2599 on, that will be used in the SQL query. Usually it is C<me>.
2600
2601 Currently the source alias that refers to the result set returned by a
2602 L</search>/L</find> family method depends on how you got to the resultset: it's
2603 C<me> by default, but eg. L</search_related> aliases it to the related result
2604 source name (and keeps C<me> referring to the original result set). The long
2605 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2606 (and make this method unnecessary).
2607
2608 Thus it's currently necessary to use this method in predefined queries (see
2609 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2610 source alias of the current result set:
2611
2612   # in a result set class
2613   sub modified_by {
2614     my ($self, $user) = @_;
2615
2616     my $me = $self->current_source_alias;
2617
2618     return $self->search(
2619       "$me.modified" => $user->id,
2620     );
2621   }
2622
2623 =cut
2624
2625 sub current_source_alias {
2626   my ($self) = @_;
2627
2628   return ($self->{attrs} || {})->{alias} || 'me';
2629 }
2630
2631 =head2 as_subselect_rs
2632
2633 =over 4
2634
2635 =item Arguments: none
2636
2637 =item Return Value: $resultset
2638
2639 =back
2640
2641 Act as a barrier to SQL symbols.  The resultset provided will be made into a
2642 "virtual view" by including it as a subquery within the from clause.  From this
2643 point on, any joined tables are inaccessible to ->search on the resultset (as if
2644 it were simply where-filtered without joins).  For example:
2645
2646  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
2647
2648  # 'x' now pollutes the query namespace
2649
2650  # So the following works as expected
2651  my $ok_rs = $rs->search({'x.other' => 1});
2652
2653  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
2654  # def) we look for one row with contradictory terms and join in another table
2655  # (aliased 'x_2') which we never use
2656  my $broken_rs = $rs->search({'x.name' => 'def'});
2657
2658  my $rs2 = $rs->as_subselect_rs;
2659
2660  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
2661  my $not_joined_rs = $rs2->search({'x.other' => 1});
2662
2663  # works as expected: finds a 'table' row related to two x rows (abc and def)
2664  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
2665
2666 Another example of when one might use this would be to select a subset of
2667 columns in a group by clause:
2668
2669  my $rs = $schema->resultset('Bar')->search(undef, {
2670    group_by => [qw{ id foo_id baz_id }],
2671  })->as_subselect_rs->search(undef, {
2672    columns => [qw{ id foo_id }]
2673  });
2674
2675 In the above example normally columns would have to be equal to the group by,
2676 but because we isolated the group by into a subselect the above works.
2677
2678 =cut
2679
2680 sub as_subselect_rs {
2681   my $self = shift;
2682
2683   my $attrs = $self->_resolved_attrs;
2684
2685   my $fresh_rs = (ref $self)->new (
2686     $self->result_source
2687   );
2688
2689   # these pieces will be locked in the subquery
2690   delete $fresh_rs->{cond};
2691   delete @{$fresh_rs->{attrs}}{qw/where bind/};
2692
2693   return $fresh_rs->search( {}, {
2694     from => [{
2695       $attrs->{alias} => $self->as_query,
2696       -alias         => $attrs->{alias},
2697       -source_handle => $self->result_source->handle,
2698     }],
2699     alias => $attrs->{alias},
2700   });
2701 }
2702
2703 # This code is called by search_related, and makes sure there
2704 # is clear separation between the joins before, during, and
2705 # after the relationship. This information is needed later
2706 # in order to properly resolve prefetch aliases (any alias
2707 # with a relation_chain_depth less than the depth of the
2708 # current prefetch is not considered)
2709 #
2710 # The increments happen twice per join. An even number means a
2711 # relationship specified via a search_related, whereas an odd
2712 # number indicates a join/prefetch added via attributes
2713 #
2714 # Also this code will wrap the current resultset (the one we
2715 # chain to) in a subselect IFF it contains limiting attributes
2716 sub _chain_relationship {
2717   my ($self, $rel) = @_;
2718   my $source = $self->result_source;
2719   my $attrs = { %{$self->{attrs}||{}} };
2720
2721   # we need to take the prefetch the attrs into account before we
2722   # ->_resolve_join as otherwise they get lost - captainL
2723   my $join = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
2724
2725   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
2726
2727   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
2728
2729   my $from;
2730   my @force_subq_attrs = qw/offset rows group_by having/;
2731
2732   if (
2733     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
2734       ||
2735     $self->_has_resolved_attr (@force_subq_attrs)
2736   ) {
2737     # Nuke the prefetch (if any) before the new $rs attrs
2738     # are resolved (prefetch is useless - we are wrapping
2739     # a subquery anyway).
2740     my $rs_copy = $self->search;
2741     $rs_copy->{attrs}{join} = $self->_merge_attr (
2742       $rs_copy->{attrs}{join},
2743       delete $rs_copy->{attrs}{prefetch},
2744     );
2745
2746     $from = [{
2747       -source_handle => $source->handle,
2748       -alias => $attrs->{alias},
2749       $attrs->{alias} => $rs_copy->as_query,
2750     }];
2751     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
2752     $seen->{-relation_chain_depth} = 0;
2753   }
2754   elsif ($attrs->{from}) {  #shallow copy suffices
2755     $from = [ @{$attrs->{from}} ];
2756   }
2757   else {
2758     $from = [{
2759       -source_handle => $source->handle,
2760       -alias => $attrs->{alias},
2761       $attrs->{alias} => $source->from,
2762     }];
2763   }
2764
2765   my $jpath = ($seen->{-relation_chain_depth})
2766     ? $from->[-1][0]{-join_path}
2767     : [];
2768
2769   my @requested_joins = $source->_resolve_join(
2770     $join,
2771     $attrs->{alias},
2772     $seen,
2773     $jpath,
2774   );
2775
2776   push @$from, @requested_joins;
2777
2778   $seen->{-relation_chain_depth}++;
2779
2780   # if $self already had a join/prefetch specified on it, the requested
2781   # $rel might very well be already included. What we do in this case
2782   # is effectively a no-op (except that we bump up the chain_depth on
2783   # the join in question so we could tell it *is* the search_related)
2784   my $already_joined;
2785
2786   # we consider the last one thus reverse
2787   for my $j (reverse @requested_joins) {
2788     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
2789     if ($rel eq $last_j) {
2790       $j->[0]{-relation_chain_depth}++;
2791       $already_joined++;
2792       last;
2793     }
2794   }
2795
2796   unless ($already_joined) {
2797     push @$from, $source->_resolve_join(
2798       $rel,
2799       $attrs->{alias},
2800       $seen,
2801       $jpath,
2802     );
2803   }
2804
2805   $seen->{-relation_chain_depth}++;
2806
2807   return {%$attrs, from => $from, seen_join => $seen};
2808 }
2809
2810 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
2811 sub _resolved_attrs_copy {
2812   my $self = shift;
2813   return { %{$self->_resolved_attrs (@_)} };
2814 }
2815
2816 sub _resolved_attrs {
2817   my $self = shift;
2818   return $self->{_attrs} if $self->{_attrs};
2819
2820   my $attrs  = { %{ $self->{attrs} || {} } };
2821   my $source = $self->result_source;
2822   my $alias  = $attrs->{alias};
2823
2824   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2825   my @colbits;
2826
2827   # build columns (as long as select isn't set) into a set of as/select hashes
2828   unless ( $attrs->{select} ) {
2829
2830     my @cols;
2831     if ( ref $attrs->{columns} eq 'ARRAY' ) {
2832       @cols = @{ delete $attrs->{columns}}
2833     } elsif ( defined $attrs->{columns} ) {
2834       @cols = delete $attrs->{columns}
2835     } else {
2836       @cols = $source->columns
2837     }
2838
2839     for (@cols) {
2840       if ( ref $_ eq 'HASH' ) {
2841         push @colbits, $_
2842       } else {
2843         my $key = /^\Q${alias}.\E(.+)$/
2844           ? "$1"
2845           : "$_";
2846         my $value = /\./
2847           ? "$_"
2848           : "${alias}.$_";
2849         push @colbits, { $key => $value };
2850       }
2851     }
2852   }
2853
2854   # add the additional columns on
2855   foreach (qw{include_columns +columns}) {
2856     if ( $attrs->{$_} ) {
2857       my @list = ( ref($attrs->{$_}) eq 'ARRAY' )
2858         ? @{ delete $attrs->{$_} }
2859         : delete $attrs->{$_};
2860       for (@list) {
2861         if ( ref($_) eq 'HASH' ) {
2862           push @colbits, $_
2863         } else {
2864           my $key = ( split /\./, $_ )[-1];
2865           my $value = ( /\./ ? $_ : "$alias.$_" );
2866           push @colbits, { $key => $value };
2867         }
2868       }
2869     }
2870   }
2871
2872   # start with initial select items
2873   if ( $attrs->{select} ) {
2874     $attrs->{select} =
2875         ( ref $attrs->{select} eq 'ARRAY' )
2876       ? [ @{ $attrs->{select} } ]
2877       : [ $attrs->{select} ];
2878
2879     if ( $attrs->{as} ) {
2880       $attrs->{as} =
2881         (
2882           ref $attrs->{as} eq 'ARRAY'
2883             ? [ @{ $attrs->{as} } ]
2884             : [ $attrs->{as} ]
2885         )
2886     } else {
2887       $attrs->{as} = [ map {
2888          m/^\Q${alias}.\E(.+)$/
2889            ? $1
2890            : $_
2891          } @{ $attrs->{select} }
2892       ]
2893     }
2894   }
2895   else {
2896
2897     # otherwise we intialise select & as to empty
2898     $attrs->{select} = [];
2899     $attrs->{as}     = [];
2900   }
2901
2902   # now add colbits to select/as
2903   push @{ $attrs->{select} }, map values %{$_}, @colbits;
2904   push @{ $attrs->{as}     }, map keys   %{$_}, @colbits;
2905
2906   if ( my $adds = delete $attrs->{'+select'} ) {
2907     $adds = [$adds] unless ref $adds eq 'ARRAY';
2908     push @{ $attrs->{select} },
2909       map { /\./ || ref $_ ? $_ : "$alias.$_" } @$adds;
2910   }
2911   if ( my $adds = delete $attrs->{'+as'} ) {
2912     $adds = [$adds] unless ref $adds eq 'ARRAY';
2913     push @{ $attrs->{as} }, @$adds;
2914   }
2915
2916   $attrs->{from} ||= [{
2917     -source_handle => $source->handle,
2918     -alias => $self->{attrs}{alias},
2919     $self->{attrs}{alias} => $source->from,
2920   }];
2921
2922   if ( $attrs->{join} || $attrs->{prefetch} ) {
2923
2924     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
2925       if ref $attrs->{from} ne 'ARRAY';
2926
2927     my $join = delete $attrs->{join} || {};
2928
2929     if ( defined $attrs->{prefetch} ) {
2930       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
2931     }
2932
2933     $attrs->{from} =    # have to copy here to avoid corrupting the original
2934       [
2935         @{ $attrs->{from} },
2936         $source->_resolve_join(
2937           $join,
2938           $alias,
2939           { %{ $attrs->{seen_join} || {} } },
2940           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
2941             ? $attrs->{from}[-1][0]{-join_path}
2942             : []
2943           ,
2944         )
2945       ];
2946   }
2947
2948   if ( defined $attrs->{order_by} ) {
2949     $attrs->{order_by} = (
2950       ref( $attrs->{order_by} ) eq 'ARRAY'
2951       ? [ @{ $attrs->{order_by} } ]
2952       : [ $attrs->{order_by} || () ]
2953     );
2954   }
2955
2956   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
2957     $attrs->{group_by} = [ $attrs->{group_by} ];
2958   }
2959
2960   # generate the distinct induced group_by early, as prefetch will be carried via a
2961   # subquery (since a group_by is present)
2962   if (delete $attrs->{distinct}) {
2963     if ($attrs->{group_by}) {
2964       carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
2965     }
2966     else {
2967       $attrs->{group_by} = [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
2968
2969       # add any order_by parts that are not already present in the group_by
2970       # we need to be careful not to add any named functions/aggregates
2971       # i.e. select => [ ... { count => 'foo', -as 'foocount' } ... ]
2972       my %already_grouped = map { $_ => 1 } (@{$attrs->{group_by}});
2973
2974       my $storage = $self->result_source->schema->storage;
2975
2976       my $rs_column_list = $storage->_resolve_column_info ($attrs->{from});
2977
2978       for my $chunk ($storage->_parse_order_by($attrs->{order_by})) {
2979         if ($rs_column_list->{$chunk} && not $already_grouped{$chunk}++) {
2980           push @{$attrs->{group_by}}, $chunk;
2981         }
2982       }
2983     }
2984   }
2985
2986   $attrs->{collapse} ||= {};
2987   if ( my $prefetch = delete $attrs->{prefetch} ) {
2988     $prefetch = $self->_merge_attr( {}, $prefetch );
2989
2990     my $prefetch_ordering = [];
2991
2992     # this is a separate structure (we don't look in {from} directly)
2993     # as the resolver needs to shift things off the lists to work
2994     # properly (identical-prefetches on different branches)
2995     my $join_map = {};
2996     if (ref $attrs->{from} eq 'ARRAY') {
2997
2998       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
2999
3000       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3001         next unless $j->[0]{-alias};
3002         next unless $j->[0]{-join_path};
3003         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3004
3005         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3006
3007         my $p = $join_map;
3008         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3009         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3010       }
3011     }
3012
3013     my @prefetch =
3014       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3015
3016     # we need to somehow mark which columns came from prefetch
3017     $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
3018
3019     push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
3020     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3021
3022     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3023     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3024   }
3025
3026   # if both page and offset are specified, produce a combined offset
3027   # even though it doesn't make much sense, this is what pre 081xx has
3028   # been doing
3029   if (my $page = delete $attrs->{page}) {
3030     $attrs->{offset} =
3031       ($attrs->{rows} * ($page - 1))
3032             +
3033       ($attrs->{offset} || 0)
3034     ;
3035   }
3036
3037   return $self->{_attrs} = $attrs;
3038 }
3039
3040 sub _rollout_attr {
3041   my ($self, $attr) = @_;
3042
3043   if (ref $attr eq 'HASH') {
3044     return $self->_rollout_hash($attr);
3045   } elsif (ref $attr eq 'ARRAY') {
3046     return $self->_rollout_array($attr);
3047   } else {
3048     return [$attr];
3049   }
3050 }
3051
3052 sub _rollout_array {
3053   my ($self, $attr) = @_;
3054
3055   my @rolled_array;
3056   foreach my $element (@{$attr}) {
3057     if (ref $element eq 'HASH') {
3058       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3059     } elsif (ref $element eq 'ARRAY') {
3060       #  XXX - should probably recurse here
3061       push( @rolled_array, @{$self->_rollout_array($element)} );
3062     } else {
3063       push( @rolled_array, $element );
3064     }
3065   }
3066   return \@rolled_array;
3067 }
3068
3069 sub _rollout_hash {
3070   my ($self, $attr) = @_;
3071
3072   my @rolled_array;
3073   foreach my $key (keys %{$attr}) {
3074     push( @rolled_array, { $key => $attr->{$key} } );
3075   }
3076   return \@rolled_array;
3077 }
3078
3079 sub _calculate_score {
3080   my ($self, $a, $b) = @_;
3081
3082   if (defined $a xor defined $b) {
3083     return 0;
3084   }
3085   elsif (not defined $a) {
3086     return 1;
3087   }
3088
3089   if (ref $b eq 'HASH') {
3090     my ($b_key) = keys %{$b};
3091     if (ref $a eq 'HASH') {
3092       my ($a_key) = keys %{$a};
3093       if ($a_key eq $b_key) {
3094         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3095       } else {
3096         return 0;
3097       }
3098     } else {
3099       return ($a eq $b_key) ? 1 : 0;
3100     }
3101   } else {
3102     if (ref $a eq 'HASH') {
3103       my ($a_key) = keys %{$a};
3104       return ($b eq $a_key) ? 1 : 0;
3105     } else {
3106       return ($b eq $a) ? 1 : 0;
3107     }
3108   }
3109 }
3110
3111 sub _merge_attr {
3112   my ($self, $orig, $import) = @_;
3113
3114   return $import unless defined($orig);
3115   return $orig unless defined($import);
3116
3117   $orig = $self->_rollout_attr($orig);
3118   $import = $self->_rollout_attr($import);
3119
3120   my $seen_keys;
3121   foreach my $import_element ( @{$import} ) {
3122     # find best candidate from $orig to merge $b_element into
3123     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3124     foreach my $orig_element ( @{$orig} ) {
3125       my $score = $self->_calculate_score( $orig_element, $import_element );
3126       if ($score > $best_candidate->{score}) {
3127         $best_candidate->{position} = $position;
3128         $best_candidate->{score} = $score;
3129       }
3130       $position++;
3131     }
3132     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3133
3134     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3135       push( @{$orig}, $import_element );
3136     } else {
3137       my $orig_best = $orig->[$best_candidate->{position}];
3138       # merge orig_best and b_element together and replace original with merged
3139       if (ref $orig_best ne 'HASH') {
3140         $orig->[$best_candidate->{position}] = $import_element;
3141       } elsif (ref $import_element eq 'HASH') {
3142         my ($key) = keys %{$orig_best};
3143         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
3144       }
3145     }
3146     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3147   }
3148
3149   return $orig;
3150 }
3151
3152 sub result_source {
3153     my $self = shift;
3154
3155     if (@_) {
3156         $self->_source_handle($_[0]->handle);
3157     } else {
3158         $self->_source_handle->resolve;
3159     }
3160 }
3161
3162 =head2 throw_exception
3163
3164 See L<DBIx::Class::Schema/throw_exception> for details.
3165
3166 =cut
3167
3168 sub throw_exception {
3169   my $self=shift;
3170
3171   if (ref $self && $self->_source_handle->schema) {
3172     $self->_source_handle->schema->throw_exception(@_)
3173   }
3174   else {
3175     DBIx::Class::Exception->throw(@_);
3176   }
3177 }
3178
3179 # XXX: FIXME: Attributes docs need clearing up
3180
3181 =head1 ATTRIBUTES
3182
3183 Attributes are used to refine a ResultSet in various ways when
3184 searching for data. They can be passed to any method which takes an
3185 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3186 L</count>.
3187
3188 These are in no particular order:
3189
3190 =head2 order_by
3191
3192 =over 4
3193
3194 =item Value: ( $order_by | \@order_by | \%order_by )
3195
3196 =back
3197
3198 Which column(s) to order the results by.
3199
3200 [The full list of suitable values is documented in
3201 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3202 common options.]
3203
3204 If a single column name, or an arrayref of names is supplied, the
3205 argument is passed through directly to SQL. The hashref syntax allows
3206 for connection-agnostic specification of ordering direction:
3207
3208  For descending order:
3209
3210   order_by => { -desc => [qw/col1 col2 col3/] }
3211
3212  For explicit ascending order:
3213
3214   order_by => { -asc => 'col' }
3215
3216 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3217 supported, although you are strongly encouraged to use the hashref
3218 syntax as outlined above.
3219
3220 =head2 columns
3221
3222 =over 4
3223
3224 =item Value: \@columns
3225
3226 =back
3227
3228 Shortcut to request a particular set of columns to be retrieved. Each
3229 column spec may be a string (a table column name), or a hash (in which
3230 case the key is the C<as> value, and the value is used as the C<select>
3231 expression). Adds C<me.> onto the start of any column without a C<.> in
3232 it and sets C<select> from that, then auto-populates C<as> from
3233 C<select> as normal. (You may also use the C<cols> attribute, as in
3234 earlier versions of DBIC.)
3235
3236 =head2 +columns
3237
3238 =over 4
3239
3240 =item Value: \@columns
3241
3242 =back
3243
3244 Indicates additional columns to be selected from storage. Works the same
3245 as L</columns> but adds columns to the selection. (You may also use the
3246 C<include_columns> attribute, as in earlier versions of DBIC). For
3247 example:-
3248
3249   $schema->resultset('CD')->search(undef, {
3250     '+columns' => ['artist.name'],
3251     join => ['artist']
3252   });
3253
3254 would return all CDs and include a 'name' column to the information
3255 passed to object inflation. Note that the 'artist' is the name of the
3256 column (or relationship) accessor, and 'name' is the name of the column
3257 accessor in the related table.
3258
3259 =head2 include_columns
3260
3261 =over 4
3262
3263 =item Value: \@columns
3264
3265 =back
3266
3267 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3268
3269 =head2 select
3270
3271 =over 4
3272
3273 =item Value: \@select_columns
3274
3275 =back
3276
3277 Indicates which columns should be selected from the storage. You can use
3278 column names, or in the case of RDBMS back ends, function or stored procedure
3279 names:
3280
3281   $rs = $schema->resultset('Employee')->search(undef, {
3282     select => [
3283       'name',
3284       { count => 'employeeid' },
3285       { max => { length => 'name' }, -as => 'longest_name' }
3286     ]
3287   });
3288
3289   # Equivalent SQL
3290   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3291
3292 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3293 use L</select>, to instruct DBIx::Class how to store the result of the column.
3294 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3295 identifier aliasing. You can however alias a function, so you can use it in
3296 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3297 attribute> supplied as shown in the example above.
3298
3299 =head2 +select
3300
3301 =over 4
3302
3303 Indicates additional columns to be selected from storage.  Works the same as
3304 L</select> but adds columns to the default selection, instead of specifying
3305 an explicit list.
3306
3307 =back
3308
3309 =head2 +as
3310
3311 =over 4
3312
3313 Indicates additional column names for those added via L</+select>. See L</as>.
3314
3315 =back
3316
3317 =head2 as
3318
3319 =over 4
3320
3321 =item Value: \@inflation_names
3322
3323 =back
3324
3325 Indicates column names for object inflation. That is L</as> indicates the
3326 slot name in which the column value will be stored within the
3327 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3328 identifier by the C<get_column> method (or via the object accessor B<if one
3329 with the same name already exists>) as shown below. The L</as> attribute has
3330 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3331
3332   $rs = $schema->resultset('Employee')->search(undef, {
3333     select => [
3334       'name',
3335       { count => 'employeeid' },
3336       { max => { length => 'name' }, -as => 'longest_name' }
3337     ],
3338     as => [qw/
3339       name
3340       employee_count
3341       max_name_length
3342     /],
3343   });
3344
3345 If the object against which the search is performed already has an accessor
3346 matching a column name specified in C<as>, the value can be retrieved using
3347 the accessor as normal:
3348
3349   my $name = $employee->name();
3350
3351 If on the other hand an accessor does not exist in the object, you need to
3352 use C<get_column> instead:
3353
3354   my $employee_count = $employee->get_column('employee_count');
3355
3356 You can create your own accessors if required - see
3357 L<DBIx::Class::Manual::Cookbook> for details.
3358
3359 =head2 join
3360
3361 =over 4
3362
3363 =item Value: ($rel_name | \@rel_names | \%rel_names)
3364
3365 =back
3366
3367 Contains a list of relationships that should be joined for this query.  For
3368 example:
3369
3370   # Get CDs by Nine Inch Nails
3371   my $rs = $schema->resultset('CD')->search(
3372     { 'artist.name' => 'Nine Inch Nails' },
3373     { join => 'artist' }
3374   );
3375
3376 Can also contain a hash reference to refer to the other relation's relations.
3377 For example:
3378
3379   package MyApp::Schema::Track;
3380   use base qw/DBIx::Class/;
3381   __PACKAGE__->table('track');
3382   __PACKAGE__->add_columns(qw/trackid cd position title/);
3383   __PACKAGE__->set_primary_key('trackid');
3384   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3385   1;
3386
3387   # In your application
3388   my $rs = $schema->resultset('Artist')->search(
3389     { 'track.title' => 'Teardrop' },
3390     {
3391       join     => { cd => 'track' },
3392       order_by => 'artist.name',
3393     }
3394   );
3395
3396 You need to use the relationship (not the table) name in  conditions,
3397 because they are aliased as such. The current table is aliased as "me", so
3398 you need to use me.column_name in order to avoid ambiguity. For example:
3399
3400   # Get CDs from 1984 with a 'Foo' track
3401   my $rs = $schema->resultset('CD')->search(
3402     {
3403       'me.year' => 1984,
3404       'tracks.name' => 'Foo'
3405     },
3406     { join => 'tracks' }
3407   );
3408
3409 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3410 similarly for a third time). For e.g.
3411
3412   my $rs = $schema->resultset('Artist')->search({
3413     'cds.title'   => 'Down to Earth',
3414     'cds_2.title' => 'Popular',
3415   }, {
3416     join => [ qw/cds cds/ ],
3417   });
3418
3419 will return a set of all artists that have both a cd with title 'Down
3420 to Earth' and a cd with title 'Popular'.
3421
3422 If you want to fetch related objects from other tables as well, see C<prefetch>
3423 below.
3424
3425 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3426
3427 =head2 prefetch
3428
3429 =over 4
3430
3431 =item Value: ($rel_name | \@rel_names | \%rel_names)
3432
3433 =back
3434
3435 Contains one or more relationships that should be fetched along with
3436 the main query (when they are accessed afterwards the data will
3437 already be available, without extra queries to the database).  This is
3438 useful for when you know you will need the related objects, because it
3439 saves at least one query:
3440
3441   my $rs = $schema->resultset('Tag')->search(
3442     undef,
3443     {
3444       prefetch => {
3445         cd => 'artist'
3446       }
3447     }
3448   );
3449
3450 The initial search results in SQL like the following:
3451
3452   SELECT tag.*, cd.*, artist.* FROM tag
3453   JOIN cd ON tag.cd = cd.cdid
3454   JOIN artist ON cd.artist = artist.artistid
3455
3456 L<DBIx::Class> has no need to go back to the database when we access the
3457 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3458 case.
3459
3460 Simple prefetches will be joined automatically, so there is no need
3461 for a C<join> attribute in the above search.
3462
3463 C<prefetch> can be used with the following relationship types: C<belongs_to>,
3464 C<has_one> (or if you're using C<add_relationship>, any relationship declared
3465 with an accessor type of 'single' or 'filter'). A more complex example that
3466 prefetches an artists cds, the tracks on those cds, and the tags associated
3467 with that artist is given below (assuming many-to-many from artists to tags):
3468
3469  my $rs = $schema->resultset('Artist')->search(
3470    undef,
3471    {
3472      prefetch => [
3473        { cds => 'tracks' },
3474        { artist_tags => 'tags' }
3475      ]
3476    }
3477  );
3478
3479
3480 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
3481 attributes will be ignored.
3482
3483 B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
3484 exactly as you might expect.
3485
3486 =over 4
3487
3488 =item *
3489
3490 Prefetch uses the L</cache> to populate the prefetched relationships. This
3491 may or may not be what you want.
3492
3493 =item *
3494
3495 If you specify a condition on a prefetched relationship, ONLY those
3496 rows that match the prefetched condition will be fetched into that relationship.
3497 This means that adding prefetch to a search() B<may alter> what is returned by
3498 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
3499
3500   my $artist_rs = $schema->resultset('Artist')->search({
3501       'cds.year' => 2008,
3502   }, {
3503       join => 'cds',
3504   });
3505
3506   my $count = $artist_rs->first->cds->count;
3507
3508   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
3509
3510   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
3511
3512   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
3513
3514 that cmp_ok() may or may not pass depending on the datasets involved. This
3515 behavior may or may not survive the 0.09 transition.
3516
3517 =back
3518
3519 =head2 page
3520
3521 =over 4
3522
3523 =item Value: $page
3524
3525 =back
3526
3527 Makes the resultset paged and specifies the page to retrieve. Effectively
3528 identical to creating a non-pages resultset and then calling ->page($page)
3529 on it.
3530
3531 If L<rows> attribute is not specified it defaults to 10 rows per page.
3532
3533 When you have a paged resultset, L</count> will only return the number
3534 of rows in the page. To get the total, use the L</pager> and call
3535 C<total_entries> on it.
3536
3537 =head2 rows
3538
3539 =over 4
3540
3541 =item Value: $rows
3542
3543 =back
3544
3545 Specifies the maximum number of rows for direct retrieval or the number of
3546 rows per page if the page attribute or method is used.
3547
3548 =head2 offset
3549
3550 =over 4
3551
3552 =item Value: $offset
3553
3554 =back
3555
3556 Specifies the (zero-based) row number for the  first row to be returned, or the
3557 of the first row of the first page if paging is used.
3558
3559 =head2 group_by
3560
3561 =over 4
3562
3563 =item Value: \@columns
3564
3565 =back
3566
3567 A arrayref of columns to group by. Can include columns of joined tables.
3568
3569   group_by => [qw/ column1 column2 ... /]
3570
3571 =head2 having
3572
3573 =over 4
3574
3575 =item Value: $condition
3576
3577 =back
3578
3579 HAVING is a select statement attribute that is applied between GROUP BY and
3580 ORDER BY. It is applied to the after the grouping calculations have been
3581 done.
3582
3583   having => { 'count(employee)' => { '>=', 100 } }
3584
3585 =head2 distinct
3586
3587 =over 4
3588
3589 =item Value: (0 | 1)
3590
3591 =back
3592
3593 Set to 1 to group by all columns. If the resultset already has a group_by
3594 attribute, this setting is ignored and an appropriate warning is issued.
3595
3596 =head2 where
3597
3598 =over 4
3599
3600 Adds to the WHERE clause.
3601
3602   # only return rows WHERE deleted IS NULL for all searches
3603   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
3604
3605 Can be overridden by passing C<< { where => undef } >> as an attribute
3606 to a resultset.
3607
3608 =back
3609
3610 =head2 cache
3611
3612 Set to 1 to cache search results. This prevents extra SQL queries if you
3613 revisit rows in your ResultSet:
3614
3615   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
3616
3617   while( my $artist = $resultset->next ) {
3618     ... do stuff ...
3619   }
3620
3621   $rs->first; # without cache, this would issue a query
3622
3623 By default, searches are not cached.
3624
3625 For more examples of using these attributes, see
3626 L<DBIx::Class::Manual::Cookbook>.
3627
3628 =head2 for
3629
3630 =over 4
3631
3632 =item Value: ( 'update' | 'shared' )
3633
3634 =back
3635
3636 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3637 ... FOR SHARED.
3638
3639 =cut
3640
3641 1;