Fixed a prefetch bug (o2m->prefetch_o2m+order_by+rows)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use base qw/DBIx::Class/;
6 use Carp::Clan qw/^DBIx::Class/;
7 use DBIx::Class::Exception;
8 use Data::Page;
9 use Storable;
10 use DBIx::Class::ResultSetColumn;
11 use DBIx::Class::ResultSourceHandle;
12 use List::Util ();
13 use Scalar::Util qw/blessed weaken/;
14 use Try::Tiny;
15 use namespace::clean;
16
17 use overload
18         '0+'     => "count",
19         'bool'   => "_bool",
20         fallback => 1;
21
22 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
23
24 =head1 NAME
25
26 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
27
28 =head1 SYNOPSIS
29
30   my $users_rs   = $schema->resultset('User');
31   while( $user = $users_rs->next) {
32     print $user->username;
33   }
34
35   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
36   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
37
38 =head1 DESCRIPTION
39
40 A ResultSet is an object which stores a set of conditions representing
41 a query. It is the backbone of DBIx::Class (i.e. the really
42 important/useful bit).
43
44 No SQL is executed on the database when a ResultSet is created, it
45 just stores all the conditions needed to create the query.
46
47 A basic ResultSet representing the data of an entire table is returned
48 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
49 L<Source|DBIx::Class::Manual::Glossary/Source> name.
50
51   my $users_rs = $schema->resultset('User');
52
53 A new ResultSet is returned from calling L</search> on an existing
54 ResultSet. The new one will contain all the conditions of the
55 original, plus any new conditions added in the C<search> call.
56
57 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
58 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
59 represents.
60
61 The query that the ResultSet represents is B<only> executed against
62 the database when these methods are called:
63 L</find>, L</next>, L</all>, L</first>, L</single>, L</count>.
64
65 If a resultset is used in a numeric context it returns the L</count>.
66 However, if it is used in a boolean context it is B<always> true.  So if
67 you want to check if a resultset has any results, you must use C<if $rs
68 != 0>.
69
70 =head1 EXAMPLES
71
72 =head2 Chaining resultsets
73
74 Let's say you've got a query that needs to be run to return some data
75 to the user. But, you have an authorization system in place that
76 prevents certain users from seeing certain information. So, you want
77 to construct the basic query in one method, but add constraints to it in
78 another.
79
80   sub get_data {
81     my $self = shift;
82     my $request = $self->get_request; # Get a request object somehow.
83     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
84
85     my $cd_rs = $schema->resultset('CD')->search({
86       title => $request->param('title'),
87       year => $request->param('year'),
88     });
89
90     $self->apply_security_policy( $cd_rs );
91
92     return $cd_rs->all();
93   }
94
95   sub apply_security_policy {
96     my $self = shift;
97     my ($rs) = @_;
98
99     return $rs->search({
100       subversive => 0,
101     });
102   }
103
104 =head3 Resolving conditions and attributes
105
106 When a resultset is chained from another resultset, conditions and
107 attributes with the same keys need resolving.
108
109 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
110 into the existing ones from the original resultset.
111
112 The L</where> and L</having> attributes, and any search conditions, are
113 merged with an SQL C<AND> to the existing condition from the original
114 resultset.
115
116 All other attributes are overridden by any new ones supplied in the
117 search attributes.
118
119 =head2 Multiple queries
120
121 Since a resultset just defines a query, you can do all sorts of
122 things with it with the same object.
123
124   # Don't hit the DB yet.
125   my $cd_rs = $schema->resultset('CD')->search({
126     title => 'something',
127     year => 2009,
128   });
129
130   # Each of these hits the DB individually.
131   my $count = $cd_rs->count;
132   my $most_recent = $cd_rs->get_column('date_released')->max();
133   my @records = $cd_rs->all;
134
135 And it's not just limited to SELECT statements.
136
137   $cd_rs->delete();
138
139 This is even cooler:
140
141   $cd_rs->create({ artist => 'Fred' });
142
143 Which is the same as:
144
145   $schema->resultset('CD')->create({
146     title => 'something',
147     year => 2009,
148     artist => 'Fred'
149   });
150
151 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
152
153 =head1 METHODS
154
155 =head2 new
156
157 =over 4
158
159 =item Arguments: $source, \%$attrs
160
161 =item Return Value: $rs
162
163 =back
164
165 The resultset constructor. Takes a source object (usually a
166 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
167 L</ATTRIBUTES> below).  Does not perform any queries -- these are
168 executed as needed by the other methods.
169
170 Generally you won't need to construct a resultset manually.  You'll
171 automatically get one from e.g. a L</search> called in scalar context:
172
173   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
174
175 IMPORTANT: If called on an object, proxies to new_result instead so
176
177   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
178
179 will return a CD object, not a ResultSet.
180
181 =cut
182
183 sub new {
184   my $class = shift;
185   return $class->new_result(@_) if ref $class;
186
187   my ($source, $attrs) = @_;
188   $source = $source->handle
189     unless $source->isa('DBIx::Class::ResultSourceHandle');
190   $attrs = { %{$attrs||{}} };
191
192   if ($attrs->{page}) {
193     $attrs->{rows} ||= 10;
194   }
195
196   $attrs->{alias} ||= 'me';
197
198   # Creation of {} and bless separated to mitigate RH perl bug
199   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
200   my $self = {
201     _source_handle => $source,
202     cond => $attrs->{where},
203     pager => undef,
204     attrs => $attrs
205   };
206
207   bless $self, $class;
208
209   $self->result_class(
210     $attrs->{result_class} || $source->resolve->result_class
211   );
212
213   return $self;
214 }
215
216 =head2 search
217
218 =over 4
219
220 =item Arguments: $cond, \%attrs?
221
222 =item Return Value: $resultset (scalar context), @row_objs (list context)
223
224 =back
225
226   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
227   my $new_rs = $cd_rs->search({ year => 2005 });
228
229   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
230                  # year = 2005 OR year = 2004
231
232 If you need to pass in additional attributes but no additional condition,
233 call it as C<search(undef, \%attrs)>.
234
235   # "SELECT name, artistid FROM $artist_table"
236   my @all_artists = $schema->resultset('Artist')->search(undef, {
237     columns => [qw/name artistid/],
238   });
239
240 For a list of attributes that can be passed to C<search>, see
241 L</ATTRIBUTES>. For more examples of using this function, see
242 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
243 documentation for the first argument, see L<SQL::Abstract>.
244
245 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
246
247 =cut
248
249 sub search {
250   my $self = shift;
251   my $rs = $self->search_rs( @_ );
252   return (wantarray ? $rs->all : $rs);
253 }
254
255 =head2 search_rs
256
257 =over 4
258
259 =item Arguments: $cond, \%attrs?
260
261 =item Return Value: $resultset
262
263 =back
264
265 This method does the same exact thing as search() except it will
266 always return a resultset, even in list context.
267
268 =cut
269
270 sub search_rs {
271   my $self = shift;
272
273   # Special-case handling for (undef, undef).
274   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
275     @_ = ();
276   }
277
278   my $call_attrs = {};
279   $call_attrs = pop(@_) if @_ > 1 and ref $_[-1] eq 'HASH';
280
281   # see if we can keep the cache (no $rs changes)
282   my $cache;
283   my %safe = (alias => 1, cache => 1);
284   if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
285     ! defined $_[0]
286       or
287     ref $_[0] eq 'HASH' && ! keys %{$_[0]}
288       or
289     ref $_[0] eq 'ARRAY' && ! @{$_[0]}
290   )) {
291     $cache = $self->get_cache;
292   }
293
294   my $old_attrs = { %{$self->{attrs}} };
295   my $old_having = delete $old_attrs->{having};
296   my $old_where = delete $old_attrs->{where};
297
298   # reset the selector list
299   if (List::Util::first { exists $call_attrs->{$_} } qw{columns select as}) {
300      delete @{$old_attrs}{qw{select as columns +select +as +columns include_columns}};
301   }
302
303   my $new_attrs = { %{$old_attrs}, %{$call_attrs} };
304
305   # merge new attrs into inherited
306   foreach my $key (qw/join prefetch +select +as +columns include_columns bind/) {
307     next unless exists $call_attrs->{$key};
308     $new_attrs->{$key} = $self->_merge_attr($old_attrs->{$key}, $call_attrs->{$key});
309   }
310
311   # rip apart the rest of @_, parse a condition
312   my $call_cond = do {
313
314     if (ref $_[0] eq 'HASH') {
315       (keys %{$_[0]}) ? $_[0] : undef
316     }
317     elsif (@_ == 1) {
318       $_[0]
319     }
320     elsif (@_ % 2) {
321       $self->throw_exception('Odd number of arguments to search')
322     }
323     else {
324       +{ @_ }
325     }
326
327   } if @_;
328
329   for ($old_where, $call_cond) {
330     if (defined $_) {
331       $new_attrs->{where} = $self->_stack_cond (
332         $_, $new_attrs->{where}
333       );
334     }
335   }
336
337   if (defined $old_having) {
338     $new_attrs->{having} = $self->_stack_cond (
339       $old_having, $new_attrs->{having}
340     )
341   }
342
343   my $rs = (ref $self)->new($self->result_source, $new_attrs);
344
345   $rs->set_cache($cache) if ($cache);
346
347   return $rs;
348 }
349
350 sub _stack_cond {
351   my ($self, $left, $right) = @_;
352   if (defined $left xor defined $right) {
353     return defined $left ? $left : $right;
354   }
355   elsif (defined $left) {
356     return { -and => [ map
357       { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
358       ($left, $right)
359     ]};
360   }
361
362   return undef;
363 }
364
365 =head2 search_literal
366
367 =over 4
368
369 =item Arguments: $sql_fragment, @bind_values
370
371 =item Return Value: $resultset (scalar context), @row_objs (list context)
372
373 =back
374
375   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
376   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
377
378 Pass a literal chunk of SQL to be added to the conditional part of the
379 resultset query.
380
381 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
382 only be used in that context. C<search_literal> is a convenience method.
383 It is equivalent to calling $schema->search(\[]), but if you want to ensure
384 columns are bound correctly, use C<search>.
385
386 Example of how to use C<search> instead of C<search_literal>
387
388   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
389   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
390
391
392 See L<DBIx::Class::Manual::Cookbook/Searching> and
393 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
394 require C<search_literal>.
395
396 =cut
397
398 sub search_literal {
399   my ($self, $sql, @bind) = @_;
400   my $attr;
401   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
402     $attr = pop @bind;
403   }
404   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
405 }
406
407 =head2 find
408
409 =over 4
410
411 =item Arguments: \%columns_values | @pk_values, \%attrs?
412
413 =item Return Value: $row_object | undef
414
415 =back
416
417 Finds and returns a single row based on supplied criteria. Takes either a
418 hashref with the same format as L</create> (including inference of foreign
419 keys from related objects), or a list of primary key values in the same
420 order as the L<primary columns|DBIx::Class::ResultSource/primary_columns>
421 declaration on the L</result_source>.
422
423 In either case an attempt is made to combine conditions already existing on
424 the resultset with the condition passed to this method.
425
426 To aid with preparing the correct query for the storage you may supply the
427 C<key> attribute, which is the name of a
428 L<unique constraint|DBIx::Class::ResultSource/add_unique_constraint> (the
429 unique constraint corresponding to the
430 L<primary columns|DBIx::Class::ResultSource/primary_columns> is always named
431 C<primary>). If the C<key> attribute has been supplied, and DBIC is unable
432 to construct a query that satisfies the named unique constraint fully (
433 non-NULL values for each column member of the constraint) an exception is
434 thrown.
435
436 If no C<key> is specified, the search is carried over all unique constraints
437 which are fully defined by the available condition.
438
439 If no such constraint is found, C<find> currently defaults to a simple
440 C<< search->(\%column_values) >> which may or may not do what you expect.
441 Note that this fallback behavior may be deprecated in further versions. If
442 you need to search with arbitrary conditions - use L</search>. If the query
443 resulting from this fallback produces more than one row, a warning to the
444 effect is issued, though only the first row is constructed and returned as
445 C<$row_object>.
446
447 In addition to C<key>, L</find> recognizes and applies standard
448 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
449
450 Note that if you have extra concerns about the correctness of the resulting
451 query you need to specify the C<key> attribute and supply the entire condition
452 as an argument to find (since it is not always possible to perform the
453 combination of the resultset condition with the supplied one, especially if
454 the resultset condition contains literal sql).
455
456 For example, to find a row by its primary key:
457
458   my $cd = $schema->resultset('CD')->find(5);
459
460 You can also find a row by a specific unique constraint:
461
462   my $cd = $schema->resultset('CD')->find(
463     {
464       artist => 'Massive Attack',
465       title  => 'Mezzanine',
466     },
467     { key => 'cd_artist_title' }
468   );
469
470 See also L</find_or_create> and L</update_or_create>.
471
472 =cut
473
474 sub find {
475   my $self = shift;
476   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
477
478   my $rsrc = $self->result_source;
479
480   # Parse out the condition from input
481   my $call_cond;
482   if (ref $_[0] eq 'HASH') {
483     $call_cond = { %{$_[0]} };
484   }
485   else {
486     my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
487     my @c_cols = $rsrc->unique_constraint_columns($constraint);
488
489     $self->throw_exception(
490       "No constraint columns, maybe a malformed '$constraint' constraint?"
491     ) unless @c_cols;
492
493     $self->throw_exception (
494       'find() expects either a column/value hashref, or a list of values '
495     . "corresponding to the columns of the specified unique constraint '$constraint'"
496     ) unless @c_cols == @_;
497
498     $call_cond = {};
499     @{$call_cond}{@c_cols} = @_;
500   }
501
502   my %related;
503   for my $key (keys %$call_cond) {
504     if (
505       my $keyref = ref($call_cond->{$key})
506         and
507       my $relinfo = $rsrc->relationship_info($key)
508     ) {
509       my $val = delete $call_cond->{$key};
510
511       next if $keyref eq 'ARRAY'; # has_many for multi_create
512
513       my $rel_q = $rsrc->_resolve_condition(
514         $relinfo->{cond}, $val, $key
515       );
516       die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
517       @related{keys %$rel_q} = values %$rel_q;
518     }
519   }
520
521   # relationship conditions take precedence (?)
522   @{$call_cond}{keys %related} = values %related;
523
524   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
525   my $final_cond;
526   if (exists $attrs->{key}) {
527     $final_cond = $self->_qualify_cond_columns (
528
529       $self->_build_unique_cond (
530         $attrs->{key},
531         $call_cond,
532       ),
533
534       $alias,
535     );
536   }
537   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
538     # This means that we got here after a merger of relationship conditions
539     # in ::Relationship::Base::search_related (the row method), and furthermore
540     # the relationship is of the 'single' type. This means that the condition
541     # provided by the relationship (already attached to $self) is sufficient,
542     # as there can be only one row in the database that would satisfy the
543     # relationship
544   }
545   else {
546     # no key was specified - fall down to heuristics mode:
547     # run through all unique queries registered on the resultset, and
548     # 'OR' all qualifying queries together
549     my (@unique_queries, %seen_column_combinations);
550     for my $c_name ($rsrc->unique_constraint_names) {
551       next if $seen_column_combinations{
552         join "\x00", sort $rsrc->unique_constraint_columns($c_name)
553       }++;
554
555       push @unique_queries, try {
556         $self->_build_unique_cond ($c_name, $call_cond)
557       } || ();
558     }
559
560     $final_cond = @unique_queries
561       ? [ map { $self->_qualify_cond_columns($_, $alias) } @unique_queries ]
562       : $self->_non_unique_find_fallback ($call_cond, $attrs)
563     ;
564   }
565
566   # Run the query, passing the result_class since it should propagate for find
567   my $rs = $self->search ($final_cond, {result_class => $self->result_class, %$attrs});
568   if (keys %{$rs->_resolved_attrs->{collapse}}) {
569     my $row = $rs->next;
570     carp "Query returned more than one row" if $rs->next;
571     return $row;
572   }
573   else {
574     return $rs->single;
575   }
576 }
577
578 # This is a stop-gap method as agreed during the discussion on find() cleanup:
579 # http://lists.scsys.co.uk/pipermail/dbix-class/2010-October/009535.html
580 #
581 # It is invoked when find() is called in legacy-mode with insufficiently-unique
582 # condition. It is provided for overrides until a saner way forward is devised
583 #
584 # *NOTE* This is not a public method, and it's *GUARANTEED* to disappear down
585 # the road. Please adjust your tests accordingly to catch this situation early
586 # DBIx::Class::ResultSet->can('_non_unique_find_fallback') is reasonable
587 #
588 # The method will not be removed without an adequately complete replacement
589 # for strict-mode enforcement
590 sub _non_unique_find_fallback {
591   my ($self, $cond, $attrs) = @_;
592
593   return $self->_qualify_cond_columns(
594     $cond,
595     exists $attrs->{alias}
596       ? $attrs->{alias}
597       : $self->{attrs}{alias}
598   );
599 }
600
601
602 sub _qualify_cond_columns {
603   my ($self, $cond, $alias) = @_;
604
605   my %aliased = %$cond;
606   for (keys %aliased) {
607     $aliased{"$alias.$_"} = delete $aliased{$_}
608       if $_ !~ /\./;
609   }
610
611   return \%aliased;
612 }
613
614 sub _build_unique_cond {
615   my ($self, $constraint_name, $extra_cond) = @_;
616
617   my @c_cols = $self->result_source->unique_constraint_columns($constraint_name);
618
619   # combination may fail if $self->{cond} is non-trivial
620   my ($final_cond) = try {
621     $self->_merge_with_rscond ($extra_cond)
622   } catch {
623     +{ %$extra_cond }
624   };
625
626   # trim out everything not in $columns
627   $final_cond = { map { $_ => $final_cond->{$_} } @c_cols };
628
629   if (my @missing = grep { ! defined $final_cond->{$_} } (@c_cols) ) {
630     $self->throw_exception( sprintf ( "Unable to satisfy requested constraint '%s', no values for column(s): %s",
631       $constraint_name,
632       join (', ', map { "'$_'" } @missing),
633     ) );
634   }
635
636   return $final_cond;
637 }
638
639 =head2 search_related
640
641 =over 4
642
643 =item Arguments: $rel, $cond, \%attrs?
644
645 =item Return Value: $new_resultset
646
647 =back
648
649   $new_rs = $cd_rs->search_related('artist', {
650     name => 'Emo-R-Us',
651   });
652
653 Searches the specified relationship, optionally specifying a condition and
654 attributes for matching records. See L</ATTRIBUTES> for more information.
655
656 =cut
657
658 sub search_related {
659   return shift->related_resultset(shift)->search(@_);
660 }
661
662 =head2 search_related_rs
663
664 This method works exactly the same as search_related, except that
665 it guarantees a resultset, even in list context.
666
667 =cut
668
669 sub search_related_rs {
670   return shift->related_resultset(shift)->search_rs(@_);
671 }
672
673 =head2 cursor
674
675 =over 4
676
677 =item Arguments: none
678
679 =item Return Value: $cursor
680
681 =back
682
683 Returns a storage-driven cursor to the given resultset. See
684 L<DBIx::Class::Cursor> for more information.
685
686 =cut
687
688 sub cursor {
689   my ($self) = @_;
690
691   my $attrs = $self->_resolved_attrs_copy;
692
693   return $self->{cursor}
694     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
695           $attrs->{where},$attrs);
696 }
697
698 =head2 single
699
700 =over 4
701
702 =item Arguments: $cond?
703
704 =item Return Value: $row_object | undef
705
706 =back
707
708   my $cd = $schema->resultset('CD')->single({ year => 2001 });
709
710 Inflates the first result without creating a cursor if the resultset has
711 any records in it; if not returns C<undef>. Used by L</find> as a lean version
712 of L</search>.
713
714 While this method can take an optional search condition (just like L</search>)
715 being a fast-code-path it does not recognize search attributes. If you need to
716 add extra joins or similar, call L</search> and then chain-call L</single> on the
717 L<DBIx::Class::ResultSet> returned.
718
719 =over
720
721 =item B<Note>
722
723 As of 0.08100, this method enforces the assumption that the preceding
724 query returns only one row. If more than one row is returned, you will receive
725 a warning:
726
727   Query returned more than one row
728
729 In this case, you should be using L</next> or L</find> instead, or if you really
730 know what you are doing, use the L</rows> attribute to explicitly limit the size
731 of the resultset.
732
733 This method will also throw an exception if it is called on a resultset prefetching
734 has_many, as such a prefetch implies fetching multiple rows from the database in
735 order to assemble the resulting object.
736
737 =back
738
739 =cut
740
741 sub single {
742   my ($self, $where) = @_;
743   if(@_ > 2) {
744       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
745   }
746
747   my $attrs = $self->_resolved_attrs_copy;
748
749   if (keys %{$attrs->{collapse}}) {
750     $self->throw_exception(
751       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
752     );
753   }
754
755   if ($where) {
756     if (defined $attrs->{where}) {
757       $attrs->{where} = {
758         '-and' =>
759             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
760                $where, delete $attrs->{where} ]
761       };
762     } else {
763       $attrs->{where} = $where;
764     }
765   }
766
767   my @data = $self->result_source->storage->select_single(
768     $attrs->{from}, $attrs->{select},
769     $attrs->{where}, $attrs
770   );
771
772   return (@data ? ($self->_construct_object(@data))[0] : undef);
773 }
774
775
776 # _collapse_query
777 #
778 # Recursively collapse the query, accumulating values for each column.
779
780 sub _collapse_query {
781   my ($self, $query, $collapsed) = @_;
782
783   $collapsed ||= {};
784
785   if (ref $query eq 'ARRAY') {
786     foreach my $subquery (@$query) {
787       next unless ref $subquery;  # -or
788       $collapsed = $self->_collapse_query($subquery, $collapsed);
789     }
790   }
791   elsif (ref $query eq 'HASH') {
792     if (keys %$query and (keys %$query)[0] eq '-and') {
793       foreach my $subquery (@{$query->{-and}}) {
794         $collapsed = $self->_collapse_query($subquery, $collapsed);
795       }
796     }
797     else {
798       foreach my $col (keys %$query) {
799         my $value = $query->{$col};
800         $collapsed->{$col}{$value}++;
801       }
802     }
803   }
804
805   return $collapsed;
806 }
807
808 =head2 get_column
809
810 =over 4
811
812 =item Arguments: $cond?
813
814 =item Return Value: $resultsetcolumn
815
816 =back
817
818   my $max_length = $rs->get_column('length')->max;
819
820 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
821
822 =cut
823
824 sub get_column {
825   my ($self, $column) = @_;
826   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
827   return $new;
828 }
829
830 =head2 search_like
831
832 =over 4
833
834 =item Arguments: $cond, \%attrs?
835
836 =item Return Value: $resultset (scalar context), @row_objs (list context)
837
838 =back
839
840   # WHERE title LIKE '%blue%'
841   $cd_rs = $rs->search_like({ title => '%blue%'});
842
843 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
844 that this is simply a convenience method retained for ex Class::DBI users.
845 You most likely want to use L</search> with specific operators.
846
847 For more information, see L<DBIx::Class::Manual::Cookbook>.
848
849 This method is deprecated and will be removed in 0.09. Use L</search()>
850 instead. An example conversion is:
851
852   ->search_like({ foo => 'bar' });
853
854   # Becomes
855
856   ->search({ foo => { like => 'bar' } });
857
858 =cut
859
860 sub search_like {
861   my $class = shift;
862   carp (
863     'search_like() is deprecated and will be removed in DBIC version 0.09.'
864    .' Instead use ->search({ x => { -like => "y%" } })'
865    .' (note the outer pair of {}s - they are important!)'
866   );
867   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
868   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
869   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
870   return $class->search($query, { %$attrs });
871 }
872
873 =head2 slice
874
875 =over 4
876
877 =item Arguments: $first, $last
878
879 =item Return Value: $resultset (scalar context), @row_objs (list context)
880
881 =back
882
883 Returns a resultset or object list representing a subset of elements from the
884 resultset slice is called on. Indexes are from 0, i.e., to get the first
885 three records, call:
886
887   my ($one, $two, $three) = $rs->slice(0, 2);
888
889 =cut
890
891 sub slice {
892   my ($self, $min, $max) = @_;
893   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
894   $attrs->{offset} = $self->{attrs}{offset} || 0;
895   $attrs->{offset} += $min;
896   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
897   return $self->search(undef, $attrs);
898   #my $slice = (ref $self)->new($self->result_source, $attrs);
899   #return (wantarray ? $slice->all : $slice);
900 }
901
902 =head2 next
903
904 =over 4
905
906 =item Arguments: none
907
908 =item Return Value: $result | undef
909
910 =back
911
912 Returns the next element in the resultset (C<undef> is there is none).
913
914 Can be used to efficiently iterate over records in the resultset:
915
916   my $rs = $schema->resultset('CD')->search;
917   while (my $cd = $rs->next) {
918     print $cd->title;
919   }
920
921 Note that you need to store the resultset object, and call C<next> on it.
922 Calling C<< resultset('Table')->next >> repeatedly will always return the
923 first record from the resultset.
924
925 =cut
926
927 sub next {
928   my ($self) = @_;
929   if (my $cache = $self->get_cache) {
930     $self->{all_cache_position} ||= 0;
931     return $cache->[$self->{all_cache_position}++];
932   }
933   if ($self->{attrs}{cache}) {
934     delete $self->{pager};
935     $self->{all_cache_position} = 1;
936     return ($self->all)[0];
937   }
938   if ($self->{stashed_objects}) {
939     my $obj = shift(@{$self->{stashed_objects}});
940     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
941     return $obj;
942   }
943   my @row = (
944     exists $self->{stashed_row}
945       ? @{delete $self->{stashed_row}}
946       : $self->cursor->next
947   );
948   return undef unless (@row);
949   my ($row, @more) = $self->_construct_object(@row);
950   $self->{stashed_objects} = \@more if @more;
951   return $row;
952 }
953
954 sub _construct_object {
955   my ($self, @row) = @_;
956
957   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row)
958     or return ();
959   my @new = $self->result_class->inflate_result($self->result_source, @$info);
960   @new = $self->{_attrs}{record_filter}->(@new)
961     if exists $self->{_attrs}{record_filter};
962   return @new;
963 }
964
965 sub _collapse_result {
966   my ($self, $as_proto, $row) = @_;
967
968   my @copy = @$row;
969
970   # 'foo'         => [ undef, 'foo' ]
971   # 'foo.bar'     => [ 'foo', 'bar' ]
972   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
973
974   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
975
976   my %collapse = %{$self->{_attrs}{collapse}||{}};
977
978   my @pri_index;
979
980   # if we're doing collapsing (has_many prefetch) we need to grab records
981   # until the PK changes, so fill @pri_index. if not, we leave it empty so
982   # we know we don't have to bother.
983
984   # the reason for not using the collapse stuff directly is because if you
985   # had for e.g. two artists in a row with no cds, the collapse info for
986   # both would be NULL (undef) so you'd lose the second artist
987
988   # store just the index so we can check the array positions from the row
989   # without having to contruct the full hash
990
991   if (keys %collapse) {
992     my %pri = map { ($_ => 1) } $self->result_source->_pri_cols;
993     foreach my $i (0 .. $#construct_as) {
994       next if defined($construct_as[$i][0]); # only self table
995       if (delete $pri{$construct_as[$i][1]}) {
996         push(@pri_index, $i);
997       }
998       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
999     }
1000   }
1001
1002   # no need to do an if, it'll be empty if @pri_index is empty anyway
1003
1004   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
1005
1006   my @const_rows;
1007
1008   do { # no need to check anything at the front, we always want the first row
1009
1010     my %const;
1011
1012     foreach my $this_as (@construct_as) {
1013       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
1014     }
1015
1016     push(@const_rows, \%const);
1017
1018   } until ( # no pri_index => no collapse => drop straight out
1019       !@pri_index
1020     or
1021       do { # get another row, stash it, drop out if different PK
1022
1023         @copy = $self->cursor->next;
1024         $self->{stashed_row} = \@copy;
1025
1026         # last thing in do block, counts as true if anything doesn't match
1027
1028         # check xor defined first for NULL vs. NOT NULL then if one is
1029         # defined the other must be so check string equality
1030
1031         grep {
1032           (defined $pri_vals{$_} ^ defined $copy[$_])
1033           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
1034         } @pri_index;
1035       }
1036   );
1037
1038   my $alias = $self->{attrs}{alias};
1039   my $info = [];
1040
1041   my %collapse_pos;
1042
1043   my @const_keys;
1044
1045   foreach my $const (@const_rows) {
1046     scalar @const_keys or do {
1047       @const_keys = sort { length($a) <=> length($b) } keys %$const;
1048     };
1049     foreach my $key (@const_keys) {
1050       if (length $key) {
1051         my $target = $info;
1052         my @parts = split(/\./, $key);
1053         my $cur = '';
1054         my $data = $const->{$key};
1055         foreach my $p (@parts) {
1056           $target = $target->[1]->{$p} ||= [];
1057           $cur .= ".${p}";
1058           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) {
1059             # collapsing at this point and on final part
1060             my $pos = $collapse_pos{$cur};
1061             CK: foreach my $ck (@ckey) {
1062               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
1063                 $collapse_pos{$cur} = $data;
1064                 delete @collapse_pos{ # clear all positioning for sub-entries
1065                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
1066                 };
1067                 push(@$target, []);
1068                 last CK;
1069               }
1070             }
1071           }
1072           if (exists $collapse{$cur}) {
1073             $target = $target->[-1];
1074           }
1075         }
1076         $target->[0] = $data;
1077       } else {
1078         $info->[0] = $const->{$key};
1079       }
1080     }
1081   }
1082
1083   return $info;
1084 }
1085
1086 =head2 result_source
1087
1088 =over 4
1089
1090 =item Arguments: $result_source?
1091
1092 =item Return Value: $result_source
1093
1094 =back
1095
1096 An accessor for the primary ResultSource object from which this ResultSet
1097 is derived.
1098
1099 =head2 result_class
1100
1101 =over 4
1102
1103 =item Arguments: $result_class?
1104
1105 =item Return Value: $result_class
1106
1107 =back
1108
1109 An accessor for the class to use when creating row objects. Defaults to
1110 C<< result_source->result_class >> - which in most cases is the name of the
1111 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1112
1113 Note that changing the result_class will also remove any components
1114 that were originally loaded in the source class via
1115 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1116 in the original source class will not run.
1117
1118 =cut
1119
1120 sub result_class {
1121   my ($self, $result_class) = @_;
1122   if ($result_class) {
1123     unless (ref $result_class) { # don't fire this for an object
1124       $self->ensure_class_loaded($result_class);
1125     }
1126     $self->_result_class($result_class);
1127     # THIS LINE WOULD BE A BUG - this accessor specifically exists to
1128     # permit the user to set result class on one result set only; it only
1129     # chains if provided to search()
1130     #$self->{attrs}{result_class} = $result_class if ref $self;
1131   }
1132   $self->_result_class;
1133 }
1134
1135 =head2 count
1136
1137 =over 4
1138
1139 =item Arguments: $cond, \%attrs??
1140
1141 =item Return Value: $count
1142
1143 =back
1144
1145 Performs an SQL C<COUNT> with the same query as the resultset was built
1146 with to find the number of elements. Passing arguments is equivalent to
1147 C<< $rs->search ($cond, \%attrs)->count >>
1148
1149 =cut
1150
1151 sub count {
1152   my $self = shift;
1153   return $self->search(@_)->count if @_ and defined $_[0];
1154   return scalar @{ $self->get_cache } if $self->get_cache;
1155
1156   my $attrs = $self->_resolved_attrs_copy;
1157
1158   # this is a little optimization - it is faster to do the limit
1159   # adjustments in software, instead of a subquery
1160   my $rows = delete $attrs->{rows};
1161   my $offset = delete $attrs->{offset};
1162
1163   my $crs;
1164   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1165     $crs = $self->_count_subq_rs ($attrs);
1166   }
1167   else {
1168     $crs = $self->_count_rs ($attrs);
1169   }
1170   my $count = $crs->next;
1171
1172   $count -= $offset if $offset;
1173   $count = $rows if $rows and $rows < $count;
1174   $count = 0 if ($count < 0);
1175
1176   return $count;
1177 }
1178
1179 =head2 count_rs
1180
1181 =over 4
1182
1183 =item Arguments: $cond, \%attrs??
1184
1185 =item Return Value: $count_rs
1186
1187 =back
1188
1189 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1190 This can be very handy for subqueries:
1191
1192   ->search( { amount => $some_rs->count_rs->as_query } )
1193
1194 As with regular resultsets the SQL query will be executed only after
1195 the resultset is accessed via L</next> or L</all>. That would return
1196 the same single value obtainable via L</count>.
1197
1198 =cut
1199
1200 sub count_rs {
1201   my $self = shift;
1202   return $self->search(@_)->count_rs if @_;
1203
1204   # this may look like a lack of abstraction (count() does about the same)
1205   # but in fact an _rs *must* use a subquery for the limits, as the
1206   # software based limiting can not be ported if this $rs is to be used
1207   # in a subquery itself (i.e. ->as_query)
1208   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1209     return $self->_count_subq_rs;
1210   }
1211   else {
1212     return $self->_count_rs;
1213   }
1214 }
1215
1216 #
1217 # returns a ResultSetColumn object tied to the count query
1218 #
1219 sub _count_rs {
1220   my ($self, $attrs) = @_;
1221
1222   my $rsrc = $self->result_source;
1223   $attrs ||= $self->_resolved_attrs;
1224
1225   my $tmp_attrs = { %$attrs };
1226   # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
1227   delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
1228
1229   # overwrite the selector (supplied by the storage)
1230   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
1231   $tmp_attrs->{as} = 'count';
1232
1233   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1234
1235   return $tmp_rs;
1236 }
1237
1238 #
1239 # same as above but uses a subquery
1240 #
1241 sub _count_subq_rs {
1242   my ($self, $attrs) = @_;
1243
1244   my $rsrc = $self->result_source;
1245   $attrs ||= $self->_resolved_attrs;
1246
1247   my $sub_attrs = { %$attrs };
1248   # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
1249   delete @{$sub_attrs}{qw/collapse select _prefetch_select as order_by for/};
1250
1251   # if we multi-prefetch we group_by primary keys only as this is what we would
1252   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1253   if ( keys %{$attrs->{collapse}}  ) {
1254     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1255   }
1256
1257   # Calculate subquery selector
1258   if (my $g = $sub_attrs->{group_by}) {
1259
1260     my $sql_maker = $rsrc->storage->sql_maker;
1261
1262     # necessary as the group_by may refer to aliased functions
1263     my $sel_index;
1264     for my $sel (@{$attrs->{select}}) {
1265       $sel_index->{$sel->{-as}} = $sel
1266         if (ref $sel eq 'HASH' and $sel->{-as});
1267     }
1268
1269     for my $g_part (@$g) {
1270       my $colpiece = $sel_index->{$g_part} || $g_part;
1271
1272       # disqualify join-based group_by's. Arcane but possible query
1273       # also horrible horrible hack to alias a column (not a func.)
1274       # (probably need to introduce SQLA syntax)
1275       if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
1276         my $as = $colpiece;
1277         $as =~ s/\./__/;
1278         $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
1279       }
1280       push @{$sub_attrs->{select}}, $colpiece;
1281     }
1282   }
1283   else {
1284     my @pcols = map { "$attrs->{alias}.$_" } ($rsrc->primary_columns);
1285     $sub_attrs->{select} = @pcols ? \@pcols : [ 1 ];
1286   }
1287
1288   return $rsrc->resultset_class
1289                ->new ($rsrc, $sub_attrs)
1290                 ->as_subselect_rs
1291                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
1292                   ->get_column ('count');
1293 }
1294
1295 sub _bool {
1296   return 1;
1297 }
1298
1299 =head2 count_literal
1300
1301 =over 4
1302
1303 =item Arguments: $sql_fragment, @bind_values
1304
1305 =item Return Value: $count
1306
1307 =back
1308
1309 Counts the results in a literal query. Equivalent to calling L</search_literal>
1310 with the passed arguments, then L</count>.
1311
1312 =cut
1313
1314 sub count_literal { shift->search_literal(@_)->count; }
1315
1316 =head2 all
1317
1318 =over 4
1319
1320 =item Arguments: none
1321
1322 =item Return Value: @objects
1323
1324 =back
1325
1326 Returns all elements in the resultset. Called implicitly if the resultset
1327 is returned in list context.
1328
1329 =cut
1330
1331 sub all {
1332   my $self = shift;
1333   if(@_) {
1334       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1335   }
1336
1337   return @{ $self->get_cache } if $self->get_cache;
1338
1339   my @obj;
1340
1341   if (keys %{$self->_resolved_attrs->{collapse}}) {
1342     # Using $self->cursor->all is really just an optimisation.
1343     # If we're collapsing has_many prefetches it probably makes
1344     # very little difference, and this is cleaner than hacking
1345     # _construct_object to survive the approach
1346     $self->cursor->reset;
1347     my @row = $self->cursor->next;
1348     while (@row) {
1349       push(@obj, $self->_construct_object(@row));
1350       @row = (exists $self->{stashed_row}
1351                ? @{delete $self->{stashed_row}}
1352                : $self->cursor->next);
1353     }
1354   } else {
1355     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1356   }
1357
1358   $self->set_cache(\@obj) if $self->{attrs}{cache};
1359
1360   return @obj;
1361 }
1362
1363 =head2 reset
1364
1365 =over 4
1366
1367 =item Arguments: none
1368
1369 =item Return Value: $self
1370
1371 =back
1372
1373 Resets the resultset's cursor, so you can iterate through the elements again.
1374 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1375 another query.
1376
1377 =cut
1378
1379 sub reset {
1380   my ($self) = @_;
1381   delete $self->{_attrs} if exists $self->{_attrs};
1382   $self->{all_cache_position} = 0;
1383   $self->cursor->reset;
1384   return $self;
1385 }
1386
1387 =head2 first
1388
1389 =over 4
1390
1391 =item Arguments: none
1392
1393 =item Return Value: $object | undef
1394
1395 =back
1396
1397 Resets the resultset and returns an object for the first result (or C<undef>
1398 if the resultset is empty).
1399
1400 =cut
1401
1402 sub first {
1403   return $_[0]->reset->next;
1404 }
1405
1406
1407 # _rs_update_delete
1408 #
1409 # Determines whether and what type of subquery is required for the $rs operation.
1410 # If grouping is necessary either supplies its own, or verifies the current one
1411 # After all is done delegates to the proper storage method.
1412
1413 sub _rs_update_delete {
1414   my ($self, $op, $values) = @_;
1415
1416   my $rsrc = $self->result_source;
1417
1418   # if a condition exists we need to strip all table qualifiers
1419   # if this is not possible we'll force a subquery below
1420   my $cond = $rsrc->schema->storage->_strip_cond_qualifiers ($self->{cond});
1421
1422   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1423   my $needs_subq = $needs_group_by_subq || (not defined $cond) || $self->_has_resolved_attr(qw/rows offset/);
1424
1425   if ($needs_group_by_subq or $needs_subq) {
1426
1427     # make a new $rs selecting only the PKs (that's all we really need)
1428     my $attrs = $self->_resolved_attrs_copy;
1429
1430
1431     delete $attrs->{$_} for qw/collapse _collapse_order_by select _prefetch_select as/;
1432     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1433
1434     if ($needs_group_by_subq) {
1435       # make sure no group_by was supplied, or if there is one - make sure it matches
1436       # the columns compiled above perfectly. Anything else can not be sanely executed
1437       # on most databases so croak right then and there
1438
1439       if (my $g = $attrs->{group_by}) {
1440         my @current_group_by = map
1441           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1442           @$g
1443         ;
1444
1445         if (
1446           join ("\x00", sort @current_group_by)
1447             ne
1448           join ("\x00", sort @{$attrs->{columns}} )
1449         ) {
1450           $self->throw_exception (
1451             "You have just attempted a $op operation on a resultset which does group_by"
1452             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1453             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1454             . ' kind of queries. Please retry the operation with a modified group_by or'
1455             . ' without using one at all.'
1456           );
1457         }
1458       }
1459       else {
1460         $attrs->{group_by} = $attrs->{columns};
1461       }
1462     }
1463
1464     my $subrs = (ref $self)->new($rsrc, $attrs);
1465     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1466   }
1467   else {
1468     return $rsrc->storage->$op(
1469       $rsrc,
1470       $op eq 'update' ? $values : (),
1471       $cond,
1472     );
1473   }
1474 }
1475
1476 =head2 update
1477
1478 =over 4
1479
1480 =item Arguments: \%values
1481
1482 =item Return Value: $storage_rv
1483
1484 =back
1485
1486 Sets the specified columns in the resultset to the supplied values in a
1487 single query. Note that this will not run any accessor/set_column/update
1488 triggers, nor will it update any row object instances derived from this
1489 resultset (this includes the contents of the L<resultset cache|/set_cache>
1490 if any). See L</update_all> if you need to execute any on-update
1491 triggers or cascades defined either by you or a
1492 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1493
1494 The return value is a pass through of what the underlying
1495 storage backend returned, and may vary. See L<DBI/execute> for the most
1496 common case.
1497
1498 =cut
1499
1500 sub update {
1501   my ($self, $values) = @_;
1502   $self->throw_exception('Values for update must be a hash')
1503     unless ref $values eq 'HASH';
1504
1505   return $self->_rs_update_delete ('update', $values);
1506 }
1507
1508 =head2 update_all
1509
1510 =over 4
1511
1512 =item Arguments: \%values
1513
1514 =item Return Value: 1
1515
1516 =back
1517
1518 Fetches all objects and updates them one at a time via
1519 L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
1520 triggers, while L</update> will not.
1521
1522 =cut
1523
1524 sub update_all {
1525   my ($self, $values) = @_;
1526   $self->throw_exception('Values for update_all must be a hash')
1527     unless ref $values eq 'HASH';
1528
1529   my $guard = $self->result_source->schema->txn_scope_guard;
1530   $_->update($values) for $self->all;
1531   $guard->commit;
1532   return 1;
1533 }
1534
1535 =head2 delete
1536
1537 =over 4
1538
1539 =item Arguments: none
1540
1541 =item Return Value: $storage_rv
1542
1543 =back
1544
1545 Deletes the rows matching this resultset in a single query. Note that this
1546 will not run any delete triggers, nor will it alter the
1547 L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
1548 derived from this resultset (this includes the contents of the
1549 L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
1550 execute any on-delete triggers or cascades defined either by you or a
1551 L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
1552
1553 The return value is a pass through of what the underlying storage backend
1554 returned, and may vary. See L<DBI/execute> for the most common case.
1555
1556 =cut
1557
1558 sub delete {
1559   my $self = shift;
1560   $self->throw_exception('delete does not accept any arguments')
1561     if @_;
1562
1563   return $self->_rs_update_delete ('delete');
1564 }
1565
1566 =head2 delete_all
1567
1568 =over 4
1569
1570 =item Arguments: none
1571
1572 =item Return Value: 1
1573
1574 =back
1575
1576 Fetches all objects and deletes them one at a time via
1577 L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
1578 triggers, while L</delete> will not.
1579
1580 =cut
1581
1582 sub delete_all {
1583   my $self = shift;
1584   $self->throw_exception('delete_all does not accept any arguments')
1585     if @_;
1586
1587   my $guard = $self->result_source->schema->txn_scope_guard;
1588   $_->delete for $self->all;
1589   $guard->commit;
1590   return 1;
1591 }
1592
1593 =head2 populate
1594
1595 =over 4
1596
1597 =item Arguments: \@data;
1598
1599 =back
1600
1601 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1602 For the arrayref of hashrefs style each hashref should be a structure suitable
1603 forsubmitting to a $resultset->create(...) method.
1604
1605 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1606 to insert the data, as this is a faster method.
1607
1608 Otherwise, each set of data is inserted into the database using
1609 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1610 accumulated into an array. The array itself, or an array reference
1611 is returned depending on scalar or list context.
1612
1613 Example:  Assuming an Artist Class that has many CDs Classes relating:
1614
1615   my $Artist_rs = $schema->resultset("Artist");
1616
1617   ## Void Context Example
1618   $Artist_rs->populate([
1619      { artistid => 4, name => 'Manufactured Crap', cds => [
1620         { title => 'My First CD', year => 2006 },
1621         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1622       ],
1623      },
1624      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1625         { title => 'My parents sold me to a record company', year => 2005 },
1626         { title => 'Why Am I So Ugly?', year => 2006 },
1627         { title => 'I Got Surgery and am now Popular', year => 2007 }
1628       ],
1629      },
1630   ]);
1631
1632   ## Array Context Example
1633   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1634     { name => "Artist One"},
1635     { name => "Artist Two"},
1636     { name => "Artist Three", cds=> [
1637     { title => "First CD", year => 2007},
1638     { title => "Second CD", year => 2008},
1639   ]}
1640   ]);
1641
1642   print $ArtistOne->name; ## response is 'Artist One'
1643   print $ArtistThree->cds->count ## reponse is '2'
1644
1645 For the arrayref of arrayrefs style,  the first element should be a list of the
1646 fieldsnames to which the remaining elements are rows being inserted.  For
1647 example:
1648
1649   $Arstist_rs->populate([
1650     [qw/artistid name/],
1651     [100, 'A Formally Unknown Singer'],
1652     [101, 'A singer that jumped the shark two albums ago'],
1653     [102, 'An actually cool singer'],
1654   ]);
1655
1656 Please note an important effect on your data when choosing between void and
1657 wantarray context. Since void context goes straight to C<insert_bulk> in
1658 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1659 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1660 create primary keys for you, you will find that your PKs are empty.  In this
1661 case you will have to use the wantarray context in order to create those
1662 values.
1663
1664 =cut
1665
1666 sub populate {
1667   my $self = shift;
1668
1669   # cruft placed in standalone method
1670   my $data = $self->_normalize_populate_args(@_);
1671
1672   if(defined wantarray) {
1673     my @created;
1674     foreach my $item (@$data) {
1675       push(@created, $self->create($item));
1676     }
1677     return wantarray ? @created : \@created;
1678   } else {
1679     my $first = $data->[0];
1680
1681     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1682     # it relationship data
1683     my (@rels, @columns);
1684     for (keys %$first) {
1685       my $ref = ref $first->{$_};
1686       $self->result_source->has_relationship($_) && ($ref eq 'ARRAY' or $ref eq 'HASH')
1687         ? push @rels, $_
1688         : push @columns, $_
1689       ;
1690     }
1691
1692     my @pks = $self->result_source->primary_columns;
1693
1694     ## do the belongs_to relationships
1695     foreach my $index (0..$#$data) {
1696
1697       # delegate to create() for any dataset without primary keys with specified relationships
1698       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1699         for my $r (@rels) {
1700           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1701             my @ret = $self->populate($data);
1702             return;
1703           }
1704         }
1705       }
1706
1707       foreach my $rel (@rels) {
1708         next unless ref $data->[$index]->{$rel} eq "HASH";
1709         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1710         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1711         my $related = $result->result_source->_resolve_condition(
1712           $result->result_source->relationship_info($reverse)->{cond},
1713           $self,
1714           $result,
1715         );
1716
1717         delete $data->[$index]->{$rel};
1718         $data->[$index] = {%{$data->[$index]}, %$related};
1719
1720         push @columns, keys %$related if $index == 0;
1721       }
1722     }
1723
1724     ## inherit the data locked in the conditions of the resultset
1725     my ($rs_data) = $self->_merge_with_rscond({});
1726     delete @{$rs_data}{@columns};
1727     my @inherit_cols = keys %$rs_data;
1728     my @inherit_data = values %$rs_data;
1729
1730     ## do bulk insert on current row
1731     $self->result_source->storage->insert_bulk(
1732       $self->result_source,
1733       [@columns, @inherit_cols],
1734       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
1735     );
1736
1737     ## do the has_many relationships
1738     foreach my $item (@$data) {
1739
1740       foreach my $rel (@rels) {
1741         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1742
1743         my $parent = $self->find({map { $_ => $item->{$_} } @pks})
1744      || $self->throw_exception('Cannot find the relating object.');
1745
1746         my $child = $parent->$rel;
1747
1748         my $related = $child->result_source->_resolve_condition(
1749           $parent->result_source->relationship_info($rel)->{cond},
1750           $child,
1751           $parent,
1752         );
1753
1754         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1755         my @populate = map { {%$_, %$related} } @rows_to_add;
1756
1757         $child->populate( \@populate );
1758       }
1759     }
1760   }
1761 }
1762
1763
1764 # populate() argumnets went over several incarnations
1765 # What we ultimately support is AoH
1766 sub _normalize_populate_args {
1767   my ($self, $arg) = @_;
1768
1769   if (ref $arg eq 'ARRAY') {
1770     if (ref $arg->[0] eq 'HASH') {
1771       return $arg;
1772     }
1773     elsif (ref $arg->[0] eq 'ARRAY') {
1774       my @ret;
1775       my @colnames = @{$arg->[0]};
1776       foreach my $values (@{$arg}[1 .. $#$arg]) {
1777         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
1778       }
1779       return \@ret;
1780     }
1781   }
1782
1783   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
1784 }
1785
1786 =head2 pager
1787
1788 =over 4
1789
1790 =item Arguments: none
1791
1792 =item Return Value: $pager
1793
1794 =back
1795
1796 Return Value a L<Data::Page> object for the current resultset. Only makes
1797 sense for queries with a C<page> attribute.
1798
1799 To get the full count of entries for a paged resultset, call
1800 C<total_entries> on the L<Data::Page> object.
1801
1802 =cut
1803
1804 # make a wizard good for both a scalar and a hashref
1805 my $mk_lazy_count_wizard = sub {
1806   require Variable::Magic;
1807
1808   my $stash = { total_rs => shift };
1809   my $slot = shift; # only used by the hashref magic
1810
1811   my $magic = Variable::Magic::wizard (
1812     data => sub { $stash },
1813
1814     (!$slot)
1815     ? (
1816       # the scalar magic
1817       get => sub {
1818         # set value lazily, and dispell for good
1819         ${$_[0]} = $_[1]{total_rs}->count;
1820         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
1821         return 1;
1822       },
1823       set => sub {
1824         # an explicit set implies dispell as well
1825         # the unless() is to work around "fun and giggles" below
1826         Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
1827           unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
1828         return 1;
1829       },
1830     )
1831     : (
1832       # the uvar magic
1833       fetch => sub {
1834         if ($_[2] eq $slot and !$_[1]{inactive}) {
1835           my $cnt = $_[1]{total_rs}->count;
1836           $_[0]->{$slot} = $cnt;
1837
1838           # attempting to dispell in a fetch handle (works in store), seems
1839           # to invariable segfault on 5.10, 5.12, 5.13 :(
1840           # so use an inactivator instead
1841           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
1842           $_[1]{inactive}++;
1843         }
1844         return 1;
1845       },
1846       store => sub {
1847         if (! $_[1]{inactive} and $_[2] eq $slot) {
1848           #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
1849           $_[1]{inactive}++
1850             unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
1851         }
1852         return 1;
1853       },
1854     ),
1855   );
1856
1857   $stash->{magic_selfref} = $magic;
1858   weaken ($stash->{magic_selfref}); # this fails on 5.8.1
1859
1860   return $magic;
1861 };
1862
1863 # the tie class for 5.8.1
1864 {
1865   package DBIx::Class::__DBIC_LAZY_RS_COUNT__;
1866   use base qw/Tie::Hash/;
1867
1868   sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
1869   sub NEXTKEY  { each %{$_[0]{data}} }
1870   sub EXISTS   { exists $_[0]{data}{$_[1]} }
1871   sub DELETE   { delete $_[0]{data}{$_[1]} }
1872   sub CLEAR    { %{$_[0]{data}} = () }
1873   sub SCALAR   { scalar %{$_[0]{data}} }
1874
1875   sub TIEHASH {
1876     $_[1]{data} = {%{$_[1]{selfref}}};
1877     %{$_[1]{selfref}} = ();
1878     Scalar::Util::weaken ($_[1]{selfref});
1879     return bless ($_[1], $_[0]);
1880   };
1881
1882   sub FETCH {
1883     if ($_[1] eq $_[0]{slot}) {
1884       my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
1885       untie %{$_[0]{selfref}};
1886       %{$_[0]{selfref}} = %{$_[0]{data}};
1887       return $cnt;
1888     }
1889     else {
1890       $_[0]{data}{$_[1]};
1891     }
1892   }
1893
1894   sub STORE {
1895     $_[0]{data}{$_[1]} = $_[2];
1896     if ($_[1] eq $_[0]{slot}) {
1897       untie %{$_[0]{selfref}};
1898       %{$_[0]{selfref}} = %{$_[0]{data}};
1899     }
1900     $_[2];
1901   }
1902 }
1903
1904 sub pager {
1905   my ($self) = @_;
1906
1907   return $self->{pager} if $self->{pager};
1908
1909   if ($self->get_cache) {
1910     $self->throw_exception ('Pagers on cached resultsets are not supported');
1911   }
1912
1913   my $attrs = $self->{attrs};
1914   $self->throw_exception("Can't create pager for non-paged rs")
1915     unless $self->{attrs}{page};
1916   $attrs->{rows} ||= 10;
1917
1918   # throw away the paging flags and re-run the count (possibly
1919   # with a subselect) to get the real total count
1920   my $count_attrs = { %$attrs };
1921   delete $count_attrs->{$_} for qw/rows offset page pager/;
1922   my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
1923
1924
1925 ### the following may seem awkward and dirty, but it's a thought-experiment
1926 ### necessary for future development of DBIx::DS. Do *NOT* change this code
1927 ### before talking to ribasushi/mst
1928
1929   my $pager = Data::Page->new(
1930     0,  #start with an empty set
1931     $attrs->{rows},
1932     $self->{attrs}{page},
1933   );
1934
1935   my $data_slot = 'total_entries';
1936
1937   # Since we are interested in a cached value (once it's set - it's set), every
1938   # technique will detach from the magic-host once the time comes to fire the
1939   # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
1940
1941   if ($] < 5.008003) {
1942     # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
1943     # to weakref the magic container :(
1944     # tested on 5.8.1
1945     tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
1946       { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
1947     );
1948   }
1949   elsif ($] < 5.010) {
1950     # We can use magic on the hash value slot. It's interesting that the magic is
1951     # attached to the hash-slot, and does *not* stop working once I do the dummy
1952     # assignments after the cast()
1953     # tested on 5.8.3 and 5.8.9
1954     my $magic = $mk_lazy_count_wizard->($total_rs);
1955     Variable::Magic::cast ( $pager->{$data_slot}, $magic );
1956
1957     # this is for fun and giggles
1958     $pager->{$data_slot} = -1;
1959     $pager->{$data_slot} = 0;
1960
1961     # this does not work for scalars, but works with
1962     # uvar magic below
1963     #my %vals = %$pager;
1964     #%$pager = ();
1965     #%{$pager} = %vals;
1966   }
1967   else {
1968     # And the uvar magic
1969     # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
1970     # however see the wizard maker for more notes
1971     my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
1972     Variable::Magic::cast ( %$pager, $magic );
1973
1974     # still works
1975     $pager->{$data_slot} = -1;
1976     $pager->{$data_slot} = 0;
1977
1978     # this now works
1979     my %vals = %$pager;
1980     %$pager = ();
1981     %{$pager} = %vals;
1982   }
1983
1984   return $self->{pager} = $pager;
1985 }
1986
1987 =head2 page
1988
1989 =over 4
1990
1991 =item Arguments: $page_number
1992
1993 =item Return Value: $rs
1994
1995 =back
1996
1997 Returns a resultset for the $page_number page of the resultset on which page
1998 is called, where each page contains a number of rows equal to the 'rows'
1999 attribute set on the resultset (10 by default).
2000
2001 =cut
2002
2003 sub page {
2004   my ($self, $page) = @_;
2005   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
2006 }
2007
2008 =head2 new_result
2009
2010 =over 4
2011
2012 =item Arguments: \%vals
2013
2014 =item Return Value: $rowobject
2015
2016 =back
2017
2018 Creates a new row object in the resultset's result class and returns
2019 it. The row is not inserted into the database at this point, call
2020 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
2021 will tell you whether the row object has been inserted or not.
2022
2023 Passes the hashref of input on to L<DBIx::Class::Row/new>.
2024
2025 =cut
2026
2027 sub new_result {
2028   my ($self, $values) = @_;
2029   $self->throw_exception( "new_result needs a hash" )
2030     unless (ref $values eq 'HASH');
2031
2032   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
2033
2034   my %new = (
2035     %$merged_cond,
2036     @$cols_from_relations
2037       ? (-cols_from_relations => $cols_from_relations)
2038       : (),
2039     -source_handle => $self->_source_handle,
2040     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2041   );
2042
2043   return $self->result_class->new(\%new);
2044 }
2045
2046 # _merge_with_rscond
2047 #
2048 # Takes a simple hash of K/V data and returns its copy merged with the
2049 # condition already present on the resultset. Additionally returns an
2050 # arrayref of value/condition names, which were inferred from related
2051 # objects (this is needed for in-memory related objects)
2052 sub _merge_with_rscond {
2053   my ($self, $data) = @_;
2054
2055   my (%new_data, @cols_from_relations);
2056
2057   my $alias = $self->{attrs}{alias};
2058
2059   if (! defined $self->{cond}) {
2060     # just massage $data below
2061   }
2062   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2063     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2064     @cols_from_relations = keys %new_data;
2065   }
2066   elsif (ref $self->{cond} ne 'HASH') {
2067     $self->throw_exception(
2068       "Can't abstract implicit construct, resultset condition not a hash"
2069     );
2070   }
2071   else {
2072     # precendence must be given to passed values over values inherited from
2073     # the cond, so the order here is important.
2074     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2075     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2076
2077     while ( my($col, $value) = each %implied ) {
2078       my $vref = ref $value;
2079       if ($vref eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '=') {
2080         $new_data{$col} = $value->{'='};
2081       }
2082       elsif( !$vref or $vref eq 'SCALAR' or blessed($value) ) {
2083         $new_data{$col} = $value;
2084       }
2085     }
2086   }
2087
2088   %new_data = (
2089     %new_data,
2090     %{ $self->_remove_alias($data, $alias) },
2091   );
2092
2093   return (\%new_data, \@cols_from_relations);
2094 }
2095
2096 # _has_resolved_attr
2097 #
2098 # determines if the resultset defines at least one
2099 # of the attributes supplied
2100 #
2101 # used to determine if a subquery is neccessary
2102 #
2103 # supports some virtual attributes:
2104 #   -join
2105 #     This will scan for any joins being present on the resultset.
2106 #     It is not a mere key-search but a deep inspection of {from}
2107 #
2108
2109 sub _has_resolved_attr {
2110   my ($self, @attr_names) = @_;
2111
2112   my $attrs = $self->_resolved_attrs;
2113
2114   my %extra_checks;
2115
2116   for my $n (@attr_names) {
2117     if (grep { $n eq $_ } (qw/-join/) ) {
2118       $extra_checks{$n}++;
2119       next;
2120     }
2121
2122     my $attr =  $attrs->{$n};
2123
2124     next if not defined $attr;
2125
2126     if (ref $attr eq 'HASH') {
2127       return 1 if keys %$attr;
2128     }
2129     elsif (ref $attr eq 'ARRAY') {
2130       return 1 if @$attr;
2131     }
2132     else {
2133       return 1 if $attr;
2134     }
2135   }
2136
2137   # a resolved join is expressed as a multi-level from
2138   return 1 if (
2139     $extra_checks{-join}
2140       and
2141     ref $attrs->{from} eq 'ARRAY'
2142       and
2143     @{$attrs->{from}} > 1
2144   );
2145
2146   return 0;
2147 }
2148
2149 # _collapse_cond
2150 #
2151 # Recursively collapse the condition.
2152
2153 sub _collapse_cond {
2154   my ($self, $cond, $collapsed) = @_;
2155
2156   $collapsed ||= {};
2157
2158   if (ref $cond eq 'ARRAY') {
2159     foreach my $subcond (@$cond) {
2160       next unless ref $subcond;  # -or
2161       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2162     }
2163   }
2164   elsif (ref $cond eq 'HASH') {
2165     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2166       foreach my $subcond (@{$cond->{-and}}) {
2167         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2168       }
2169     }
2170     else {
2171       foreach my $col (keys %$cond) {
2172         my $value = $cond->{$col};
2173         $collapsed->{$col} = $value;
2174       }
2175     }
2176   }
2177
2178   return $collapsed;
2179 }
2180
2181 # _remove_alias
2182 #
2183 # Remove the specified alias from the specified query hash. A copy is made so
2184 # the original query is not modified.
2185
2186 sub _remove_alias {
2187   my ($self, $query, $alias) = @_;
2188
2189   my %orig = %{ $query || {} };
2190   my %unaliased;
2191
2192   foreach my $key (keys %orig) {
2193     if ($key !~ /\./) {
2194       $unaliased{$key} = $orig{$key};
2195       next;
2196     }
2197     $unaliased{$1} = $orig{$key}
2198       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2199   }
2200
2201   return \%unaliased;
2202 }
2203
2204 =head2 as_query
2205
2206 =over 4
2207
2208 =item Arguments: none
2209
2210 =item Return Value: \[ $sql, @bind ]
2211
2212 =back
2213
2214 Returns the SQL query and bind vars associated with the invocant.
2215
2216 This is generally used as the RHS for a subquery.
2217
2218 =cut
2219
2220 sub as_query {
2221   my $self = shift;
2222
2223   my $attrs = $self->_resolved_attrs_copy;
2224
2225   # For future use:
2226   #
2227   # in list ctx:
2228   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2229   # $sql also has no wrapping parenthesis in list ctx
2230   #
2231   my $sqlbind = $self->result_source->storage
2232     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2233
2234   return $sqlbind;
2235 }
2236
2237 =head2 find_or_new
2238
2239 =over 4
2240
2241 =item Arguments: \%vals, \%attrs?
2242
2243 =item Return Value: $rowobject
2244
2245 =back
2246
2247   my $artist = $schema->resultset('Artist')->find_or_new(
2248     { artist => 'fred' }, { key => 'artists' });
2249
2250   $cd->cd_to_producer->find_or_new({ producer => $producer },
2251                                    { key => 'primary });
2252
2253 Find an existing record from this resultset using L</find>. if none exists,
2254 instantiate a new result object and return it. The object will not be saved
2255 into your storage until you call L<DBIx::Class::Row/insert> on it.
2256
2257 You most likely want this method when looking for existing rows using a unique
2258 constraint that is not the primary key, or looking for related rows.
2259
2260 If you want objects to be saved immediately, use L</find_or_create> instead.
2261
2262 B<Note>: Make sure to read the documentation of L</find> and understand the
2263 significance of the C<key> attribute, as its lack may skew your search, and
2264 subsequently result in spurious new objects.
2265
2266 B<Note>: Take care when using C<find_or_new> with a table having
2267 columns with default values that you intend to be automatically
2268 supplied by the database (e.g. an auto_increment primary key column).
2269 In normal usage, the value of such columns should NOT be included at
2270 all in the call to C<find_or_new>, even when set to C<undef>.
2271
2272 =cut
2273
2274 sub find_or_new {
2275   my $self     = shift;
2276   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2277   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2278   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2279     return $row;
2280   }
2281   return $self->new_result($hash);
2282 }
2283
2284 =head2 create
2285
2286 =over 4
2287
2288 =item Arguments: \%vals
2289
2290 =item Return Value: a L<DBIx::Class::Row> $object
2291
2292 =back
2293
2294 Attempt to create a single new row or a row with multiple related rows
2295 in the table represented by the resultset (and related tables). This
2296 will not check for duplicate rows before inserting, use
2297 L</find_or_create> to do that.
2298
2299 To create one row for this resultset, pass a hashref of key/value
2300 pairs representing the columns of the table and the values you wish to
2301 store. If the appropriate relationships are set up, foreign key fields
2302 can also be passed an object representing the foreign row, and the
2303 value will be set to its primary key.
2304
2305 To create related objects, pass a hashref of related-object column values
2306 B<keyed on the relationship name>. If the relationship is of type C<multi>
2307 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2308 The process will correctly identify columns holding foreign keys, and will
2309 transparently populate them from the keys of the corresponding relation.
2310 This can be applied recursively, and will work correctly for a structure
2311 with an arbitrary depth and width, as long as the relationships actually
2312 exists and the correct column data has been supplied.
2313
2314
2315 Instead of hashrefs of plain related data (key/value pairs), you may
2316 also pass new or inserted objects. New objects (not inserted yet, see
2317 L</new>), will be inserted into their appropriate tables.
2318
2319 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2320
2321 Example of creating a new row.
2322
2323   $person_rs->create({
2324     name=>"Some Person",
2325     email=>"somebody@someplace.com"
2326   });
2327
2328 Example of creating a new row and also creating rows in a related C<has_many>
2329 or C<has_one> resultset.  Note Arrayref.
2330
2331   $artist_rs->create(
2332      { artistid => 4, name => 'Manufactured Crap', cds => [
2333         { title => 'My First CD', year => 2006 },
2334         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2335       ],
2336      },
2337   );
2338
2339 Example of creating a new row and also creating a row in a related
2340 C<belongs_to> resultset. Note Hashref.
2341
2342   $cd_rs->create({
2343     title=>"Music for Silly Walks",
2344     year=>2000,
2345     artist => {
2346       name=>"Silly Musician",
2347     }
2348   });
2349
2350 =over
2351
2352 =item WARNING
2353
2354 When subclassing ResultSet never attempt to override this method. Since
2355 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2356 lot of the internals simply never call it, so your override will be
2357 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2358 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2359 L</create> process you need to intervene.
2360
2361 =back
2362
2363 =cut
2364
2365 sub create {
2366   my ($self, $attrs) = @_;
2367   $self->throw_exception( "create needs a hashref" )
2368     unless ref $attrs eq 'HASH';
2369   return $self->new_result($attrs)->insert;
2370 }
2371
2372 =head2 find_or_create
2373
2374 =over 4
2375
2376 =item Arguments: \%vals, \%attrs?
2377
2378 =item Return Value: $rowobject
2379
2380 =back
2381
2382   $cd->cd_to_producer->find_or_create({ producer => $producer },
2383                                       { key => 'primary' });
2384
2385 Tries to find a record based on its primary key or unique constraints; if none
2386 is found, creates one and returns that instead.
2387
2388   my $cd = $schema->resultset('CD')->find_or_create({
2389     cdid   => 5,
2390     artist => 'Massive Attack',
2391     title  => 'Mezzanine',
2392     year   => 2005,
2393   });
2394
2395 Also takes an optional C<key> attribute, to search by a specific key or unique
2396 constraint. For example:
2397
2398   my $cd = $schema->resultset('CD')->find_or_create(
2399     {
2400       artist => 'Massive Attack',
2401       title  => 'Mezzanine',
2402     },
2403     { key => 'cd_artist_title' }
2404   );
2405
2406 B<Note>: Make sure to read the documentation of L</find> and understand the
2407 significance of the C<key> attribute, as its lack may skew your search, and
2408 subsequently result in spurious row creation.
2409
2410 B<Note>: Because find_or_create() reads from the database and then
2411 possibly inserts based on the result, this method is subject to a race
2412 condition. Another process could create a record in the table after
2413 the find has completed and before the create has started. To avoid
2414 this problem, use find_or_create() inside a transaction.
2415
2416 B<Note>: Take care when using C<find_or_create> with a table having
2417 columns with default values that you intend to be automatically
2418 supplied by the database (e.g. an auto_increment primary key column).
2419 In normal usage, the value of such columns should NOT be included at
2420 all in the call to C<find_or_create>, even when set to C<undef>.
2421
2422 See also L</find> and L</update_or_create>. For information on how to declare
2423 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2424
2425 =cut
2426
2427 sub find_or_create {
2428   my $self     = shift;
2429   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2430   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2431   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2432     return $row;
2433   }
2434   return $self->create($hash);
2435 }
2436
2437 =head2 update_or_create
2438
2439 =over 4
2440
2441 =item Arguments: \%col_values, { key => $unique_constraint }?
2442
2443 =item Return Value: $row_object
2444
2445 =back
2446
2447   $resultset->update_or_create({ col => $val, ... });
2448
2449 Like L</find_or_create>, but if a row is found it is immediately updated via
2450 C<< $found_row->update (\%col_values) >>.
2451
2452
2453 Takes an optional C<key> attribute to search on a specific unique constraint.
2454 For example:
2455
2456   # In your application
2457   my $cd = $schema->resultset('CD')->update_or_create(
2458     {
2459       artist => 'Massive Attack',
2460       title  => 'Mezzanine',
2461       year   => 1998,
2462     },
2463     { key => 'cd_artist_title' }
2464   );
2465
2466   $cd->cd_to_producer->update_or_create({
2467     producer => $producer,
2468     name => 'harry',
2469   }, {
2470     key => 'primary',
2471   });
2472
2473 B<Note>: Make sure to read the documentation of L</find> and understand the
2474 significance of the C<key> attribute, as its lack may skew your search, and
2475 subsequently result in spurious row creation.
2476
2477 B<Note>: Take care when using C<update_or_create> with a table having
2478 columns with default values that you intend to be automatically
2479 supplied by the database (e.g. an auto_increment primary key column).
2480 In normal usage, the value of such columns should NOT be included at
2481 all in the call to C<update_or_create>, even when set to C<undef>.
2482
2483 See also L</find> and L</find_or_create>. For information on how to declare
2484 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2485
2486 =cut
2487
2488 sub update_or_create {
2489   my $self = shift;
2490   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2491   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2492
2493   my $row = $self->find($cond, $attrs);
2494   if (defined $row) {
2495     $row->update($cond);
2496     return $row;
2497   }
2498
2499   return $self->create($cond);
2500 }
2501
2502 =head2 update_or_new
2503
2504 =over 4
2505
2506 =item Arguments: \%col_values, { key => $unique_constraint }?
2507
2508 =item Return Value: $rowobject
2509
2510 =back
2511
2512   $resultset->update_or_new({ col => $val, ... });
2513
2514 Like L</find_or_new> but if a row is found it is immediately updated via
2515 C<< $found_row->update (\%col_values) >>.
2516
2517 For example:
2518
2519   # In your application
2520   my $cd = $schema->resultset('CD')->update_or_new(
2521     {
2522       artist => 'Massive Attack',
2523       title  => 'Mezzanine',
2524       year   => 1998,
2525     },
2526     { key => 'cd_artist_title' }
2527   );
2528
2529   if ($cd->in_storage) {
2530       # the cd was updated
2531   }
2532   else {
2533       # the cd is not yet in the database, let's insert it
2534       $cd->insert;
2535   }
2536
2537 B<Note>: Make sure to read the documentation of L</find> and understand the
2538 significance of the C<key> attribute, as its lack may skew your search, and
2539 subsequently result in spurious new objects.
2540
2541 B<Note>: Take care when using C<update_or_new> with a table having
2542 columns with default values that you intend to be automatically
2543 supplied by the database (e.g. an auto_increment primary key column).
2544 In normal usage, the value of such columns should NOT be included at
2545 all in the call to C<update_or_new>, even when set to C<undef>.
2546
2547 See also L</find>, L</find_or_create> and L</find_or_new>. 
2548
2549 =cut
2550
2551 sub update_or_new {
2552     my $self  = shift;
2553     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2554     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2555
2556     my $row = $self->find( $cond, $attrs );
2557     if ( defined $row ) {
2558         $row->update($cond);
2559         return $row;
2560     }
2561
2562     return $self->new_result($cond);
2563 }
2564
2565 =head2 get_cache
2566
2567 =over 4
2568
2569 =item Arguments: none
2570
2571 =item Return Value: \@cache_objects | undef
2572
2573 =back
2574
2575 Gets the contents of the cache for the resultset, if the cache is set.
2576
2577 The cache is populated either by using the L</prefetch> attribute to
2578 L</search> or by calling L</set_cache>.
2579
2580 =cut
2581
2582 sub get_cache {
2583   shift->{all_cache};
2584 }
2585
2586 =head2 set_cache
2587
2588 =over 4
2589
2590 =item Arguments: \@cache_objects
2591
2592 =item Return Value: \@cache_objects
2593
2594 =back
2595
2596 Sets the contents of the cache for the resultset. Expects an arrayref
2597 of objects of the same class as those produced by the resultset. Note that
2598 if the cache is set the resultset will return the cached objects rather
2599 than re-querying the database even if the cache attr is not set.
2600
2601 The contents of the cache can also be populated by using the
2602 L</prefetch> attribute to L</search>.
2603
2604 =cut
2605
2606 sub set_cache {
2607   my ( $self, $data ) = @_;
2608   $self->throw_exception("set_cache requires an arrayref")
2609       if defined($data) && (ref $data ne 'ARRAY');
2610   $self->{all_cache} = $data;
2611 }
2612
2613 =head2 clear_cache
2614
2615 =over 4
2616
2617 =item Arguments: none
2618
2619 =item Return Value: undef
2620
2621 =back
2622
2623 Clears the cache for the resultset.
2624
2625 =cut
2626
2627 sub clear_cache {
2628   shift->set_cache(undef);
2629 }
2630
2631 =head2 is_paged
2632
2633 =over 4
2634
2635 =item Arguments: none
2636
2637 =item Return Value: true, if the resultset has been paginated
2638
2639 =back
2640
2641 =cut
2642
2643 sub is_paged {
2644   my ($self) = @_;
2645   return !!$self->{attrs}{page};
2646 }
2647
2648 =head2 is_ordered
2649
2650 =over 4
2651
2652 =item Arguments: none
2653
2654 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2655
2656 =back
2657
2658 =cut
2659
2660 sub is_ordered {
2661   my ($self) = @_;
2662   return scalar $self->result_source->storage->_extract_order_columns($self->{attrs}{order_by});
2663 }
2664
2665 =head2 related_resultset
2666
2667 =over 4
2668
2669 =item Arguments: $relationship_name
2670
2671 =item Return Value: $resultset
2672
2673 =back
2674
2675 Returns a related resultset for the supplied relationship name.
2676
2677   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2678
2679 =cut
2680
2681 sub related_resultset {
2682   my ($self, $rel) = @_;
2683
2684   $self->{related_resultsets} ||= {};
2685   return $self->{related_resultsets}{$rel} ||= do {
2686     my $rsrc = $self->result_source;
2687     my $rel_info = $rsrc->relationship_info($rel);
2688
2689     $self->throw_exception(
2690       "search_related: result source '" . $rsrc->source_name .
2691         "' has no such relationship $rel")
2692       unless $rel_info;
2693
2694     my $attrs = $self->_chain_relationship($rel);
2695
2696     my $join_count = $attrs->{seen_join}{$rel};
2697
2698     my $alias = $self->result_source->storage
2699         ->relname_to_table_alias($rel, $join_count);
2700
2701     # since this is search_related, and we already slid the select window inwards
2702     # (the select/as attrs were deleted in the beginning), we need to flip all
2703     # left joins to inner, so we get the expected results
2704     # read the comment on top of the actual function to see what this does
2705     $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
2706
2707
2708     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2709     delete @{$attrs}{qw(result_class alias)};
2710
2711     my $new_cache;
2712
2713     if (my $cache = $self->get_cache) {
2714       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2715         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2716                         @$cache ];
2717       }
2718     }
2719
2720     my $rel_source = $rsrc->related_source($rel);
2721
2722     my $new = do {
2723
2724       # The reason we do this now instead of passing the alias to the
2725       # search_rs below is that if you wrap/overload resultset on the
2726       # source you need to know what alias it's -going- to have for things
2727       # to work sanely (e.g. RestrictWithObject wants to be able to add
2728       # extra query restrictions, and these may need to be $alias.)
2729
2730       my $rel_attrs = $rel_source->resultset_attributes;
2731       local $rel_attrs->{alias} = $alias;
2732
2733       $rel_source->resultset
2734                  ->search_rs(
2735                      undef, {
2736                        %$attrs,
2737                        where => $attrs->{where},
2738                    });
2739     };
2740     $new->set_cache($new_cache) if $new_cache;
2741     $new;
2742   };
2743 }
2744
2745 =head2 current_source_alias
2746
2747 =over 4
2748
2749 =item Arguments: none
2750
2751 =item Return Value: $source_alias
2752
2753 =back
2754
2755 Returns the current table alias for the result source this resultset is built
2756 on, that will be used in the SQL query. Usually it is C<me>.
2757
2758 Currently the source alias that refers to the result set returned by a
2759 L</search>/L</find> family method depends on how you got to the resultset: it's
2760 C<me> by default, but eg. L</search_related> aliases it to the related result
2761 source name (and keeps C<me> referring to the original result set). The long
2762 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2763 (and make this method unnecessary).
2764
2765 Thus it's currently necessary to use this method in predefined queries (see
2766 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2767 source alias of the current result set:
2768
2769   # in a result set class
2770   sub modified_by {
2771     my ($self, $user) = @_;
2772
2773     my $me = $self->current_source_alias;
2774
2775     return $self->search(
2776       "$me.modified" => $user->id,
2777     );
2778   }
2779
2780 =cut
2781
2782 sub current_source_alias {
2783   my ($self) = @_;
2784
2785   return ($self->{attrs} || {})->{alias} || 'me';
2786 }
2787
2788 =head2 as_subselect_rs
2789
2790 =over 4
2791
2792 =item Arguments: none
2793
2794 =item Return Value: $resultset
2795
2796 =back
2797
2798 Act as a barrier to SQL symbols.  The resultset provided will be made into a
2799 "virtual view" by including it as a subquery within the from clause.  From this
2800 point on, any joined tables are inaccessible to ->search on the resultset (as if
2801 it were simply where-filtered without joins).  For example:
2802
2803  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
2804
2805  # 'x' now pollutes the query namespace
2806
2807  # So the following works as expected
2808  my $ok_rs = $rs->search({'x.other' => 1});
2809
2810  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
2811  # def) we look for one row with contradictory terms and join in another table
2812  # (aliased 'x_2') which we never use
2813  my $broken_rs = $rs->search({'x.name' => 'def'});
2814
2815  my $rs2 = $rs->as_subselect_rs;
2816
2817  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
2818  my $not_joined_rs = $rs2->search({'x.other' => 1});
2819
2820  # works as expected: finds a 'table' row related to two x rows (abc and def)
2821  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
2822
2823 Another example of when one might use this would be to select a subset of
2824 columns in a group by clause:
2825
2826  my $rs = $schema->resultset('Bar')->search(undef, {
2827    group_by => [qw{ id foo_id baz_id }],
2828  })->as_subselect_rs->search(undef, {
2829    columns => [qw{ id foo_id }]
2830  });
2831
2832 In the above example normally columns would have to be equal to the group by,
2833 but because we isolated the group by into a subselect the above works.
2834
2835 =cut
2836
2837 sub as_subselect_rs {
2838   my $self = shift;
2839
2840   my $attrs = $self->_resolved_attrs;
2841
2842   my $fresh_rs = (ref $self)->new (
2843     $self->result_source
2844   );
2845
2846   # these pieces will be locked in the subquery
2847   delete $fresh_rs->{cond};
2848   delete @{$fresh_rs->{attrs}}{qw/where bind/};
2849
2850   return $fresh_rs->search( {}, {
2851     from => [{
2852       $attrs->{alias} => $self->as_query,
2853       -alias         => $attrs->{alias},
2854       -source_handle => $self->result_source->handle,
2855     }],
2856     alias => $attrs->{alias},
2857   });
2858 }
2859
2860 # This code is called by search_related, and makes sure there
2861 # is clear separation between the joins before, during, and
2862 # after the relationship. This information is needed later
2863 # in order to properly resolve prefetch aliases (any alias
2864 # with a relation_chain_depth less than the depth of the
2865 # current prefetch is not considered)
2866 #
2867 # The increments happen twice per join. An even number means a
2868 # relationship specified via a search_related, whereas an odd
2869 # number indicates a join/prefetch added via attributes
2870 #
2871 # Also this code will wrap the current resultset (the one we
2872 # chain to) in a subselect IFF it contains limiting attributes
2873 sub _chain_relationship {
2874   my ($self, $rel) = @_;
2875   my $source = $self->result_source;
2876   my $attrs = { %{$self->{attrs}||{}} };
2877
2878   # we need to take the prefetch the attrs into account before we
2879   # ->_resolve_join as otherwise they get lost - captainL
2880   my $join = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
2881
2882   delete @{$attrs}{qw/join prefetch collapse group_by distinct select as columns +select +as +columns/};
2883
2884   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
2885
2886   my $from;
2887   my @force_subq_attrs = qw/offset rows group_by having/;
2888
2889   if (
2890     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
2891       ||
2892     $self->_has_resolved_attr (@force_subq_attrs)
2893   ) {
2894     # Nuke the prefetch (if any) before the new $rs attrs
2895     # are resolved (prefetch is useless - we are wrapping
2896     # a subquery anyway).
2897     my $rs_copy = $self->search;
2898     $rs_copy->{attrs}{join} = $self->_merge_attr (
2899       $rs_copy->{attrs}{join},
2900       delete $rs_copy->{attrs}{prefetch},
2901     );
2902
2903     $from = [{
2904       -source_handle => $source->handle,
2905       -alias => $attrs->{alias},
2906       $attrs->{alias} => $rs_copy->as_query,
2907     }];
2908     delete @{$attrs}{@force_subq_attrs, qw/where bind/};
2909     $seen->{-relation_chain_depth} = 0;
2910   }
2911   elsif ($attrs->{from}) {  #shallow copy suffices
2912     $from = [ @{$attrs->{from}} ];
2913   }
2914   else {
2915     $from = [{
2916       -source_handle => $source->handle,
2917       -alias => $attrs->{alias},
2918       $attrs->{alias} => $source->from,
2919     }];
2920   }
2921
2922   my $jpath = ($seen->{-relation_chain_depth})
2923     ? $from->[-1][0]{-join_path}
2924     : [];
2925
2926   my @requested_joins = $source->_resolve_join(
2927     $join,
2928     $attrs->{alias},
2929     $seen,
2930     $jpath,
2931   );
2932
2933   push @$from, @requested_joins;
2934
2935   $seen->{-relation_chain_depth}++;
2936
2937   # if $self already had a join/prefetch specified on it, the requested
2938   # $rel might very well be already included. What we do in this case
2939   # is effectively a no-op (except that we bump up the chain_depth on
2940   # the join in question so we could tell it *is* the search_related)
2941   my $already_joined;
2942
2943   # we consider the last one thus reverse
2944   for my $j (reverse @requested_joins) {
2945     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
2946     if ($rel eq $last_j) {
2947       $j->[0]{-relation_chain_depth}++;
2948       $already_joined++;
2949       last;
2950     }
2951   }
2952
2953   unless ($already_joined) {
2954     push @$from, $source->_resolve_join(
2955       $rel,
2956       $attrs->{alias},
2957       $seen,
2958       $jpath,
2959     );
2960   }
2961
2962   $seen->{-relation_chain_depth}++;
2963
2964   return {%$attrs, from => $from, seen_join => $seen};
2965 }
2966
2967 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
2968 sub _resolved_attrs_copy {
2969   my $self = shift;
2970   return { %{$self->_resolved_attrs (@_)} };
2971 }
2972
2973 sub _resolved_attrs {
2974   my $self = shift;
2975   return $self->{_attrs} if $self->{_attrs};
2976
2977   my $attrs  = { %{ $self->{attrs} || {} } };
2978   my $source = $self->result_source;
2979   my $alias  = $attrs->{alias};
2980
2981   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2982   my @colbits;
2983
2984   # build columns (as long as select isn't set) into a set of as/select hashes
2985   unless ( $attrs->{select} ) {
2986
2987     my @cols;
2988     if ( ref $attrs->{columns} eq 'ARRAY' ) {
2989       @cols = @{ delete $attrs->{columns}}
2990     } elsif ( defined $attrs->{columns} ) {
2991       @cols = delete $attrs->{columns}
2992     } else {
2993       @cols = $source->columns
2994     }
2995
2996     for (@cols) {
2997       if ( ref $_ eq 'HASH' ) {
2998         push @colbits, $_
2999       } else {
3000         my $key = /^\Q${alias}.\E(.+)$/
3001           ? "$1"
3002           : "$_";
3003         my $value = /\./
3004           ? "$_"
3005           : "${alias}.$_";
3006         push @colbits, { $key => $value };
3007       }
3008     }
3009   }
3010
3011   # add the additional columns on
3012   foreach (qw{include_columns +columns}) {
3013     if ( $attrs->{$_} ) {
3014       my @list = ( ref($attrs->{$_}) eq 'ARRAY' )
3015         ? @{ delete $attrs->{$_} }
3016         : delete $attrs->{$_};
3017       for (@list) {
3018         if ( ref($_) eq 'HASH' ) {
3019           push @colbits, $_
3020         } else {
3021           my $key = ( split /\./, $_ )[-1];
3022           my $value = ( /\./ ? $_ : "$alias.$_" );
3023           push @colbits, { $key => $value };
3024         }
3025       }
3026     }
3027   }
3028
3029   # start with initial select items
3030   if ( $attrs->{select} ) {
3031     $attrs->{select} =
3032         ( ref $attrs->{select} eq 'ARRAY' )
3033       ? [ @{ $attrs->{select} } ]
3034       : [ $attrs->{select} ];
3035
3036     if ( $attrs->{as} ) {
3037       $attrs->{as} =
3038         (
3039           ref $attrs->{as} eq 'ARRAY'
3040             ? [ @{ $attrs->{as} } ]
3041             : [ $attrs->{as} ]
3042         )
3043     } else {
3044       $attrs->{as} = [ map {
3045          m/^\Q${alias}.\E(.+)$/
3046            ? $1
3047            : $_
3048          } @{ $attrs->{select} }
3049       ]
3050     }
3051   }
3052   else {
3053
3054     # otherwise we intialise select & as to empty
3055     $attrs->{select} = [];
3056     $attrs->{as}     = [];
3057   }
3058
3059   # now add colbits to select/as
3060   push @{ $attrs->{select} }, map values %{$_}, @colbits;
3061   push @{ $attrs->{as}     }, map keys   %{$_}, @colbits;
3062
3063   if ( my $adds = delete $attrs->{'+select'} ) {
3064     $adds = [$adds] unless ref $adds eq 'ARRAY';
3065     push @{ $attrs->{select} },
3066       map { /\./ || ref $_ ? $_ : "$alias.$_" } @$adds;
3067   }
3068   if ( my $adds = delete $attrs->{'+as'} ) {
3069     $adds = [$adds] unless ref $adds eq 'ARRAY';
3070     push @{ $attrs->{as} }, @$adds;
3071   }
3072
3073   $attrs->{from} ||= [{
3074     -source_handle => $source->handle,
3075     -alias => $self->{attrs}{alias},
3076     $self->{attrs}{alias} => $source->from,
3077   }];
3078
3079   if ( $attrs->{join} || $attrs->{prefetch} ) {
3080
3081     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3082       if ref $attrs->{from} ne 'ARRAY';
3083
3084     my $join = delete $attrs->{join} || {};
3085
3086     if ( defined $attrs->{prefetch} ) {
3087       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
3088     }
3089
3090     $attrs->{from} =    # have to copy here to avoid corrupting the original
3091       [
3092         @{ $attrs->{from} },
3093         $source->_resolve_join(
3094           $join,
3095           $alias,
3096           { %{ $attrs->{seen_join} || {} } },
3097           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3098             ? $attrs->{from}[-1][0]{-join_path}
3099             : []
3100           ,
3101         )
3102       ];
3103   }
3104
3105   if ( defined $attrs->{order_by} ) {
3106     $attrs->{order_by} = (
3107       ref( $attrs->{order_by} ) eq 'ARRAY'
3108       ? [ @{ $attrs->{order_by} } ]
3109       : [ $attrs->{order_by} || () ]
3110     );
3111   }
3112
3113   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3114     $attrs->{group_by} = [ $attrs->{group_by} ];
3115   }
3116
3117   # generate the distinct induced group_by early, as prefetch will be carried via a
3118   # subquery (since a group_by is present)
3119   if (delete $attrs->{distinct}) {
3120     if ($attrs->{group_by}) {
3121       carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3122     }
3123     else {
3124       $attrs->{group_by} = $source->storage->_group_over_selection (
3125         @{$attrs}{qw/from select order_by/}
3126       );
3127     }
3128   }
3129
3130   $attrs->{collapse} ||= {};
3131   if ( my $prefetch = delete $attrs->{prefetch} ) {
3132     $prefetch = $self->_merge_attr( {}, $prefetch );
3133
3134     my $prefetch_ordering = [];
3135
3136     # this is a separate structure (we don't look in {from} directly)
3137     # as the resolver needs to shift things off the lists to work
3138     # properly (identical-prefetches on different branches)
3139     my $join_map = {};
3140     if (ref $attrs->{from} eq 'ARRAY') {
3141
3142       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3143
3144       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3145         next unless $j->[0]{-alias};
3146         next unless $j->[0]{-join_path};
3147         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3148
3149         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3150
3151         my $p = $join_map;
3152         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3153         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3154       }
3155     }
3156
3157     my @prefetch =
3158       $source->_resolve_prefetch( $prefetch, $alias, $join_map, $prefetch_ordering, $attrs->{collapse} );
3159
3160     # we need to somehow mark which columns came from prefetch
3161     $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
3162
3163     push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
3164     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3165
3166     push( @{$attrs->{order_by}}, @$prefetch_ordering );
3167     $attrs->{_collapse_order_by} = \@$prefetch_ordering;
3168   }
3169
3170   # if both page and offset are specified, produce a combined offset
3171   # even though it doesn't make much sense, this is what pre 081xx has
3172   # been doing
3173   if (my $page = delete $attrs->{page}) {
3174     $attrs->{offset} =
3175       ($attrs->{rows} * ($page - 1))
3176             +
3177       ($attrs->{offset} || 0)
3178     ;
3179   }
3180
3181   return $self->{_attrs} = $attrs;
3182 }
3183
3184 sub _rollout_attr {
3185   my ($self, $attr) = @_;
3186
3187   if (ref $attr eq 'HASH') {
3188     return $self->_rollout_hash($attr);
3189   } elsif (ref $attr eq 'ARRAY') {
3190     return $self->_rollout_array($attr);
3191   } else {
3192     return [$attr];
3193   }
3194 }
3195
3196 sub _rollout_array {
3197   my ($self, $attr) = @_;
3198
3199   my @rolled_array;
3200   foreach my $element (@{$attr}) {
3201     if (ref $element eq 'HASH') {
3202       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3203     } elsif (ref $element eq 'ARRAY') {
3204       #  XXX - should probably recurse here
3205       push( @rolled_array, @{$self->_rollout_array($element)} );
3206     } else {
3207       push( @rolled_array, $element );
3208     }
3209   }
3210   return \@rolled_array;
3211 }
3212
3213 sub _rollout_hash {
3214   my ($self, $attr) = @_;
3215
3216   my @rolled_array;
3217   foreach my $key (keys %{$attr}) {
3218     push( @rolled_array, { $key => $attr->{$key} } );
3219   }
3220   return \@rolled_array;
3221 }
3222
3223 sub _calculate_score {
3224   my ($self, $a, $b) = @_;
3225
3226   if (defined $a xor defined $b) {
3227     return 0;
3228   }
3229   elsif (not defined $a) {
3230     return 1;
3231   }
3232
3233   if (ref $b eq 'HASH') {
3234     my ($b_key) = keys %{$b};
3235     if (ref $a eq 'HASH') {
3236       my ($a_key) = keys %{$a};
3237       if ($a_key eq $b_key) {
3238         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3239       } else {
3240         return 0;
3241       }
3242     } else {
3243       return ($a eq $b_key) ? 1 : 0;
3244     }
3245   } else {
3246     if (ref $a eq 'HASH') {
3247       my ($a_key) = keys %{$a};
3248       return ($b eq $a_key) ? 1 : 0;
3249     } else {
3250       return ($b eq $a) ? 1 : 0;
3251     }
3252   }
3253 }
3254
3255 sub _merge_attr {
3256   my ($self, $orig, $import) = @_;
3257
3258   return $import unless defined($orig);
3259   return $orig unless defined($import);
3260
3261   $orig = $self->_rollout_attr($orig);
3262   $import = $self->_rollout_attr($import);
3263
3264   my $seen_keys;
3265   foreach my $import_element ( @{$import} ) {
3266     # find best candidate from $orig to merge $b_element into
3267     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3268     foreach my $orig_element ( @{$orig} ) {
3269       my $score = $self->_calculate_score( $orig_element, $import_element );
3270       if ($score > $best_candidate->{score}) {
3271         $best_candidate->{position} = $position;
3272         $best_candidate->{score} = $score;
3273       }
3274       $position++;
3275     }
3276     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3277
3278     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3279       push( @{$orig}, $import_element );
3280     } else {
3281       my $orig_best = $orig->[$best_candidate->{position}];
3282       # merge orig_best and b_element together and replace original with merged
3283       if (ref $orig_best ne 'HASH') {
3284         $orig->[$best_candidate->{position}] = $import_element;
3285       } elsif (ref $import_element eq 'HASH') {
3286         my ($key) = keys %{$orig_best};
3287         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
3288       }
3289     }
3290     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3291   }
3292
3293   return $orig;
3294 }
3295
3296 sub result_source {
3297     my $self = shift;
3298
3299     if (@_) {
3300         $self->_source_handle($_[0]->handle);
3301     } else {
3302         $self->_source_handle->resolve;
3303     }
3304 }
3305
3306 =head2 throw_exception
3307
3308 See L<DBIx::Class::Schema/throw_exception> for details.
3309
3310 =cut
3311
3312 sub throw_exception {
3313   my $self=shift;
3314
3315   if (ref $self && $self->_source_handle->schema) {
3316     $self->_source_handle->schema->throw_exception(@_)
3317   }
3318   else {
3319     DBIx::Class::Exception->throw(@_);
3320   }
3321 }
3322
3323 # XXX: FIXME: Attributes docs need clearing up
3324
3325 =head1 ATTRIBUTES
3326
3327 Attributes are used to refine a ResultSet in various ways when
3328 searching for data. They can be passed to any method which takes an
3329 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3330 L</count>.
3331
3332 These are in no particular order:
3333
3334 =head2 order_by
3335
3336 =over 4
3337
3338 =item Value: ( $order_by | \@order_by | \%order_by )
3339
3340 =back
3341
3342 Which column(s) to order the results by.
3343
3344 [The full list of suitable values is documented in
3345 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3346 common options.]
3347
3348 If a single column name, or an arrayref of names is supplied, the
3349 argument is passed through directly to SQL. The hashref syntax allows
3350 for connection-agnostic specification of ordering direction:
3351
3352  For descending order:
3353
3354   order_by => { -desc => [qw/col1 col2 col3/] }
3355
3356  For explicit ascending order:
3357
3358   order_by => { -asc => 'col' }
3359
3360 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3361 supported, although you are strongly encouraged to use the hashref
3362 syntax as outlined above.
3363
3364 =head2 columns
3365
3366 =over 4
3367
3368 =item Value: \@columns
3369
3370 =back
3371
3372 Shortcut to request a particular set of columns to be retrieved. Each
3373 column spec may be a string (a table column name), or a hash (in which
3374 case the key is the C<as> value, and the value is used as the C<select>
3375 expression). Adds C<me.> onto the start of any column without a C<.> in
3376 it and sets C<select> from that, then auto-populates C<as> from
3377 C<select> as normal. (You may also use the C<cols> attribute, as in
3378 earlier versions of DBIC.)
3379
3380 Essentially C<columns> does the same as L</select> and L</as>.
3381
3382     columns => [ 'foo', { bar => 'baz' } ]
3383
3384 is the same as
3385
3386     select => [qw/foo baz/],
3387     as => [qw/foo bar/]
3388
3389 =head2 +columns
3390
3391 =over 4
3392
3393 =item Value: \@columns
3394
3395 =back
3396
3397 Indicates additional columns to be selected from storage. Works the same
3398 as L</columns> but adds columns to the selection. (You may also use the
3399 C<include_columns> attribute, as in earlier versions of DBIC). For
3400 example:-
3401
3402   $schema->resultset('CD')->search(undef, {
3403     '+columns' => ['artist.name'],
3404     join => ['artist']
3405   });
3406
3407 would return all CDs and include a 'name' column to the information
3408 passed to object inflation. Note that the 'artist' is the name of the
3409 column (or relationship) accessor, and 'name' is the name of the column
3410 accessor in the related table.
3411
3412 =head2 include_columns
3413
3414 =over 4
3415
3416 =item Value: \@columns
3417
3418 =back
3419
3420 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3421
3422 =head2 select
3423
3424 =over 4
3425
3426 =item Value: \@select_columns
3427
3428 =back
3429
3430 Indicates which columns should be selected from the storage. You can use
3431 column names, or in the case of RDBMS back ends, function or stored procedure
3432 names:
3433
3434   $rs = $schema->resultset('Employee')->search(undef, {
3435     select => [
3436       'name',
3437       { count => 'employeeid' },
3438       { max => { length => 'name' }, -as => 'longest_name' }
3439     ]
3440   });
3441
3442   # Equivalent SQL
3443   SELECT name, COUNT( employeeid ), MAX( LENGTH( name ) ) AS longest_name FROM employee
3444
3445 B<NOTE:> You will almost always need a corresponding L</as> attribute when you
3446 use L</select>, to instruct DBIx::Class how to store the result of the column.
3447 Also note that the L</as> attribute has nothing to do with the SQL-side 'AS'
3448 identifier aliasing. You can however alias a function, so you can use it in
3449 e.g. an C<ORDER BY> clause. This is done via the C<-as> B<select function
3450 attribute> supplied as shown in the example above.
3451
3452 =head2 +select
3453
3454 =over 4
3455
3456 Indicates additional columns to be selected from storage.  Works the same as
3457 L</select> but adds columns to the default selection, instead of specifying
3458 an explicit list.
3459
3460 =back
3461
3462 =head2 +as
3463
3464 =over 4
3465
3466 Indicates additional column names for those added via L</+select>. See L</as>.
3467
3468 =back
3469
3470 =head2 as
3471
3472 =over 4
3473
3474 =item Value: \@inflation_names
3475
3476 =back
3477
3478 Indicates column names for object inflation. That is L</as> indicates the
3479 slot name in which the column value will be stored within the
3480 L<Row|DBIx::Class::Row> object. The value will then be accessible via this
3481 identifier by the C<get_column> method (or via the object accessor B<if one
3482 with the same name already exists>) as shown below. The L</as> attribute has
3483 B<nothing to do> with the SQL-side C<AS>. See L</select> for details.
3484
3485   $rs = $schema->resultset('Employee')->search(undef, {
3486     select => [
3487       'name',
3488       { count => 'employeeid' },
3489       { max => { length => 'name' }, -as => 'longest_name' }
3490     ],
3491     as => [qw/
3492       name
3493       employee_count
3494       max_name_length
3495     /],
3496   });
3497
3498 If the object against which the search is performed already has an accessor
3499 matching a column name specified in C<as>, the value can be retrieved using
3500 the accessor as normal:
3501
3502   my $name = $employee->name();
3503
3504 If on the other hand an accessor does not exist in the object, you need to
3505 use C<get_column> instead:
3506
3507   my $employee_count = $employee->get_column('employee_count');
3508
3509 You can create your own accessors if required - see
3510 L<DBIx::Class::Manual::Cookbook> for details.
3511
3512 =head2 join
3513
3514 =over 4
3515
3516 =item Value: ($rel_name | \@rel_names | \%rel_names)
3517
3518 =back
3519
3520 Contains a list of relationships that should be joined for this query.  For
3521 example:
3522
3523   # Get CDs by Nine Inch Nails
3524   my $rs = $schema->resultset('CD')->search(
3525     { 'artist.name' => 'Nine Inch Nails' },
3526     { join => 'artist' }
3527   );
3528
3529 Can also contain a hash reference to refer to the other relation's relations.
3530 For example:
3531
3532   package MyApp::Schema::Track;
3533   use base qw/DBIx::Class/;
3534   __PACKAGE__->table('track');
3535   __PACKAGE__->add_columns(qw/trackid cd position title/);
3536   __PACKAGE__->set_primary_key('trackid');
3537   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3538   1;
3539
3540   # In your application
3541   my $rs = $schema->resultset('Artist')->search(
3542     { 'track.title' => 'Teardrop' },
3543     {
3544       join     => { cd => 'track' },
3545       order_by => 'artist.name',
3546     }
3547   );
3548
3549 You need to use the relationship (not the table) name in  conditions,
3550 because they are aliased as such. The current table is aliased as "me", so
3551 you need to use me.column_name in order to avoid ambiguity. For example:
3552
3553   # Get CDs from 1984 with a 'Foo' track
3554   my $rs = $schema->resultset('CD')->search(
3555     {
3556       'me.year' => 1984,
3557       'tracks.name' => 'Foo'
3558     },
3559     { join => 'tracks' }
3560   );
3561
3562 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3563 similarly for a third time). For e.g.
3564
3565   my $rs = $schema->resultset('Artist')->search({
3566     'cds.title'   => 'Down to Earth',
3567     'cds_2.title' => 'Popular',
3568   }, {
3569     join => [ qw/cds cds/ ],
3570   });
3571
3572 will return a set of all artists that have both a cd with title 'Down
3573 to Earth' and a cd with title 'Popular'.
3574
3575 If you want to fetch related objects from other tables as well, see C<prefetch>
3576 below.
3577
3578 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3579
3580 =head2 prefetch
3581
3582 =over 4
3583
3584 =item Value: ($rel_name | \@rel_names | \%rel_names)
3585
3586 =back
3587
3588 Contains one or more relationships that should be fetched along with
3589 the main query (when they are accessed afterwards the data will
3590 already be available, without extra queries to the database).  This is
3591 useful for when you know you will need the related objects, because it
3592 saves at least one query:
3593
3594   my $rs = $schema->resultset('Tag')->search(
3595     undef,
3596     {
3597       prefetch => {
3598         cd => 'artist'
3599       }
3600     }
3601   );
3602
3603 The initial search results in SQL like the following:
3604
3605   SELECT tag.*, cd.*, artist.* FROM tag
3606   JOIN cd ON tag.cd = cd.cdid
3607   JOIN artist ON cd.artist = artist.artistid
3608
3609 L<DBIx::Class> has no need to go back to the database when we access the
3610 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3611 case.
3612
3613 Simple prefetches will be joined automatically, so there is no need
3614 for a C<join> attribute in the above search.
3615
3616 C<prefetch> can be used with the following relationship types: C<belongs_to>,
3617 C<has_one> (or if you're using C<add_relationship>, any relationship declared
3618 with an accessor type of 'single' or 'filter'). A more complex example that
3619 prefetches an artists cds, the tracks on those cds, and the tags associated
3620 with that artist is given below (assuming many-to-many from artists to tags):
3621
3622  my $rs = $schema->resultset('Artist')->search(
3623    undef,
3624    {
3625      prefetch => [
3626        { cds => 'tracks' },
3627        { artist_tags => 'tags' }
3628      ]
3629    }
3630  );
3631
3632
3633 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
3634 attributes will be ignored.
3635
3636 B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
3637 exactly as you might expect.
3638
3639 =over 4
3640
3641 =item *
3642
3643 Prefetch uses the L</cache> to populate the prefetched relationships. This
3644 may or may not be what you want.
3645
3646 =item *
3647
3648 If you specify a condition on a prefetched relationship, ONLY those
3649 rows that match the prefetched condition will be fetched into that relationship.
3650 This means that adding prefetch to a search() B<may alter> what is returned by
3651 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
3652
3653   my $artist_rs = $schema->resultset('Artist')->search({
3654       'cds.year' => 2008,
3655   }, {
3656       join => 'cds',
3657   });
3658
3659   my $count = $artist_rs->first->cds->count;
3660
3661   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
3662
3663   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
3664
3665   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
3666
3667 that cmp_ok() may or may not pass depending on the datasets involved. This
3668 behavior may or may not survive the 0.09 transition.
3669
3670 =back
3671
3672 =head2 page
3673
3674 =over 4
3675
3676 =item Value: $page
3677
3678 =back
3679
3680 Makes the resultset paged and specifies the page to retrieve. Effectively
3681 identical to creating a non-pages resultset and then calling ->page($page)
3682 on it.
3683
3684 If L<rows> attribute is not specified it defaults to 10 rows per page.
3685
3686 When you have a paged resultset, L</count> will only return the number
3687 of rows in the page. To get the total, use the L</pager> and call
3688 C<total_entries> on it.
3689
3690 =head2 rows
3691
3692 =over 4
3693
3694 =item Value: $rows
3695
3696 =back
3697
3698 Specifies the maximum number of rows for direct retrieval or the number of
3699 rows per page if the page attribute or method is used.
3700
3701 =head2 offset
3702
3703 =over 4
3704
3705 =item Value: $offset
3706
3707 =back
3708
3709 Specifies the (zero-based) row number for the  first row to be returned, or the
3710 of the first row of the first page if paging is used.
3711
3712 =head2 group_by
3713
3714 =over 4
3715
3716 =item Value: \@columns
3717
3718 =back
3719
3720 A arrayref of columns to group by. Can include columns of joined tables.
3721
3722   group_by => [qw/ column1 column2 ... /]
3723
3724 =head2 having
3725
3726 =over 4
3727
3728 =item Value: $condition
3729
3730 =back
3731
3732 HAVING is a select statement attribute that is applied between GROUP BY and
3733 ORDER BY. It is applied to the after the grouping calculations have been
3734 done.
3735
3736   having => { 'count(employee)' => { '>=', 100 } }
3737
3738 =head2 distinct
3739
3740 =over 4
3741
3742 =item Value: (0 | 1)
3743
3744 =back
3745
3746 Set to 1 to group by all columns. If the resultset already has a group_by
3747 attribute, this setting is ignored and an appropriate warning is issued.
3748
3749 =head2 where
3750
3751 =over 4
3752
3753 Adds to the WHERE clause.
3754
3755   # only return rows WHERE deleted IS NULL for all searches
3756   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
3757
3758 Can be overridden by passing C<< { where => undef } >> as an attribute
3759 to a resultset.
3760
3761 =back
3762
3763 =head2 cache
3764
3765 Set to 1 to cache search results. This prevents extra SQL queries if you
3766 revisit rows in your ResultSet:
3767
3768   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
3769
3770   while( my $artist = $resultset->next ) {
3771     ... do stuff ...
3772   }
3773
3774   $rs->first; # without cache, this would issue a query
3775
3776 By default, searches are not cached.
3777
3778 For more examples of using these attributes, see
3779 L<DBIx::Class::Manual::Cookbook>.
3780
3781 =head2 for
3782
3783 =over 4
3784
3785 =item Value: ( 'update' | 'shared' )
3786
3787 =back
3788
3789 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3790 ... FOR SHARED.
3791
3792 =cut
3793
3794 1;