aa6477484df5864fc65418e2d19a1441c18ab7e0
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use DBIx::Class::Exception;
11 use Data::Page;
12 use Storable;
13 use DBIx::Class::ResultSetColumn;
14 use DBIx::Class::ResultSourceHandle;
15 use List::Util ();
16 use Scalar::Util ();
17
18 use base qw/DBIx::Class/;
19
20 #use Test::Deep::NoTest (qw/eq_deeply/);
21 use Data::Dumper::Concise;
22
23 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
24
25 =head1 NAME
26
27 DBIx::Class::ResultSet - Represents a query used for fetching a set of results.
28
29 =head1 SYNOPSIS
30
31   my $users_rs   = $schema->resultset('User');
32   my $registered_users_rs   = $schema->resultset('User')->search({ registered => 1 });
33   my @cds_in_2005 = $schema->resultset('CD')->search({ year => 2005 })->all();
34
35 =head1 DESCRIPTION
36
37 A ResultSet is an object which stores a set of conditions representing
38 a query. It is the backbone of DBIx::Class (i.e. the really
39 important/useful bit).
40
41 No SQL is executed on the database when a ResultSet is created, it
42 just stores all the conditions needed to create the query.
43
44 A basic ResultSet representing the data of an entire table is returned
45 by calling C<resultset> on a L<DBIx::Class::Schema> and passing in a
46 L<Source|DBIx::Class::Manual::Glossary/Source> name.
47
48   my $users_rs = $schema->resultset('User');
49
50 A new ResultSet is returned from calling L</search> on an existing
51 ResultSet. The new one will contain all the conditions of the
52 original, plus any new conditions added in the C<search> call.
53
54 A ResultSet also incorporates an implicit iterator. L</next> and L</reset>
55 can be used to walk through all the L<DBIx::Class::Row>s the ResultSet
56 represents.
57
58 The query that the ResultSet represents is B<only> executed against
59 the database when these methods are called:
60 L</find> L</next> L</all> L</first> L</single> L</count>
61
62 =head1 EXAMPLES
63
64 =head2 Chaining resultsets
65
66 Let's say you've got a query that needs to be run to return some data
67 to the user. But, you have an authorization system in place that
68 prevents certain users from seeing certain information. So, you want
69 to construct the basic query in one method, but add constraints to it in
70 another.
71
72   sub get_data {
73     my $self = shift;
74     my $request = $self->get_request; # Get a request object somehow.
75     my $schema = $self->get_schema;   # Get the DBIC schema object somehow.
76
77     my $cd_rs = $schema->resultset('CD')->search({
78       title => $request->param('title'),
79       year => $request->param('year'),
80     });
81
82     $self->apply_security_policy( $cd_rs );
83
84     return $cd_rs->all();
85   }
86
87   sub apply_security_policy {
88     my $self = shift;
89     my ($rs) = @_;
90
91     return $rs->search({
92       subversive => 0,
93     });
94   }
95
96 =head3 Resolving conditions and attributes
97
98 When a resultset is chained from another resultset, conditions and
99 attributes with the same keys need resolving.
100
101 L</join>, L</prefetch>, L</+select>, L</+as> attributes are merged
102 into the existing ones from the original resultset.
103
104 The L</where>, L</having> attribute, and any search conditions are
105 merged with an SQL C<AND> to the existing condition from the original
106 resultset.
107
108 All other attributes are overridden by any new ones supplied in the
109 search attributes.
110
111 =head2 Multiple queries
112
113 Since a resultset just defines a query, you can do all sorts of
114 things with it with the same object.
115
116   # Don't hit the DB yet.
117   my $cd_rs = $schema->resultset('CD')->search({
118     title => 'something',
119     year => 2009,
120   });
121
122   # Each of these hits the DB individually.
123   my $count = $cd_rs->count;
124   my $most_recent = $cd_rs->get_column('date_released')->max();
125   my @records = $cd_rs->all;
126
127 And it's not just limited to SELECT statements.
128
129   $cd_rs->delete();
130
131 This is even cooler:
132
133   $cd_rs->create({ artist => 'Fred' });
134
135 Which is the same as:
136
137   $schema->resultset('CD')->create({
138     title => 'something',
139     year => 2009,
140     artist => 'Fred'
141   });
142
143 See: L</search>, L</count>, L</get_column>, L</all>, L</create>.
144
145 =head1 OVERLOADING
146
147 If a resultset is used in a numeric context it returns the L</count>.
148 However, if it is used in a boolean context it is always true.  So if
149 you want to check if a resultset has any results use C<if $rs != 0>.
150 C<if $rs> will always be true.
151
152 =head1 METHODS
153
154 =head2 new
155
156 =over 4
157
158 =item Arguments: $source, \%$attrs
159
160 =item Return Value: $rs
161
162 =back
163
164 The resultset constructor. Takes a source object (usually a
165 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
166 L</ATTRIBUTES> below).  Does not perform any queries -- these are
167 executed as needed by the other methods.
168
169 Generally you won't need to construct a resultset manually.  You'll
170 automatically get one from e.g. a L</search> called in scalar context:
171
172   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
173
174 IMPORTANT: If called on an object, proxies to new_result instead so
175
176   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
177
178 will return a CD object, not a ResultSet.
179
180 =cut
181
182 sub new {
183   my $class = shift;
184   return $class->new_result(@_) if ref $class;
185
186   my ($source, $attrs) = @_;
187   $source = $source->handle
188     unless $source->isa('DBIx::Class::ResultSourceHandle');
189   $attrs = { %{$attrs||{}} };
190
191   if ($attrs->{page}) {
192     $attrs->{rows} ||= 10;
193   }
194
195   $attrs->{alias} ||= 'me';
196
197   # Creation of {} and bless separated to mitigate RH perl bug
198   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
199   my $self = {
200     _source_handle => $source,
201     cond => $attrs->{where},
202     count => undef,
203     pager => undef,
204     attrs => $attrs
205   };
206
207   bless $self, $class;
208
209   $self->result_class(
210     $attrs->{result_class} || $source->resolve->result_class
211   );
212
213   return $self;
214 }
215
216 =head2 search
217
218 =over 4
219
220 =item Arguments: $cond, \%attrs?
221
222 =item Return Value: $resultset (scalar context), @row_objs (list context)
223
224 =back
225
226   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
227   my $new_rs = $cd_rs->search({ year => 2005 });
228
229   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
230                  # year = 2005 OR year = 2004
231
232 If you need to pass in additional attributes but no additional condition,
233 call it as C<search(undef, \%attrs)>.
234
235   # "SELECT name, artistid FROM $artist_table"
236   my @all_artists = $schema->resultset('Artist')->search(undef, {
237     columns => [qw/name artistid/],
238   });
239
240 For a list of attributes that can be passed to C<search>, see
241 L</ATTRIBUTES>. For more examples of using this function, see
242 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
243 documentation for the first argument, see L<SQL::Abstract>.
244
245 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
246
247 =cut
248
249 sub search {
250   my $self = shift;
251   my $rs = $self->search_rs( @_ );
252   return (wantarray ? $rs->all : $rs);
253 }
254
255 =head2 search_rs
256
257 =over 4
258
259 =item Arguments: $cond, \%attrs?
260
261 =item Return Value: $resultset
262
263 =back
264
265 This method does the same exact thing as search() except it will
266 always return a resultset, even in list context.
267
268 =cut
269
270 sub search_rs {
271   my $self = shift;
272
273   # Special-case handling for (undef, undef).
274   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
275     pop(@_); pop(@_);
276   }
277
278   my $attrs = {};
279   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
280   my $our_attrs = { %{$self->{attrs}} };
281   my $having = delete $our_attrs->{having};
282   my $where = delete $our_attrs->{where};
283
284   my $rows;
285
286   my %safe = (alias => 1, cache => 1);
287
288   unless (
289     (@_ && defined($_[0])) # @_ == () or (undef)
290     ||
291     (keys %$attrs # empty attrs or only 'safe' attrs
292     && List::Util::first { !$safe{$_} } keys %$attrs)
293   ) {
294     # no search, effectively just a clone
295     $rows = $self->get_cache;
296   }
297
298   # reset the selector list
299   if (List::Util::first { exists $attrs->{$_} } qw{columns select as}) {
300      delete @{$our_attrs}{qw{select as columns +select +as +columns include_columns}};
301   }
302
303   my $new_attrs = { %{$our_attrs}, %{$attrs} };
304
305   # merge new attrs into inherited
306   foreach my $key (qw/join prefetch +select +as +columns include_columns bind/) {
307     next unless exists $attrs->{$key};
308     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
309   }
310
311   my $cond = (@_
312     ? (
313         (@_ == 1 || ref $_[0] eq "HASH")
314           ? (
315               (ref $_[0] eq 'HASH')
316                 ? (
317                     (keys %{ $_[0] }  > 0)
318                       ? shift
319                       : undef
320                    )
321                 :  shift
322              )
323           : (
324               (@_ % 2)
325                 ? $self->throw_exception("Odd number of arguments to search")
326                 : {@_}
327              )
328       )
329     : undef
330   );
331
332   if (defined $where) {
333     $new_attrs->{where} = (
334       defined $new_attrs->{where}
335         ? { '-and' => [
336               map {
337                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
338               } $where, $new_attrs->{where}
339             ]
340           }
341         : $where);
342   }
343
344   if (defined $cond) {
345     $new_attrs->{where} = (
346       defined $new_attrs->{where}
347         ? { '-and' => [
348               map {
349                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
350               } $cond, $new_attrs->{where}
351             ]
352           }
353         : $cond);
354   }
355
356   if (defined $having) {
357     $new_attrs->{having} = (
358       defined $new_attrs->{having}
359         ? { '-and' => [
360               map {
361                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
362               } $having, $new_attrs->{having}
363             ]
364           }
365         : $having);
366   }
367
368   my $rs = (ref $self)->new($self->result_source, $new_attrs);
369
370   $rs->set_cache($rows) if ($rows);
371
372   return $rs;
373 }
374
375 =head2 search_literal
376
377 =over 4
378
379 =item Arguments: $sql_fragment, @bind_values
380
381 =item Return Value: $resultset (scalar context), @row_objs (list context)
382
383 =back
384
385   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
386   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
387
388 Pass a literal chunk of SQL to be added to the conditional part of the
389 resultset query.
390
391 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
392 only be used in that context. C<search_literal> is a convenience method.
393 It is equivalent to calling $schema->search(\[]), but if you want to ensure
394 columns are bound correctly, use C<search>.
395
396 Example of how to use C<search> instead of C<search_literal>
397
398   my @cds = $cd_rs->search_literal('cdid = ? AND (artist = ? OR artist = ?)', (2, 1, 2));
399   my @cds = $cd_rs->search(\[ 'cdid = ? AND (artist = ? OR artist = ?)', [ 'cdid', 2 ], [ 'artist', 1 ], [ 'artist', 2 ] ]);
400
401
402 See L<DBIx::Class::Manual::Cookbook/Searching> and
403 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
404 require C<search_literal>.
405
406 =cut
407
408 sub search_literal {
409   my ($self, $sql, @bind) = @_;
410   my $attr;
411   if ( @bind && ref($bind[-1]) eq 'HASH' ) {
412     $attr = pop @bind;
413   }
414   return $self->search(\[ $sql, map [ __DUMMY__ => $_ ], @bind ], ($attr || () ));
415 }
416
417 =head2 find
418
419 =over 4
420
421 =item Arguments: @values | \%cols, \%attrs?
422
423 =item Return Value: $row_object | undef
424
425 =back
426
427 Finds a row based on its primary key or unique constraint. For example, to find
428 a row by its primary key:
429
430   my $cd = $schema->resultset('CD')->find(5);
431
432 You can also find a row by a specific unique constraint using the C<key>
433 attribute. For example:
434
435   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
436     key => 'cd_artist_title'
437   });
438
439 Additionally, you can specify the columns explicitly by name:
440
441   my $cd = $schema->resultset('CD')->find(
442     {
443       artist => 'Massive Attack',
444       title  => 'Mezzanine',
445     },
446     { key => 'cd_artist_title' }
447   );
448
449 If the C<key> is specified as C<primary>, it searches only on the primary key.
450
451 If no C<key> is specified, it searches on all unique constraints defined on the
452 source for which column data is provided, including the primary key.
453
454 If your table does not have a primary key, you B<must> provide a value for the
455 C<key> attribute matching one of the unique constraints on the source.
456
457 In addition to C<key>, L</find> recognizes and applies standard
458 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
459
460 Note: If your query does not return only one row, a warning is generated:
461
462   Query returned more than one row
463
464 See also L</find_or_create> and L</update_or_create>. For information on how to
465 declare unique constraints, see
466 L<DBIx::Class::ResultSource/add_unique_constraint>.
467
468 =cut
469
470 sub find {
471   my $self = shift;
472   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
473
474   # Default to the primary key, but allow a specific key
475   my @cols = exists $attrs->{key}
476     ? $self->result_source->unique_constraint_columns($attrs->{key})
477     : $self->result_source->primary_columns;
478   $self->throw_exception(
479     "Can't find unless a primary key is defined or unique constraint is specified"
480   ) unless @cols;
481
482   # Parse out a hashref from input
483   my $input_query;
484   if (ref $_[0] eq 'HASH') {
485     $input_query = { %{$_[0]} };
486   }
487   elsif (@_ == @cols) {
488     $input_query = {};
489     @{$input_query}{@cols} = @_;
490   }
491   else {
492     # Compatibility: Allow e.g. find(id => $value)
493     carp "Find by key => value deprecated; please use a hashref instead";
494     $input_query = {@_};
495   }
496
497   my (%related, $info);
498
499   KEY: foreach my $key (keys %$input_query) {
500     if (ref($input_query->{$key})
501         && ($info = $self->result_source->relationship_info($key))) {
502       my $val = delete $input_query->{$key};
503       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
504       my $rel_q = $self->result_source->_resolve_condition(
505                     $info->{cond}, $val, $key
506                   );
507       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
508       @related{keys %$rel_q} = values %$rel_q;
509     }
510   }
511   if (my @keys = keys %related) {
512     @{$input_query}{@keys} = values %related;
513   }
514
515
516   # Build the final query: Default to the disjunction of the unique queries,
517   # but allow the input query in case the ResultSet defines the query or the
518   # user is abusing find
519   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
520   my $query;
521   if (exists $attrs->{key}) {
522     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
523     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
524     $query = $self->_add_alias($unique_query, $alias);
525   }
526   elsif ($self->{attrs}{accessor} and $self->{attrs}{accessor} eq 'single') {
527     # This means that we got here after a merger of relationship conditions
528     # in ::Relationship::Base::search_related (the row method), and furthermore
529     # the relationship is of the 'single' type. This means that the condition
530     # provided by the relationship (already attached to $self) is sufficient,
531     # as there can be only one row in the database that would satisfy the
532     # relationship
533   }
534   else {
535     my @unique_queries = $self->_unique_queries($input_query, $attrs);
536     $query = @unique_queries
537       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
538       : $self->_add_alias($input_query, $alias);
539   }
540
541   # Run the query
542   my $rs = $self->search ($query, {result_class => $self->result_class, %$attrs});
543   if ($rs->_resolved_attrs->{collapse}) {
544     my $row = $rs->next;
545     carp "Query returned more than one row" if $rs->next;
546     return $row;
547   }
548   else {
549     return $rs->single;
550   }
551 }
552
553 # _add_alias
554 #
555 # Add the specified alias to the specified query hash. A copy is made so the
556 # original query is not modified.
557
558 sub _add_alias {
559   my ($self, $query, $alias) = @_;
560
561   my %aliased = %$query;
562   foreach my $col (grep { ! m/\./ } keys %aliased) {
563     $aliased{"$alias.$col"} = delete $aliased{$col};
564   }
565
566   return \%aliased;
567 }
568
569 # _unique_queries
570 #
571 # Build a list of queries which satisfy unique constraints.
572
573 sub _unique_queries {
574   my ($self, $query, $attrs) = @_;
575
576   my @constraint_names = exists $attrs->{key}
577     ? ($attrs->{key})
578     : $self->result_source->unique_constraint_names;
579
580   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
581   my $num_where = scalar keys %$where;
582
583   my (@unique_queries, %seen_column_combinations);
584   foreach my $name (@constraint_names) {
585     my @constraint_cols = $self->result_source->unique_constraint_columns($name);
586
587     my $constraint_sig = join "\x00", sort @constraint_cols;
588     next if $seen_column_combinations{$constraint_sig}++;
589
590     my $unique_query = $self->_build_unique_query($query, \@constraint_cols);
591
592     my $num_cols = scalar @constraint_cols;
593     my $num_query = scalar keys %$unique_query;
594
595     my $total = $num_query + $num_where;
596     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
597       # The query is either unique on its own or is unique in combination with
598       # the existing where clause
599       push @unique_queries, $unique_query;
600     }
601   }
602
603   return @unique_queries;
604 }
605
606 # _build_unique_query
607 #
608 # Constrain the specified query hash based on the specified column names.
609
610 sub _build_unique_query {
611   my ($self, $query, $unique_cols) = @_;
612
613   return {
614     map  { $_ => $query->{$_} }
615     grep { exists $query->{$_} }
616       @$unique_cols
617   };
618 }
619
620 =head2 search_related
621
622 =over 4
623
624 =item Arguments: $rel, $cond, \%attrs?
625
626 =item Return Value: $new_resultset
627
628 =back
629
630   $new_rs = $cd_rs->search_related('artist', {
631     name => 'Emo-R-Us',
632   });
633
634 Searches the specified relationship, optionally specifying a condition and
635 attributes for matching records. See L</ATTRIBUTES> for more information.
636
637 =cut
638
639 sub search_related {
640   return shift->related_resultset(shift)->search(@_);
641 }
642
643 =head2 search_related_rs
644
645 This method works exactly the same as search_related, except that
646 it guarantees a resultset, even in list context.
647
648 =cut
649
650 sub search_related_rs {
651   return shift->related_resultset(shift)->search_rs(@_);
652 }
653
654 =head2 cursor
655
656 =over 4
657
658 =item Arguments: none
659
660 =item Return Value: $cursor
661
662 =back
663
664 Returns a storage-driven cursor to the given resultset. See
665 L<DBIx::Class::Cursor> for more information.
666
667 =cut
668
669 sub cursor {
670   my ($self) = @_;
671
672   my $attrs = $self->_resolved_attrs_copy;
673
674   return $self->{cursor}
675     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
676           $attrs->{where},$attrs);
677 }
678
679 =head2 single
680
681 =over 4
682
683 =item Arguments: $cond?
684
685 =item Return Value: $row_object?
686
687 =back
688
689   my $cd = $schema->resultset('CD')->single({ year => 2001 });
690
691 Inflates the first result without creating a cursor if the resultset has
692 any records in it; if not returns nothing. Used by L</find> as a lean version of
693 L</search>.
694
695 While this method can take an optional search condition (just like L</search>)
696 being a fast-code-path it does not recognize search attributes. If you need to
697 add extra joins or similar, call L</search> and then chain-call L</single> on the
698 L<DBIx::Class::ResultSet> returned.
699
700 =over
701
702 =item B<Note>
703
704 As of 0.08100, this method enforces the assumption that the preceding
705 query returns only one row. If more than one row is returned, you will receive
706 a warning:
707
708   Query returned more than one row
709
710 In this case, you should be using L</next> or L</find> instead, or if you really
711 know what you are doing, use the L</rows> attribute to explicitly limit the size
712 of the resultset.
713
714 This method will also throw an exception if it is called on a resultset prefetching
715 has_many, as such a prefetch implies fetching multiple rows from the database in
716 order to assemble the resulting object.
717
718 =back
719
720 =cut
721
722 sub single {
723   my ($self, $where) = @_;
724   if(@_ > 2) {
725       $self->throw_exception('single() only takes search conditions, no attributes. You want ->search( $cond, $attrs )->single()');
726   }
727
728   my $attrs = $self->_resolved_attrs_copy;
729
730   if ($attrs->{collapse}) {
731     $self->throw_exception(
732       'single() can not be used on resultsets prefetching has_many. Use find( \%cond ) or next() instead'
733     );
734   }
735
736   if ($where) {
737     if (defined $attrs->{where}) {
738       $attrs->{where} = {
739         '-and' =>
740             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
741                $where, delete $attrs->{where} ]
742       };
743     } else {
744       $attrs->{where} = $where;
745     }
746   }
747
748 #  XXX: Disabled since it doesn't infer uniqueness in all cases
749 #  unless ($self->_is_unique_query($attrs->{where})) {
750 #    carp "Query not guaranteed to return a single row"
751 #      . "; please declare your unique constraints or use search instead";
752 #  }
753
754   my @data = $self->result_source->storage->select_single(
755     $attrs->{from}, $attrs->{select},
756     $attrs->{where}, $attrs
757   );
758
759   return @data
760     ? ($self->_construct_objects(@data))[0]
761     : undef
762   ;
763 }
764
765
766 # _is_unique_query
767 #
768 # Try to determine if the specified query is guaranteed to be unique, based on
769 # the declared unique constraints.
770
771 sub _is_unique_query {
772   my ($self, $query) = @_;
773
774   my $collapsed = $self->_collapse_query($query);
775   my $alias = $self->{attrs}{alias};
776
777   foreach my $name ($self->result_source->unique_constraint_names) {
778     my @unique_cols = map {
779       "$alias.$_"
780     } $self->result_source->unique_constraint_columns($name);
781
782     # Count the values for each unique column
783     my %seen = map { $_ => 0 } @unique_cols;
784
785     foreach my $key (keys %$collapsed) {
786       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
787       next unless exists $seen{$aliased};  # Additional constraints are okay
788       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
789     }
790
791     # If we get 0 or more than 1 value for a column, it's not necessarily unique
792     return 1 unless grep { $_ != 1 } values %seen;
793   }
794
795   return 0;
796 }
797
798 # _collapse_query
799 #
800 # Recursively collapse the query, accumulating values for each column.
801
802 sub _collapse_query {
803   my ($self, $query, $collapsed) = @_;
804
805   $collapsed ||= {};
806
807   if (ref $query eq 'ARRAY') {
808     foreach my $subquery (@$query) {
809       next unless ref $subquery;  # -or
810       $collapsed = $self->_collapse_query($subquery, $collapsed);
811     }
812   }
813   elsif (ref $query eq 'HASH') {
814     if (keys %$query and (keys %$query)[0] eq '-and') {
815       foreach my $subquery (@{$query->{-and}}) {
816         $collapsed = $self->_collapse_query($subquery, $collapsed);
817       }
818     }
819     else {
820       foreach my $col (keys %$query) {
821         my $value = $query->{$col};
822         $collapsed->{$col}{$value}++;
823       }
824     }
825   }
826
827   return $collapsed;
828 }
829
830 =head2 get_column
831
832 =over 4
833
834 =item Arguments: $cond?
835
836 =item Return Value: $resultsetcolumn
837
838 =back
839
840   my $max_length = $rs->get_column('length')->max;
841
842 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
843
844 =cut
845
846 sub get_column {
847   my ($self, $column) = @_;
848   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
849   return $new;
850 }
851
852 =head2 search_like
853
854 =over 4
855
856 =item Arguments: $cond, \%attrs?
857
858 =item Return Value: $resultset (scalar context), @row_objs (list context)
859
860 =back
861
862   # WHERE title LIKE '%blue%'
863   $cd_rs = $rs->search_like({ title => '%blue%'});
864
865 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
866 that this is simply a convenience method retained for ex Class::DBI users.
867 You most likely want to use L</search> with specific operators.
868
869 For more information, see L<DBIx::Class::Manual::Cookbook>.
870
871 This method is deprecated and will be removed in 0.09. Use L</search()>
872 instead. An example conversion is:
873
874   ->search_like({ foo => 'bar' });
875
876   # Becomes
877
878   ->search({ foo => { like => 'bar' } });
879
880 =cut
881
882 sub search_like {
883   my $class = shift;
884   carp (
885     'search_like() is deprecated and will be removed in DBIC version 0.09.'
886    .' Instead use ->search({ x => { -like => "y%" } })'
887    .' (note the outer pair of {}s - they are important!)'
888   );
889   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
890   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
891   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
892   return $class->search($query, { %$attrs });
893 }
894
895 =head2 slice
896
897 =over 4
898
899 =item Arguments: $first, $last
900
901 =item Return Value: $resultset (scalar context), @row_objs (list context)
902
903 =back
904
905 Returns a resultset or object list representing a subset of elements from the
906 resultset slice is called on. Indexes are from 0, i.e., to get the first
907 three records, call:
908
909   my ($one, $two, $three) = $rs->slice(0, 2);
910
911 =cut
912
913 sub slice {
914   my ($self, $min, $max) = @_;
915   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
916   $attrs->{offset} = $self->{attrs}{offset} || 0;
917   $attrs->{offset} += $min;
918   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
919   return $self->search(undef(), $attrs);
920   #my $slice = (ref $self)->new($self->result_source, $attrs);
921   #return (wantarray ? $slice->all : $slice);
922 }
923
924 =head2 next
925
926 =over 4
927
928 =item Arguments: none
929
930 =item Return Value: $result?
931
932 =back
933
934 Returns the next element in the resultset (C<undef> is there is none).
935
936 Can be used to efficiently iterate over records in the resultset:
937
938   my $rs = $schema->resultset('CD')->search;
939   while (my $cd = $rs->next) {
940     print $cd->title;
941   }
942
943 Note that you need to store the resultset object, and call C<next> on it.
944 Calling C<< resultset('Table')->next >> repeatedly will always return the
945 first record from the resultset.
946
947 =cut
948
949 sub next {
950   my ($self) = @_;
951   if (my $cache = $self->get_cache) {
952     $self->{all_cache_position} ||= 0;
953     return $cache->[$self->{all_cache_position}++];
954   }
955   if ($self->{attrs}{cache}) {
956     $self->{all_cache_position} = 1;
957     return ($self->all)[0];
958   }
959   if ($self->{stashed_objects}) {
960     my $obj = shift(@{$self->{stashed_objects}});
961     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
962     return $obj;
963   }
964   my @row = (
965     exists $self->{stashed_row}
966       ? @{delete $self->{stashed_row}}
967       : $self->cursor->next
968   );
969   return undef unless (@row);
970   my ($row, @more) = $self->_construct_objects(@row);
971   $self->{stashed_objects} = \@more if @more;
972   return $row;
973 }
974
975 # takes a single DBI-row of data and coinstructs as many objects
976 # as the resultset attributes call for.
977 # This can be a bit of an action at a distance - it takes as an argument
978 # the *current* cursor-row (already taken off the $sth), but if
979 # collapsing is requested it will keep advancing the cursor either
980 # until the current row-object is assembled (the collapser was able to
981 # order the result sensibly) OR until the cursor is exhausted (an
982 # unordered collapsing resultset effectively triggers ->all)
983
984 # FIXME: why the *FUCK* do we pass around DBI data by copy?! Sadly needs
985 # assessment before changing...
986 #
987 sub _construct_objects {
988   my ($self, @row) = @_;
989
990   my $attrs = $self->_resolved_attrs;
991   my $keep_collapsing = $attrs->{collapse};
992
993   my $res_index;
994 =begin
995   do {
996     my $me_pref_col = $attrs->{_row_parser}->($row_ref);
997
998     my $container;
999     if ($keep_collapsing) {
1000
1001       # FIXME - we should be able to remove these 2 checks after the design validates
1002       $self->throw_exception ('Collapsing without a top-level collapse-set... can not happen')
1003         unless @{$me_ref_col->[2]};
1004       $self->throw_exception ('Top-level collapse-set contains a NULL-value... can not happen')
1005         if grep { ! defined $_ }  @{$me_pref_col->[2]};
1006
1007       my $main_ident = join "\x00", @{$me_pref_col->[2]};
1008
1009       if (! $res_index->{$main_ident}) {
1010         # this is where we bail out IFF we are ordered, and the $main_ident changes
1011
1012         $res_index->{$main_ident} = {
1013           all_me_pref => [,
1014           index => scalar keys %$res_index,
1015         };
1016       }
1017     }
1018
1019
1020
1021       $container = $res_index->{$main_ident}{container};
1022     };
1023
1024     push @$container, [ @{$me_pref_col}[0,1] ];
1025
1026
1027
1028   } while (
1029     $keep_collapsing
1030       &&
1031     do { $row_ref = [$self->cursor->next]; $self->{stashed_row} = $row_ref if @$row_ref; scalar @$row_ref }
1032   );
1033
1034   # attempt collapse all rows with same collapse identity
1035   if (@to_collapse > 1) {
1036     my @collapsed;
1037     while (@to_collapse) {
1038       $self->_merge_result(\@collapsed, shift @to_collapse);
1039     }
1040   }
1041 =cut
1042
1043   my $mepref_structs = $self->_collapse_result(\@row)
1044     or return ();
1045
1046   my $rsrc = $self->result_source;
1047   my $res_class = $self->result_class;
1048   my $inflator = $res_class->can ('inflate_result');
1049
1050   my @objs = map {
1051     $res_class->$inflator ($rsrc, @$_)
1052   } (@$mepref_structs);
1053
1054   if (my $f = $attrs->{record_filter}) {
1055     @objs = map { $f->($_) } @objs;
1056   }
1057
1058   return @objs;
1059 }
1060
1061 =begin
1062
1063 # two arguments: $as_proto is an arrayref of column names,
1064 # $row_ref is an arrayref of the data. If none of the row data
1065 # is defined we return undef (that's copied from the old
1066 # _collapse_result). Next we decide whether we need to collapse
1067 # the resultset (i.e. we prefetch something) or not. $collapse
1068 # indicates that. The do-while loop will run once if we do not need
1069 # to collapse the result and will run as long as _merge_result returns
1070 # a true value. It will return undef if the current added row does not
1071 # match the previous row. A bit of stashing and cursor magic is
1072 # required so that the cursor is not mixed up.
1073
1074 # "$rows" is a bit misleading. In the end, there should only be one
1075 # element in this arrayref. 
1076
1077 sub _collapse_result {
1078     my ( $self, $as_proto, $row_ref ) = @_;
1079     my $has_def;
1080     for (@$row_ref) {
1081         if ( defined $_ ) {
1082             $has_def++;
1083             last;
1084         }
1085     }
1086     return undef unless $has_def;
1087
1088     my $collapse = $self->_resolved_attrs->{collapse};
1089     my $rows     = [];
1090     my @row      = @$row_ref;
1091     do {
1092         my $i = 0;
1093         my $row = { map { $_ => $row[ $i++ ] } @$as_proto };
1094         $row = $self->result_source->_parse_row($row, $collapse);
1095         unless ( scalar @$rows ) {
1096             push( @$rows, $row );
1097         }
1098         $collapse = undef unless ( $self->_merge_result( $rows, $row ) );
1099       } while (
1100         $collapse
1101         && do { @row = $self->cursor->next; $self->{stashed_row} = \@row if @row; }
1102       );
1103
1104     return $rows->[0];
1105
1106 }
1107
1108 # _merge_result accepts an arrayref of rows objects (again, an arrayref of two elements)
1109 # and a row object which should be merged into the first object.
1110 # First we try to find out whether $row is already in $rows. If this is the case
1111 # we try to merge them by iteration through their relationship data. We call
1112 # _merge_result again on them, so they get merged.
1113
1114 # If we don't find the $row in $rows, we append it to $rows and return undef.
1115 # _merge_result returns 1 otherwise (i.e. $row has been found in $rows).
1116
1117 sub _merge_result {
1118     my ( $self, $rows, $row ) = @_;
1119     my ( $columns, $rels ) = @$row;
1120     my $found = undef;
1121     foreach my $seen (@$rows) {
1122         my $match = 1;
1123         foreach my $column ( keys %$columns ) {
1124             if (   defined $seen->[0]->{$column} ^ defined $columns->{$column}
1125                 or defined $columns->{$column}
1126                 && $seen->[0]->{$column} ne $columns->{$column} )
1127             {
1128
1129                 $match = 0;
1130                 last;
1131             }
1132         }
1133         if ($match) {
1134             $found = $seen;
1135             last;
1136         }
1137     }
1138     if ($found) {
1139         foreach my $rel ( keys %$rels ) {
1140             my $old_rows = $found->[1]->{$rel};
1141             $self->_merge_result(
1142                 ref $found->[1]->{$rel}->[0] eq 'HASH' ? [ $found->[1]->{$rel} ]
1143                 : $found->[1]->{$rel},
1144                 ref $rels->{$rel}->[0] eq 'HASH' ? [ $rels->{$rel}->[0], $rels->{$rel}->[1] ]
1145                 : $rels->{$rel}->[0]
1146             );
1147
1148   my $attrs = $self->_resolved_attrs;
1149   my ($keep_collapsing, $set_ident) = @{$attrs}{qw/collapse _collapse_ident/};
1150
1151   # FIXME this is temporary, need to calculate in _resolved_attrs
1152   $set_ident ||= { me => [ $self->result_source->_pri_cols ], pref => {} };
1153
1154   my @cur_row = @$row_ref;
1155   my (@to_collapse, $last_ident);
1156
1157   do {
1158     my $row_hr = { map { $as_proto->[$_] => $cur_row[$_] } (0 .. $#$as_proto) };
1159
1160     # see if we are switching to another object
1161     # this can be turned off and things will still work
1162     # since _merge_prefetch knows about _collapse_ident
1163 #    my $cur_ident = [ @{$row_hr}{@$set_ident} ];
1164     my $cur_ident = [];
1165     $last_ident ||= $cur_ident;
1166
1167 #    if ($keep_collapsing = Test::Deep::eq_deeply ($cur_ident, $last_ident)) {
1168 #      push @to_collapse, $self->result_source->_parse_row (
1169 #        $row_hr,
1170 #      );
1171 #    }
1172   } while (
1173     $keep_collapsing
1174       &&
1175     do { @cur_row = $self->cursor->next; $self->{stashed_row} = \@cur_row if @cur_row; }
1176   );
1177
1178   die Dumper \@to_collapse;
1179
1180
1181   # attempt collapse all rows with same collapse identity
1182   if (@to_collapse > 1) {
1183     my @collapsed;
1184     while (@to_collapse) {
1185       $self->_merge_result(\@collapsed, shift @to_collapse);
1186     }
1187     @to_collapse = @collapsed;
1188   }
1189
1190   # still didn't fully collapse
1191   $self->throw_exception ('Resultset collapse failed (theoretically impossible). Maybe a wrong collapse_ident...?')
1192     if (@to_collapse > 1);
1193
1194   return $to_collapse[0];
1195 }
1196
1197
1198 # two arguments: $as_proto is an arrayref of 'as' column names,
1199 # $row_ref is an arrayref of the data. The do-while loop will run
1200 # once if we do not need to collapse the result and will run as long as
1201 # _merge_result returns a true value. It will return undef if the
1202 # current added row does not match the previous row, which in turn
1203 # means we need to stash the row for the subsequent ->next call
1204 sub _collapse_result {
1205   my ( $self, $as_proto, $row_ref ) = @_;
1206
1207   my $attrs = $self->_resolved_attrs;
1208   my ($keep_collapsing, $set_ident) = @{$attrs}{qw/collapse _collapse_ident/};
1209
1210   die Dumper [$as_proto, $row_ref, $keep_collapsing, $set_ident ];
1211
1212
1213   my @cur_row = @$row_ref;
1214   my (@to_collapse, $last_ident);
1215
1216   do {
1217     my $row_hr = { map { $as_proto->[$_] => $cur_row[$_] } (0 .. $#$as_proto) };
1218
1219     # see if we are switching to another object
1220     # this can be turned off and things will still work
1221     # since _merge_prefetch knows about _collapse_ident
1222 #    my $cur_ident = [ @{$row_hr}{@$set_ident} ];
1223     my $cur_ident = [];
1224     $last_ident ||= $cur_ident;
1225
1226 #    if ($keep_collapsing = eq_deeply ($cur_ident, $last_ident)) {
1227 #      push @to_collapse, $self->result_source->_parse_row (
1228 #        $row_hr,
1229 #      );
1230 #    }
1231   } while (
1232     $keep_collapsing
1233       &&
1234     do { @cur_row = $self->cursor->next; $self->{stashed_row} = \@cur_row if @cur_row; }
1235   );
1236
1237   # attempt collapse all rows with same collapse identity
1238 }
1239 =cut
1240
1241 # Takes an arrayref of me/pref pairs and a new me/pref pair that should
1242 # be merged on a preexisting matching me (or should be pushed into $merged
1243 # as a new me/pref pair for further invocations). It should be possible to
1244 # use this function to collapse complete ->all results,  provided _collapse_result() is adjusted
1245 # to provide everything to this sub not to barf when $merged contains more than one 
1246 # arrayref)
1247 sub _merge_prefetch {
1248   my ($self, $merged, $next_row) = @_;
1249
1250   unless (@$merged) {
1251     push @$merged, $next_row;
1252     return;
1253   }
1254
1255 }
1256
1257 =head2 result_source
1258
1259 =over 4
1260
1261 =item Arguments: $result_source?
1262
1263 =item Return Value: $result_source
1264
1265 =back
1266
1267 An accessor for the primary ResultSource object from which this ResultSet
1268 is derived.
1269
1270 =head2 result_class
1271
1272 =over 4
1273
1274 =item Arguments: $result_class?
1275
1276 =item Return Value: $result_class
1277
1278 =back
1279
1280 An accessor for the class to use when creating row objects. Defaults to
1281 C<< result_source->result_class >> - which in most cases is the name of the
1282 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
1283
1284 Note that changing the result_class will also remove any components
1285 that were originally loaded in the source class via
1286 L<DBIx::Class::ResultSource/load_components>. Any overloaded methods
1287 in the original source class will not run.
1288
1289 =cut
1290
1291 sub result_class {
1292   my ($self, $result_class) = @_;
1293   if ($result_class) {
1294     $self->ensure_class_loaded($result_class);
1295     $self->_result_class($result_class);
1296     $self->{attrs}{result_class} = $result_class if ref $self;
1297   }
1298   $self->_result_class;
1299 }
1300
1301 =head2 count
1302
1303 =over 4
1304
1305 =item Arguments: $cond, \%attrs??
1306
1307 =item Return Value: $count
1308
1309 =back
1310
1311 Performs an SQL C<COUNT> with the same query as the resultset was built
1312 with to find the number of elements. Passing arguments is equivalent to
1313 C<< $rs->search ($cond, \%attrs)->count >>
1314
1315 =cut
1316
1317 sub count {
1318   my $self = shift;
1319   return $self->search(@_)->count if @_ and defined $_[0];
1320   return scalar @{ $self->get_cache } if $self->get_cache;
1321
1322   my $attrs = $self->_resolved_attrs_copy;
1323
1324   # this is a little optimization - it is faster to do the limit
1325   # adjustments in software, instead of a subquery
1326   my $rows = delete $attrs->{rows};
1327   my $offset = delete $attrs->{offset};
1328
1329   my $crs;
1330   if ($self->_has_resolved_attr (qw/collapse group_by/)) {
1331     $crs = $self->_count_subq_rs ($attrs);
1332   }
1333   else {
1334     $crs = $self->_count_rs ($attrs);
1335   }
1336   my $count = $crs->next;
1337
1338   $count -= $offset if $offset;
1339   $count = $rows if $rows and $rows < $count;
1340   $count = 0 if ($count < 0);
1341
1342   return $count;
1343 }
1344
1345 =head2 count_rs
1346
1347 =over 4
1348
1349 =item Arguments: $cond, \%attrs??
1350
1351 =item Return Value: $count_rs
1352
1353 =back
1354
1355 Same as L</count> but returns a L<DBIx::Class::ResultSetColumn> object.
1356 This can be very handy for subqueries:
1357
1358   ->search( { amount => $some_rs->count_rs->as_query } )
1359
1360 As with regular resultsets the SQL query will be executed only after
1361 the resultset is accessed via L</next> or L</all>. That would return
1362 the same single value obtainable via L</count>.
1363
1364 =cut
1365
1366 sub count_rs {
1367   my $self = shift;
1368   return $self->search(@_)->count_rs if @_;
1369
1370   # this may look like a lack of abstraction (count() does about the same)
1371   # but in fact an _rs *must* use a subquery for the limits, as the
1372   # software based limiting can not be ported if this $rs is to be used
1373   # in a subquery itself (i.e. ->as_query)
1374   if ($self->_has_resolved_attr (qw/collapse group_by offset rows/)) {
1375     return $self->_count_subq_rs;
1376   }
1377   else {
1378     return $self->_count_rs;
1379   }
1380 }
1381
1382 #
1383 # returns a ResultSetColumn object tied to the count query
1384 #
1385 sub _count_rs {
1386   my ($self, $attrs) = @_;
1387
1388   my $rsrc = $self->result_source;
1389   $attrs ||= $self->_resolved_attrs;
1390
1391   my $tmp_attrs = { %$attrs };
1392
1393   # take off any limits, record_filter is cdbi, and no point of ordering a count
1394   delete $tmp_attrs->{$_} for (qw/select as rows offset order_by record_filter/);
1395
1396   # overwrite the selector (supplied by the storage)
1397   $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
1398   $tmp_attrs->{as} = 'count';
1399
1400   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
1401
1402   return $tmp_rs;
1403 }
1404
1405 #
1406 # same as above but uses a subquery
1407 #
1408 sub _count_subq_rs {
1409   my ($self, $attrs) = @_;
1410
1411   my $rsrc = $self->result_source;
1412   $attrs ||= $self->_resolved_attrs_copy;
1413
1414   my $sub_attrs = { %$attrs };
1415
1416   # extra selectors do not go in the subquery and there is no point of ordering it
1417   delete $sub_attrs->{$_} for qw/collapse select _prefetch_select as order_by/;
1418
1419   # if we multi-prefetch we group_by primary keys only as this is what we would
1420   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
1421   if ( $attrs->{collapse}  ) {
1422     $sub_attrs->{group_by} = [ map { "$attrs->{alias}.$_" } ($rsrc->_pri_cols) ]
1423   }
1424
1425   $sub_attrs->{select} = $rsrc->storage->_subq_count_select ($rsrc, $attrs);
1426
1427   # this is so that the query can be simplified e.g.
1428   # * ordering can be thrown away in things like Top limit
1429   $sub_attrs->{-for_count_only} = 1;
1430
1431   my $sub_rs = $rsrc->resultset_class->new ($rsrc, $sub_attrs);
1432
1433   $attrs->{from} = [{
1434     -alias => 'count_subq',
1435     -source_handle => $rsrc->handle,
1436     count_subq => $sub_rs->as_query,
1437   }];
1438
1439   # the subquery replaces this
1440   delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
1441
1442   return $self->_count_rs ($attrs);
1443 }
1444
1445 sub _bool {
1446   return 1;
1447 }
1448
1449 =head2 count_literal
1450
1451 =over 4
1452
1453 =item Arguments: $sql_fragment, @bind_values
1454
1455 =item Return Value: $count
1456
1457 =back
1458
1459 Counts the results in a literal query. Equivalent to calling L</search_literal>
1460 with the passed arguments, then L</count>.
1461
1462 =cut
1463
1464 sub count_literal { shift->search_literal(@_)->count; }
1465
1466 =head2 all
1467
1468 =over 4
1469
1470 =item Arguments: none
1471
1472 =item Return Value: @objects
1473
1474 =back
1475
1476 Returns all elements in the resultset. Called implicitly if the resultset
1477 is returned in list context.
1478
1479 =cut
1480
1481 sub all {
1482   my $self = shift;
1483   if(@_) {
1484       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1485   }
1486
1487   if (my $c = $self->get_cache) {
1488     return @$c;
1489   }
1490
1491   my @objects;
1492
1493   if ($self->_resolved_attrs->{collapse}) {
1494     # Using $self->cursor->all is really just an optimisation.
1495     # If we're collapsing has_many prefetches it probably makes
1496     # very little difference, and this is cleaner than hacking
1497     # _construct_objects to survive the approach
1498     $self->cursor->reset;
1499     my @row = $self->cursor->next;
1500     while (@row) {
1501       push(@objects, $self->_construct_objects(@row));
1502       @row = (exists $self->{stashed_row}
1503                ? @{delete $self->{stashed_row}}
1504                : $self->cursor->next);
1505     }
1506   } else {
1507     @objects = map { $self->_construct_objects($_) } $self->cursor->all;
1508   }
1509
1510   $self->set_cache(\@objects) if $self->{attrs}{cache};
1511
1512   return @objects;
1513 }
1514
1515 =head2 reset
1516
1517 =over 4
1518
1519 =item Arguments: none
1520
1521 =item Return Value: $self
1522
1523 =back
1524
1525 Resets the resultset's cursor, so you can iterate through the elements again.
1526 Implicitly resets the storage cursor, so a subsequent L</next> will trigger
1527 another query.
1528
1529 =cut
1530
1531 sub reset {
1532   my ($self) = @_;
1533   delete $self->{_attrs} if exists $self->{_attrs};
1534   $self->{all_cache_position} = 0;
1535   $self->cursor->reset;
1536   return $self;
1537 }
1538
1539 =head2 first
1540
1541 =over 4
1542
1543 =item Arguments: none
1544
1545 =item Return Value: $object?
1546
1547 =back
1548
1549 Resets the resultset and returns an object for the first result (if the
1550 resultset returns anything).
1551
1552 =cut
1553
1554 sub first {
1555   return $_[0]->reset->next;
1556 }
1557
1558
1559 # _rs_update_delete
1560 #
1561 # Determines whether and what type of subquery is required for the $rs operation.
1562 # If grouping is necessary either supplies its own, or verifies the current one
1563 # After all is done delegates to the proper storage method.
1564
1565 sub _rs_update_delete {
1566   my ($self, $op, $values) = @_;
1567
1568   my $rsrc = $self->result_source;
1569
1570   # if a condition exists we need to strip all table qualifiers
1571   # if this is not possible we'll force a subquery below
1572   my $cond = $rsrc->schema->storage->_strip_cond_qualifiers ($self->{cond});
1573
1574   my $needs_group_by_subq = $self->_has_resolved_attr (qw/collapse group_by -join/);
1575   my $needs_subq = $needs_group_by_subq || (not defined $cond) || $self->_has_resolved_attr(qw/row offset/);
1576
1577   if ($needs_group_by_subq or $needs_subq) {
1578
1579     # make a new $rs selecting only the PKs (that's all we really need)
1580     my $attrs = $self->_resolved_attrs_copy;
1581
1582     delete $attrs->{$_} for qw/collapse select as/;
1583     $attrs->{columns} = [ map { "$attrs->{alias}.$_" } ($self->result_source->_pri_cols) ];
1584
1585     if ($needs_group_by_subq) {
1586       # make sure no group_by was supplied, or if there is one - make sure it matches
1587       # the columns compiled above perfectly. Anything else can not be sanely executed
1588       # on most databases so croak right then and there
1589
1590       if (my $g = $attrs->{group_by}) {
1591         my @current_group_by = map
1592           { $_ =~ /\./ ? $_ : "$attrs->{alias}.$_" }
1593           @$g
1594         ;
1595
1596         if (
1597           join ("\x00", sort @current_group_by)
1598             ne
1599           join ("\x00", sort @{$attrs->{columns}} )
1600         ) {
1601           $self->throw_exception (
1602             "You have just attempted a $op operation on a resultset which does group_by"
1603             . ' on columns other than the primary keys, while DBIC internally needs to retrieve'
1604             . ' the primary keys in a subselect. All sane RDBMS engines do not support this'
1605             . ' kind of queries. Please retry the operation with a modified group_by or'
1606             . ' without using one at all.'
1607           );
1608         }
1609       }
1610       else {
1611         $attrs->{group_by} = $attrs->{columns};
1612       }
1613     }
1614
1615     my $subrs = (ref $self)->new($rsrc, $attrs);
1616
1617     return $self->result_source->storage->_subq_update_delete($subrs, $op, $values);
1618   }
1619   else {
1620     return $rsrc->storage->$op(
1621       $rsrc,
1622       $op eq 'update' ? $values : (),
1623       $cond,
1624     );
1625   }
1626 }
1627
1628 =head2 update
1629
1630 =over 4
1631
1632 =item Arguments: \%values
1633
1634 =item Return Value: $storage_rv
1635
1636 =back
1637
1638 Sets the specified columns in the resultset to the supplied values in a
1639 single query. Return value will be true if the update succeeded or false
1640 if no records were updated; exact type of success value is storage-dependent.
1641
1642 =cut
1643
1644 sub update {
1645   my ($self, $values) = @_;
1646   $self->throw_exception('Values for update must be a hash')
1647     unless ref $values eq 'HASH';
1648
1649   return $self->_rs_update_delete ('update', $values);
1650 }
1651
1652 =head2 update_all
1653
1654 =over 4
1655
1656 =item Arguments: \%values
1657
1658 =item Return Value: 1
1659
1660 =back
1661
1662 Fetches all objects and updates them one at a time. Note that C<update_all>
1663 will run DBIC cascade triggers, while L</update> will not.
1664
1665 =cut
1666
1667 sub update_all {
1668   my ($self, $values) = @_;
1669   $self->throw_exception('Values for update_all must be a hash')
1670     unless ref $values eq 'HASH';
1671   foreach my $obj ($self->all) {
1672     $obj->set_columns($values)->update;
1673   }
1674   return 1;
1675 }
1676
1677 =head2 delete
1678
1679 =over 4
1680
1681 =item Arguments: none
1682
1683 =item Return Value: $storage_rv
1684
1685 =back
1686
1687 Deletes the contents of the resultset from its result source. Note that this
1688 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1689 to run. See also L<DBIx::Class::Row/delete>.
1690
1691 Return value will be the amount of rows deleted; exact type of return value
1692 is storage-dependent.
1693
1694 =cut
1695
1696 sub delete {
1697   my $self = shift;
1698   $self->throw_exception('delete does not accept any arguments')
1699     if @_;
1700
1701   return $self->_rs_update_delete ('delete');
1702 }
1703
1704 =head2 delete_all
1705
1706 =over 4
1707
1708 =item Arguments: none
1709
1710 =item Return Value: 1
1711
1712 =back
1713
1714 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1715 will run DBIC cascade triggers, while L</delete> will not.
1716
1717 =cut
1718
1719 sub delete_all {
1720   my $self = shift;
1721   $self->throw_exception('delete_all does not accept any arguments')
1722     if @_;
1723
1724   $_->delete for $self->all;
1725   return 1;
1726 }
1727
1728 =head2 populate
1729
1730 =over 4
1731
1732 =item Arguments: \@data;
1733
1734 =back
1735
1736 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1737 For the arrayref of hashrefs style each hashref should be a structure suitable
1738 forsubmitting to a $resultset->create(...) method.
1739
1740 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1741 to insert the data, as this is a faster method.
1742
1743 Otherwise, each set of data is inserted into the database using
1744 L<DBIx::Class::ResultSet/create>, and the resulting objects are
1745 accumulated into an array. The array itself, or an array reference
1746 is returned depending on scalar or list context.
1747
1748 Example:  Assuming an Artist Class that has many CDs Classes relating:
1749
1750   my $Artist_rs = $schema->resultset("Artist");
1751
1752   ## Void Context Example
1753   $Artist_rs->populate([
1754      { artistid => 4, name => 'Manufactured Crap', cds => [
1755         { title => 'My First CD', year => 2006 },
1756         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1757       ],
1758      },
1759      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1760         { title => 'My parents sold me to a record company', year => 2005 },
1761         { title => 'Why Am I So Ugly?', year => 2006 },
1762         { title => 'I Got Surgery and am now Popular', year => 2007 }
1763       ],
1764      },
1765   ]);
1766
1767   ## Array Context Example
1768   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1769     { name => "Artist One"},
1770     { name => "Artist Two"},
1771     { name => "Artist Three", cds=> [
1772     { title => "First CD", year => 2007},
1773     { title => "Second CD", year => 2008},
1774   ]}
1775   ]);
1776
1777   print $ArtistOne->name; ## response is 'Artist One'
1778   print $ArtistThree->cds->count ## reponse is '2'
1779
1780 For the arrayref of arrayrefs style,  the first element should be a list of the
1781 fieldsnames to which the remaining elements are rows being inserted.  For
1782 example:
1783
1784   $Arstist_rs->populate([
1785     [qw/artistid name/],
1786     [100, 'A Formally Unknown Singer'],
1787     [101, 'A singer that jumped the shark two albums ago'],
1788     [102, 'An actually cool singer'],
1789   ]);
1790
1791 Please note an important effect on your data when choosing between void and
1792 wantarray context. Since void context goes straight to C<insert_bulk> in
1793 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1794 C<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to
1795 create primary keys for you, you will find that your PKs are empty.  In this
1796 case you will have to use the wantarray context in order to create those
1797 values.
1798
1799 =cut
1800
1801 sub populate {
1802   my $self = shift;
1803
1804   # cruft placed in standalone method
1805   my $data = $self->_normalize_populate_args(@_);
1806
1807   if(defined wantarray) {
1808     my @created;
1809     foreach my $item (@$data) {
1810       push(@created, $self->create($item));
1811     }
1812     return wantarray ? @created : \@created;
1813   } else {
1814     my $first = $data->[0];
1815
1816     # if a column is a registered relationship, and is a non-blessed hash/array, consider
1817     # it relationship data
1818     my (@rels, @columns);
1819     for (keys %$first) {
1820       my $ref = ref $first->{$_};
1821       $self->result_source->has_relationship($_) && ($ref eq 'ARRAY' or $ref eq 'HASH')
1822         ? push @rels, $_
1823         : push @columns, $_
1824       ;
1825     }
1826
1827     my @pks = $self->result_source->primary_columns;
1828
1829     ## do the belongs_to relationships
1830     foreach my $index (0..$#$data) {
1831
1832       # delegate to create() for any dataset without primary keys with specified relationships
1833       if (grep { !defined $data->[$index]->{$_} } @pks ) {
1834         for my $r (@rels) {
1835           if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
1836             my @ret = $self->populate($data);
1837             return;
1838           }
1839         }
1840       }
1841
1842       foreach my $rel (@rels) {
1843         next unless ref $data->[$index]->{$rel} eq "HASH";
1844         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1845         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1846         my $related = $result->result_source->_resolve_condition(
1847           $result->result_source->relationship_info($reverse)->{cond},
1848           $self,
1849           $result,
1850         );
1851
1852         delete $data->[$index]->{$rel};
1853         $data->[$index] = {%{$data->[$index]}, %$related};
1854
1855         push @columns, keys %$related if $index == 0;
1856       }
1857     }
1858
1859     ## inherit the data locked in the conditions of the resultset
1860     my ($rs_data) = $self->_merge_cond_with_data({});
1861     delete @{$rs_data}{@columns};
1862     my @inherit_cols = keys %$rs_data;
1863     my @inherit_data = values %$rs_data;
1864
1865     ## do bulk insert on current row
1866     $self->result_source->storage->insert_bulk(
1867       $self->result_source,
1868       [@columns, @inherit_cols],
1869       [ map { [ @$_{@columns}, @inherit_data ] } @$data ],
1870     );
1871
1872     ## do the has_many relationships
1873     foreach my $item (@$data) {
1874
1875       foreach my $rel (@rels) {
1876         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1877
1878         my $parent = $self->find({map { $_ => $item->{$_} } @pks})
1879      || $self->throw_exception('Cannot find the relating object.');
1880
1881         my $child = $parent->$rel;
1882
1883         my $related = $child->result_source->_resolve_condition(
1884           $parent->result_source->relationship_info($rel)->{cond},
1885           $child,
1886           $parent,
1887         );
1888
1889         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1890         my @populate = map { {%$_, %$related} } @rows_to_add;
1891
1892         $child->populate( \@populate );
1893       }
1894     }
1895   }
1896 }
1897
1898
1899 # populate() argumnets went over several incarnations
1900 # What we ultimately support is AoH
1901 sub _normalize_populate_args {
1902   my ($self, $arg) = @_;
1903
1904   if (ref $arg eq 'ARRAY') {
1905     if (ref $arg->[0] eq 'HASH') {
1906       return $arg;
1907     }
1908     elsif (ref $arg->[0] eq 'ARRAY') {
1909       my @ret;
1910       my @colnames = @{$arg->[0]};
1911       foreach my $values (@{$arg}[1 .. $#$arg]) {
1912         push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
1913       }
1914       return \@ret;
1915     }
1916   }
1917
1918   $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
1919 }
1920
1921 =head2 pager
1922
1923 =over 4
1924
1925 =item Arguments: none
1926
1927 =item Return Value: $pager
1928
1929 =back
1930
1931 Return Value a L<Data::Page> object for the current resultset. Only makes
1932 sense for queries with a C<page> attribute.
1933
1934 To get the full count of entries for a paged resultset, call
1935 C<total_entries> on the L<Data::Page> object.
1936
1937 =cut
1938
1939 sub pager {
1940   my ($self) = @_;
1941
1942   return $self->{pager} if $self->{pager};
1943
1944   my $attrs = $self->{attrs};
1945   $self->throw_exception("Can't create pager for non-paged rs")
1946     unless $self->{attrs}{page};
1947   $attrs->{rows} ||= 10;
1948
1949   # throw away the paging flags and re-run the count (possibly
1950   # with a subselect) to get the real total count
1951   my $count_attrs = { %$attrs };
1952   delete $count_attrs->{$_} for qw/rows offset page pager/;
1953   my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
1954
1955   return $self->{pager} = Data::Page->new(
1956     $total_count,
1957     $attrs->{rows},
1958     $self->{attrs}{page}
1959   );
1960 }
1961
1962 =head2 page
1963
1964 =over 4
1965
1966 =item Arguments: $page_number
1967
1968 =item Return Value: $rs
1969
1970 =back
1971
1972 Returns a resultset for the $page_number page of the resultset on which page
1973 is called, where each page contains a number of rows equal to the 'rows'
1974 attribute set on the resultset (10 by default).
1975
1976 =cut
1977
1978 sub page {
1979   my ($self, $page) = @_;
1980   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1981 }
1982
1983 =head2 new_result
1984
1985 =over 4
1986
1987 =item Arguments: \%vals
1988
1989 =item Return Value: $rowobject
1990
1991 =back
1992
1993 Creates a new row object in the resultset's result class and returns
1994 it. The row is not inserted into the database at this point, call
1995 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1996 will tell you whether the row object has been inserted or not.
1997
1998 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1999
2000 =cut
2001
2002 sub new_result {
2003   my ($self, $values) = @_;
2004   $self->throw_exception( "new_result needs a hash" )
2005     unless (ref $values eq 'HASH');
2006
2007   my ($merged_cond, $cols_from_relations) = $self->_merge_cond_with_data($values);
2008
2009   my %new = (
2010     %$merged_cond,
2011     @$cols_from_relations
2012       ? (-cols_from_relations => $cols_from_relations)
2013       : (),
2014     -source_handle => $self->_source_handle,
2015     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
2016   );
2017
2018   return $self->result_class->new(\%new);
2019 }
2020
2021 # _merge_cond_with_data
2022 #
2023 # Takes a simple hash of K/V data and returns its copy merged with the
2024 # condition already present on the resultset. Additionally returns an
2025 # arrayref of value/condition names, which were inferred from related
2026 # objects (this is needed for in-memory related objects)
2027 sub _merge_cond_with_data {
2028   my ($self, $data) = @_;
2029
2030   my (%new_data, @cols_from_relations);
2031
2032   my $alias = $self->{attrs}{alias};
2033
2034   if (! defined $self->{cond}) {
2035     # just massage $data below
2036   }
2037   elsif ($self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION) {
2038     %new_data = %{ $self->{attrs}{related_objects} || {} };  # nothing might have been inserted yet
2039     @cols_from_relations = keys %new_data;
2040   }
2041   elsif (ref $self->{cond} ne 'HASH') {
2042     $self->throw_exception(
2043       "Can't abstract implicit construct, resultset condition not a hash"
2044     );
2045   }
2046   else {
2047     # precendence must be given to passed values over values inherited from
2048     # the cond, so the order here is important.
2049     my $collapsed_cond = $self->_collapse_cond($self->{cond});
2050     my %implied = %{$self->_remove_alias($collapsed_cond, $alias)};
2051
2052     while ( my($col, $value) = each %implied ) {
2053       if (ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '=') {
2054         $new_data{$col} = $value->{'='};
2055         next;
2056       }
2057       $new_data{$col} = $value if $self->_is_deterministic_value($value);
2058     }
2059   }
2060
2061   %new_data = (
2062     %new_data,
2063     %{ $self->_remove_alias($data, $alias) },
2064   );
2065
2066   return (\%new_data, \@cols_from_relations);
2067 }
2068
2069 # _is_deterministic_value
2070 #
2071 # Make an effor to strip non-deterministic values from the condition,
2072 # to make sure new_result chokes less
2073
2074 sub _is_deterministic_value {
2075   my $self = shift;
2076   my $value = shift;
2077   my $ref_type = ref $value;
2078   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
2079   return 1 if Scalar::Util::blessed($value);
2080   return 0;
2081 }
2082
2083 # _has_resolved_attr
2084 #
2085 # determines if the resultset defines at least one
2086 # of the attributes supplied
2087 #
2088 # used to determine if a subquery is neccessary
2089 #
2090 # supports some virtual attributes:
2091 #   -join
2092 #     This will scan for any joins being present on the resultset.
2093 #     It is not a mere key-search but a deep inspection of {from}
2094 #
2095
2096 sub _has_resolved_attr {
2097   my ($self, @attr_names) = @_;
2098
2099   my $attrs = $self->_resolved_attrs;
2100
2101   my %extra_checks;
2102
2103   for my $n (@attr_names) {
2104     if (grep { $n eq $_ } (qw/-join/) ) {
2105       $extra_checks{$n}++;
2106       next;
2107     }
2108
2109     my $attr =  $attrs->{$n};
2110
2111     next if not defined $attr;
2112
2113     if (ref $attr eq 'HASH') {
2114       return 1 if keys %$attr;
2115     }
2116     elsif (ref $attr eq 'ARRAY') {
2117       return 1 if @$attr;
2118     }
2119     else {
2120       return 1 if $attr;
2121     }
2122   }
2123
2124   # a resolved join is expressed as a multi-level from
2125   return 1 if (
2126     $extra_checks{-join}
2127       and
2128     ref $attrs->{from} eq 'ARRAY'
2129       and
2130     @{$attrs->{from}} > 1
2131   );
2132
2133   return 0;
2134 }
2135
2136 # _collapse_cond
2137 #
2138 # Recursively collapse the condition.
2139
2140 sub _collapse_cond {
2141   my ($self, $cond, $collapsed) = @_;
2142
2143   $collapsed ||= {};
2144
2145   if (ref $cond eq 'ARRAY') {
2146     foreach my $subcond (@$cond) {
2147       next unless ref $subcond;  # -or
2148       $collapsed = $self->_collapse_cond($subcond, $collapsed);
2149     }
2150   }
2151   elsif (ref $cond eq 'HASH') {
2152     if (keys %$cond and (keys %$cond)[0] eq '-and') {
2153       foreach my $subcond (@{$cond->{-and}}) {
2154         $collapsed = $self->_collapse_cond($subcond, $collapsed);
2155       }
2156     }
2157     else {
2158       foreach my $col (keys %$cond) {
2159         my $value = $cond->{$col};
2160         $collapsed->{$col} = $value;
2161       }
2162     }
2163   }
2164
2165   return $collapsed;
2166 }
2167
2168 # _remove_alias
2169 #
2170 # Remove the specified alias from the specified query hash. A copy is made so
2171 # the original query is not modified.
2172
2173 sub _remove_alias {
2174   my ($self, $query, $alias) = @_;
2175
2176   my %orig = %{ $query || {} };
2177   my %unaliased;
2178
2179   foreach my $key (keys %orig) {
2180     if ($key !~ /\./) {
2181       $unaliased{$key} = $orig{$key};
2182       next;
2183     }
2184     $unaliased{$1} = $orig{$key}
2185       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
2186   }
2187
2188   return \%unaliased;
2189 }
2190
2191 =head2 as_query
2192
2193 =over 4
2194
2195 =item Arguments: none
2196
2197 =item Return Value: \[ $sql, @bind ]
2198
2199 =back
2200
2201 Returns the SQL query and bind vars associated with the invocant.
2202
2203 This is generally used as the RHS for a subquery.
2204
2205 =cut
2206
2207 sub as_query {
2208   my $self = shift;
2209
2210   my $attrs = $self->_resolved_attrs_copy;
2211
2212   # For future use:
2213   #
2214   # in list ctx:
2215   # my ($sql, \@bind, \%dbi_bind_attrs) = _select_args_to_query (...)
2216   # $sql also has no wrapping parenthesis in list ctx
2217   #
2218   my $sqlbind = $self->result_source->storage
2219     ->_select_args_to_query ($attrs->{from}, $attrs->{select}, $attrs->{where}, $attrs);
2220
2221   return $sqlbind;
2222 }
2223
2224 =head2 find_or_new
2225
2226 =over 4
2227
2228 =item Arguments: \%vals, \%attrs?
2229
2230 =item Return Value: $rowobject
2231
2232 =back
2233
2234   my $artist = $schema->resultset('Artist')->find_or_new(
2235     { artist => 'fred' }, { key => 'artists' });
2236
2237   $cd->cd_to_producer->find_or_new({ producer => $producer },
2238                                    { key => 'primary });
2239
2240 Find an existing record from this resultset, based on its primary
2241 key, or a unique constraint. If none exists, instantiate a new result
2242 object and return it. The object will not be saved into your storage
2243 until you call L<DBIx::Class::Row/insert> on it.
2244
2245 You most likely want this method when looking for existing rows using
2246 a unique constraint that is not the primary key, or looking for
2247 related rows.
2248
2249 If you want objects to be saved immediately, use L</find_or_create>
2250 instead.
2251
2252 B<Note>: Take care when using C<find_or_new> with a table having
2253 columns with default values that you intend to be automatically
2254 supplied by the database (e.g. an auto_increment primary key column).
2255 In normal usage, the value of such columns should NOT be included at
2256 all in the call to C<find_or_new>, even when set to C<undef>.
2257
2258 =cut
2259
2260 sub find_or_new {
2261   my $self     = shift;
2262   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2263   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2264   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2265     return $row;
2266   }
2267   return $self->new_result($hash);
2268 }
2269
2270 =head2 create
2271
2272 =over 4
2273
2274 =item Arguments: \%vals
2275
2276 =item Return Value: a L<DBIx::Class::Row> $object
2277
2278 =back
2279
2280 Attempt to create a single new row or a row with multiple related rows
2281 in the table represented by the resultset (and related tables). This
2282 will not check for duplicate rows before inserting, use
2283 L</find_or_create> to do that.
2284
2285 To create one row for this resultset, pass a hashref of key/value
2286 pairs representing the columns of the table and the values you wish to
2287 store. If the appropriate relationships are set up, foreign key fields
2288 can also be passed an object representing the foreign row, and the
2289 value will be set to its primary key.
2290
2291 To create related objects, pass a hashref of related-object column values
2292 B<keyed on the relationship name>. If the relationship is of type C<multi>
2293 (L<DBIx::Class::Relationship/has_many>) - pass an arrayref of hashrefs.
2294 The process will correctly identify columns holding foreign keys, and will
2295 transparently populate them from the keys of the corresponding relation.
2296 This can be applied recursively, and will work correctly for a structure
2297 with an arbitrary depth and width, as long as the relationships actually
2298 exists and the correct column data has been supplied.
2299
2300
2301 Instead of hashrefs of plain related data (key/value pairs), you may
2302 also pass new or inserted objects. New objects (not inserted yet, see
2303 L</new>), will be inserted into their appropriate tables.
2304
2305 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
2306
2307 Example of creating a new row.
2308
2309   $person_rs->create({
2310     name=>"Some Person",
2311     email=>"somebody@someplace.com"
2312   });
2313
2314 Example of creating a new row and also creating rows in a related C<has_many>
2315 or C<has_one> resultset.  Note Arrayref.
2316
2317   $artist_rs->create(
2318      { artistid => 4, name => 'Manufactured Crap', cds => [
2319         { title => 'My First CD', year => 2006 },
2320         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
2321       ],
2322      },
2323   );
2324
2325 Example of creating a new row and also creating a row in a related
2326 C<belongs_to>resultset. Note Hashref.
2327
2328   $cd_rs->create({
2329     title=>"Music for Silly Walks",
2330     year=>2000,
2331     artist => {
2332       name=>"Silly Musician",
2333     }
2334   });
2335
2336 =over
2337
2338 =item WARNING
2339
2340 When subclassing ResultSet never attempt to override this method. Since
2341 it is a simple shortcut for C<< $self->new_result($attrs)->insert >>, a
2342 lot of the internals simply never call it, so your override will be
2343 bypassed more often than not. Override either L<new|DBIx::Class::Row/new>
2344 or L<insert|DBIx::Class::Row/insert> depending on how early in the
2345 L</create> process you need to intervene.
2346
2347 =back
2348
2349 =cut
2350
2351 sub create {
2352   my ($self, $attrs) = @_;
2353   $self->throw_exception( "create needs a hashref" )
2354     unless ref $attrs eq 'HASH';
2355   return $self->new_result($attrs)->insert;
2356 }
2357
2358 =head2 find_or_create
2359
2360 =over 4
2361
2362 =item Arguments: \%vals, \%attrs?
2363
2364 =item Return Value: $rowobject
2365
2366 =back
2367
2368   $cd->cd_to_producer->find_or_create({ producer => $producer },
2369                                       { key => 'primary' });
2370
2371 Tries to find a record based on its primary key or unique constraints; if none
2372 is found, creates one and returns that instead.
2373
2374   my $cd = $schema->resultset('CD')->find_or_create({
2375     cdid   => 5,
2376     artist => 'Massive Attack',
2377     title  => 'Mezzanine',
2378     year   => 2005,
2379   });
2380
2381 Also takes an optional C<key> attribute, to search by a specific key or unique
2382 constraint. For example:
2383
2384   my $cd = $schema->resultset('CD')->find_or_create(
2385     {
2386       artist => 'Massive Attack',
2387       title  => 'Mezzanine',
2388     },
2389     { key => 'cd_artist_title' }
2390   );
2391
2392 B<Note>: Because find_or_create() reads from the database and then
2393 possibly inserts based on the result, this method is subject to a race
2394 condition. Another process could create a record in the table after
2395 the find has completed and before the create has started. To avoid
2396 this problem, use find_or_create() inside a transaction.
2397
2398 B<Note>: Take care when using C<find_or_create> with a table having
2399 columns with default values that you intend to be automatically
2400 supplied by the database (e.g. an auto_increment primary key column).
2401 In normal usage, the value of such columns should NOT be included at
2402 all in the call to C<find_or_create>, even when set to C<undef>.
2403
2404 See also L</find> and L</update_or_create>. For information on how to declare
2405 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2406
2407 =cut
2408
2409 sub find_or_create {
2410   my $self     = shift;
2411   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2412   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
2413   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
2414     return $row;
2415   }
2416   return $self->create($hash);
2417 }
2418
2419 =head2 update_or_create
2420
2421 =over 4
2422
2423 =item Arguments: \%col_values, { key => $unique_constraint }?
2424
2425 =item Return Value: $rowobject
2426
2427 =back
2428
2429   $resultset->update_or_create({ col => $val, ... });
2430
2431 First, searches for an existing row matching one of the unique constraints
2432 (including the primary key) on the source of this resultset. If a row is
2433 found, updates it with the other given column values. Otherwise, creates a new
2434 row.
2435
2436 Takes an optional C<key> attribute to search on a specific unique constraint.
2437 For example:
2438
2439   # In your application
2440   my $cd = $schema->resultset('CD')->update_or_create(
2441     {
2442       artist => 'Massive Attack',
2443       title  => 'Mezzanine',
2444       year   => 1998,
2445     },
2446     { key => 'cd_artist_title' }
2447   );
2448
2449   $cd->cd_to_producer->update_or_create({
2450     producer => $producer,
2451     name => 'harry',
2452   }, {
2453     key => 'primary,
2454   });
2455
2456
2457 If no C<key> is specified, it searches on all unique constraints defined on the
2458 source, including the primary key.
2459
2460 If the C<key> is specified as C<primary>, it searches only on the primary key.
2461
2462 See also L</find> and L</find_or_create>. For information on how to declare
2463 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
2464
2465 B<Note>: Take care when using C<update_or_create> with a table having
2466 columns with default values that you intend to be automatically
2467 supplied by the database (e.g. an auto_increment primary key column).
2468 In normal usage, the value of such columns should NOT be included at
2469 all in the call to C<update_or_create>, even when set to C<undef>.
2470
2471 =cut
2472
2473 sub update_or_create {
2474   my $self = shift;
2475   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2476   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2477
2478   my $row = $self->find($cond, $attrs);
2479   if (defined $row) {
2480     $row->update($cond);
2481     return $row;
2482   }
2483
2484   return $self->create($cond);
2485 }
2486
2487 =head2 update_or_new
2488
2489 =over 4
2490
2491 =item Arguments: \%col_values, { key => $unique_constraint }?
2492
2493 =item Return Value: $rowobject
2494
2495 =back
2496
2497   $resultset->update_or_new({ col => $val, ... });
2498
2499 First, searches for an existing row matching one of the unique constraints
2500 (including the primary key) on the source of this resultset. If a row is
2501 found, updates it with the other given column values. Otherwise, instantiate
2502 a new result object and return it. The object will not be saved into your storage
2503 until you call L<DBIx::Class::Row/insert> on it.
2504
2505 Takes an optional C<key> attribute to search on a specific unique constraint.
2506 For example:
2507
2508   # In your application
2509   my $cd = $schema->resultset('CD')->update_or_new(
2510     {
2511       artist => 'Massive Attack',
2512       title  => 'Mezzanine',
2513       year   => 1998,
2514     },
2515     { key => 'cd_artist_title' }
2516   );
2517
2518   if ($cd->in_storage) {
2519       # the cd was updated
2520   }
2521   else {
2522       # the cd is not yet in the database, let's insert it
2523       $cd->insert;
2524   }
2525
2526 B<Note>: Take care when using C<update_or_new> with a table having
2527 columns with default values that you intend to be automatically
2528 supplied by the database (e.g. an auto_increment primary key column).
2529 In normal usage, the value of such columns should NOT be included at
2530 all in the call to C<update_or_new>, even when set to C<undef>.
2531
2532 See also L</find>, L</find_or_create> and L</find_or_new>.
2533
2534 =cut
2535
2536 sub update_or_new {
2537     my $self  = shift;
2538     my $attrs = ( @_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {} );
2539     my $cond  = ref $_[0] eq 'HASH' ? shift : {@_};
2540
2541     my $row = $self->find( $cond, $attrs );
2542     if ( defined $row ) {
2543         $row->update($cond);
2544         return $row;
2545     }
2546
2547     return $self->new_result($cond);
2548 }
2549
2550 =head2 get_cache
2551
2552 =over 4
2553
2554 =item Arguments: none
2555
2556 =item Return Value: \@cache_objects?
2557
2558 =back
2559
2560 Gets the contents of the cache for the resultset, if the cache is set.
2561
2562 The cache is populated either by using the L</prefetch> attribute to
2563 L</search> or by calling L</set_cache>.
2564
2565 =cut
2566
2567 sub get_cache {
2568   shift->{all_cache};
2569 }
2570
2571 =head2 set_cache
2572
2573 =over 4
2574
2575 =item Arguments: \@cache_objects
2576
2577 =item Return Value: \@cache_objects
2578
2579 =back
2580
2581 Sets the contents of the cache for the resultset. Expects an arrayref
2582 of objects of the same class as those produced by the resultset. Note that
2583 if the cache is set the resultset will return the cached objects rather
2584 than re-querying the database even if the cache attr is not set.
2585
2586 The contents of the cache can also be populated by using the
2587 L</prefetch> attribute to L</search>.
2588
2589 =cut
2590
2591 sub set_cache {
2592   my ( $self, $data ) = @_;
2593   $self->throw_exception("set_cache requires an arrayref")
2594       if defined($data) && (ref $data ne 'ARRAY');
2595   $self->{all_cache} = $data;
2596 }
2597
2598 =head2 clear_cache
2599
2600 =over 4
2601
2602 =item Arguments: none
2603
2604 =item Return Value: []
2605
2606 =back
2607
2608 Clears the cache for the resultset.
2609
2610 =cut
2611
2612 sub clear_cache {
2613   shift->set_cache(undef);
2614 }
2615
2616 =head2 is_paged
2617
2618 =over 4
2619
2620 =item Arguments: none
2621
2622 =item Return Value: true, if the resultset has been paginated
2623
2624 =back
2625
2626 =cut
2627
2628 sub is_paged {
2629   my ($self) = @_;
2630   return !!$self->{attrs}{page};
2631 }
2632
2633 =head2 is_ordered
2634
2635 =over 4
2636
2637 =item Arguments: none
2638
2639 =item Return Value: true, if the resultset has been ordered with C<order_by>.
2640
2641 =back
2642
2643 =cut
2644
2645 sub is_ordered {
2646   my ($self) = @_;
2647   return scalar $self->result_source->storage->_parse_order_by($self->{attrs}{order_by});
2648 }
2649
2650 =head2 related_resultset
2651
2652 =over 4
2653
2654 =item Arguments: $relationship_name
2655
2656 =item Return Value: $resultset
2657
2658 =back
2659
2660 Returns a related resultset for the supplied relationship name.
2661
2662   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2663
2664 =cut
2665
2666 sub related_resultset {
2667   my ($self, $rel) = @_;
2668
2669   $self->{related_resultsets} ||= {};
2670   return $self->{related_resultsets}{$rel} ||= do {
2671     my $rsrc = $self->result_source;
2672     my $rel_info = $rsrc->relationship_info($rel);
2673
2674     $self->throw_exception(
2675       "search_related: result source '" . $rsrc->source_name .
2676         "' has no such relationship $rel")
2677       unless $rel_info;
2678
2679     my $attrs = $self->_chain_relationship($rel);
2680
2681     my $join_count = $attrs->{seen_join}{$rel};
2682
2683     my $alias = $self->result_source->storage
2684         ->relname_to_table_alias($rel, $join_count);
2685
2686     # since this is search_related, and we already slid the select window inwards
2687     # (the select/as attrs were deleted in the beginning), we need to flip all
2688     # left joins to inner, so we get the expected results
2689     # read the comment on top of the actual function to see what this does
2690     $attrs->{from} = $rsrc->schema->storage->_straight_join_to_node ($attrs->{from}, $alias);
2691
2692
2693     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2694     delete @{$attrs}{qw(result_class alias)};
2695
2696     my $new_cache;
2697
2698     if (my $cache = $self->get_cache) {
2699       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2700         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2701                         @$cache ];
2702       }
2703     }
2704
2705     my $rel_source = $rsrc->related_source($rel);
2706
2707     my $new = do {
2708
2709       # The reason we do this now instead of passing the alias to the
2710       # search_rs below is that if you wrap/overload resultset on the
2711       # source you need to know what alias it's -going- to have for things
2712       # to work sanely (e.g. RestrictWithObject wants to be able to add
2713       # extra query restrictions, and these may need to be $alias.)
2714
2715       my $rel_attrs = $rel_source->resultset_attributes;
2716       local $rel_attrs->{alias} = $alias;
2717
2718       $rel_source->resultset
2719                  ->search_rs(
2720                      undef, {
2721                        %$attrs,
2722                        where => $attrs->{where},
2723                    });
2724     };
2725     $new->set_cache($new_cache) if $new_cache;
2726     $new;
2727   };
2728 }
2729
2730 =head2 current_source_alias
2731
2732 =over 4
2733
2734 =item Arguments: none
2735
2736 =item Return Value: $source_alias
2737
2738 =back
2739
2740 Returns the current table alias for the result source this resultset is built
2741 on, that will be used in the SQL query. Usually it is C<me>.
2742
2743 Currently the source alias that refers to the result set returned by a
2744 L</search>/L</find> family method depends on how you got to the resultset: it's
2745 C<me> by default, but eg. L</search_related> aliases it to the related result
2746 source name (and keeps C<me> referring to the original result set). The long
2747 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2748 (and make this method unnecessary).
2749
2750 Thus it's currently necessary to use this method in predefined queries (see
2751 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2752 source alias of the current result set:
2753
2754   # in a result set class
2755   sub modified_by {
2756     my ($self, $user) = @_;
2757
2758     my $me = $self->current_source_alias;
2759
2760     return $self->search(
2761       "$me.modified" => $user->id,
2762     );
2763   }
2764
2765 =cut
2766
2767 sub current_source_alias {
2768   my ($self) = @_;
2769
2770   return ($self->{attrs} || {})->{alias} || 'me';
2771 }
2772
2773 =head2 as_subselect_rs
2774
2775 =over 4
2776
2777 =item Arguments: none
2778
2779 =item Return Value: $resultset
2780
2781 =back
2782
2783 Act as a barrier to SQL symbols.  The resultset provided will be made into a
2784 "virtual view" by including it as a subquery within the from clause.  From this
2785 point on, any joined tables are inaccessible to ->search on the resultset (as if
2786 it were simply where-filtered without joins).  For example:
2787
2788  my $rs = $schema->resultset('Bar')->search({'x.name' => 'abc'},{ join => 'x' });
2789
2790  # 'x' now pollutes the query namespace
2791
2792  # So the following works as expected
2793  my $ok_rs = $rs->search({'x.other' => 1});
2794
2795  # But this doesn't: instead of finding a 'Bar' related to two x rows (abc and
2796  # def) we look for one row with contradictory terms and join in another table
2797  # (aliased 'x_2') which we never use
2798  my $broken_rs = $rs->search({'x.name' => 'def'});
2799
2800  my $rs2 = $rs->as_subselect_rs;
2801
2802  # doesn't work - 'x' is no longer accessible in $rs2, having been sealed away
2803  my $not_joined_rs = $rs2->search({'x.other' => 1});
2804
2805  # works as expected: finds a 'table' row related to two x rows (abc and def)
2806  my $correctly_joined_rs = $rs2->search({'x.name' => 'def'});
2807
2808 Another example of when one might use this would be to select a subset of
2809 columns in a group by clause:
2810
2811  my $rs = $schema->resultset('Bar')->search(undef, {
2812    group_by => [qw{ id foo_id baz_id }],
2813  })->as_subselect_rs->search(undef, {
2814    columns => [qw{ id foo_id }]
2815  });
2816
2817 In the above example normally columns would have to be equal to the group by,
2818 but because we isolated the group by into a subselect the above works.
2819
2820 =cut
2821
2822 sub as_subselect_rs {
2823    my $self = shift;
2824
2825    return $self->result_source->resultset->search( undef, {
2826       alias => $self->current_source_alias,
2827       from => [{
2828             $self->current_source_alias => $self->as_query,
2829             -alias         => $self->current_source_alias,
2830             -source_handle => $self->result_source->handle,
2831          }]
2832    });
2833 }
2834
2835 # This code is called by search_related, and makes sure there
2836 # is clear separation between the joins before, during, and
2837 # after the relationship. This information is needed later
2838 # in order to properly resolve prefetch aliases (any alias
2839 # with a relation_chain_depth less than the depth of the
2840 # current prefetch is not considered)
2841 #
2842 # The increments happen twice per join. An even number means a
2843 # relationship specified via a search_related, whereas an odd
2844 # number indicates a join/prefetch added via attributes
2845 #
2846 # Also this code will wrap the current resultset (the one we
2847 # chain to) in a subselect IFF it contains limiting attributes
2848 sub _chain_relationship {
2849   my ($self, $rel) = @_;
2850   my $source = $self->result_source;
2851   my $attrs = { %{$self->{attrs}||{}} };
2852
2853   # we need to take the prefetch the attrs into account before we
2854   # ->_resolve_join as otherwise they get lost - captainL
2855   my $join = $self->_merge_attr( $attrs->{join}, $attrs->{prefetch} );
2856
2857   delete @{$attrs}{qw/join prefetch collapse distinct select as columns +select +as +columns/};
2858
2859   my $seen = { %{ (delete $attrs->{seen_join}) || {} } };
2860
2861   my $from;
2862   my @force_subq_attrs = qw/offset rows group_by having/;
2863
2864   if (
2865     ($attrs->{from} && ref $attrs->{from} ne 'ARRAY')
2866       ||
2867     $self->_has_resolved_attr (@force_subq_attrs)
2868   ) {
2869     # Nuke the prefetch (if any) before the new $rs attrs
2870     # are resolved (prefetch is useless - we are wrapping
2871     # a subquery anyway).
2872     my $rs_copy = $self->search;
2873     $rs_copy->{attrs}{join} = $self->_merge_attr (
2874       $rs_copy->{attrs}{join},
2875       delete $rs_copy->{attrs}{prefetch},
2876     );
2877
2878     $from = [{
2879       -source_handle => $source->handle,
2880       -alias => $attrs->{alias},
2881       $attrs->{alias} => $rs_copy->as_query,
2882     }];
2883     delete @{$attrs}{@force_subq_attrs, 'where'};
2884     $seen->{-relation_chain_depth} = 0;
2885   }
2886   elsif ($attrs->{from}) {  #shallow copy suffices
2887     $from = [ @{$attrs->{from}} ];
2888   }
2889   else {
2890     $from = [{
2891       -source_handle => $source->handle,
2892       -alias => $attrs->{alias},
2893       $attrs->{alias} => $source->from,
2894     }];
2895   }
2896
2897   my $jpath = ($seen->{-relation_chain_depth})
2898     ? $from->[-1][0]{-join_path}
2899     : [];
2900
2901   my @requested_joins = $source->_resolve_join(
2902     $join,
2903     $attrs->{alias},
2904     $seen,
2905     $jpath,
2906   );
2907
2908   push @$from, @requested_joins;
2909
2910   $seen->{-relation_chain_depth}++;
2911
2912   # if $self already had a join/prefetch specified on it, the requested
2913   # $rel might very well be already included. What we do in this case
2914   # is effectively a no-op (except that we bump up the chain_depth on
2915   # the join in question so we could tell it *is* the search_related)
2916   my $already_joined;
2917
2918   # we consider the last one thus reverse
2919   for my $j (reverse @requested_joins) {
2920     my ($last_j) = keys %{$j->[0]{-join_path}[-1]};
2921     if ($rel eq $last_j) {
2922       $j->[0]{-relation_chain_depth}++;
2923       $already_joined++;
2924       last;
2925     }
2926   }
2927
2928   unless ($already_joined) {
2929     push @$from, $source->_resolve_join(
2930       $rel,
2931       $attrs->{alias},
2932       $seen,
2933       $jpath,
2934     );
2935   }
2936
2937   $seen->{-relation_chain_depth}++;
2938
2939   return {%$attrs, from => $from, seen_join => $seen};
2940 }
2941
2942 # too many times we have to do $attrs = { %{$self->_resolved_attrs} }
2943 sub _resolved_attrs_copy {
2944   my $self = shift;
2945   return { %{$self->_resolved_attrs (@_)} };
2946 }
2947
2948 sub _resolved_attrs {
2949   my $self = shift;
2950   return $self->{_attrs} if $self->{_attrs};
2951
2952   my $attrs  = { %{ $self->{attrs} || {} } };
2953   my $source = $self->result_source;
2954   my $alias  = $attrs->{alias};
2955
2956   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2957   my @colbits;
2958
2959   # build columns (as long as select isn't set) into a set of as/select hashes
2960   unless ( $attrs->{select} ) {
2961
2962     my @cols;
2963     if ( ref $attrs->{columns} eq 'ARRAY' ) {
2964       @cols = @{ delete $attrs->{columns}}
2965     } elsif ( defined $attrs->{columns} ) {
2966       @cols = delete $attrs->{columns}
2967     } else {
2968       @cols = $source->columns
2969     }
2970
2971     for (@cols) {
2972       if ( ref $_ eq 'HASH' ) {
2973         push @colbits, $_
2974       } else {
2975         my $key = /^\Q${alias}.\E(.+)$/
2976           ? "$1"
2977           : "$_";
2978         my $value = /\./
2979           ? "$_"
2980           : "${alias}.$_";
2981         push @colbits, { $key => $value };
2982       }
2983     }
2984   }
2985
2986   # add the additional columns on
2987   foreach (qw{include_columns +columns}) {
2988     if ( $attrs->{$_} ) {
2989       my @list = ( ref($attrs->{$_}) eq 'ARRAY' )
2990         ? @{ delete $attrs->{$_} }
2991         : delete $attrs->{$_};
2992       for (@list) {
2993         if ( ref($_) eq 'HASH' ) {
2994           push @colbits, $_
2995         } else {
2996           my $key = ( split /\./, $_ )[-1];
2997           my $value = ( /\./ ? $_ : "$alias.$_" );
2998           push @colbits, { $key => $value };
2999         }
3000       }
3001     }
3002   }
3003
3004   # start with initial select items
3005   if ( $attrs->{select} ) {
3006     $attrs->{select} =
3007         ( ref $attrs->{select} eq 'ARRAY' )
3008       ? [ @{ $attrs->{select} } ]
3009       : [ $attrs->{select} ];
3010
3011     if ( $attrs->{as} ) {
3012       $attrs->{as} =
3013         (
3014           ref $attrs->{as} eq 'ARRAY'
3015             ? [ @{ $attrs->{as} } ]
3016             : [ $attrs->{as} ]
3017         )
3018     } else {
3019       $attrs->{as} = [ map {
3020          m/^\Q${alias}.\E(.+)$/
3021            ? $1
3022            : $_
3023          } @{ $attrs->{select} }
3024       ]
3025     }
3026   }
3027   else {
3028     # otherwise we intialise select & as to empty
3029     $attrs->{select} = [];
3030     $attrs->{as}     = [];
3031   }
3032
3033   # now add colbits to select/as
3034   push @{ $attrs->{select} }, map values %{$_}, @colbits;
3035   push @{ $attrs->{as}     }, map keys   %{$_}, @colbits;
3036
3037   if ( my $adds = delete $attrs->{'+select'} ) {
3038     $adds = [$adds] unless ref $adds eq 'ARRAY';
3039     push @{ $attrs->{select} },
3040       map { /\./ || ref $_ ? $_ : "$alias.$_" } @$adds;
3041   }
3042   if ( my $adds = delete $attrs->{'+as'} ) {
3043     $adds = [$adds] unless ref $adds eq 'ARRAY';
3044     push @{ $attrs->{as} }, @$adds;
3045   }
3046
3047   $attrs->{from} ||= [{
3048     -source_handle => $source->handle,
3049     -alias => $self->{attrs}{alias},
3050     $self->{attrs}{alias} => $source->from,
3051   }];
3052
3053   if ( $attrs->{join} || $attrs->{prefetch} ) {
3054
3055     $self->throw_exception ('join/prefetch can not be used with a custom {from}')
3056       if ref $attrs->{from} ne 'ARRAY';
3057
3058     my $join = delete $attrs->{join} || {};
3059
3060     if ( defined $attrs->{prefetch} ) {
3061       $join = $self->_merge_attr( $join, $attrs->{prefetch} );
3062     }
3063
3064     $attrs->{from} =    # have to copy here to avoid corrupting the original
3065       [
3066         @{ $attrs->{from} },
3067         $source->_resolve_join(
3068           $join,
3069           $alias,
3070           { %{ $attrs->{seen_join} || {} } },
3071           ( $attrs->{seen_join} && keys %{$attrs->{seen_join}})
3072             ? $attrs->{from}[-1][0]{-join_path}
3073             : []
3074           ,
3075         )
3076       ];
3077   }
3078
3079   if ( defined $attrs->{order_by} ) {
3080     $attrs->{order_by} = (
3081       ref( $attrs->{order_by} ) eq 'ARRAY'
3082       ? [ @{ $attrs->{order_by} } ]
3083       : [ $attrs->{order_by} || () ]
3084     );
3085   }
3086
3087   if ($attrs->{group_by} and ref $attrs->{group_by} ne 'ARRAY') {
3088     $attrs->{group_by} = [ $attrs->{group_by} ];
3089   }
3090
3091   # generate the distinct induced group_by early, as prefetch will be carried via a
3092   # subquery (since a group_by is present)
3093   if (delete $attrs->{distinct}) {
3094     if ($attrs->{group_by}) {
3095       carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
3096     }
3097     else {
3098       $attrs->{group_by} = [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
3099
3100       # add any order_by parts that are not already present in the group_by
3101       # we need to be careful not to add any named functions/aggregates
3102       # i.e. select => [ ... { count => 'foo', -as 'foocount' } ... ]
3103       my %already_grouped = map { $_ => 1 } (@{$attrs->{group_by}});
3104
3105       my $storage = $self->result_source->schema->storage;
3106
3107       my $rs_column_list = $storage->_resolve_column_info ($attrs->{from});
3108
3109       for my $chunk ($storage->_parse_order_by($attrs->{order_by})) {
3110         if ($rs_column_list->{$chunk} && not $already_grouped{$chunk}++) {
3111           push @{$attrs->{group_by}}, $chunk;
3112         }
3113       }
3114     }
3115   }
3116
3117   # generate selections based on the prefetch helper
3118   if ( my $prefetch = delete $attrs->{prefetch} ) {
3119     $attrs->{collapse} = 1;
3120
3121     # this is a separate structure (we don't look in {from} directly)
3122     # as the resolver needs to shift things off the lists to work
3123     # properly (identical-prefetches on different branches)
3124     my $join_map = {};
3125     if (ref $attrs->{from} eq 'ARRAY') {
3126
3127       my $start_depth = $attrs->{seen_join}{-relation_chain_depth} || 0;
3128
3129       for my $j ( @{$attrs->{from}}[1 .. $#{$attrs->{from}} ] ) {
3130         next unless $j->[0]{-alias};
3131         next unless $j->[0]{-join_path};
3132         next if ($j->[0]{-relation_chain_depth} || 0) < $start_depth;
3133
3134         my @jpath = map { keys %$_ } @{$j->[0]{-join_path}};
3135
3136         my $p = $join_map;
3137         $p = $p->{$_} ||= {} for @jpath[ ($start_depth/2) .. $#jpath]; #only even depths are actual jpath boundaries
3138         push @{$p->{-join_aliases} }, $j->[0]{-alias};
3139       }
3140     }
3141
3142     my @prefetch = $source->_resolve_prefetch( $prefetch, $alias, $join_map );
3143
3144     # we need to somehow mark which columns came from prefetch
3145     $attrs->{_prefetch_select} = [ map { $_->[0] } @prefetch ];
3146
3147     push @{ $attrs->{select} }, @{$attrs->{_prefetch_select}};
3148     push @{ $attrs->{as} }, (map { $_->[1] } @prefetch);
3149   }
3150
3151   # run through the resulting joinstructure (starting from our current slot)
3152   # and unset collapse if proven unnesessary
3153   if ($attrs->{collapse} && ref $attrs->{from} eq 'ARRAY') {
3154
3155     if (@{$attrs->{from}} > 1) {
3156
3157       # find where our table-spec starts and consider only things after us
3158       my @fromlist = @{$attrs->{from}};
3159       while (@fromlist) {
3160         my $t = shift @fromlist;
3161         $t = $t->[0] if ref $t eq 'ARRAY';  #me vs join from-spec mismatch
3162         last if ($t->{-alias} && $t->{-alias} eq $alias);
3163       }
3164
3165       for (@fromlist) {
3166         $attrs->{collapse} = ! $_->[0]{-is_single}
3167           and last;
3168       }
3169     }
3170     else {
3171       # no joins - no collapse
3172       $attrs->{collapse} = 0;
3173     }
3174   }
3175
3176   # the row parser generates differently depending on whether collapsing is requested
3177   # the need to look at {select} is temporary
3178   $attrs->{_row_parser} = $source->_mk_row_parser (
3179     @{$attrs}{qw/as collapse select/}
3180   );
3181
3182   # if both page and offset are specified, produce a combined offset
3183   # even though it doesn't make much sense, this is what pre 081xx has
3184   # been doing
3185   if (my $page = delete $attrs->{page}) {
3186     $attrs->{offset} =
3187       ($attrs->{rows} * ($page - 1))
3188             +
3189       ($attrs->{offset} || 0)
3190     ;
3191   }
3192
3193   return $self->{_attrs} = $attrs;
3194 }
3195
3196 sub _rollout_attr {
3197   my ($self, $attr) = @_;
3198
3199   if (ref $attr eq 'HASH') {
3200     return $self->_rollout_hash($attr);
3201   } elsif (ref $attr eq 'ARRAY') {
3202     return $self->_rollout_array($attr);
3203   } else {
3204     return [$attr];
3205   }
3206 }
3207
3208 sub _rollout_array {
3209   my ($self, $attr) = @_;
3210
3211   my @rolled_array;
3212   foreach my $element (@{$attr}) {
3213     if (ref $element eq 'HASH') {
3214       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
3215     } elsif (ref $element eq 'ARRAY') {
3216       #  XXX - should probably recurse here
3217       push( @rolled_array, @{$self->_rollout_array($element)} );
3218     } else {
3219       push( @rolled_array, $element );
3220     }
3221   }
3222   return \@rolled_array;
3223 }
3224
3225 sub _rollout_hash {
3226   my ($self, $attr) = @_;
3227
3228   my @rolled_array;
3229   foreach my $key (keys %{$attr}) {
3230     push( @rolled_array, { $key => $attr->{$key} } );
3231   }
3232   return \@rolled_array;
3233 }
3234
3235 sub _calculate_score {
3236   my ($self, $a, $b) = @_;
3237
3238   if (defined $a xor defined $b) {
3239     return 0;
3240   }
3241   elsif (not defined $a) {
3242     return 1;
3243   }
3244
3245   if (ref $b eq 'HASH') {
3246     my ($b_key) = keys %{$b};
3247     if (ref $a eq 'HASH') {
3248       my ($a_key) = keys %{$a};
3249       if ($a_key eq $b_key) {
3250         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
3251       } else {
3252         return 0;
3253       }
3254     } else {
3255       return ($a eq $b_key) ? 1 : 0;
3256     }
3257   } else {
3258     if (ref $a eq 'HASH') {
3259       my ($a_key) = keys %{$a};
3260       return ($b eq $a_key) ? 1 : 0;
3261     } else {
3262       return ($b eq $a) ? 1 : 0;
3263     }
3264   }
3265 }
3266
3267 sub _merge_attr {
3268   my ($self, $orig, $import) = @_;
3269
3270   return $import unless defined($orig);
3271   return $orig unless defined($import);
3272
3273   $orig = $self->_rollout_attr($orig);
3274   $import = $self->_rollout_attr($import);
3275
3276   my $seen_keys;
3277   foreach my $import_element ( @{$import} ) {
3278     # find best candidate from $orig to merge $b_element into
3279     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
3280     foreach my $orig_element ( @{$orig} ) {
3281       my $score = $self->_calculate_score( $orig_element, $import_element );
3282       if ($score > $best_candidate->{score}) {
3283         $best_candidate->{position} = $position;
3284         $best_candidate->{score} = $score;
3285       }
3286       $position++;
3287     }
3288     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
3289
3290     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
3291       push( @{$orig}, $import_element );
3292     } else {
3293       my $orig_best = $orig->[$best_candidate->{position}];
3294       # merge orig_best and b_element together and replace original with merged
3295       if (ref $orig_best ne 'HASH') {
3296         $orig->[$best_candidate->{position}] = $import_element;
3297       } elsif (ref $import_element eq 'HASH') {
3298         my ($key) = keys %{$orig_best};
3299         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
3300       }
3301     }
3302     $seen_keys->{$import_key} = 1; # don't merge the same key twice
3303   }
3304
3305   return $orig;
3306 }
3307
3308 sub result_source {
3309     my $self = shift;
3310
3311     if (@_) {
3312         $self->_source_handle($_[0]->handle);
3313     } else {
3314         $self->_source_handle->resolve;
3315     }
3316 }
3317
3318 =head2 throw_exception
3319
3320 See L<DBIx::Class::Schema/throw_exception> for details.
3321
3322 =cut
3323
3324 sub throw_exception {
3325   my $self=shift;
3326
3327   if (ref $self && $self->_source_handle->schema) {
3328     $self->_source_handle->schema->throw_exception(@_)
3329   }
3330   else {
3331     DBIx::Class::Exception->throw(@_);
3332   }
3333 }
3334
3335 # XXX: FIXME: Attributes docs need clearing up
3336
3337 =head1 ATTRIBUTES
3338
3339 Attributes are used to refine a ResultSet in various ways when
3340 searching for data. They can be passed to any method which takes an
3341 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
3342 L</count>.
3343
3344 These are in no particular order:
3345
3346 =head2 order_by
3347
3348 =over 4
3349
3350 =item Value: ( $order_by | \@order_by | \%order_by )
3351
3352 =back
3353
3354 Which column(s) to order the results by.
3355
3356 [The full list of suitable values is documented in
3357 L<SQL::Abstract/"ORDER BY CLAUSES">; the following is a summary of
3358 common options.]
3359
3360 If a single column name, or an arrayref of names is supplied, the
3361 argument is passed through directly to SQL. The hashref syntax allows
3362 for connection-agnostic specification of ordering direction:
3363
3364  For descending order:
3365
3366   order_by => { -desc => [qw/col1 col2 col3/] }
3367
3368  For explicit ascending order:
3369
3370   order_by => { -asc => 'col' }
3371
3372 The old scalarref syntax (i.e. order_by => \'year DESC') is still
3373 supported, although you are strongly encouraged to use the hashref
3374 syntax as outlined above.
3375
3376 =head2 columns
3377
3378 =over 4
3379
3380 =item Value: \@columns
3381
3382 =back
3383
3384 Shortcut to request a particular set of columns to be retrieved. Each
3385 column spec may be a string (a table column name), or a hash (in which
3386 case the key is the C<as> value, and the value is used as the C<select>
3387 expression). Adds C<me.> onto the start of any column without a C<.> in
3388 it and sets C<select> from that, then auto-populates C<as> from
3389 C<select> as normal. (You may also use the C<cols> attribute, as in
3390 earlier versions of DBIC.)
3391
3392 =head2 +columns
3393
3394 =over 4
3395
3396 =item Value: \@columns
3397
3398 =back
3399
3400 Indicates additional columns to be selected from storage. Works the same
3401 as L</columns> but adds columns to the selection. (You may also use the
3402 C<include_columns> attribute, as in earlier versions of DBIC). For
3403 example:-
3404
3405   $schema->resultset('CD')->search(undef, {
3406     '+columns' => ['artist.name'],
3407     join => ['artist']
3408   });
3409
3410 would return all CDs and include a 'name' column to the information
3411 passed to object inflation. Note that the 'artist' is the name of the
3412 column (or relationship) accessor, and 'name' is the name of the column
3413 accessor in the related table.
3414
3415 =head2 include_columns
3416
3417 =over 4
3418
3419 =item Value: \@columns
3420
3421 =back
3422
3423 Deprecated.  Acts as a synonym for L</+columns> for backward compatibility.
3424
3425 =head2 select
3426
3427 =over 4
3428
3429 =item Value: \@select_columns
3430
3431 =back
3432
3433 Indicates which columns should be selected from the storage. You can use
3434 column names, or in the case of RDBMS back ends, function or stored procedure
3435 names:
3436
3437   $rs = $schema->resultset('Employee')->search(undef, {
3438     select => [
3439       'name',
3440       { count => 'employeeid' },
3441       { sum => 'salary' }
3442     ]
3443   });
3444
3445 When you use function/stored procedure names and do not supply an C<as>
3446 attribute, the column names returned are storage-dependent. E.g. MySQL would
3447 return a column named C<count(employeeid)> in the above example.
3448
3449 B<NOTE:> You will almost always need a corresponding 'as' entry when you use
3450 'select'.
3451
3452 =head2 +select
3453
3454 =over 4
3455
3456 Indicates additional columns to be selected from storage.  Works the same as
3457 L</select> but adds columns to the selection.
3458
3459 =back
3460
3461 =head2 +as
3462
3463 =over 4
3464
3465 Indicates additional column names for those added via L</+select>. See L</as>.
3466
3467 =back
3468
3469 =head2 as
3470
3471 =over 4
3472
3473 =item Value: \@inflation_names
3474
3475 =back
3476
3477 Indicates column names for object inflation. That is, C<as>
3478 indicates the name that the column can be accessed as via the
3479 C<get_column> method (or via the object accessor, B<if one already
3480 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
3481
3482 The C<as> attribute is used in conjunction with C<select>,
3483 usually when C<select> contains one or more function or stored
3484 procedure names:
3485
3486   $rs = $schema->resultset('Employee')->search(undef, {
3487     select => [
3488       'name',
3489       { count => 'employeeid' }
3490     ],
3491     as => ['name', 'employee_count'],
3492   });
3493
3494   my $employee = $rs->first(); # get the first Employee
3495
3496 If the object against which the search is performed already has an accessor
3497 matching a column name specified in C<as>, the value can be retrieved using
3498 the accessor as normal:
3499
3500   my $name = $employee->name();
3501
3502 If on the other hand an accessor does not exist in the object, you need to
3503 use C<get_column> instead:
3504
3505   my $employee_count = $employee->get_column('employee_count');
3506
3507 You can create your own accessors if required - see
3508 L<DBIx::Class::Manual::Cookbook> for details.
3509
3510 Please note: This will NOT insert an C<AS employee_count> into the SQL
3511 statement produced, it is used for internal access only. Thus
3512 attempting to use the accessor in an C<order_by> clause or similar
3513 will fail miserably.
3514
3515 To get around this limitation, you can supply literal SQL to your
3516 C<select> attribute that contains the C<AS alias> text, e.g.
3517
3518   select => [\'myfield AS alias']
3519
3520 =head2 join
3521
3522 =over 4
3523
3524 =item Value: ($rel_name | \@rel_names | \%rel_names)
3525
3526 =back
3527
3528 Contains a list of relationships that should be joined for this query.  For
3529 example:
3530
3531   # Get CDs by Nine Inch Nails
3532   my $rs = $schema->resultset('CD')->search(
3533     { 'artist.name' => 'Nine Inch Nails' },
3534     { join => 'artist' }
3535   );
3536
3537 Can also contain a hash reference to refer to the other relation's relations.
3538 For example:
3539
3540   package MyApp::Schema::Track;
3541   use base qw/DBIx::Class/;
3542   __PACKAGE__->table('track');
3543   __PACKAGE__->add_columns(qw/trackid cd position title/);
3544   __PACKAGE__->set_primary_key('trackid');
3545   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
3546   1;
3547
3548   # In your application
3549   my $rs = $schema->resultset('Artist')->search(
3550     { 'track.title' => 'Teardrop' },
3551     {
3552       join     => { cd => 'track' },
3553       order_by => 'artist.name',
3554     }
3555   );
3556
3557 You need to use the relationship (not the table) name in  conditions,
3558 because they are aliased as such. The current table is aliased as "me", so
3559 you need to use me.column_name in order to avoid ambiguity. For example:
3560
3561   # Get CDs from 1984 with a 'Foo' track
3562   my $rs = $schema->resultset('CD')->search(
3563     {
3564       'me.year' => 1984,
3565       'tracks.name' => 'Foo'
3566     },
3567     { join => 'tracks' }
3568   );
3569
3570 If the same join is supplied twice, it will be aliased to <rel>_2 (and
3571 similarly for a third time). For e.g.
3572
3573   my $rs = $schema->resultset('Artist')->search({
3574     'cds.title'   => 'Down to Earth',
3575     'cds_2.title' => 'Popular',
3576   }, {
3577     join => [ qw/cds cds/ ],
3578   });
3579
3580 will return a set of all artists that have both a cd with title 'Down
3581 to Earth' and a cd with title 'Popular'.
3582
3583 If you want to fetch related objects from other tables as well, see C<prefetch>
3584 below.
3585
3586 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
3587
3588 =head2 prefetch
3589
3590 =over 4
3591
3592 =item Value: ($rel_name | \@rel_names | \%rel_names)
3593
3594 =back
3595
3596 Contains one or more relationships that should be fetched along with
3597 the main query (when they are accessed afterwards the data will
3598 already be available, without extra queries to the database).  This is
3599 useful for when you know you will need the related objects, because it
3600 saves at least one query:
3601
3602   my $rs = $schema->resultset('Tag')->search(
3603     undef,
3604     {
3605       prefetch => {
3606         cd => 'artist'
3607       }
3608     }
3609   );
3610
3611 The initial search results in SQL like the following:
3612
3613   SELECT tag.*, cd.*, artist.* FROM tag
3614   JOIN cd ON tag.cd = cd.cdid
3615   JOIN artist ON cd.artist = artist.artistid
3616
3617 L<DBIx::Class> has no need to go back to the database when we access the
3618 C<cd> or C<artist> relationships, which saves us two SQL statements in this
3619 case.
3620
3621 Simple prefetches will be joined automatically, so there is no need
3622 for a C<join> attribute in the above search.
3623
3624 C<prefetch> can be used with the following relationship types: C<belongs_to>,
3625 C<has_one> (or if you're using C<add_relationship>, any relationship declared
3626 with an accessor type of 'single' or 'filter'). A more complex example that
3627 prefetches an artists cds, the tracks on those cds, and the tags associated
3628 with that artist is given below (assuming many-to-many from artists to tags):
3629
3630  my $rs = $schema->resultset('Artist')->search(
3631    undef,
3632    {
3633      prefetch => [
3634        { cds => 'tracks' },
3635        { artist_tags => 'tags' }
3636      ]
3637    }
3638  );
3639
3640
3641 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
3642 attributes will be ignored.
3643
3644 B<CAVEATs>: Prefetch does a lot of deep magic. As such, it may not behave
3645 exactly as you might expect.
3646
3647 =over 4
3648
3649 =item *
3650
3651 Prefetch uses the L</cache> to populate the prefetched relationships. This
3652 may or may not be what you want.
3653
3654 =item *
3655
3656 If you specify a condition on a prefetched relationship, ONLY those
3657 rows that match the prefetched condition will be fetched into that relationship.
3658 This means that adding prefetch to a search() B<may alter> what is returned by
3659 traversing a relationship. So, if you have C<< Artist->has_many(CDs) >> and you do
3660
3661   my $artist_rs = $schema->resultset('Artist')->search({
3662       'cds.year' => 2008,
3663   }, {
3664       join => 'cds',
3665   });
3666
3667   my $count = $artist_rs->first->cds->count;
3668
3669   my $artist_rs_prefetch = $artist_rs->search( {}, { prefetch => 'cds' } );
3670
3671   my $prefetch_count = $artist_rs_prefetch->first->cds->count;
3672
3673   cmp_ok( $count, '==', $prefetch_count, "Counts should be the same" );
3674
3675 that cmp_ok() may or may not pass depending on the datasets involved. This
3676 behavior may or may not survive the 0.09 transition.
3677
3678 =back
3679
3680 =head2 page
3681
3682 =over 4
3683
3684 =item Value: $page
3685
3686 =back
3687
3688 Makes the resultset paged and specifies the page to retrieve. Effectively
3689 identical to creating a non-pages resultset and then calling ->page($page)
3690 on it.
3691
3692 If L<rows> attribute is not specified it defaults to 10 rows per page.
3693
3694 When you have a paged resultset, L</count> will only return the number
3695 of rows in the page. To get the total, use the L</pager> and call
3696 C<total_entries> on it.
3697
3698 =head2 rows
3699
3700 =over 4
3701
3702 =item Value: $rows
3703
3704 =back
3705
3706 Specifies the maximum number of rows for direct retrieval or the number of
3707 rows per page if the page attribute or method is used.
3708
3709 =head2 offset
3710
3711 =over 4
3712
3713 =item Value: $offset
3714
3715 =back
3716
3717 Specifies the (zero-based) row number for the  first row to be returned, or the
3718 of the first row of the first page if paging is used.
3719
3720 =head2 group_by
3721
3722 =over 4
3723
3724 =item Value: \@columns
3725
3726 =back
3727
3728 A arrayref of columns to group by. Can include columns of joined tables.
3729
3730   group_by => [qw/ column1 column2 ... /]
3731
3732 =head2 having
3733
3734 =over 4
3735
3736 =item Value: $condition
3737
3738 =back
3739
3740 HAVING is a select statement attribute that is applied between GROUP BY and
3741 ORDER BY. It is applied to the after the grouping calculations have been
3742 done.
3743
3744   having => { 'count(employee)' => { '>=', 100 } }
3745
3746 =head2 distinct
3747
3748 =over 4
3749
3750 =item Value: (0 | 1)
3751
3752 =back
3753
3754 Set to 1 to group by all columns. If the resultset already has a group_by
3755 attribute, this setting is ignored and an appropriate warning is issued.
3756
3757 =head2 where
3758
3759 =over 4
3760
3761 Adds to the WHERE clause.
3762
3763   # only return rows WHERE deleted IS NULL for all searches
3764   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
3765
3766 Can be overridden by passing C<< { where => undef } >> as an attribute
3767 to a resultset.
3768
3769 =back
3770
3771 =head2 cache
3772
3773 Set to 1 to cache search results. This prevents extra SQL queries if you
3774 revisit rows in your ResultSet:
3775
3776   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
3777
3778   while( my $artist = $resultset->next ) {
3779     ... do stuff ...
3780   }
3781
3782   $rs->first; # without cache, this would issue a query
3783
3784 By default, searches are not cached.
3785
3786 For more examples of using these attributes, see
3787 L<DBIx::Class::Manual::Cookbook>.
3788
3789 =head2 for
3790
3791 =over 4
3792
3793 =item Value: ( 'update' | 'shared' )
3794
3795 =back
3796
3797 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3798 ... FOR SHARED.
3799
3800 =cut
3801
3802 1;