f71743573d39fd109343b0ddace5c796c2ff9c9d
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search({ registered => 1 });
27   my @rows = $schema->resultset('CD')->search({ year => 2005 })->all();
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 OVERLOADING
56
57 If a resultset is used in a numeric context it returns the L</count>.
58 However, if it is used in a booleand context it is always true.  So if
59 you want to check if a resultset has any results use C<if $rs != 0>.
60 C<if $rs> will always be true.
61
62 =head1 METHODS
63
64 =head2 new
65
66 =over 4
67
68 =item Arguments: $source, \%$attrs
69
70 =item Return Value: $rs
71
72 =back
73
74 The resultset constructor. Takes a source object (usually a
75 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
76 L</ATTRIBUTES> below).  Does not perform any queries -- these are
77 executed as needed by the other methods.
78
79 Generally you won't need to construct a resultset manually.  You'll
80 automatically get one from e.g. a L</search> called in scalar context:
81
82   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
83
84 IMPORTANT: If called on an object, proxies to new_result instead so
85
86   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
87
88 will return a CD object, not a ResultSet.
89
90 =cut
91
92 sub new {
93   my $class = shift;
94   return $class->new_result(@_) if ref $class;
95
96   my ($source, $attrs) = @_;
97   $source = $source->handle 
98     unless $source->isa('DBIx::Class::ResultSourceHandle');
99   $attrs = { %{$attrs||{}} };
100
101   if ($attrs->{page}) {
102     $attrs->{rows} ||= 10;
103   }
104
105   $attrs->{alias} ||= 'me';
106
107   # Creation of {} and bless separated to mitigate RH perl bug
108   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
109   my $self = {
110     _source_handle => $source,
111     cond => $attrs->{where},
112     count => undef,
113     pager => undef,
114     attrs => $attrs
115   };
116
117   bless $self, $class;
118
119   $self->result_class(
120     $attrs->{result_class} || $source->resolve->result_class
121   );
122
123   return $self;
124 }
125
126 =head2 search
127
128 =over 4
129
130 =item Arguments: $cond, \%attrs?
131
132 =item Return Value: $resultset (scalar context), @row_objs (list context)
133
134 =back
135
136   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
137   my $new_rs = $cd_rs->search({ year => 2005 });
138
139   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
140                  # year = 2005 OR year = 2004
141
142 If you need to pass in additional attributes but no additional condition,
143 call it as C<search(undef, \%attrs)>.
144
145   # "SELECT name, artistid FROM $artist_table"
146   my @all_artists = $schema->resultset('Artist')->search(undef, {
147     columns => [qw/name artistid/],
148   });
149
150 For a list of attributes that can be passed to C<search>, see
151 L</ATTRIBUTES>. For more examples of using this function, see
152 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
153 documentation for the first argument, see L<SQL::Abstract>.
154
155 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
156
157 =cut
158
159 sub search {
160   my $self = shift;
161   my $rs = $self->search_rs( @_ );
162   return (wantarray ? $rs->all : $rs);
163 }
164
165 =head2 search_rs
166
167 =over 4
168
169 =item Arguments: $cond, \%attrs?
170
171 =item Return Value: $resultset
172
173 =back
174
175 This method does the same exact thing as search() except it will
176 always return a resultset, even in list context.
177
178 =cut
179
180 sub search_rs {
181   my $self = shift;
182
183   my $attrs = {};
184   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
185   my $our_attrs = { %{$self->{attrs}} };
186   my $having = delete $our_attrs->{having};
187   my $where = delete $our_attrs->{where};
188
189   my $rows;
190
191   my %safe = (alias => 1, cache => 1);
192
193   unless (
194     (@_ && defined($_[0])) # @_ == () or (undef)
195     || 
196     (keys %$attrs # empty attrs or only 'safe' attrs
197     && List::Util::first { !$safe{$_} } keys %$attrs)
198   ) {
199     # no search, effectively just a clone
200     $rows = $self->get_cache;
201   }
202
203   my $new_attrs = { %{$our_attrs}, %{$attrs} };
204
205   # merge new attrs into inherited
206   foreach my $key (qw/join prefetch +select +as/) {
207     next unless exists $attrs->{$key};
208     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
209   }
210
211   my $cond = (@_
212     ? (
213         (@_ == 1 || ref $_[0] eq "HASH")
214           ? (
215               (ref $_[0] eq 'HASH')
216                 ? (
217                     (keys %{ $_[0] }  > 0)
218                       ? shift
219                       : undef
220                    )
221                 :  shift
222              )
223           : (
224               (@_ % 2)
225                 ? $self->throw_exception("Odd number of arguments to search")
226                 : {@_}
227              )
228       )
229     : undef
230   );
231
232   if (defined $where) {
233     $new_attrs->{where} = (
234       defined $new_attrs->{where}
235         ? { '-and' => [
236               map {
237                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
238               } $where, $new_attrs->{where}
239             ]
240           }
241         : $where);
242   }
243
244   if (defined $cond) {
245     $new_attrs->{where} = (
246       defined $new_attrs->{where}
247         ? { '-and' => [
248               map {
249                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
250               } $cond, $new_attrs->{where}
251             ]
252           }
253         : $cond);
254   }
255
256   if (defined $having) {
257     $new_attrs->{having} = (
258       defined $new_attrs->{having}
259         ? { '-and' => [
260               map {
261                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
262               } $having, $new_attrs->{having}
263             ]
264           }
265         : $having);
266   }
267
268   my $rs = (ref $self)->new($self->result_source, $new_attrs);
269   if ($rows) {
270     $rs->set_cache($rows);
271   }
272   return $rs;
273 }
274
275 =head2 search_literal
276
277 =over 4
278
279 =item Arguments: $sql_fragment, @bind_values
280
281 =item Return Value: $resultset (scalar context), @row_objs (list context)
282
283 =back
284
285   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
286   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
287
288 Pass a literal chunk of SQL to be added to the conditional part of the
289 resultset query.
290
291 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
292 only be used in that context. There are known problems using C<search_literal>
293 in chained queries; it can result in bind values in the wrong order.  See
294 L<DBIx::Class::Manual::Cookbook/Searching> and
295 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
296 require C<search_literal>.
297
298 =cut
299
300 sub search_literal {
301   my ($self, $cond, @vals) = @_;
302   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
303   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
304   return $self->search(\$cond, $attrs);
305 }
306
307 =head2 find
308
309 =over 4
310
311 =item Arguments: @values | \%cols, \%attrs?
312
313 =item Return Value: $row_object | undef
314
315 =back
316
317 Finds a row based on its primary key or unique constraint. For example, to find
318 a row by its primary key:
319
320   my $cd = $schema->resultset('CD')->find(5);
321
322 You can also find a row by a specific unique constraint using the C<key>
323 attribute. For example:
324
325   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
326     key => 'cd_artist_title'
327   });
328
329 Additionally, you can specify the columns explicitly by name:
330
331   my $cd = $schema->resultset('CD')->find(
332     {
333       artist => 'Massive Attack',
334       title  => 'Mezzanine',
335     },
336     { key => 'cd_artist_title' }
337   );
338
339 If the C<key> is specified as C<primary>, it searches only on the primary key.
340
341 If no C<key> is specified, it searches on all unique constraints defined on the
342 source for which column data is provided, including the primary key.
343
344 If your table does not have a primary key, you B<must> provide a value for the
345 C<key> attribute matching one of the unique constraints on the source.
346
347 In addition to C<key>, L</find> recognizes and applies standard
348 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
349
350 Note: If your query does not return only one row, a warning is generated:
351
352   Query returned more than one row
353
354 See also L</find_or_create> and L</update_or_create>. For information on how to
355 declare unique constraints, see
356 L<DBIx::Class::ResultSource/add_unique_constraint>.
357
358 =cut
359
360 sub find {
361   my $self = shift;
362   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
363
364   # Default to the primary key, but allow a specific key
365   my @cols = exists $attrs->{key}
366     ? $self->result_source->unique_constraint_columns($attrs->{key})
367     : $self->result_source->primary_columns;
368   $self->throw_exception(
369     "Can't find unless a primary key is defined or unique constraint is specified"
370   ) unless @cols;
371
372   # Parse out a hashref from input
373   my $input_query;
374   if (ref $_[0] eq 'HASH') {
375     $input_query = { %{$_[0]} };
376   }
377   elsif (@_ == @cols) {
378     $input_query = {};
379     @{$input_query}{@cols} = @_;
380   }
381   else {
382     # Compatibility: Allow e.g. find(id => $value)
383     carp "Find by key => value deprecated; please use a hashref instead";
384     $input_query = {@_};
385   }
386
387   my (%related, $info);
388
389   KEY: foreach my $key (keys %$input_query) {
390     if (ref($input_query->{$key})
391         && ($info = $self->result_source->relationship_info($key))) {
392       my $val = delete $input_query->{$key};
393       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
394       my $rel_q = $self->result_source->resolve_condition(
395                     $info->{cond}, $val, $key
396                   );
397       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
398       @related{keys %$rel_q} = values %$rel_q;
399     }
400   }
401   if (my @keys = keys %related) {
402     @{$input_query}{@keys} = values %related;
403   }
404
405
406   # Build the final query: Default to the disjunction of the unique queries,
407   # but allow the input query in case the ResultSet defines the query or the
408   # user is abusing find
409   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
410   my $query;
411   if (exists $attrs->{key}) {
412     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
413     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
414     $query = $self->_add_alias($unique_query, $alias);
415   }
416   else {
417     my @unique_queries = $self->_unique_queries($input_query, $attrs);
418     $query = @unique_queries
419       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
420       : $self->_add_alias($input_query, $alias);
421   }
422
423   # Run the query
424   if (keys %$attrs) {
425     my $rs = $self->search($query, $attrs);
426     if (keys %{$rs->_resolved_attrs->{collapse}}) {
427       my $row = $rs->next;
428       carp "Query returned more than one row" if $rs->next;
429       return $row;
430     }
431     else {
432       return $rs->single;
433     }
434   }
435   else {
436     if (keys %{$self->_resolved_attrs->{collapse}}) {
437       my $rs = $self->search($query);
438       my $row = $rs->next;
439       carp "Query returned more than one row" if $rs->next;
440       return $row;
441     }
442     else {
443       return $self->single($query);
444     }
445   }
446 }
447
448 # _add_alias
449 #
450 # Add the specified alias to the specified query hash. A copy is made so the
451 # original query is not modified.
452
453 sub _add_alias {
454   my ($self, $query, $alias) = @_;
455
456   my %aliased = %$query;
457   foreach my $col (grep { ! m/\./ } keys %aliased) {
458     $aliased{"$alias.$col"} = delete $aliased{$col};
459   }
460
461   return \%aliased;
462 }
463
464 # _unique_queries
465 #
466 # Build a list of queries which satisfy unique constraints.
467
468 sub _unique_queries {
469   my ($self, $query, $attrs) = @_;
470
471   my @constraint_names = exists $attrs->{key}
472     ? ($attrs->{key})
473     : $self->result_source->unique_constraint_names;
474
475   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
476   my $num_where = scalar keys %$where;
477
478   my @unique_queries;
479   foreach my $name (@constraint_names) {
480     my @unique_cols = $self->result_source->unique_constraint_columns($name);
481     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
482
483     my $num_cols = scalar @unique_cols;
484     my $num_query = scalar keys %$unique_query;
485
486     my $total = $num_query + $num_where;
487     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
488       # The query is either unique on its own or is unique in combination with
489       # the existing where clause
490       push @unique_queries, $unique_query;
491     }
492   }
493
494   return @unique_queries;
495 }
496
497 # _build_unique_query
498 #
499 # Constrain the specified query hash based on the specified column names.
500
501 sub _build_unique_query {
502   my ($self, $query, $unique_cols) = @_;
503
504   return {
505     map  { $_ => $query->{$_} }
506     grep { exists $query->{$_} }
507       @$unique_cols
508   };
509 }
510
511 =head2 search_related
512
513 =over 4
514
515 =item Arguments: $rel, $cond, \%attrs?
516
517 =item Return Value: $new_resultset
518
519 =back
520
521   $new_rs = $cd_rs->search_related('artist', {
522     name => 'Emo-R-Us',
523   });
524
525 Searches the specified relationship, optionally specifying a condition and
526 attributes for matching records. See L</ATTRIBUTES> for more information.
527
528 =cut
529
530 sub search_related {
531   return shift->related_resultset(shift)->search(@_);
532 }
533
534 =head2 search_related_rs
535
536 This method works exactly the same as search_related, except that
537 it guarantees a restultset, even in list context.
538
539 =cut
540
541 sub search_related_rs {
542   return shift->related_resultset(shift)->search_rs(@_);
543 }
544
545 =head2 cursor
546
547 =over 4
548
549 =item Arguments: none
550
551 =item Return Value: $cursor
552
553 =back
554
555 Returns a storage-driven cursor to the given resultset. See
556 L<DBIx::Class::Cursor> for more information.
557
558 =cut
559
560 sub cursor {
561   my ($self) = @_;
562
563   my $attrs = { %{$self->_resolved_attrs} };
564   return $self->{cursor}
565     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
566           $attrs->{where},$attrs);
567 }
568
569 =head2 single
570
571 =over 4
572
573 =item Arguments: $cond?
574
575 =item Return Value: $row_object?
576
577 =back
578
579   my $cd = $schema->resultset('CD')->single({ year => 2001 });
580
581 Inflates the first result without creating a cursor if the resultset has
582 any records in it; if not returns nothing. Used by L</find> as a lean version of
583 L</search>.
584
585 While this method can take an optional search condition (just like L</search>)
586 being a fast-code-path it does not recognize search attributes. If you need to
587 add extra joins or similar, call L</search> and then chain-call L</single> on the
588 L<DBIx::Class::ResultSet> returned.
589
590 =over
591
592 =item B<Note>
593
594 As of 0.08100, this method enforces the assumption that the preceeding
595 query returns only one row. If more than one row is returned, you will receive
596 a warning:
597
598   Query returned more than one row
599
600 In this case, you should be using L</first> or L</find> instead, or if you really
601 know what you are doing, use the L</rows> attribute to explicitly limit the size 
602 of the resultset.
603
604 =back
605
606 =cut
607
608 sub single {
609   my ($self, $where) = @_;
610   my $attrs = { %{$self->_resolved_attrs} };
611   if ($where) {
612     if (defined $attrs->{where}) {
613       $attrs->{where} = {
614         '-and' =>
615             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
616                $where, delete $attrs->{where} ]
617       };
618     } else {
619       $attrs->{where} = $where;
620     }
621   }
622
623 #  XXX: Disabled since it doesn't infer uniqueness in all cases
624 #  unless ($self->_is_unique_query($attrs->{where})) {
625 #    carp "Query not guaranteed to return a single row"
626 #      . "; please declare your unique constraints or use search instead";
627 #  }
628
629   my @data = $self->result_source->storage->select_single(
630     $attrs->{from}, $attrs->{select},
631     $attrs->{where}, $attrs
632   );
633
634   return (@data ? ($self->_construct_object(@data))[0] : undef);
635 }
636
637 # _is_unique_query
638 #
639 # Try to determine if the specified query is guaranteed to be unique, based on
640 # the declared unique constraints.
641
642 sub _is_unique_query {
643   my ($self, $query) = @_;
644
645   my $collapsed = $self->_collapse_query($query);
646   my $alias = $self->{attrs}{alias};
647
648   foreach my $name ($self->result_source->unique_constraint_names) {
649     my @unique_cols = map {
650       "$alias.$_"
651     } $self->result_source->unique_constraint_columns($name);
652
653     # Count the values for each unique column
654     my %seen = map { $_ => 0 } @unique_cols;
655
656     foreach my $key (keys %$collapsed) {
657       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
658       next unless exists $seen{$aliased};  # Additional constraints are okay
659       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
660     }
661
662     # If we get 0 or more than 1 value for a column, it's not necessarily unique
663     return 1 unless grep { $_ != 1 } values %seen;
664   }
665
666   return 0;
667 }
668
669 # _collapse_query
670 #
671 # Recursively collapse the query, accumulating values for each column.
672
673 sub _collapse_query {
674   my ($self, $query, $collapsed) = @_;
675
676   $collapsed ||= {};
677
678   if (ref $query eq 'ARRAY') {
679     foreach my $subquery (@$query) {
680       next unless ref $subquery;  # -or
681 #      warn "ARRAY: " . Dumper $subquery;
682       $collapsed = $self->_collapse_query($subquery, $collapsed);
683     }
684   }
685   elsif (ref $query eq 'HASH') {
686     if (keys %$query and (keys %$query)[0] eq '-and') {
687       foreach my $subquery (@{$query->{-and}}) {
688 #        warn "HASH: " . Dumper $subquery;
689         $collapsed = $self->_collapse_query($subquery, $collapsed);
690       }
691     }
692     else {
693 #      warn "LEAF: " . Dumper $query;
694       foreach my $col (keys %$query) {
695         my $value = $query->{$col};
696         $collapsed->{$col}{$value}++;
697       }
698     }
699   }
700
701   return $collapsed;
702 }
703
704 =head2 get_column
705
706 =over 4
707
708 =item Arguments: $cond?
709
710 =item Return Value: $resultsetcolumn
711
712 =back
713
714   my $max_length = $rs->get_column('length')->max;
715
716 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
717
718 =cut
719
720 sub get_column {
721   my ($self, $column) = @_;
722   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
723   return $new;
724 }
725
726 =head2 search_like
727
728 =over 4
729
730 =item Arguments: $cond, \%attrs?
731
732 =item Return Value: $resultset (scalar context), @row_objs (list context)
733
734 =back
735
736   # WHERE title LIKE '%blue%'
737   $cd_rs = $rs->search_like({ title => '%blue%'});
738
739 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
740 that this is simply a convenience method retained for ex Class::DBI users.
741 You most likely want to use L</search> with specific operators.
742
743 For more information, see L<DBIx::Class::Manual::Cookbook>.
744
745 =cut
746
747 sub search_like {
748   my $class = shift;
749   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
750   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
751   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
752   return $class->search($query, { %$attrs });
753 }
754
755 =head2 slice
756
757 =over 4
758
759 =item Arguments: $first, $last
760
761 =item Return Value: $resultset (scalar context), @row_objs (list context)
762
763 =back
764
765 Returns a resultset or object list representing a subset of elements from the
766 resultset slice is called on. Indexes are from 0, i.e., to get the first
767 three records, call:
768
769   my ($one, $two, $three) = $rs->slice(0, 2);
770
771 =cut
772
773 sub slice {
774   my ($self, $min, $max) = @_;
775   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
776   $attrs->{offset} = $self->{attrs}{offset} || 0;
777   $attrs->{offset} += $min;
778   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
779   return $self->search(undef(), $attrs);
780   #my $slice = (ref $self)->new($self->result_source, $attrs);
781   #return (wantarray ? $slice->all : $slice);
782 }
783
784 =head2 next
785
786 =over 4
787
788 =item Arguments: none
789
790 =item Return Value: $result?
791
792 =back
793
794 Returns the next element in the resultset (C<undef> is there is none).
795
796 Can be used to efficiently iterate over records in the resultset:
797
798   my $rs = $schema->resultset('CD')->search;
799   while (my $cd = $rs->next) {
800     print $cd->title;
801   }
802
803 Note that you need to store the resultset object, and call C<next> on it.
804 Calling C<< resultset('Table')->next >> repeatedly will always return the
805 first record from the resultset.
806
807 =cut
808
809 sub next {
810   my ($self) = @_;
811   if (my $cache = $self->get_cache) {
812     $self->{all_cache_position} ||= 0;
813     return $cache->[$self->{all_cache_position}++];
814   }
815   if ($self->{attrs}{cache}) {
816     $self->{all_cache_position} = 1;
817     return ($self->all)[0];
818   }
819   if ($self->{stashed_objects}) {
820     my $obj = shift(@{$self->{stashed_objects}});
821     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
822     return $obj;
823   }
824   my @row = (
825     exists $self->{stashed_row}
826       ? @{delete $self->{stashed_row}}
827       : $self->cursor->next
828   );
829   return undef unless (@row);
830   my ($row, @more) = $self->_construct_object(@row);
831   $self->{stashed_objects} = \@more if @more;
832   return $row;
833 }
834
835 sub _construct_object {
836   my ($self, @row) = @_;
837   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
838   my @new = $self->result_class->inflate_result($self->result_source, @$info);
839   @new = $self->{_attrs}{record_filter}->(@new)
840     if exists $self->{_attrs}{record_filter};
841   return @new;
842 }
843
844 sub _collapse_result {
845   my ($self, $as_proto, $row) = @_;
846
847   my @copy = @$row;
848
849   # 'foo'         => [ undef, 'foo' ]
850   # 'foo.bar'     => [ 'foo', 'bar' ]
851   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
852
853   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
854
855   my %collapse = %{$self->{_attrs}{collapse}||{}};
856
857   my @pri_index;
858
859   # if we're doing collapsing (has_many prefetch) we need to grab records
860   # until the PK changes, so fill @pri_index. if not, we leave it empty so
861   # we know we don't have to bother.
862
863   # the reason for not using the collapse stuff directly is because if you
864   # had for e.g. two artists in a row with no cds, the collapse info for
865   # both would be NULL (undef) so you'd lose the second artist
866
867   # store just the index so we can check the array positions from the row
868   # without having to contruct the full hash
869
870   if (keys %collapse) {
871     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
872     foreach my $i (0 .. $#construct_as) {
873       next if defined($construct_as[$i][0]); # only self table
874       if (delete $pri{$construct_as[$i][1]}) {
875         push(@pri_index, $i);
876       }
877       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
878     }
879   }
880
881   # no need to do an if, it'll be empty if @pri_index is empty anyway
882
883   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
884
885   my @const_rows;
886
887   do { # no need to check anything at the front, we always want the first row
888
889     my %const;
890   
891     foreach my $this_as (@construct_as) {
892       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
893     }
894
895     push(@const_rows, \%const);
896
897   } until ( # no pri_index => no collapse => drop straight out
898       !@pri_index
899     or
900       do { # get another row, stash it, drop out if different PK
901
902         @copy = $self->cursor->next;
903         $self->{stashed_row} = \@copy;
904
905         # last thing in do block, counts as true if anything doesn't match
906
907         # check xor defined first for NULL vs. NOT NULL then if one is
908         # defined the other must be so check string equality
909
910         grep {
911           (defined $pri_vals{$_} ^ defined $copy[$_])
912           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
913         } @pri_index;
914       }
915   );
916
917   my $alias = $self->{attrs}{alias};
918   my $info = [];
919
920   my %collapse_pos;
921
922   my @const_keys;
923
924   foreach my $const (@const_rows) {
925     scalar @const_keys or do {
926       @const_keys = sort { length($a) <=> length($b) } keys %$const;
927     };
928     foreach my $key (@const_keys) {
929       if (length $key) {
930         my $target = $info;
931         my @parts = split(/\./, $key);
932         my $cur = '';
933         my $data = $const->{$key};
934         foreach my $p (@parts) {
935           $target = $target->[1]->{$p} ||= [];
936           $cur .= ".${p}";
937           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
938             # collapsing at this point and on final part
939             my $pos = $collapse_pos{$cur};
940             CK: foreach my $ck (@ckey) {
941               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
942                 $collapse_pos{$cur} = $data;
943                 delete @collapse_pos{ # clear all positioning for sub-entries
944                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
945                 };
946                 push(@$target, []);
947                 last CK;
948               }
949             }
950           }
951           if (exists $collapse{$cur}) {
952             $target = $target->[-1];
953           }
954         }
955         $target->[0] = $data;
956       } else {
957         $info->[0] = $const->{$key};
958       }
959     }
960   }
961
962   return $info;
963 }
964
965 =head2 result_source
966
967 =over 4
968
969 =item Arguments: $result_source?
970
971 =item Return Value: $result_source
972
973 =back
974
975 An accessor for the primary ResultSource object from which this ResultSet
976 is derived.
977
978 =head2 result_class
979
980 =over 4
981
982 =item Arguments: $result_class?
983
984 =item Return Value: $result_class
985
986 =back
987
988 An accessor for the class to use when creating row objects. Defaults to 
989 C<< result_source->result_class >> - which in most cases is the name of the 
990 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
991
992 =cut
993
994 sub result_class {
995   my ($self, $result_class) = @_;
996   if ($result_class) {
997     $self->ensure_class_loaded($result_class);
998     $self->_result_class($result_class);
999   }
1000   $self->_result_class;
1001 }
1002
1003 =head2 count
1004
1005 =over 4
1006
1007 =item Arguments: $cond, \%attrs??
1008
1009 =item Return Value: $count
1010
1011 =back
1012
1013 Performs an SQL C<COUNT> with the same query as the resultset was built
1014 with to find the number of elements. If passed arguments, does a search
1015 on the resultset and counts the results of that.
1016
1017 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1018 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1019 not support C<DISTINCT> with multiple columns. If you are using such a
1020 database, you should only use columns from the main table in your C<group_by>
1021 clause.
1022
1023 =cut
1024
1025 sub count {
1026   my $self = shift;
1027   return $self->search(@_)->count if @_ and defined $_[0];
1028   return scalar @{ $self->get_cache } if $self->get_cache;
1029   my $count = $self->_count;
1030   return 0 unless $count;
1031
1032   # need to take offset from resolved attrs
1033
1034   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1035   $count = $self->{attrs}{rows} if
1036     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1037   $count = 0 if ($count < 0);
1038   return $count;
1039 }
1040
1041 sub _count { # Separated out so pager can get the full count
1042   my $self = shift;
1043   my $select = { count => '*' };
1044
1045   my $attrs = { %{$self->_resolved_attrs} };
1046   if (my $group_by = delete $attrs->{group_by}) {
1047     delete $attrs->{having};
1048     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1049     # todo: try CONCAT for multi-column pk
1050     my @pk = $self->result_source->primary_columns;
1051     if (@pk == 1) {
1052       my $alias = $attrs->{alias};
1053       foreach my $column (@distinct) {
1054         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1055           @distinct = ($column);
1056           last;
1057         }
1058       }
1059     }
1060
1061     $select = { count => { distinct => \@distinct } };
1062   }
1063
1064   $attrs->{select} = $select;
1065   $attrs->{as} = [qw/count/];
1066
1067   # offset, order by and page are not needed to count. record_filter is cdbi
1068   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1069
1070   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1071   my ($count) = $tmp_rs->cursor->next;
1072   return $count;
1073 }
1074
1075 sub _bool {
1076   return 1;
1077 }
1078
1079 =head2 count_literal
1080
1081 =over 4
1082
1083 =item Arguments: $sql_fragment, @bind_values
1084
1085 =item Return Value: $count
1086
1087 =back
1088
1089 Counts the results in a literal query. Equivalent to calling L</search_literal>
1090 with the passed arguments, then L</count>.
1091
1092 =cut
1093
1094 sub count_literal { shift->search_literal(@_)->count; }
1095
1096 =head2 all
1097
1098 =over 4
1099
1100 =item Arguments: none
1101
1102 =item Return Value: @objects
1103
1104 =back
1105
1106 Returns all elements in the resultset. Called implicitly if the resultset
1107 is returned in list context.
1108
1109 =cut
1110
1111 sub all {
1112   my ($self) = @_;
1113   return @{ $self->get_cache } if $self->get_cache;
1114
1115   my @obj;
1116
1117   # TODO: don't call resolve here
1118   if (keys %{$self->_resolved_attrs->{collapse}}) {
1119 #  if ($self->{attrs}{prefetch}) {
1120       # Using $self->cursor->all is really just an optimisation.
1121       # If we're collapsing has_many prefetches it probably makes
1122       # very little difference, and this is cleaner than hacking
1123       # _construct_object to survive the approach
1124     my @row = $self->cursor->next;
1125     while (@row) {
1126       push(@obj, $self->_construct_object(@row));
1127       @row = (exists $self->{stashed_row}
1128                ? @{delete $self->{stashed_row}}
1129                : $self->cursor->next);
1130     }
1131   } else {
1132     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1133   }
1134
1135   $self->set_cache(\@obj) if $self->{attrs}{cache};
1136   return @obj;
1137 }
1138
1139 =head2 reset
1140
1141 =over 4
1142
1143 =item Arguments: none
1144
1145 =item Return Value: $self
1146
1147 =back
1148
1149 Resets the resultset's cursor, so you can iterate through the elements again.
1150
1151 =cut
1152
1153 sub reset {
1154   my ($self) = @_;
1155   delete $self->{_attrs} if exists $self->{_attrs};
1156   $self->{all_cache_position} = 0;
1157   $self->cursor->reset;
1158   return $self;
1159 }
1160
1161 =head2 first
1162
1163 =over 4
1164
1165 =item Arguments: none
1166
1167 =item Return Value: $object?
1168
1169 =back
1170
1171 Resets the resultset and returns an object for the first result (if the
1172 resultset returns anything).
1173
1174 =cut
1175
1176 sub first {
1177   return $_[0]->reset->next;
1178 }
1179
1180 # _cond_for_update_delete
1181 #
1182 # update/delete require the condition to be modified to handle
1183 # the differing SQL syntax available.  This transforms the $self->{cond}
1184 # appropriately, returning the new condition.
1185
1186 sub _cond_for_update_delete {
1187   my ($self, $full_cond) = @_;
1188   my $cond = {};
1189
1190   $full_cond ||= $self->{cond};
1191   # No-op. No condition, we're updating/deleting everything
1192   return $cond unless ref $full_cond;
1193
1194   if (ref $full_cond eq 'ARRAY') {
1195     $cond = [
1196       map {
1197         my %hash;
1198         foreach my $key (keys %{$_}) {
1199           $key =~ /([^.]+)$/;
1200           $hash{$1} = $_->{$key};
1201         }
1202         \%hash;
1203       } @{$full_cond}
1204     ];
1205   }
1206   elsif (ref $full_cond eq 'HASH') {
1207     if ((keys %{$full_cond})[0] eq '-and') {
1208       $cond->{-and} = [];
1209
1210       my @cond = @{$full_cond->{-and}};
1211       for (my $i = 0; $i < @cond; $i++) {
1212         my $entry = $cond[$i];
1213
1214         my $hash;
1215         if (ref $entry eq 'HASH') {
1216           $hash = $self->_cond_for_update_delete($entry);
1217         }
1218         else {
1219           $entry =~ /([^.]+)$/;
1220           $hash->{$1} = $cond[++$i];
1221         }
1222
1223         push @{$cond->{-and}}, $hash;
1224       }
1225     }
1226     else {
1227       foreach my $key (keys %{$full_cond}) {
1228         $key =~ /([^.]+)$/;
1229         $cond->{$1} = $full_cond->{$key};
1230       }
1231     }
1232   }
1233   else {
1234     $self->throw_exception(
1235       "Can't update/delete on resultset with condition unless hash or array"
1236     );
1237   }
1238
1239   return $cond;
1240 }
1241
1242
1243 =head2 update
1244
1245 =over 4
1246
1247 =item Arguments: \%values
1248
1249 =item Return Value: $storage_rv
1250
1251 =back
1252
1253 Sets the specified columns in the resultset to the supplied values in a
1254 single query. Return value will be true if the update succeeded or false
1255 if no records were updated; exact type of success value is storage-dependent.
1256
1257 =cut
1258
1259 sub update {
1260   my ($self, $values) = @_;
1261   $self->throw_exception("Values for update must be a hash")
1262     unless ref $values eq 'HASH';
1263
1264   my $cond = $self->_cond_for_update_delete;
1265    
1266   return $self->result_source->storage->update(
1267     $self->result_source, $values, $cond
1268   );
1269 }
1270
1271 =head2 update_all
1272
1273 =over 4
1274
1275 =item Arguments: \%values
1276
1277 =item Return Value: 1
1278
1279 =back
1280
1281 Fetches all objects and updates them one at a time. Note that C<update_all>
1282 will run DBIC cascade triggers, while L</update> will not.
1283
1284 =cut
1285
1286 sub update_all {
1287   my ($self, $values) = @_;
1288   $self->throw_exception("Values for update must be a hash")
1289     unless ref $values eq 'HASH';
1290   foreach my $obj ($self->all) {
1291     $obj->set_columns($values)->update;
1292   }
1293   return 1;
1294 }
1295
1296 =head2 delete
1297
1298 =over 4
1299
1300 =item Arguments: none
1301
1302 =item Return Value: 1
1303
1304 =back
1305
1306 Deletes the contents of the resultset from its result source. Note that this
1307 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1308 to run. See also L<DBIx::Class::Row/delete>.
1309
1310 delete may not generate correct SQL for a query with joins or a resultset
1311 chained from a related resultset.  In this case it will generate a warning:-
1312
1313   WARNING! Currently $rs->delete() does not generate proper SQL on
1314   joined resultsets, and may delete rows well outside of the contents
1315   of $rs. Use at your own risk
1316
1317 In these cases you may find that delete_all is more appropriate, or you
1318 need to respecify your query in a way that can be expressed without a join.
1319
1320 =cut
1321
1322 sub delete {
1323   my ($self) = @_;
1324   $self->throw_exception("Delete should not be passed any arguments")
1325     if $_[1];
1326   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1327         . ' on joined resultsets, and may delete rows well outside of the'
1328         . ' contents of $rs. Use at your own risk' )
1329     if ( $self->{attrs}{seen_join} );
1330   my $cond = $self->_cond_for_update_delete;
1331
1332   $self->result_source->storage->delete($self->result_source, $cond);
1333   return 1;
1334 }
1335
1336 =head2 delete_all
1337
1338 =over 4
1339
1340 =item Arguments: none
1341
1342 =item Return Value: 1
1343
1344 =back
1345
1346 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1347 will run DBIC cascade triggers, while L</delete> will not.
1348
1349 =cut
1350
1351 sub delete_all {
1352   my ($self) = @_;
1353   $_->delete for $self->all;
1354   return 1;
1355 }
1356
1357 =head2 populate
1358
1359 =over 4
1360
1361 =item Arguments: \@data;
1362
1363 =back
1364
1365 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1366 For the arrayref of hashrefs style each hashref should be a structure suitable
1367 forsubmitting to a $resultset->create(...) method.
1368
1369 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1370 to insert the data, as this is a faster method.  
1371
1372 Otherwise, each set of data is inserted into the database using
1373 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1374 objects is returned.
1375
1376 Example:  Assuming an Artist Class that has many CDs Classes relating:
1377
1378   my $Artist_rs = $schema->resultset("Artist");
1379   
1380   ## Void Context Example 
1381   $Artist_rs->populate([
1382      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1383         { title => 'My First CD', year => 2006 },
1384         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1385       ],
1386      },
1387      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1388         { title => 'My parents sold me to a record company' ,year => 2005 },
1389         { title => 'Why Am I So Ugly?', year => 2006 },
1390         { title => 'I Got Surgery and am now Popular', year => 2007 }
1391       ],
1392      },
1393   ]);
1394   
1395   ## Array Context Example
1396   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1397     { name => "Artist One"},
1398     { name => "Artist Two"},
1399     { name => "Artist Three", cds=> [
1400     { title => "First CD", year => 2007},
1401     { title => "Second CD", year => 2008},
1402   ]}
1403   ]);
1404   
1405   print $ArtistOne->name; ## response is 'Artist One'
1406   print $ArtistThree->cds->count ## reponse is '2'
1407
1408 For the arrayref of arrayrefs style,  the first element should be a list of the
1409 fieldsnames to which the remaining elements are rows being inserted.  For
1410 example:
1411
1412   $Arstist_rs->populate([
1413     [qw/artistid name/],
1414     [100, 'A Formally Unknown Singer'],
1415     [101, 'A singer that jumped the shark two albums ago'],
1416     [102, 'An actually cool singer.'],
1417   ]);
1418
1419 Please note an important effect on your data when choosing between void and
1420 wantarray context. Since void context goes straight to C<insert_bulk> in 
1421 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1422 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1423 create primary keys for you, you will find that your PKs are empty.  In this 
1424 case you will have to use the wantarray context in order to create those 
1425 values.
1426
1427 =cut
1428
1429 sub populate {
1430   my $self = shift @_;
1431   my $data = ref $_[0][0] eq 'HASH'
1432     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1433     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1434   
1435   if(defined wantarray) {
1436     my @created;
1437     foreach my $item (@$data) {
1438       push(@created, $self->create($item));
1439     }
1440     return @created;
1441   } else {
1442     my ($first, @rest) = @$data;
1443
1444     my @names = grep {!ref $first->{$_}} keys %$first;
1445     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1446     my @pks = $self->result_source->primary_columns;  
1447
1448     ## do the belongs_to relationships  
1449     foreach my $index (0..$#$data) {
1450       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1451         my @ret = $self->populate($data);
1452         return;
1453       }
1454     
1455       foreach my $rel (@rels) {
1456         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1457         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1458         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1459         my $related = $result->result_source->resolve_condition(
1460           $result->result_source->relationship_info($reverse)->{cond},
1461           $self,        
1462           $result,        
1463         );
1464
1465         delete $data->[$index]->{$rel};
1466         $data->[$index] = {%{$data->[$index]}, %$related};
1467       
1468         push @names, keys %$related if $index == 0;
1469       }
1470     }
1471
1472     ## do bulk insert on current row
1473     my @values = map { [ @$_{@names} ] } @$data;
1474
1475     $self->result_source->storage->insert_bulk(
1476       $self->result_source, 
1477       \@names, 
1478       \@values,
1479     );
1480
1481     ## do the has_many relationships
1482     foreach my $item (@$data) {
1483
1484       foreach my $rel (@rels) {
1485         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1486
1487         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1488      || $self->throw_exception('Cannot find the relating object.');
1489      
1490         my $child = $parent->$rel;
1491     
1492         my $related = $child->result_source->resolve_condition(
1493           $parent->result_source->relationship_info($rel)->{cond},
1494           $child,
1495           $parent,
1496         );
1497
1498         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1499         my @populate = map { {%$_, %$related} } @rows_to_add;
1500
1501         $child->populate( \@populate );
1502       }
1503     }
1504   }
1505 }
1506
1507 =head2 _normalize_populate_args ($args)
1508
1509 Private method used by L</populate> to normalize its incoming arguments.  Factored
1510 out in case you want to subclass and accept new argument structures to the
1511 L</populate> method.
1512
1513 =cut
1514
1515 sub _normalize_populate_args {
1516   my ($self, $data) = @_;
1517   my @names = @{shift(@$data)};
1518   my @results_to_create;
1519   foreach my $datum (@$data) {
1520     my %result_to_create;
1521     foreach my $index (0..$#names) {
1522       $result_to_create{$names[$index]} = $$datum[$index];
1523     }
1524     push @results_to_create, \%result_to_create;    
1525   }
1526   return \@results_to_create;
1527 }
1528
1529 =head2 pager
1530
1531 =over 4
1532
1533 =item Arguments: none
1534
1535 =item Return Value: $pager
1536
1537 =back
1538
1539 Return Value a L<Data::Page> object for the current resultset. Only makes
1540 sense for queries with a C<page> attribute.
1541
1542 =cut
1543
1544 sub pager {
1545   my ($self) = @_;
1546   my $attrs = $self->{attrs};
1547   $self->throw_exception("Can't create pager for non-paged rs")
1548     unless $self->{attrs}{page};
1549   $attrs->{rows} ||= 10;
1550   return $self->{pager} ||= Data::Page->new(
1551     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1552 }
1553
1554 =head2 page
1555
1556 =over 4
1557
1558 =item Arguments: $page_number
1559
1560 =item Return Value: $rs
1561
1562 =back
1563
1564 Returns a resultset for the $page_number page of the resultset on which page
1565 is called, where each page contains a number of rows equal to the 'rows'
1566 attribute set on the resultset (10 by default).
1567
1568 =cut
1569
1570 sub page {
1571   my ($self, $page) = @_;
1572   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1573 }
1574
1575 =head2 new_result
1576
1577 =over 4
1578
1579 =item Arguments: \%vals
1580
1581 =item Return Value: $rowobject
1582
1583 =back
1584
1585 Creates a new row object in the resultset's result class and returns
1586 it. The row is not inserted into the database at this point, call
1587 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1588 will tell you whether the row object has been inserted or not.
1589
1590 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1591
1592 =cut
1593
1594 sub new_result {
1595   my ($self, $values) = @_;
1596   $self->throw_exception( "new_result needs a hash" )
1597     unless (ref $values eq 'HASH');
1598
1599   my %new;
1600   my $alias = $self->{attrs}{alias};
1601
1602   if (
1603     defined $self->{cond}
1604     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1605   ) {
1606     %new = %{$self->{attrs}{related_objects}};
1607   } else {
1608     $self->throw_exception(
1609       "Can't abstract implicit construct, condition not a hash"
1610     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1611   
1612     my $collapsed_cond = (
1613       $self->{cond}
1614         ? $self->_collapse_cond($self->{cond})
1615         : {}
1616     );
1617   
1618     # precendence must be given to passed values over values inherited from
1619     # the cond, so the order here is important.
1620     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1621     while( my($col,$value) = each %implied ){
1622       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1623         $new{$col} = $value->{'='};
1624         next;
1625       }
1626       $new{$col} = $value if $self->_is_deterministic_value($value);
1627     }
1628   }
1629
1630   %new = (
1631     %new,
1632     %{ $self->_remove_alias($values, $alias) },
1633     -source_handle => $self->_source_handle,
1634     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1635   );
1636
1637   return $self->result_class->new(\%new);
1638 }
1639
1640 # _is_deterministic_value
1641 #
1642 # Make an effor to strip non-deterministic values from the condition, 
1643 # to make sure new_result chokes less
1644
1645 sub _is_deterministic_value {
1646   my $self = shift;
1647   my $value = shift;
1648   my $ref_type = ref $value;
1649   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1650   return 1 if Scalar::Util::blessed($value);
1651   return 0;
1652 }
1653
1654 # _collapse_cond
1655 #
1656 # Recursively collapse the condition.
1657
1658 sub _collapse_cond {
1659   my ($self, $cond, $collapsed) = @_;
1660
1661   $collapsed ||= {};
1662
1663   if (ref $cond eq 'ARRAY') {
1664     foreach my $subcond (@$cond) {
1665       next unless ref $subcond;  # -or
1666 #      warn "ARRAY: " . Dumper $subcond;
1667       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1668     }
1669   }
1670   elsif (ref $cond eq 'HASH') {
1671     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1672       foreach my $subcond (@{$cond->{-and}}) {
1673 #        warn "HASH: " . Dumper $subcond;
1674         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1675       }
1676     }
1677     else {
1678 #      warn "LEAF: " . Dumper $cond;
1679       foreach my $col (keys %$cond) {
1680         my $value = $cond->{$col};
1681         $collapsed->{$col} = $value;
1682       }
1683     }
1684   }
1685
1686   return $collapsed;
1687 }
1688
1689 # _remove_alias
1690 #
1691 # Remove the specified alias from the specified query hash. A copy is made so
1692 # the original query is not modified.
1693
1694 sub _remove_alias {
1695   my ($self, $query, $alias) = @_;
1696
1697   my %orig = %{ $query || {} };
1698   my %unaliased;
1699
1700   foreach my $key (keys %orig) {
1701     if ($key !~ /\./) {
1702       $unaliased{$key} = $orig{$key};
1703       next;
1704     }
1705     $unaliased{$1} = $orig{$key}
1706       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1707   }
1708
1709   return \%unaliased;
1710 }
1711
1712 =head2 as_query
1713
1714 =over 4
1715
1716 =item Arguments: none
1717
1718 =item Return Value: \[ $sql, @bind ]
1719
1720 =back
1721
1722 Returns the SQL query and bind vars associated with the invocant.
1723
1724 =cut
1725
1726 sub as_query { return shift->cursor->as_query(@_) }
1727
1728 =head2 as_subselect
1729
1730 =over 4
1731
1732 =item Arguments: none
1733
1734 =item Return Value: \[ $sql, @bind ]
1735
1736 =back
1737
1738 Returns the SQL query and bind vars associated with the invocant.
1739
1740 The SQL will be wrapped in parentheses, ready for use as a subselect.
1741
1742 =cut
1743
1744 sub as_subselect {
1745   my $self = shift;
1746   my $arr = ${$self->as_query(@_)};
1747   $arr->[0] = '( ' . $arr->[0] . ' )';
1748   return \$arr;
1749 }
1750
1751 =head2 as_query
1752
1753 =over 4
1754
1755 =item Arguments: none
1756
1757 =item Return Value: $sql
1758
1759 =back
1760
1761 Returns the SQL query associated with the invocant. All bind vars
1762 will have been bound using C<< DBI->quote() >>.
1763
1764 =cut
1765
1766 sub as_sql {
1767   my $self = shift;
1768   my $arr = ${$self->as_query(@_)};
1769   my $sql = shift @$arr;
1770   my $dbh = $self->result_source->schema->storage->dbh;
1771   $sql =~ s/\?/$dbh->quote((shift @$arr)->[1])/eg;
1772   return $sql
1773 }
1774
1775 =head2 find_or_new
1776
1777 =over 4
1778
1779 =item Arguments: \%vals, \%attrs?
1780
1781 =item Return Value: $rowobject
1782
1783 =back
1784
1785   my $artist = $schema->resultset('Artist')->find_or_new(
1786     { artist => 'fred' }, { key => 'artists' });
1787
1788   $cd->cd_to_producer->find_or_new({ producer => $producer },
1789                                    { key => 'primary });
1790
1791 Find an existing record from this resultset, based on its primary
1792 key, or a unique constraint. If none exists, instantiate a new result
1793 object and return it. The object will not be saved into your storage
1794 until you call L<DBIx::Class::Row/insert> on it.
1795
1796 You most likely want this method when looking for existing rows using
1797 a unique constraint that is not the primary key, or looking for
1798 related rows.
1799
1800 If you want objects to be saved immediately, use L</find_or_create> instead.
1801
1802 B<Note>: C<find_or_new> is probably not what you want when creating a
1803 new row in a table that uses primary keys supplied by the
1804 database. Passing in a primary key column with a value of I<undef>
1805 will cause L</find> to attempt to search for a row with a value of
1806 I<NULL>.
1807
1808 =cut
1809
1810 sub find_or_new {
1811   my $self     = shift;
1812   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1813   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1814   my $exists   = $self->find($hash, $attrs);
1815   return defined $exists ? $exists : $self->new_result($hash);
1816 }
1817
1818 =head2 create
1819
1820 =over 4
1821
1822 =item Arguments: \%vals
1823
1824 =item Return Value: a L<DBIx::Class::Row> $object
1825
1826 =back
1827
1828 Attempt to create a single new row or a row with multiple related rows
1829 in the table represented by the resultset (and related tables). This
1830 will not check for duplicate rows before inserting, use
1831 L</find_or_create> to do that.
1832
1833 To create one row for this resultset, pass a hashref of key/value
1834 pairs representing the columns of the table and the values you wish to
1835 store. If the appropriate relationships are set up, foreign key fields
1836 can also be passed an object representing the foreign row, and the
1837 value will be set to its primary key.
1838
1839 To create related objects, pass a hashref for the value if the related
1840 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1841 and use the name of the relationship as the key. (NOT the name of the field,
1842 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1843 of hashrefs containing the data for each of the rows to create in the foreign
1844 tables, again using the relationship name as the key.
1845
1846 Instead of hashrefs of plain related data (key/value pairs), you may
1847 also pass new or inserted objects. New objects (not inserted yet, see
1848 L</new>), will be inserted into their appropriate tables.
1849
1850 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1851
1852 Example of creating a new row.
1853
1854   $person_rs->create({
1855     name=>"Some Person",
1856         email=>"somebody@someplace.com"
1857   });
1858   
1859 Example of creating a new row and also creating rows in a related C<has_many>
1860 or C<has_one> resultset.  Note Arrayref.
1861
1862   $artist_rs->create(
1863      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1864         { title => 'My First CD', year => 2006 },
1865         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1866       ],
1867      },
1868   );
1869
1870 Example of creating a new row and also creating a row in a related
1871 C<belongs_to>resultset. Note Hashref.
1872
1873   $cd_rs->create({
1874     title=>"Music for Silly Walks",
1875         year=>2000,
1876         artist => {
1877           name=>"Silly Musician",
1878         }
1879   });
1880
1881 =cut
1882
1883 sub create {
1884   my ($self, $attrs) = @_;
1885   $self->throw_exception( "create needs a hashref" )
1886     unless ref $attrs eq 'HASH';
1887   return $self->new_result($attrs)->insert;
1888 }
1889
1890 =head2 find_or_create
1891
1892 =over 4
1893
1894 =item Arguments: \%vals, \%attrs?
1895
1896 =item Return Value: $rowobject
1897
1898 =back
1899
1900   $cd->cd_to_producer->find_or_create({ producer => $producer },
1901                                       { key => 'primary });
1902
1903 Tries to find a record based on its primary key or unique constraints; if none
1904 is found, creates one and returns that instead.
1905
1906   my $cd = $schema->resultset('CD')->find_or_create({
1907     cdid   => 5,
1908     artist => 'Massive Attack',
1909     title  => 'Mezzanine',
1910     year   => 2005,
1911   });
1912
1913 Also takes an optional C<key> attribute, to search by a specific key or unique
1914 constraint. For example:
1915
1916   my $cd = $schema->resultset('CD')->find_or_create(
1917     {
1918       artist => 'Massive Attack',
1919       title  => 'Mezzanine',
1920     },
1921     { key => 'cd_artist_title' }
1922   );
1923
1924 B<Note>: Because find_or_create() reads from the database and then
1925 possibly inserts based on the result, this method is subject to a race
1926 condition. Another process could create a record in the table after
1927 the find has completed and before the create has started. To avoid
1928 this problem, use find_or_create() inside a transaction.
1929
1930 B<Note>: C<find_or_create> is probably not what you want when creating
1931 a new row in a table that uses primary keys supplied by the
1932 database. Passing in a primary key column with a value of I<undef>
1933 will cause L</find> to attempt to search for a row with a value of
1934 I<NULL>.
1935
1936 See also L</find> and L</update_or_create>. For information on how to declare
1937 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1938
1939 =cut
1940
1941 sub find_or_create {
1942   my $self     = shift;
1943   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1944   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1945   my $exists   = $self->find($hash, $attrs);
1946   return defined $exists ? $exists : $self->create($hash);
1947 }
1948
1949 =head2 update_or_create
1950
1951 =over 4
1952
1953 =item Arguments: \%col_values, { key => $unique_constraint }?
1954
1955 =item Return Value: $rowobject
1956
1957 =back
1958
1959   $resultset->update_or_create({ col => $val, ... });
1960
1961 First, searches for an existing row matching one of the unique constraints
1962 (including the primary key) on the source of this resultset. If a row is
1963 found, updates it with the other given column values. Otherwise, creates a new
1964 row.
1965
1966 Takes an optional C<key> attribute to search on a specific unique constraint.
1967 For example:
1968
1969   # In your application
1970   my $cd = $schema->resultset('CD')->update_or_create(
1971     {
1972       artist => 'Massive Attack',
1973       title  => 'Mezzanine',
1974       year   => 1998,
1975     },
1976     { key => 'cd_artist_title' }
1977   );
1978
1979   $cd->cd_to_producer->update_or_create({ 
1980     producer => $producer, 
1981     name => 'harry',
1982   }, { 
1983     key => 'primary,
1984   });
1985
1986
1987 If no C<key> is specified, it searches on all unique constraints defined on the
1988 source, including the primary key.
1989
1990 If the C<key> is specified as C<primary>, it searches only on the primary key.
1991
1992 See also L</find> and L</find_or_create>. For information on how to declare
1993 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1994
1995 B<Note>: C<update_or_create> is probably not what you want when
1996 looking for a row in a table that uses primary keys supplied by the
1997 database, unless you actually have a key value. Passing in a primary
1998 key column with a value of I<undef> will cause L</find> to attempt to
1999 search for a row with a value of I<NULL>.
2000
2001 =cut
2002
2003 sub update_or_create {
2004   my $self = shift;
2005   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
2006   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
2007
2008   my $row = $self->find($cond, $attrs);
2009   if (defined $row) {
2010     $row->update($cond);
2011     return $row;
2012   }
2013
2014   return $self->create($cond);
2015 }
2016
2017 =head2 get_cache
2018
2019 =over 4
2020
2021 =item Arguments: none
2022
2023 =item Return Value: \@cache_objects?
2024
2025 =back
2026
2027 Gets the contents of the cache for the resultset, if the cache is set.
2028
2029 The cache is populated either by using the L</prefetch> attribute to
2030 L</search> or by calling L</set_cache>.
2031
2032 =cut
2033
2034 sub get_cache {
2035   shift->{all_cache};
2036 }
2037
2038 =head2 set_cache
2039
2040 =over 4
2041
2042 =item Arguments: \@cache_objects
2043
2044 =item Return Value: \@cache_objects
2045
2046 =back
2047
2048 Sets the contents of the cache for the resultset. Expects an arrayref
2049 of objects of the same class as those produced by the resultset. Note that
2050 if the cache is set the resultset will return the cached objects rather
2051 than re-querying the database even if the cache attr is not set.
2052
2053 The contents of the cache can also be populated by using the
2054 L</prefetch> attribute to L</search>.
2055
2056 =cut
2057
2058 sub set_cache {
2059   my ( $self, $data ) = @_;
2060   $self->throw_exception("set_cache requires an arrayref")
2061       if defined($data) && (ref $data ne 'ARRAY');
2062   $self->{all_cache} = $data;
2063 }
2064
2065 =head2 clear_cache
2066
2067 =over 4
2068
2069 =item Arguments: none
2070
2071 =item Return Value: []
2072
2073 =back
2074
2075 Clears the cache for the resultset.
2076
2077 =cut
2078
2079 sub clear_cache {
2080   shift->set_cache(undef);
2081 }
2082
2083 =head2 related_resultset
2084
2085 =over 4
2086
2087 =item Arguments: $relationship_name
2088
2089 =item Return Value: $resultset
2090
2091 =back
2092
2093 Returns a related resultset for the supplied relationship name.
2094
2095   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2096
2097 =cut
2098
2099 sub related_resultset {
2100   my ($self, $rel) = @_;
2101
2102   $self->{related_resultsets} ||= {};
2103   return $self->{related_resultsets}{$rel} ||= do {
2104     my $rel_obj = $self->result_source->relationship_info($rel);
2105
2106     $self->throw_exception(
2107       "search_related: result source '" . $self->result_source->source_name .
2108         "' has no such relationship $rel")
2109       unless $rel_obj;
2110     
2111     my ($from,$seen) = $self->_resolve_from($rel);
2112
2113     my $join_count = $seen->{$rel};
2114     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2115
2116     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2117     my %attrs = %{$self->{attrs}||{}};
2118     delete @attrs{qw(result_class alias)};
2119
2120     my $new_cache;
2121
2122     if (my $cache = $self->get_cache) {
2123       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2124         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2125                         @$cache ];
2126       }
2127     }
2128
2129     my $rel_source = $self->result_source->related_source($rel);
2130
2131     my $new = do {
2132
2133       # The reason we do this now instead of passing the alias to the
2134       # search_rs below is that if you wrap/overload resultset on the
2135       # source you need to know what alias it's -going- to have for things
2136       # to work sanely (e.g. RestrictWithObject wants to be able to add
2137       # extra query restrictions, and these may need to be $alias.)
2138
2139       my $attrs = $rel_source->resultset_attributes;
2140       local $attrs->{alias} = $alias;
2141
2142       $rel_source->resultset
2143                  ->search_rs(
2144                      undef, {
2145                        %attrs,
2146                        join => undef,
2147                        prefetch => undef,
2148                        select => undef,
2149                        as => undef,
2150                        where => $self->{cond},
2151                        seen_join => $seen,
2152                        from => $from,
2153                    });
2154     };
2155     $new->set_cache($new_cache) if $new_cache;
2156     $new;
2157   };
2158 }
2159
2160 =head2 current_source_alias
2161
2162 =over 4
2163
2164 =item Arguments: none
2165
2166 =item Return Value: $source_alias
2167
2168 =back
2169
2170 Returns the current table alias for the result source this resultset is built
2171 on, that will be used in the SQL query. Usually it is C<me>.
2172
2173 Currently the source alias that refers to the result set returned by a
2174 L</search>/L</find> family method depends on how you got to the resultset: it's
2175 C<me> by default, but eg. L</search_related> aliases it to the related result
2176 source name (and keeps C<me> referring to the original result set). The long
2177 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2178 (and make this method unnecessary).
2179
2180 Thus it's currently necessary to use this method in predefined queries (see
2181 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2182 source alias of the current result set:
2183
2184   # in a result set class
2185   sub modified_by {
2186     my ($self, $user) = @_;
2187
2188     my $me = $self->current_source_alias;
2189
2190     return $self->search(
2191       "$me.modified" => $user->id,
2192     );
2193   }
2194
2195 =cut
2196
2197 sub current_source_alias {
2198   my ($self) = @_;
2199
2200   return ($self->{attrs} || {})->{alias} || 'me';
2201 }
2202
2203 sub _resolve_from {
2204   my ($self, $extra_join) = @_;
2205   my $source = $self->result_source;
2206   my $attrs = $self->{attrs};
2207   
2208   my $from = $attrs->{from}
2209     || [ { $attrs->{alias} => $source->from } ];
2210     
2211   my $seen = { %{$attrs->{seen_join}||{}} };
2212
2213   my $join = ($attrs->{join}
2214                ? [ $attrs->{join}, $extra_join ]
2215                : $extra_join);
2216
2217   # we need to take the prefetch the attrs into account before we 
2218   # ->resolve_join as otherwise they get lost - captainL
2219   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2220
2221   $from = [
2222     @$from,
2223     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2224   ];
2225
2226   return ($from,$seen);
2227 }
2228
2229 sub _resolved_attrs {
2230   my $self = shift;
2231   return $self->{_attrs} if $self->{_attrs};
2232
2233   my $attrs = { %{$self->{attrs}||{}} };
2234   my $source = $self->result_source;
2235   my $alias = $attrs->{alias};
2236
2237   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2238   if ($attrs->{columns}) {
2239     delete $attrs->{as};
2240   } elsif (!$attrs->{select}) {
2241     $attrs->{columns} = [ $source->columns ];
2242   }
2243  
2244   $attrs->{select} = 
2245     ($attrs->{select}
2246       ? (ref $attrs->{select} eq 'ARRAY'
2247           ? [ @{$attrs->{select}} ]
2248           : [ $attrs->{select} ])
2249       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
2250     );
2251   $attrs->{as} =
2252     ($attrs->{as}
2253       ? (ref $attrs->{as} eq 'ARRAY'
2254           ? [ @{$attrs->{as}} ]
2255           : [ $attrs->{as} ])
2256       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
2257     );
2258   
2259   my $adds;
2260   if ($adds = delete $attrs->{include_columns}) {
2261     $adds = [$adds] unless ref $adds eq 'ARRAY';
2262     push(@{$attrs->{select}}, @$adds);
2263     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
2264   }
2265   if ($adds = delete $attrs->{'+select'}) {
2266     $adds = [$adds] unless ref $adds eq 'ARRAY';
2267     push(@{$attrs->{select}},
2268            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
2269   }
2270   if (my $adds = delete $attrs->{'+as'}) {
2271     $adds = [$adds] unless ref $adds eq 'ARRAY';
2272     push(@{$attrs->{as}}, @$adds);
2273   }
2274
2275   $attrs->{from} ||= [ { 'me' => $source->from } ];
2276
2277   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
2278     my $join = delete $attrs->{join} || {};
2279
2280     if (defined $attrs->{prefetch}) {
2281       $join = $self->_merge_attr(
2282         $join, $attrs->{prefetch}
2283       );
2284       
2285     }
2286
2287     $attrs->{from} =   # have to copy here to avoid corrupting the original
2288       [
2289         @{$attrs->{from}}, 
2290         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
2291       ];
2292
2293   }
2294
2295   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
2296   if ($attrs->{order_by}) {
2297     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
2298                            ? [ @{$attrs->{order_by}} ]
2299                            : [ $attrs->{order_by} ]);
2300   } else {
2301     $attrs->{order_by} = [];    
2302   }
2303
2304   my $collapse = $attrs->{collapse} || {};
2305   if (my $prefetch = delete $attrs->{prefetch}) {
2306     $prefetch = $self->_merge_attr({}, $prefetch);
2307     my @pre_order;
2308     my $seen = $attrs->{seen_join} || {};
2309     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
2310       # bring joins back to level of current class
2311       my @prefetch = $source->resolve_prefetch(
2312         $p, $alias, $seen, \@pre_order, $collapse
2313       );
2314       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
2315       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
2316     }
2317     push(@{$attrs->{order_by}}, @pre_order);
2318   }
2319   $attrs->{collapse} = $collapse;
2320
2321   if ($attrs->{page}) {
2322     $attrs->{offset} ||= 0;
2323     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
2324   }
2325
2326   return $self->{_attrs} = $attrs;
2327 }
2328
2329 sub _rollout_attr {
2330   my ($self, $attr) = @_;
2331   
2332   if (ref $attr eq 'HASH') {
2333     return $self->_rollout_hash($attr);
2334   } elsif (ref $attr eq 'ARRAY') {
2335     return $self->_rollout_array($attr);
2336   } else {
2337     return [$attr];
2338   }
2339 }
2340
2341 sub _rollout_array {
2342   my ($self, $attr) = @_;
2343
2344   my @rolled_array;
2345   foreach my $element (@{$attr}) {
2346     if (ref $element eq 'HASH') {
2347       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2348     } elsif (ref $element eq 'ARRAY') {
2349       #  XXX - should probably recurse here
2350       push( @rolled_array, @{$self->_rollout_array($element)} );
2351     } else {
2352       push( @rolled_array, $element );
2353     }
2354   }
2355   return \@rolled_array;
2356 }
2357
2358 sub _rollout_hash {
2359   my ($self, $attr) = @_;
2360
2361   my @rolled_array;
2362   foreach my $key (keys %{$attr}) {
2363     push( @rolled_array, { $key => $attr->{$key} } );
2364   }
2365   return \@rolled_array;
2366 }
2367
2368 sub _calculate_score {
2369   my ($self, $a, $b) = @_;
2370
2371   if (ref $b eq 'HASH') {
2372     my ($b_key) = keys %{$b};
2373     if (ref $a eq 'HASH') {
2374       my ($a_key) = keys %{$a};
2375       if ($a_key eq $b_key) {
2376         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2377       } else {
2378         return 0;
2379       }
2380     } else {
2381       return ($a eq $b_key) ? 1 : 0;
2382     }       
2383   } else {
2384     if (ref $a eq 'HASH') {
2385       my ($a_key) = keys %{$a};
2386       return ($b eq $a_key) ? 1 : 0;
2387     } else {
2388       return ($b eq $a) ? 1 : 0;
2389     }
2390   }
2391 }
2392
2393 sub _merge_attr {
2394   my ($self, $orig, $import) = @_;
2395
2396   return $import unless defined($orig);
2397   return $orig unless defined($import);
2398   
2399   $orig = $self->_rollout_attr($orig);
2400   $import = $self->_rollout_attr($import);
2401
2402   my $seen_keys;
2403   foreach my $import_element ( @{$import} ) {
2404     # find best candidate from $orig to merge $b_element into
2405     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2406     foreach my $orig_element ( @{$orig} ) {
2407       my $score = $self->_calculate_score( $orig_element, $import_element );
2408       if ($score > $best_candidate->{score}) {
2409         $best_candidate->{position} = $position;
2410         $best_candidate->{score} = $score;
2411       }
2412       $position++;
2413     }
2414     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2415
2416     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2417       push( @{$orig}, $import_element );
2418     } else {
2419       my $orig_best = $orig->[$best_candidate->{position}];
2420       # merge orig_best and b_element together and replace original with merged
2421       if (ref $orig_best ne 'HASH') {
2422         $orig->[$best_candidate->{position}] = $import_element;
2423       } elsif (ref $import_element eq 'HASH') {
2424         my ($key) = keys %{$orig_best};
2425         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2426       }
2427     }
2428     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2429   }
2430
2431   return $orig;
2432 }
2433
2434 sub result_source {
2435     my $self = shift;
2436
2437     if (@_) {
2438         $self->_source_handle($_[0]->handle);
2439     } else {
2440         $self->_source_handle->resolve;
2441     }
2442 }
2443
2444 =head2 throw_exception
2445
2446 See L<DBIx::Class::Schema/throw_exception> for details.
2447
2448 =cut
2449
2450 sub throw_exception {
2451   my $self=shift;
2452   if (ref $self && $self->_source_handle->schema) {
2453     $self->_source_handle->schema->throw_exception(@_)
2454   } else {
2455     croak(@_);
2456   }
2457
2458 }
2459
2460 # XXX: FIXME: Attributes docs need clearing up
2461
2462 =head1 ATTRIBUTES
2463
2464 The resultset takes various attributes that modify its behavior. Here's an
2465 overview of them:
2466
2467 =head2 order_by
2468
2469 =over 4
2470
2471 =item Value: ($order_by | \@order_by)
2472
2473 =back
2474
2475 Which column(s) to order the results by. This is currently passed
2476 through directly to SQL, so you can give e.g. C<year DESC> for a
2477 descending order on the column `year'.
2478
2479 Please note that if you have C<quote_char> enabled (see
2480 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
2481 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
2482 so you will need to manually quote things as appropriate.)
2483
2484 If your L<SQL::Abstract> version supports it (>=1.50), you can also use
2485 C<{-desc => 'year'}>, which takes care of the quoting for you. This is the
2486 recommended syntax.
2487
2488 =head2 columns
2489
2490 =over 4
2491
2492 =item Value: \@columns
2493
2494 =back
2495
2496 Shortcut to request a particular set of columns to be retrieved.  Adds
2497 C<me.> onto the start of any column without a C<.> in it and sets C<select>
2498 from that, then auto-populates C<as> from C<select> as normal. (You may also
2499 use the C<cols> attribute, as in earlier versions of DBIC.)
2500
2501 =head2 include_columns
2502
2503 =over 4
2504
2505 =item Value: \@columns
2506
2507 =back
2508
2509 Shortcut to include additional columns in the returned results - for example
2510
2511   $schema->resultset('CD')->search(undef, {
2512     include_columns => ['artist.name'],
2513     join => ['artist']
2514   });
2515
2516 would return all CDs and include a 'name' column to the information
2517 passed to object inflation. Note that the 'artist' is the name of the
2518 column (or relationship) accessor, and 'name' is the name of the column
2519 accessor in the related table.
2520
2521 =head2 select
2522
2523 =over 4
2524
2525 =item Value: \@select_columns
2526
2527 =back
2528
2529 Indicates which columns should be selected from the storage. You can use
2530 column names, or in the case of RDBMS back ends, function or stored procedure
2531 names:
2532
2533   $rs = $schema->resultset('Employee')->search(undef, {
2534     select => [
2535       'name',
2536       { count => 'employeeid' },
2537       { sum => 'salary' }
2538     ]
2539   });
2540
2541 When you use function/stored procedure names and do not supply an C<as>
2542 attribute, the column names returned are storage-dependent. E.g. MySQL would
2543 return a column named C<count(employeeid)> in the above example.
2544
2545 =head2 +select
2546
2547 =over 4
2548
2549 Indicates additional columns to be selected from storage.  Works the same as
2550 L</select> but adds columns to the selection.
2551
2552 =back
2553
2554 =head2 +as
2555
2556 =over 4
2557
2558 Indicates additional column names for those added via L</+select>.
2559
2560 =back
2561
2562 =head2 as
2563
2564 =over 4
2565
2566 =item Value: \@inflation_names
2567
2568 =back
2569
2570 Indicates column names for object inflation. That is, C<as>
2571 indicates the name that the column can be accessed as via the
2572 C<get_column> method (or via the object accessor, B<if one already
2573 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2574
2575 The C<as> attribute is used in conjunction with C<select>,
2576 usually when C<select> contains one or more function or stored
2577 procedure names:
2578
2579   $rs = $schema->resultset('Employee')->search(undef, {
2580     select => [
2581       'name',
2582       { count => 'employeeid' }
2583     ],
2584     as => ['name', 'employee_count'],
2585   });
2586
2587   my $employee = $rs->first(); # get the first Employee
2588
2589 If the object against which the search is performed already has an accessor
2590 matching a column name specified in C<as>, the value can be retrieved using
2591 the accessor as normal:
2592
2593   my $name = $employee->name();
2594
2595 If on the other hand an accessor does not exist in the object, you need to
2596 use C<get_column> instead:
2597
2598   my $employee_count = $employee->get_column('employee_count');
2599
2600 You can create your own accessors if required - see
2601 L<DBIx::Class::Manual::Cookbook> for details.
2602
2603 Please note: This will NOT insert an C<AS employee_count> into the SQL
2604 statement produced, it is used for internal access only. Thus
2605 attempting to use the accessor in an C<order_by> clause or similar
2606 will fail miserably.
2607
2608 To get around this limitation, you can supply literal SQL to your
2609 C<select> attibute that contains the C<AS alias> text, eg:
2610
2611   select => [\'myfield AS alias']
2612
2613 =head2 join
2614
2615 =over 4
2616
2617 =item Value: ($rel_name | \@rel_names | \%rel_names)
2618
2619 =back
2620
2621 Contains a list of relationships that should be joined for this query.  For
2622 example:
2623
2624   # Get CDs by Nine Inch Nails
2625   my $rs = $schema->resultset('CD')->search(
2626     { 'artist.name' => 'Nine Inch Nails' },
2627     { join => 'artist' }
2628   );
2629
2630 Can also contain a hash reference to refer to the other relation's relations.
2631 For example:
2632
2633   package MyApp::Schema::Track;
2634   use base qw/DBIx::Class/;
2635   __PACKAGE__->table('track');
2636   __PACKAGE__->add_columns(qw/trackid cd position title/);
2637   __PACKAGE__->set_primary_key('trackid');
2638   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2639   1;
2640
2641   # In your application
2642   my $rs = $schema->resultset('Artist')->search(
2643     { 'track.title' => 'Teardrop' },
2644     {
2645       join     => { cd => 'track' },
2646       order_by => 'artist.name',
2647     }
2648   );
2649
2650 You need to use the relationship (not the table) name in  conditions, 
2651 because they are aliased as such. The current table is aliased as "me", so 
2652 you need to use me.column_name in order to avoid ambiguity. For example:
2653
2654   # Get CDs from 1984 with a 'Foo' track 
2655   my $rs = $schema->resultset('CD')->search(
2656     { 
2657       'me.year' => 1984,
2658       'tracks.name' => 'Foo'
2659     },
2660     { join => 'tracks' }
2661   );
2662   
2663 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2664 similarly for a third time). For e.g.
2665
2666   my $rs = $schema->resultset('Artist')->search({
2667     'cds.title'   => 'Down to Earth',
2668     'cds_2.title' => 'Popular',
2669   }, {
2670     join => [ qw/cds cds/ ],
2671   });
2672
2673 will return a set of all artists that have both a cd with title 'Down
2674 to Earth' and a cd with title 'Popular'.
2675
2676 If you want to fetch related objects from other tables as well, see C<prefetch>
2677 below.
2678
2679 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2680
2681 =head2 prefetch
2682
2683 =over 4
2684
2685 =item Value: ($rel_name | \@rel_names | \%rel_names)
2686
2687 =back
2688
2689 Contains one or more relationships that should be fetched along with
2690 the main query (when they are accessed afterwards the data will
2691 already be available, without extra queries to the database).  This is
2692 useful for when you know you will need the related objects, because it
2693 saves at least one query:
2694
2695   my $rs = $schema->resultset('Tag')->search(
2696     undef,
2697     {
2698       prefetch => {
2699         cd => 'artist'
2700       }
2701     }
2702   );
2703
2704 The initial search results in SQL like the following:
2705
2706   SELECT tag.*, cd.*, artist.* FROM tag
2707   JOIN cd ON tag.cd = cd.cdid
2708   JOIN artist ON cd.artist = artist.artistid
2709
2710 L<DBIx::Class> has no need to go back to the database when we access the
2711 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2712 case.
2713
2714 Simple prefetches will be joined automatically, so there is no need
2715 for a C<join> attribute in the above search. 
2716
2717 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2718 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2719 with an accessor type of 'single' or 'filter'). A more complex example that
2720 prefetches an artists cds, the tracks on those cds, and the tags associted 
2721 with that artist is given below (assuming many-to-many from artists to tags):
2722
2723  my $rs = $schema->resultset('Artist')->search(
2724    undef,
2725    {
2726      prefetch => [
2727        { cds => 'tracks' },
2728        { artist_tags => 'tags' }
2729      ]
2730    }
2731  );
2732  
2733
2734 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2735 attributes will be ignored.
2736
2737 =head2 page
2738
2739 =over 4
2740
2741 =item Value: $page
2742
2743 =back
2744
2745 Makes the resultset paged and specifies the page to retrieve. Effectively
2746 identical to creating a non-pages resultset and then calling ->page($page)
2747 on it.
2748
2749 If L<rows> attribute is not specified it defualts to 10 rows per page.
2750
2751 =head2 rows
2752
2753 =over 4
2754
2755 =item Value: $rows
2756
2757 =back
2758
2759 Specifes the maximum number of rows for direct retrieval or the number of
2760 rows per page if the page attribute or method is used.
2761
2762 =head2 offset
2763
2764 =over 4
2765
2766 =item Value: $offset
2767
2768 =back
2769
2770 Specifies the (zero-based) row number for the  first row to be returned, or the
2771 of the first row of the first page if paging is used.
2772
2773 =head2 group_by
2774
2775 =over 4
2776
2777 =item Value: \@columns
2778
2779 =back
2780
2781 A arrayref of columns to group by. Can include columns of joined tables.
2782
2783   group_by => [qw/ column1 column2 ... /]
2784
2785 =head2 having
2786
2787 =over 4
2788
2789 =item Value: $condition
2790
2791 =back
2792
2793 HAVING is a select statement attribute that is applied between GROUP BY and
2794 ORDER BY. It is applied to the after the grouping calculations have been
2795 done.
2796
2797   having => { 'count(employee)' => { '>=', 100 } }
2798
2799 =head2 distinct
2800
2801 =over 4
2802
2803 =item Value: (0 | 1)
2804
2805 =back
2806
2807 Set to 1 to group by all columns.
2808
2809 =head2 where
2810
2811 =over 4
2812
2813 Adds to the WHERE clause.
2814
2815   # only return rows WHERE deleted IS NULL for all searches
2816   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2817
2818 Can be overridden by passing C<{ where => undef }> as an attribute
2819 to a resulset.
2820
2821 =back
2822
2823 =head2 cache
2824
2825 Set to 1 to cache search results. This prevents extra SQL queries if you
2826 revisit rows in your ResultSet:
2827
2828   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2829
2830   while( my $artist = $resultset->next ) {
2831     ... do stuff ...
2832   }
2833
2834   $rs->first; # without cache, this would issue a query
2835
2836 By default, searches are not cached.
2837
2838 For more examples of using these attributes, see
2839 L<DBIx::Class::Manual::Cookbook>.
2840
2841 =head2 from
2842
2843 =over 4
2844
2845 =item Value: \@from_clause
2846
2847 =back
2848
2849 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2850 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2851 clauses.
2852
2853 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2854
2855 C<join> will usually do what you need and it is strongly recommended that you
2856 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2857 And we really do mean "cannot", not just tried and failed. Attempting to use
2858 this because you're having problems with C<join> is like trying to use x86
2859 ASM because you've got a syntax error in your C. Trust us on this.
2860
2861 Now, if you're still really, really sure you need to use this (and if you're
2862 not 100% sure, ask the mailing list first), here's an explanation of how this
2863 works.
2864
2865 The syntax is as follows -
2866
2867   [
2868     { <alias1> => <table1> },
2869     [
2870       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2871       [], # nested JOIN (optional)
2872       { <table1.column1> => <table2.column2>, ... (more conditions) },
2873     ],
2874     # More of the above [ ] may follow for additional joins
2875   ]
2876
2877   <table1> <alias1>
2878   JOIN
2879     <table2> <alias2>
2880     [JOIN ...]
2881   ON <table1.column1> = <table2.column2>
2882   <more joins may follow>
2883
2884 An easy way to follow the examples below is to remember the following:
2885
2886     Anything inside "[]" is a JOIN
2887     Anything inside "{}" is a condition for the enclosing JOIN
2888
2889 The following examples utilize a "person" table in a family tree application.
2890 In order to express parent->child relationships, this table is self-joined:
2891
2892     # Person->belongs_to('father' => 'Person');
2893     # Person->belongs_to('mother' => 'Person');
2894
2895 C<from> can be used to nest joins. Here we return all children with a father,
2896 then search against all mothers of those children:
2897
2898   $rs = $schema->resultset('Person')->search(
2899       undef,
2900       {
2901           alias => 'mother', # alias columns in accordance with "from"
2902           from => [
2903               { mother => 'person' },
2904               [
2905                   [
2906                       { child => 'person' },
2907                       [
2908                           { father => 'person' },
2909                           { 'father.person_id' => 'child.father_id' }
2910                       ]
2911                   ],
2912                   { 'mother.person_id' => 'child.mother_id' }
2913               ],
2914           ]
2915       },
2916   );
2917
2918   # Equivalent SQL:
2919   # SELECT mother.* FROM person mother
2920   # JOIN (
2921   #   person child
2922   #   JOIN person father
2923   #   ON ( father.person_id = child.father_id )
2924   # )
2925   # ON ( mother.person_id = child.mother_id )
2926
2927 The type of any join can be controlled manually. To search against only people
2928 with a father in the person table, we could explicitly use C<INNER JOIN>:
2929
2930     $rs = $schema->resultset('Person')->search(
2931         undef,
2932         {
2933             alias => 'child', # alias columns in accordance with "from"
2934             from => [
2935                 { child => 'person' },
2936                 [
2937                     { father => 'person', -join_type => 'inner' },
2938                     { 'father.id' => 'child.father_id' }
2939                 ],
2940             ]
2941         },
2942     );
2943
2944     # Equivalent SQL:
2945     # SELECT child.* FROM person child
2946     # INNER JOIN person father ON child.father_id = father.id
2947
2948 If you need to express really complex joins or you need a subselect, you
2949 can supply literal SQL to C<from> via a scalar reference. In this case
2950 the contents of the scalar will replace the table name asscoiated with the
2951 resultsource.
2952
2953 WARNING: This technique might very well not work as expected on chained
2954 searches - you have been warned.
2955
2956     # Assuming the Event resultsource is defined as:
2957
2958         MySchema::Event->add_columns (
2959             sequence => {
2960                 data_type => 'INT',
2961                 is_auto_increment => 1,
2962             },
2963             location => {
2964                 data_type => 'INT',
2965             },
2966             type => {
2967                 data_type => 'INT',
2968             },
2969         );
2970         MySchema::Event->set_primary_key ('sequence');
2971
2972     # This will get back the latest event for every location. The column
2973     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
2974     # combo to limit the resultset
2975
2976     $rs = $schema->resultset('Event');
2977     $table = $rs->result_source->name;
2978     $latest = $rs->search (
2979         undef,
2980         { from => \ " 
2981             (SELECT e1.* FROM $table e1 
2982                 JOIN $table e2 
2983                     ON e1.location = e2.location 
2984                     AND e1.sequence < e2.sequence 
2985                 WHERE e2.sequence is NULL 
2986             ) me",
2987         },
2988     );
2989
2990     # Equivalent SQL (with the DBIC chunks added):
2991
2992     SELECT me.sequence, me.location, me.type FROM
2993        (SELECT e1.* FROM events e1
2994            JOIN events e2
2995                ON e1.location = e2.location
2996                AND e1.sequence < e2.sequence
2997            WHERE e2.sequence is NULL
2998        ) me;
2999
3000 =head2 for
3001
3002 =over 4
3003
3004 =item Value: ( 'update' | 'shared' )
3005
3006 =back
3007
3008 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
3009 ... FOR SHARED.
3010
3011 =cut
3012
3013 1;