Add missing shallow copy of seen_join
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use overload
6         '0+'     => "count",
7         'bool'   => "_bool",
8         fallback => 1;
9 use Carp::Clan qw/^DBIx::Class/;
10 use Data::Page;
11 use Storable;
12 use DBIx::Class::ResultSetColumn;
13 use DBIx::Class::ResultSourceHandle;
14 use List::Util ();
15 use Scalar::Util ();
16 use base qw/DBIx::Class/;
17
18 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
19
20 =head1 NAME
21
22 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
23
24 =head1 SYNOPSIS
25
26   my $rs   = $schema->resultset('User')->search({ registered => 1 });
27   my @rows = $schema->resultset('CD')->search({ year => 2005 })->all();
28
29 =head1 DESCRIPTION
30
31 The resultset is also known as an iterator. It is responsible for handling
32 queries that may return an arbitrary number of rows, e.g. via L</search>
33 or a C<has_many> relationship.
34
35 In the examples below, the following table classes are used:
36
37   package MyApp::Schema::Artist;
38   use base qw/DBIx::Class/;
39   __PACKAGE__->load_components(qw/Core/);
40   __PACKAGE__->table('artist');
41   __PACKAGE__->add_columns(qw/artistid name/);
42   __PACKAGE__->set_primary_key('artistid');
43   __PACKAGE__->has_many(cds => 'MyApp::Schema::CD');
44   1;
45
46   package MyApp::Schema::CD;
47   use base qw/DBIx::Class/;
48   __PACKAGE__->load_components(qw/Core/);
49   __PACKAGE__->table('cd');
50   __PACKAGE__->add_columns(qw/cdid artist title year/);
51   __PACKAGE__->set_primary_key('cdid');
52   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
53   1;
54
55 =head1 OVERLOADING
56
57 If a resultset is used in a numeric context it returns the L</count>.
58 However, if it is used in a booleand context it is always true.  So if
59 you want to check if a resultset has any results use C<if $rs != 0>.
60 C<if $rs> will always be true.
61
62 =head1 METHODS
63
64 =head2 new
65
66 =over 4
67
68 =item Arguments: $source, \%$attrs
69
70 =item Return Value: $rs
71
72 =back
73
74 The resultset constructor. Takes a source object (usually a
75 L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
76 L</ATTRIBUTES> below).  Does not perform any queries -- these are
77 executed as needed by the other methods.
78
79 Generally you won't need to construct a resultset manually.  You'll
80 automatically get one from e.g. a L</search> called in scalar context:
81
82   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
83
84 IMPORTANT: If called on an object, proxies to new_result instead so
85
86   my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
87
88 will return a CD object, not a ResultSet.
89
90 =cut
91
92 sub new {
93   my $class = shift;
94   return $class->new_result(@_) if ref $class;
95
96   my ($source, $attrs) = @_;
97   $source = $source->handle 
98     unless $source->isa('DBIx::Class::ResultSourceHandle');
99   $attrs = { %{$attrs||{}} };
100
101   if ($attrs->{page}) {
102     $attrs->{rows} ||= 10;
103   }
104
105   $attrs->{alias} ||= 'me';
106
107   # Creation of {} and bless separated to mitigate RH perl bug
108   # see https://bugzilla.redhat.com/show_bug.cgi?id=196836
109   my $self = {
110     _source_handle => $source,
111     cond => $attrs->{where},
112     count => undef,
113     pager => undef,
114     attrs => $attrs
115   };
116
117   bless $self, $class;
118
119   $self->result_class(
120     $attrs->{result_class} || $source->resolve->result_class
121   );
122
123   return $self;
124 }
125
126 =head2 search
127
128 =over 4
129
130 =item Arguments: $cond, \%attrs?
131
132 =item Return Value: $resultset (scalar context), @row_objs (list context)
133
134 =back
135
136   my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
137   my $new_rs = $cd_rs->search({ year => 2005 });
138
139   my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
140                  # year = 2005 OR year = 2004
141
142 If you need to pass in additional attributes but no additional condition,
143 call it as C<search(undef, \%attrs)>.
144
145   # "SELECT name, artistid FROM $artist_table"
146   my @all_artists = $schema->resultset('Artist')->search(undef, {
147     columns => [qw/name artistid/],
148   });
149
150 For a list of attributes that can be passed to C<search>, see
151 L</ATTRIBUTES>. For more examples of using this function, see
152 L<Searching|DBIx::Class::Manual::Cookbook/Searching>. For a complete
153 documentation for the first argument, see L<SQL::Abstract>.
154
155 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
156
157 =cut
158
159 sub search {
160   my $self = shift;
161   my $rs = $self->search_rs( @_ );
162   return (wantarray ? $rs->all : $rs);
163 }
164
165 =head2 search_rs
166
167 =over 4
168
169 =item Arguments: $cond, \%attrs?
170
171 =item Return Value: $resultset
172
173 =back
174
175 This method does the same exact thing as search() except it will
176 always return a resultset, even in list context.
177
178 =cut
179
180 sub search_rs {
181   my $self = shift;
182
183   my $attrs = {};
184   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
185   my $our_attrs = { %{$self->{attrs}} };
186   my $having = delete $our_attrs->{having};
187   my $where = delete $our_attrs->{where};
188
189   my $rows;
190
191   my %safe = (alias => 1, cache => 1);
192
193   unless (
194     (@_ && defined($_[0])) # @_ == () or (undef)
195     || 
196     (keys %$attrs # empty attrs or only 'safe' attrs
197     && List::Util::first { !$safe{$_} } keys %$attrs)
198   ) {
199     # no search, effectively just a clone
200     $rows = $self->get_cache;
201   }
202
203   my $new_attrs = { %{$our_attrs}, %{$attrs} };
204
205   # merge new attrs into inherited
206   foreach my $key (qw/join prefetch +select +as/) {
207     next unless exists $attrs->{$key};
208     $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
209   }
210
211   my $cond = (@_
212     ? (
213         (@_ == 1 || ref $_[0] eq "HASH")
214           ? (
215               (ref $_[0] eq 'HASH')
216                 ? (
217                     (keys %{ $_[0] }  > 0)
218                       ? shift
219                       : undef
220                    )
221                 :  shift
222              )
223           : (
224               (@_ % 2)
225                 ? $self->throw_exception("Odd number of arguments to search")
226                 : {@_}
227              )
228       )
229     : undef
230   );
231
232   if (defined $where) {
233     $new_attrs->{where} = (
234       defined $new_attrs->{where}
235         ? { '-and' => [
236               map {
237                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
238               } $where, $new_attrs->{where}
239             ]
240           }
241         : $where);
242   }
243
244   if (defined $cond) {
245     $new_attrs->{where} = (
246       defined $new_attrs->{where}
247         ? { '-and' => [
248               map {
249                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
250               } $cond, $new_attrs->{where}
251             ]
252           }
253         : $cond);
254   }
255
256   if (defined $having) {
257     $new_attrs->{having} = (
258       defined $new_attrs->{having}
259         ? { '-and' => [
260               map {
261                 ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
262               } $having, $new_attrs->{having}
263             ]
264           }
265         : $having);
266   }
267
268   my $rs = (ref $self)->new($self->result_source, $new_attrs);
269   if ($rows) {
270     $rs->set_cache($rows);
271   }
272   return $rs;
273 }
274
275 =head2 search_literal
276
277 =over 4
278
279 =item Arguments: $sql_fragment, @bind_values
280
281 =item Return Value: $resultset (scalar context), @row_objs (list context)
282
283 =back
284
285   my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
286   my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
287
288 Pass a literal chunk of SQL to be added to the conditional part of the
289 resultset query.
290
291 CAVEAT: C<search_literal> is provided for Class::DBI compatibility and should
292 only be used in that context. There are known problems using C<search_literal>
293 in chained queries; it can result in bind values in the wrong order.  See
294 L<DBIx::Class::Manual::Cookbook/Searching> and
295 L<DBIx::Class::Manual::FAQ/Searching> for searching techniques that do not
296 require C<search_literal>.
297
298 =cut
299
300 sub search_literal {
301   my ($self, $cond, @vals) = @_;
302   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
303   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
304   return $self->search(\$cond, $attrs);
305 }
306
307 =head2 find
308
309 =over 4
310
311 =item Arguments: @values | \%cols, \%attrs?
312
313 =item Return Value: $row_object | undef
314
315 =back
316
317 Finds a row based on its primary key or unique constraint. For example, to find
318 a row by its primary key:
319
320   my $cd = $schema->resultset('CD')->find(5);
321
322 You can also find a row by a specific unique constraint using the C<key>
323 attribute. For example:
324
325   my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', {
326     key => 'cd_artist_title'
327   });
328
329 Additionally, you can specify the columns explicitly by name:
330
331   my $cd = $schema->resultset('CD')->find(
332     {
333       artist => 'Massive Attack',
334       title  => 'Mezzanine',
335     },
336     { key => 'cd_artist_title' }
337   );
338
339 If the C<key> is specified as C<primary>, it searches only on the primary key.
340
341 If no C<key> is specified, it searches on all unique constraints defined on the
342 source for which column data is provided, including the primary key.
343
344 If your table does not have a primary key, you B<must> provide a value for the
345 C<key> attribute matching one of the unique constraints on the source.
346
347 In addition to C<key>, L</find> recognizes and applies standard
348 L<resultset attributes|/ATTRIBUTES> in the same way as L</search> does.
349
350 Note: If your query does not return only one row, a warning is generated:
351
352   Query returned more than one row
353
354 See also L</find_or_create> and L</update_or_create>. For information on how to
355 declare unique constraints, see
356 L<DBIx::Class::ResultSource/add_unique_constraint>.
357
358 =cut
359
360 sub find {
361   my $self = shift;
362   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
363
364   # Default to the primary key, but allow a specific key
365   my @cols = exists $attrs->{key}
366     ? $self->result_source->unique_constraint_columns($attrs->{key})
367     : $self->result_source->primary_columns;
368   $self->throw_exception(
369     "Can't find unless a primary key is defined or unique constraint is specified"
370   ) unless @cols;
371
372   # Parse out a hashref from input
373   my $input_query;
374   if (ref $_[0] eq 'HASH') {
375     $input_query = { %{$_[0]} };
376   }
377   elsif (@_ == @cols) {
378     $input_query = {};
379     @{$input_query}{@cols} = @_;
380   }
381   else {
382     # Compatibility: Allow e.g. find(id => $value)
383     carp "Find by key => value deprecated; please use a hashref instead";
384     $input_query = {@_};
385   }
386
387   my (%related, $info);
388
389   KEY: foreach my $key (keys %$input_query) {
390     if (ref($input_query->{$key})
391         && ($info = $self->result_source->relationship_info($key))) {
392       my $val = delete $input_query->{$key};
393       next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
394       my $rel_q = $self->result_source->resolve_condition(
395                     $info->{cond}, $val, $key
396                   );
397       die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
398       @related{keys %$rel_q} = values %$rel_q;
399     }
400   }
401   if (my @keys = keys %related) {
402     @{$input_query}{@keys} = values %related;
403   }
404
405
406   # Build the final query: Default to the disjunction of the unique queries,
407   # but allow the input query in case the ResultSet defines the query or the
408   # user is abusing find
409   my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
410   my $query;
411   if (exists $attrs->{key}) {
412     my @unique_cols = $self->result_source->unique_constraint_columns($attrs->{key});
413     my $unique_query = $self->_build_unique_query($input_query, \@unique_cols);
414     $query = $self->_add_alias($unique_query, $alias);
415   }
416   else {
417     my @unique_queries = $self->_unique_queries($input_query, $attrs);
418     $query = @unique_queries
419       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
420       : $self->_add_alias($input_query, $alias);
421   }
422
423   # Run the query
424   if (keys %$attrs) {
425     my $rs = $self->search($query, $attrs);
426     if (keys %{$rs->_resolved_attrs->{collapse}}) {
427       my $row = $rs->next;
428       carp "Query returned more than one row" if $rs->next;
429       return $row;
430     }
431     else {
432       return $rs->single;
433     }
434   }
435   else {
436     if (keys %{$self->_resolved_attrs->{collapse}}) {
437       my $rs = $self->search($query);
438       my $row = $rs->next;
439       carp "Query returned more than one row" if $rs->next;
440       return $row;
441     }
442     else {
443       return $self->single($query);
444     }
445   }
446 }
447
448 # _add_alias
449 #
450 # Add the specified alias to the specified query hash. A copy is made so the
451 # original query is not modified.
452
453 sub _add_alias {
454   my ($self, $query, $alias) = @_;
455
456   my %aliased = %$query;
457   foreach my $col (grep { ! m/\./ } keys %aliased) {
458     $aliased{"$alias.$col"} = delete $aliased{$col};
459   }
460
461   return \%aliased;
462 }
463
464 # _unique_queries
465 #
466 # Build a list of queries which satisfy unique constraints.
467
468 sub _unique_queries {
469   my ($self, $query, $attrs) = @_;
470
471   my @constraint_names = exists $attrs->{key}
472     ? ($attrs->{key})
473     : $self->result_source->unique_constraint_names;
474
475   my $where = $self->_collapse_cond($self->{attrs}{where} || {});
476   my $num_where = scalar keys %$where;
477
478   my @unique_queries;
479   foreach my $name (@constraint_names) {
480     my @unique_cols = $self->result_source->unique_constraint_columns($name);
481     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
482
483     my $num_cols = scalar @unique_cols;
484     my $num_query = scalar keys %$unique_query;
485
486     my $total = $num_query + $num_where;
487     if ($num_query && ($num_query == $num_cols || $total == $num_cols)) {
488       # The query is either unique on its own or is unique in combination with
489       # the existing where clause
490       push @unique_queries, $unique_query;
491     }
492   }
493
494   return @unique_queries;
495 }
496
497 # _build_unique_query
498 #
499 # Constrain the specified query hash based on the specified column names.
500
501 sub _build_unique_query {
502   my ($self, $query, $unique_cols) = @_;
503
504   return {
505     map  { $_ => $query->{$_} }
506     grep { exists $query->{$_} }
507       @$unique_cols
508   };
509 }
510
511 =head2 search_related
512
513 =over 4
514
515 =item Arguments: $rel, $cond, \%attrs?
516
517 =item Return Value: $new_resultset
518
519 =back
520
521   $new_rs = $cd_rs->search_related('artist', {
522     name => 'Emo-R-Us',
523   });
524
525 Searches the specified relationship, optionally specifying a condition and
526 attributes for matching records. See L</ATTRIBUTES> for more information.
527
528 =cut
529
530 sub search_related {
531   return shift->related_resultset(shift)->search(@_);
532 }
533
534 =head2 search_related_rs
535
536 This method works exactly the same as search_related, except that
537 it guarantees a restultset, even in list context.
538
539 =cut
540
541 sub search_related_rs {
542   return shift->related_resultset(shift)->search_rs(@_);
543 }
544
545 =head2 cursor
546
547 =over 4
548
549 =item Arguments: none
550
551 =item Return Value: $cursor
552
553 =back
554
555 Returns a storage-driven cursor to the given resultset. See
556 L<DBIx::Class::Cursor> for more information.
557
558 =cut
559
560 sub cursor {
561   my ($self) = @_;
562
563   my $attrs = { %{$self->_resolved_attrs} };
564   return $self->{cursor}
565     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
566           $attrs->{where},$attrs);
567 }
568
569 =head2 single
570
571 =over 4
572
573 =item Arguments: $cond?
574
575 =item Return Value: $row_object?
576
577 =back
578
579   my $cd = $schema->resultset('CD')->single({ year => 2001 });
580
581 Inflates the first result without creating a cursor if the resultset has
582 any records in it; if not returns nothing. Used by L</find> as a lean version of
583 L</search>.
584
585 While this method can take an optional search condition (just like L</search>)
586 being a fast-code-path it does not recognize search attributes. If you need to
587 add extra joins or similar, call L</search> and then chain-call L</single> on the
588 L<DBIx::Class::ResultSet> returned.
589
590 =over
591
592 =item B<Note>
593
594 As of 0.08100, this method enforces the assumption that the preceeding
595 query returns only one row. If more than one row is returned, you will receive
596 a warning:
597
598   Query returned more than one row
599
600 In this case, you should be using L</first> or L</find> instead, or if you really
601 know what you are doing, use the L</rows> attribute to explicitly limit the size 
602 of the resultset.
603
604 =back
605
606 =cut
607
608 sub single {
609   my ($self, $where) = @_;
610   my $attrs = { %{$self->_resolved_attrs} };
611   if ($where) {
612     if (defined $attrs->{where}) {
613       $attrs->{where} = {
614         '-and' =>
615             [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
616                $where, delete $attrs->{where} ]
617       };
618     } else {
619       $attrs->{where} = $where;
620     }
621   }
622
623 #  XXX: Disabled since it doesn't infer uniqueness in all cases
624 #  unless ($self->_is_unique_query($attrs->{where})) {
625 #    carp "Query not guaranteed to return a single row"
626 #      . "; please declare your unique constraints or use search instead";
627 #  }
628
629   my @data = $self->result_source->storage->select_single(
630     $attrs->{from}, $attrs->{select},
631     $attrs->{where}, $attrs
632   );
633
634   return (@data ? ($self->_construct_object(@data))[0] : undef);
635 }
636
637 # _is_unique_query
638 #
639 # Try to determine if the specified query is guaranteed to be unique, based on
640 # the declared unique constraints.
641
642 sub _is_unique_query {
643   my ($self, $query) = @_;
644
645   my $collapsed = $self->_collapse_query($query);
646   my $alias = $self->{attrs}{alias};
647
648   foreach my $name ($self->result_source->unique_constraint_names) {
649     my @unique_cols = map {
650       "$alias.$_"
651     } $self->result_source->unique_constraint_columns($name);
652
653     # Count the values for each unique column
654     my %seen = map { $_ => 0 } @unique_cols;
655
656     foreach my $key (keys %$collapsed) {
657       my $aliased = $key =~ /\./ ? $key : "$alias.$key";
658       next unless exists $seen{$aliased};  # Additional constraints are okay
659       $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
660     }
661
662     # If we get 0 or more than 1 value for a column, it's not necessarily unique
663     return 1 unless grep { $_ != 1 } values %seen;
664   }
665
666   return 0;
667 }
668
669 # _collapse_query
670 #
671 # Recursively collapse the query, accumulating values for each column.
672
673 sub _collapse_query {
674   my ($self, $query, $collapsed) = @_;
675
676   $collapsed ||= {};
677
678   if (ref $query eq 'ARRAY') {
679     foreach my $subquery (@$query) {
680       next unless ref $subquery;  # -or
681 #      warn "ARRAY: " . Dumper $subquery;
682       $collapsed = $self->_collapse_query($subquery, $collapsed);
683     }
684   }
685   elsif (ref $query eq 'HASH') {
686     if (keys %$query and (keys %$query)[0] eq '-and') {
687       foreach my $subquery (@{$query->{-and}}) {
688 #        warn "HASH: " . Dumper $subquery;
689         $collapsed = $self->_collapse_query($subquery, $collapsed);
690       }
691     }
692     else {
693 #      warn "LEAF: " . Dumper $query;
694       foreach my $col (keys %$query) {
695         my $value = $query->{$col};
696         $collapsed->{$col}{$value}++;
697       }
698     }
699   }
700
701   return $collapsed;
702 }
703
704 =head2 get_column
705
706 =over 4
707
708 =item Arguments: $cond?
709
710 =item Return Value: $resultsetcolumn
711
712 =back
713
714   my $max_length = $rs->get_column('length')->max;
715
716 Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
717
718 =cut
719
720 sub get_column {
721   my ($self, $column) = @_;
722   my $new = DBIx::Class::ResultSetColumn->new($self, $column);
723   return $new;
724 }
725
726 =head2 search_like
727
728 =over 4
729
730 =item Arguments: $cond, \%attrs?
731
732 =item Return Value: $resultset (scalar context), @row_objs (list context)
733
734 =back
735
736   # WHERE title LIKE '%blue%'
737   $cd_rs = $rs->search_like({ title => '%blue%'});
738
739 Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
740 that this is simply a convenience method retained for ex Class::DBI users.
741 You most likely want to use L</search> with specific operators.
742
743 For more information, see L<DBIx::Class::Manual::Cookbook>.
744
745 =cut
746
747 sub search_like {
748   my $class = shift;
749   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
750   my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
751   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
752   return $class->search($query, { %$attrs });
753 }
754
755 =head2 slice
756
757 =over 4
758
759 =item Arguments: $first, $last
760
761 =item Return Value: $resultset (scalar context), @row_objs (list context)
762
763 =back
764
765 Returns a resultset or object list representing a subset of elements from the
766 resultset slice is called on. Indexes are from 0, i.e., to get the first
767 three records, call:
768
769   my ($one, $two, $three) = $rs->slice(0, 2);
770
771 =cut
772
773 sub slice {
774   my ($self, $min, $max) = @_;
775   my $attrs = {}; # = { %{ $self->{attrs} || {} } };
776   $attrs->{offset} = $self->{attrs}{offset} || 0;
777   $attrs->{offset} += $min;
778   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
779   return $self->search(undef(), $attrs);
780   #my $slice = (ref $self)->new($self->result_source, $attrs);
781   #return (wantarray ? $slice->all : $slice);
782 }
783
784 =head2 next
785
786 =over 4
787
788 =item Arguments: none
789
790 =item Return Value: $result?
791
792 =back
793
794 Returns the next element in the resultset (C<undef> is there is none).
795
796 Can be used to efficiently iterate over records in the resultset:
797
798   my $rs = $schema->resultset('CD')->search;
799   while (my $cd = $rs->next) {
800     print $cd->title;
801   }
802
803 Note that you need to store the resultset object, and call C<next> on it.
804 Calling C<< resultset('Table')->next >> repeatedly will always return the
805 first record from the resultset.
806
807 =cut
808
809 sub next {
810   my ($self) = @_;
811   if (my $cache = $self->get_cache) {
812     $self->{all_cache_position} ||= 0;
813     return $cache->[$self->{all_cache_position}++];
814   }
815   if ($self->{attrs}{cache}) {
816     $self->{all_cache_position} = 1;
817     return ($self->all)[0];
818   }
819   if ($self->{stashed_objects}) {
820     my $obj = shift(@{$self->{stashed_objects}});
821     delete $self->{stashed_objects} unless @{$self->{stashed_objects}};
822     return $obj;
823   }
824   my @row = (
825     exists $self->{stashed_row}
826       ? @{delete $self->{stashed_row}}
827       : $self->cursor->next
828   );
829   return undef unless (@row);
830   my ($row, @more) = $self->_construct_object(@row);
831   $self->{stashed_objects} = \@more if @more;
832   return $row;
833 }
834
835 sub _construct_object {
836   my ($self, @row) = @_;
837   my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
838   my @new = $self->result_class->inflate_result($self->result_source, @$info);
839   @new = $self->{_attrs}{record_filter}->(@new)
840     if exists $self->{_attrs}{record_filter};
841   return @new;
842 }
843
844 sub _collapse_result {
845   my ($self, $as_proto, $row) = @_;
846
847   my @copy = @$row;
848
849   # 'foo'         => [ undef, 'foo' ]
850   # 'foo.bar'     => [ 'foo', 'bar' ]
851   # 'foo.bar.baz' => [ 'foo.bar', 'baz' ]
852
853   my @construct_as = map { [ (/^(?:(.*)\.)?([^.]+)$/) ] } @$as_proto;
854
855   my %collapse = %{$self->{_attrs}{collapse}||{}};
856
857   my @pri_index;
858
859   # if we're doing collapsing (has_many prefetch) we need to grab records
860   # until the PK changes, so fill @pri_index. if not, we leave it empty so
861   # we know we don't have to bother.
862
863   # the reason for not using the collapse stuff directly is because if you
864   # had for e.g. two artists in a row with no cds, the collapse info for
865   # both would be NULL (undef) so you'd lose the second artist
866
867   # store just the index so we can check the array positions from the row
868   # without having to contruct the full hash
869
870   if (keys %collapse) {
871     my %pri = map { ($_ => 1) } $self->result_source->primary_columns;
872     foreach my $i (0 .. $#construct_as) {
873       next if defined($construct_as[$i][0]); # only self table
874       if (delete $pri{$construct_as[$i][1]}) {
875         push(@pri_index, $i);
876       }
877       last unless keys %pri; # short circuit (Johnny Five Is Alive!)
878     }
879   }
880
881   # no need to do an if, it'll be empty if @pri_index is empty anyway
882
883   my %pri_vals = map { ($_ => $copy[$_]) } @pri_index;
884
885   my @const_rows;
886
887   do { # no need to check anything at the front, we always want the first row
888
889     my %const;
890   
891     foreach my $this_as (@construct_as) {
892       $const{$this_as->[0]||''}{$this_as->[1]} = shift(@copy);
893     }
894
895     push(@const_rows, \%const);
896
897   } until ( # no pri_index => no collapse => drop straight out
898       !@pri_index
899     or
900       do { # get another row, stash it, drop out if different PK
901
902         @copy = $self->cursor->next;
903         $self->{stashed_row} = \@copy;
904
905         # last thing in do block, counts as true if anything doesn't match
906
907         # check xor defined first for NULL vs. NOT NULL then if one is
908         # defined the other must be so check string equality
909
910         grep {
911           (defined $pri_vals{$_} ^ defined $copy[$_])
912           || (defined $pri_vals{$_} && ($pri_vals{$_} ne $copy[$_]))
913         } @pri_index;
914       }
915   );
916
917   my $alias = $self->{attrs}{alias};
918   my $info = [];
919
920   my %collapse_pos;
921
922   my @const_keys;
923
924   foreach my $const (@const_rows) {
925     scalar @const_keys or do {
926       @const_keys = sort { length($a) <=> length($b) } keys %$const;
927     };
928     foreach my $key (@const_keys) {
929       if (length $key) {
930         my $target = $info;
931         my @parts = split(/\./, $key);
932         my $cur = '';
933         my $data = $const->{$key};
934         foreach my $p (@parts) {
935           $target = $target->[1]->{$p} ||= [];
936           $cur .= ".${p}";
937           if ($cur eq ".${key}" && (my @ckey = @{$collapse{$cur}||[]})) { 
938             # collapsing at this point and on final part
939             my $pos = $collapse_pos{$cur};
940             CK: foreach my $ck (@ckey) {
941               if (!defined $pos->{$ck} || $pos->{$ck} ne $data->{$ck}) {
942                 $collapse_pos{$cur} = $data;
943                 delete @collapse_pos{ # clear all positioning for sub-entries
944                   grep { m/^\Q${cur}.\E/ } keys %collapse_pos
945                 };
946                 push(@$target, []);
947                 last CK;
948               }
949             }
950           }
951           if (exists $collapse{$cur}) {
952             $target = $target->[-1];
953           }
954         }
955         $target->[0] = $data;
956       } else {
957         $info->[0] = $const->{$key};
958       }
959     }
960   }
961
962   return $info;
963 }
964
965 =head2 result_source
966
967 =over 4
968
969 =item Arguments: $result_source?
970
971 =item Return Value: $result_source
972
973 =back
974
975 An accessor for the primary ResultSource object from which this ResultSet
976 is derived.
977
978 =head2 result_class
979
980 =over 4
981
982 =item Arguments: $result_class?
983
984 =item Return Value: $result_class
985
986 =back
987
988 An accessor for the class to use when creating row objects. Defaults to 
989 C<< result_source->result_class >> - which in most cases is the name of the 
990 L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
991
992 =cut
993
994 sub result_class {
995   my ($self, $result_class) = @_;
996   if ($result_class) {
997     $self->ensure_class_loaded($result_class);
998     $self->_result_class($result_class);
999   }
1000   $self->_result_class;
1001 }
1002
1003 =head2 count
1004
1005 =over 4
1006
1007 =item Arguments: $cond, \%attrs??
1008
1009 =item Return Value: $count
1010
1011 =back
1012
1013 Performs an SQL C<COUNT> with the same query as the resultset was built
1014 with to find the number of elements. If passed arguments, does a search
1015 on the resultset and counts the results of that.
1016
1017 Note: When using C<count> with C<group_by>, L<DBIx::Class> emulates C<GROUP BY>
1018 using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
1019 not support C<DISTINCT> with multiple columns. If you are using such a
1020 database, you should only use columns from the main table in your C<group_by>
1021 clause.
1022
1023 =cut
1024
1025 sub count {
1026   my $self = shift;
1027   return $self->search(@_)->count if @_ and defined $_[0];
1028   return scalar @{ $self->get_cache } if $self->get_cache;
1029   my $count = $self->_count;
1030   return 0 unless $count;
1031
1032   # need to take offset from resolved attrs
1033
1034   $count -= $self->{_attrs}{offset} if $self->{_attrs}{offset};
1035   $count = $self->{attrs}{rows} if
1036     $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
1037   $count = 0 if ($count < 0);
1038   return $count;
1039 }
1040
1041 sub _count { # Separated out so pager can get the full count
1042   my $self = shift;
1043   my $select = { count => '*' };
1044
1045   my $attrs = { %{$self->_resolved_attrs} };
1046   if (my $group_by = delete $attrs->{group_by}) {
1047     delete $attrs->{having};
1048     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
1049     # todo: try CONCAT for multi-column pk
1050     my @pk = $self->result_source->primary_columns;
1051     if (@pk == 1) {
1052       my $alias = $attrs->{alias};
1053       foreach my $column (@distinct) {
1054         if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
1055           @distinct = ($column);
1056           last;
1057         }
1058       }
1059     }
1060
1061     $select = { count => { distinct => \@distinct } };
1062   }
1063
1064   $attrs->{select} = $select;
1065   $attrs->{as} = [qw/count/];
1066
1067   # offset, order by and page are not needed to count. record_filter is cdbi
1068   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
1069
1070   my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
1071   my ($count) = $tmp_rs->cursor->next;
1072   return $count;
1073 }
1074
1075 sub _bool {
1076   return 1;
1077 }
1078
1079 =head2 count_literal
1080
1081 =over 4
1082
1083 =item Arguments: $sql_fragment, @bind_values
1084
1085 =item Return Value: $count
1086
1087 =back
1088
1089 Counts the results in a literal query. Equivalent to calling L</search_literal>
1090 with the passed arguments, then L</count>.
1091
1092 =cut
1093
1094 sub count_literal { shift->search_literal(@_)->count; }
1095
1096 =head2 all
1097
1098 =over 4
1099
1100 =item Arguments: none
1101
1102 =item Return Value: @objects
1103
1104 =back
1105
1106 Returns all elements in the resultset. Called implicitly if the resultset
1107 is returned in list context.
1108
1109 =cut
1110
1111 sub all {
1112   my $self = shift;
1113   if(@_) {
1114       $self->throw_exception("all() doesn't take any arguments, you probably wanted ->search(...)->all()");
1115   }
1116
1117   return @{ $self->get_cache } if $self->get_cache;
1118
1119   my @obj;
1120
1121   # TODO: don't call resolve here
1122   if (keys %{$self->_resolved_attrs->{collapse}}) {
1123 #  if ($self->{attrs}{prefetch}) {
1124       # Using $self->cursor->all is really just an optimisation.
1125       # If we're collapsing has_many prefetches it probably makes
1126       # very little difference, and this is cleaner than hacking
1127       # _construct_object to survive the approach
1128     my @row = $self->cursor->next;
1129     while (@row) {
1130       push(@obj, $self->_construct_object(@row));
1131       @row = (exists $self->{stashed_row}
1132                ? @{delete $self->{stashed_row}}
1133                : $self->cursor->next);
1134     }
1135   } else {
1136     @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
1137   }
1138
1139   $self->set_cache(\@obj) if $self->{attrs}{cache};
1140   return @obj;
1141 }
1142
1143 =head2 reset
1144
1145 =over 4
1146
1147 =item Arguments: none
1148
1149 =item Return Value: $self
1150
1151 =back
1152
1153 Resets the resultset's cursor, so you can iterate through the elements again.
1154
1155 =cut
1156
1157 sub reset {
1158   my ($self) = @_;
1159   delete $self->{_attrs} if exists $self->{_attrs};
1160   $self->{all_cache_position} = 0;
1161   $self->cursor->reset;
1162   return $self;
1163 }
1164
1165 =head2 first
1166
1167 =over 4
1168
1169 =item Arguments: none
1170
1171 =item Return Value: $object?
1172
1173 =back
1174
1175 Resets the resultset and returns an object for the first result (if the
1176 resultset returns anything).
1177
1178 =cut
1179
1180 sub first {
1181   return $_[0]->reset->next;
1182 }
1183
1184 # _cond_for_update_delete
1185 #
1186 # update/delete require the condition to be modified to handle
1187 # the differing SQL syntax available.  This transforms the $self->{cond}
1188 # appropriately, returning the new condition.
1189
1190 sub _cond_for_update_delete {
1191   my ($self, $full_cond) = @_;
1192   my $cond = {};
1193
1194   $full_cond ||= $self->{cond};
1195   # No-op. No condition, we're updating/deleting everything
1196   return $cond unless ref $full_cond;
1197
1198   if (ref $full_cond eq 'ARRAY') {
1199     $cond = [
1200       map {
1201         my %hash;
1202         foreach my $key (keys %{$_}) {
1203           $key =~ /([^.]+)$/;
1204           $hash{$1} = $_->{$key};
1205         }
1206         \%hash;
1207       } @{$full_cond}
1208     ];
1209   }
1210   elsif (ref $full_cond eq 'HASH') {
1211     if ((keys %{$full_cond})[0] eq '-and') {
1212       $cond->{-and} = [];
1213
1214       my @cond = @{$full_cond->{-and}};
1215       for (my $i = 0; $i < @cond; $i++) {
1216         my $entry = $cond[$i];
1217
1218         my $hash;
1219         if (ref $entry eq 'HASH') {
1220           $hash = $self->_cond_for_update_delete($entry);
1221         }
1222         else {
1223           $entry =~ /([^.]+)$/;
1224           $hash->{$1} = $cond[++$i];
1225         }
1226
1227         push @{$cond->{-and}}, $hash;
1228       }
1229     }
1230     else {
1231       foreach my $key (keys %{$full_cond}) {
1232         $key =~ /([^.]+)$/;
1233         $cond->{$1} = $full_cond->{$key};
1234       }
1235     }
1236   }
1237   else {
1238     $self->throw_exception(
1239       "Can't update/delete on resultset with condition unless hash or array"
1240     );
1241   }
1242
1243   return $cond;
1244 }
1245
1246
1247 =head2 update
1248
1249 =over 4
1250
1251 =item Arguments: \%values
1252
1253 =item Return Value: $storage_rv
1254
1255 =back
1256
1257 Sets the specified columns in the resultset to the supplied values in a
1258 single query. Return value will be true if the update succeeded or false
1259 if no records were updated; exact type of success value is storage-dependent.
1260
1261 =cut
1262
1263 sub update {
1264   my ($self, $values) = @_;
1265   $self->throw_exception("Values for update must be a hash")
1266     unless ref $values eq 'HASH';
1267
1268   my $cond = $self->_cond_for_update_delete;
1269    
1270   return $self->result_source->storage->update(
1271     $self->result_source, $values, $cond
1272   );
1273 }
1274
1275 =head2 update_all
1276
1277 =over 4
1278
1279 =item Arguments: \%values
1280
1281 =item Return Value: 1
1282
1283 =back
1284
1285 Fetches all objects and updates them one at a time. Note that C<update_all>
1286 will run DBIC cascade triggers, while L</update> will not.
1287
1288 =cut
1289
1290 sub update_all {
1291   my ($self, $values) = @_;
1292   $self->throw_exception("Values for update must be a hash")
1293     unless ref $values eq 'HASH';
1294   foreach my $obj ($self->all) {
1295     $obj->set_columns($values)->update;
1296   }
1297   return 1;
1298 }
1299
1300 =head2 delete
1301
1302 =over 4
1303
1304 =item Arguments: none
1305
1306 =item Return Value: 1
1307
1308 =back
1309
1310 Deletes the contents of the resultset from its result source. Note that this
1311 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
1312 to run. See also L<DBIx::Class::Row/delete>.
1313
1314 delete may not generate correct SQL for a query with joins or a resultset
1315 chained from a related resultset.  In this case it will generate a warning:-
1316
1317   WARNING! Currently $rs->delete() does not generate proper SQL on
1318   joined resultsets, and may delete rows well outside of the contents
1319   of $rs. Use at your own risk
1320
1321 In these cases you may find that delete_all is more appropriate, or you
1322 need to respecify your query in a way that can be expressed without a join.
1323
1324 =cut
1325
1326 sub delete {
1327   my ($self) = @_;
1328   $self->throw_exception("Delete should not be passed any arguments")
1329     if $_[1];
1330   carp(   'WARNING! Currently $rs->delete() does not generate proper SQL'
1331         . ' on joined resultsets, and may delete rows well outside of the'
1332         . ' contents of $rs. Use at your own risk' )
1333     if ( $self->{attrs}{seen_join} );
1334   my $cond = $self->_cond_for_update_delete;
1335
1336   $self->result_source->storage->delete($self->result_source, $cond);
1337   return 1;
1338 }
1339
1340 =head2 delete_all
1341
1342 =over 4
1343
1344 =item Arguments: none
1345
1346 =item Return Value: 1
1347
1348 =back
1349
1350 Fetches all objects and deletes them one at a time. Note that C<delete_all>
1351 will run DBIC cascade triggers, while L</delete> will not.
1352
1353 =cut
1354
1355 sub delete_all {
1356   my ($self) = @_;
1357   $_->delete for $self->all;
1358   return 1;
1359 }
1360
1361 =head2 populate
1362
1363 =over 4
1364
1365 =item Arguments: \@data;
1366
1367 =back
1368
1369 Accepts either an arrayref of hashrefs or alternatively an arrayref of arrayrefs.
1370 For the arrayref of hashrefs style each hashref should be a structure suitable
1371 forsubmitting to a $resultset->create(...) method.
1372
1373 In void context, C<insert_bulk> in L<DBIx::Class::Storage::DBI> is used
1374 to insert the data, as this is a faster method.  
1375
1376 Otherwise, each set of data is inserted into the database using
1377 L<DBIx::Class::ResultSet/create>, and a arrayref of the resulting row
1378 objects is returned.
1379
1380 Example:  Assuming an Artist Class that has many CDs Classes relating:
1381
1382   my $Artist_rs = $schema->resultset("Artist");
1383   
1384   ## Void Context Example 
1385   $Artist_rs->populate([
1386      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1387         { title => 'My First CD', year => 2006 },
1388         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1389       ],
1390      },
1391      { artistid => 5, name => 'Angsty-Whiny Girl', cds => [
1392         { title => 'My parents sold me to a record company' ,year => 2005 },
1393         { title => 'Why Am I So Ugly?', year => 2006 },
1394         { title => 'I Got Surgery and am now Popular', year => 2007 }
1395       ],
1396      },
1397   ]);
1398   
1399   ## Array Context Example
1400   my ($ArtistOne, $ArtistTwo, $ArtistThree) = $Artist_rs->populate([
1401     { name => "Artist One"},
1402     { name => "Artist Two"},
1403     { name => "Artist Three", cds=> [
1404     { title => "First CD", year => 2007},
1405     { title => "Second CD", year => 2008},
1406   ]}
1407   ]);
1408   
1409   print $ArtistOne->name; ## response is 'Artist One'
1410   print $ArtistThree->cds->count ## reponse is '2'
1411
1412 For the arrayref of arrayrefs style,  the first element should be a list of the
1413 fieldsnames to which the remaining elements are rows being inserted.  For
1414 example:
1415
1416   $Arstist_rs->populate([
1417     [qw/artistid name/],
1418     [100, 'A Formally Unknown Singer'],
1419     [101, 'A singer that jumped the shark two albums ago'],
1420     [102, 'An actually cool singer.'],
1421   ]);
1422
1423 Please note an important effect on your data when choosing between void and
1424 wantarray context. Since void context goes straight to C<insert_bulk> in 
1425 L<DBIx::Class::Storage::DBI> this will skip any component that is overriding
1426 c<insert>.  So if you are using something like L<DBIx-Class-UUIDColumns> to 
1427 create primary keys for you, you will find that your PKs are empty.  In this 
1428 case you will have to use the wantarray context in order to create those 
1429 values.
1430
1431 =cut
1432
1433 sub populate {
1434   my $self = shift @_;
1435   my $data = ref $_[0][0] eq 'HASH'
1436     ? $_[0] : ref $_[0][0] eq 'ARRAY' ? $self->_normalize_populate_args($_[0]) :
1437     $self->throw_exception('Populate expects an arrayref of hashes or arrayref of arrayrefs');
1438   
1439   if(defined wantarray) {
1440     my @created;
1441     foreach my $item (@$data) {
1442       push(@created, $self->create($item));
1443     }
1444     return @created;
1445   } else {
1446     my ($first, @rest) = @$data;
1447
1448     my @names = grep {!ref $first->{$_}} keys %$first;
1449     my @rels = grep { $self->result_source->has_relationship($_) } keys %$first;
1450     my @pks = $self->result_source->primary_columns;  
1451
1452     ## do the belongs_to relationships  
1453     foreach my $index (0..$#$data) {
1454       if( grep { !defined $data->[$index]->{$_} } @pks ) {
1455         my @ret = $self->populate($data);
1456         return;
1457       }
1458     
1459       foreach my $rel (@rels) {
1460         next unless $data->[$index]->{$rel} && ref $data->[$index]->{$rel} eq "HASH";
1461         my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
1462         my ($reverse) = keys %{$self->result_source->reverse_relationship_info($rel)};
1463         my $related = $result->result_source->resolve_condition(
1464           $result->result_source->relationship_info($reverse)->{cond},
1465           $self,        
1466           $result,        
1467         );
1468
1469         delete $data->[$index]->{$rel};
1470         $data->[$index] = {%{$data->[$index]}, %$related};
1471       
1472         push @names, keys %$related if $index == 0;
1473       }
1474     }
1475
1476     ## do bulk insert on current row
1477     my @values = map { [ @$_{@names} ] } @$data;
1478
1479     $self->result_source->storage->insert_bulk(
1480       $self->result_source, 
1481       \@names, 
1482       \@values,
1483     );
1484
1485     ## do the has_many relationships
1486     foreach my $item (@$data) {
1487
1488       foreach my $rel (@rels) {
1489         next unless $item->{$rel} && ref $item->{$rel} eq "ARRAY";
1490
1491         my $parent = $self->find(map {{$_=>$item->{$_}} } @pks) 
1492      || $self->throw_exception('Cannot find the relating object.');
1493      
1494         my $child = $parent->$rel;
1495     
1496         my $related = $child->result_source->resolve_condition(
1497           $parent->result_source->relationship_info($rel)->{cond},
1498           $child,
1499           $parent,
1500         );
1501
1502         my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
1503         my @populate = map { {%$_, %$related} } @rows_to_add;
1504
1505         $child->populate( \@populate );
1506       }
1507     }
1508   }
1509 }
1510
1511 =head2 _normalize_populate_args ($args)
1512
1513 Private method used by L</populate> to normalize its incoming arguments.  Factored
1514 out in case you want to subclass and accept new argument structures to the
1515 L</populate> method.
1516
1517 =cut
1518
1519 sub _normalize_populate_args {
1520   my ($self, $data) = @_;
1521   my @names = @{shift(@$data)};
1522   my @results_to_create;
1523   foreach my $datum (@$data) {
1524     my %result_to_create;
1525     foreach my $index (0..$#names) {
1526       $result_to_create{$names[$index]} = $$datum[$index];
1527     }
1528     push @results_to_create, \%result_to_create;    
1529   }
1530   return \@results_to_create;
1531 }
1532
1533 =head2 pager
1534
1535 =over 4
1536
1537 =item Arguments: none
1538
1539 =item Return Value: $pager
1540
1541 =back
1542
1543 Return Value a L<Data::Page> object for the current resultset. Only makes
1544 sense for queries with a C<page> attribute.
1545
1546 =cut
1547
1548 sub pager {
1549   my ($self) = @_;
1550   my $attrs = $self->{attrs};
1551   $self->throw_exception("Can't create pager for non-paged rs")
1552     unless $self->{attrs}{page};
1553   $attrs->{rows} ||= 10;
1554   return $self->{pager} ||= Data::Page->new(
1555     $self->_count, $attrs->{rows}, $self->{attrs}{page});
1556 }
1557
1558 =head2 page
1559
1560 =over 4
1561
1562 =item Arguments: $page_number
1563
1564 =item Return Value: $rs
1565
1566 =back
1567
1568 Returns a resultset for the $page_number page of the resultset on which page
1569 is called, where each page contains a number of rows equal to the 'rows'
1570 attribute set on the resultset (10 by default).
1571
1572 =cut
1573
1574 sub page {
1575   my ($self, $page) = @_;
1576   return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
1577 }
1578
1579 =head2 new_result
1580
1581 =over 4
1582
1583 =item Arguments: \%vals
1584
1585 =item Return Value: $rowobject
1586
1587 =back
1588
1589 Creates a new row object in the resultset's result class and returns
1590 it. The row is not inserted into the database at this point, call
1591 L<DBIx::Class::Row/insert> to do that. Calling L<DBIx::Class::Row/in_storage>
1592 will tell you whether the row object has been inserted or not.
1593
1594 Passes the hashref of input on to L<DBIx::Class::Row/new>.
1595
1596 =cut
1597
1598 sub new_result {
1599   my ($self, $values) = @_;
1600   $self->throw_exception( "new_result needs a hash" )
1601     unless (ref $values eq 'HASH');
1602
1603   my %new;
1604   my $alias = $self->{attrs}{alias};
1605
1606   if (
1607     defined $self->{cond}
1608     && $self->{cond} eq $DBIx::Class::ResultSource::UNRESOLVABLE_CONDITION
1609   ) {
1610     %new = %{$self->{attrs}{related_objects}};
1611   } else {
1612     $self->throw_exception(
1613       "Can't abstract implicit construct, condition not a hash"
1614     ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
1615   
1616     my $collapsed_cond = (
1617       $self->{cond}
1618         ? $self->_collapse_cond($self->{cond})
1619         : {}
1620     );
1621   
1622     # precendence must be given to passed values over values inherited from
1623     # the cond, so the order here is important.
1624     my %implied =  %{$self->_remove_alias($collapsed_cond, $alias)};
1625     while( my($col,$value) = each %implied ){
1626       if(ref($value) eq 'HASH' && keys(%$value) && (keys %$value)[0] eq '='){
1627         $new{$col} = $value->{'='};
1628         next;
1629       }
1630       $new{$col} = $value if $self->_is_deterministic_value($value);
1631     }
1632   }
1633
1634   %new = (
1635     %new,
1636     %{ $self->_remove_alias($values, $alias) },
1637     -source_handle => $self->_source_handle,
1638     -result_source => $self->result_source, # DO NOT REMOVE THIS, REQUIRED
1639   );
1640
1641   return $self->result_class->new(\%new);
1642 }
1643
1644 # _is_deterministic_value
1645 #
1646 # Make an effor to strip non-deterministic values from the condition, 
1647 # to make sure new_result chokes less
1648
1649 sub _is_deterministic_value {
1650   my $self = shift;
1651   my $value = shift;
1652   my $ref_type = ref $value;
1653   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
1654   return 1 if Scalar::Util::blessed($value);
1655   return 0;
1656 }
1657
1658 # _collapse_cond
1659 #
1660 # Recursively collapse the condition.
1661
1662 sub _collapse_cond {
1663   my ($self, $cond, $collapsed) = @_;
1664
1665   $collapsed ||= {};
1666
1667   if (ref $cond eq 'ARRAY') {
1668     foreach my $subcond (@$cond) {
1669       next unless ref $subcond;  # -or
1670 #      warn "ARRAY: " . Dumper $subcond;
1671       $collapsed = $self->_collapse_cond($subcond, $collapsed);
1672     }
1673   }
1674   elsif (ref $cond eq 'HASH') {
1675     if (keys %$cond and (keys %$cond)[0] eq '-and') {
1676       foreach my $subcond (@{$cond->{-and}}) {
1677 #        warn "HASH: " . Dumper $subcond;
1678         $collapsed = $self->_collapse_cond($subcond, $collapsed);
1679       }
1680     }
1681     else {
1682 #      warn "LEAF: " . Dumper $cond;
1683       foreach my $col (keys %$cond) {
1684         my $value = $cond->{$col};
1685         $collapsed->{$col} = $value;
1686       }
1687     }
1688   }
1689
1690   return $collapsed;
1691 }
1692
1693 # _remove_alias
1694 #
1695 # Remove the specified alias from the specified query hash. A copy is made so
1696 # the original query is not modified.
1697
1698 sub _remove_alias {
1699   my ($self, $query, $alias) = @_;
1700
1701   my %orig = %{ $query || {} };
1702   my %unaliased;
1703
1704   foreach my $key (keys %orig) {
1705     if ($key !~ /\./) {
1706       $unaliased{$key} = $orig{$key};
1707       next;
1708     }
1709     $unaliased{$1} = $orig{$key}
1710       if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
1711   }
1712
1713   return \%unaliased;
1714 }
1715
1716 =head2 find_or_new
1717
1718 =over 4
1719
1720 =item Arguments: \%vals, \%attrs?
1721
1722 =item Return Value: $rowobject
1723
1724 =back
1725
1726   my $artist = $schema->resultset('Artist')->find_or_new(
1727     { artist => 'fred' }, { key => 'artists' });
1728
1729   $cd->cd_to_producer->find_or_new({ producer => $producer },
1730                                    { key => 'primary });
1731
1732 Find an existing record from this resultset, based on its primary
1733 key, or a unique constraint. If none exists, instantiate a new result
1734 object and return it. The object will not be saved into your storage
1735 until you call L<DBIx::Class::Row/insert> on it.
1736
1737 You most likely want this method when looking for existing rows using
1738 a unique constraint that is not the primary key, or looking for
1739 related rows.
1740
1741 If you want objects to be saved immediately, use L</find_or_create> instead.
1742
1743 B<Note>: C<find_or_new> is probably not what you want when creating a
1744 new row in a table that uses primary keys supplied by the
1745 database. Passing in a primary key column with a value of I<undef>
1746 will cause L</find> to attempt to search for a row with a value of
1747 I<NULL>.
1748
1749 =cut
1750
1751 sub find_or_new {
1752   my $self     = shift;
1753   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1754   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1755   my $exists   = $self->find($hash, $attrs);
1756   return defined $exists ? $exists : $self->new_result($hash);
1757 }
1758
1759 =head2 create
1760
1761 =over 4
1762
1763 =item Arguments: \%vals
1764
1765 =item Return Value: a L<DBIx::Class::Row> $object
1766
1767 =back
1768
1769 Attempt to create a single new row or a row with multiple related rows
1770 in the table represented by the resultset (and related tables). This
1771 will not check for duplicate rows before inserting, use
1772 L</find_or_create> to do that.
1773
1774 To create one row for this resultset, pass a hashref of key/value
1775 pairs representing the columns of the table and the values you wish to
1776 store. If the appropriate relationships are set up, foreign key fields
1777 can also be passed an object representing the foreign row, and the
1778 value will be set to its primary key.
1779
1780 To create related objects, pass a hashref for the value if the related
1781 item is a foreign key relationship (L<DBIx::Class::Relationship/belongs_to>),
1782 and use the name of the relationship as the key. (NOT the name of the field,
1783 necessarily). For C<has_many> and C<has_one> relationships, pass an arrayref
1784 of hashrefs containing the data for each of the rows to create in the foreign
1785 tables, again using the relationship name as the key.
1786
1787 Instead of hashrefs of plain related data (key/value pairs), you may
1788 also pass new or inserted objects. New objects (not inserted yet, see
1789 L</new>), will be inserted into their appropriate tables.
1790
1791 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
1792
1793 Example of creating a new row.
1794
1795   $person_rs->create({
1796     name=>"Some Person",
1797     email=>"somebody@someplace.com"
1798   });
1799   
1800 Example of creating a new row and also creating rows in a related C<has_many>
1801 or C<has_one> resultset.  Note Arrayref.
1802
1803   $artist_rs->create(
1804      { artistid => 4, name => 'Manufactured Crap', cds => [ 
1805         { title => 'My First CD', year => 2006 },
1806         { title => 'Yet More Tweeny-Pop crap', year => 2007 },
1807       ],
1808      },
1809   );
1810
1811 Example of creating a new row and also creating a row in a related
1812 C<belongs_to>resultset. Note Hashref.
1813
1814   $cd_rs->create({
1815     title=>"Music for Silly Walks",
1816     year=>2000,
1817     artist => {
1818       name=>"Silly Musician",
1819     }
1820   });
1821
1822 =cut
1823
1824 sub create {
1825   my ($self, $attrs) = @_;
1826   $self->throw_exception( "create needs a hashref" )
1827     unless ref $attrs eq 'HASH';
1828   return $self->new_result($attrs)->insert;
1829 }
1830
1831 =head2 find_or_create
1832
1833 =over 4
1834
1835 =item Arguments: \%vals, \%attrs?
1836
1837 =item Return Value: $rowobject
1838
1839 =back
1840
1841   $cd->cd_to_producer->find_or_create({ producer => $producer },
1842                                       { key => 'primary });
1843
1844 Tries to find a record based on its primary key or unique constraints; if none
1845 is found, creates one and returns that instead.
1846
1847   my $cd = $schema->resultset('CD')->find_or_create({
1848     cdid   => 5,
1849     artist => 'Massive Attack',
1850     title  => 'Mezzanine',
1851     year   => 2005,
1852   });
1853
1854 Also takes an optional C<key> attribute, to search by a specific key or unique
1855 constraint. For example:
1856
1857   my $cd = $schema->resultset('CD')->find_or_create(
1858     {
1859       artist => 'Massive Attack',
1860       title  => 'Mezzanine',
1861     },
1862     { key => 'cd_artist_title' }
1863   );
1864
1865 B<Note>: Because find_or_create() reads from the database and then
1866 possibly inserts based on the result, this method is subject to a race
1867 condition. Another process could create a record in the table after
1868 the find has completed and before the create has started. To avoid
1869 this problem, use find_or_create() inside a transaction.
1870
1871 B<Note>: C<find_or_create> is probably not what you want when creating
1872 a new row in a table that uses primary keys supplied by the
1873 database. Passing in a primary key column with a value of I<undef>
1874 will cause L</find> to attempt to search for a row with a value of
1875 I<NULL>.
1876
1877 See also L</find> and L</update_or_create>. For information on how to declare
1878 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1879
1880 =cut
1881
1882 sub find_or_create {
1883   my $self     = shift;
1884   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1885   my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
1886   my $exists   = $self->find($hash, $attrs);
1887   return defined $exists ? $exists : $self->create($hash);
1888 }
1889
1890 =head2 update_or_create
1891
1892 =over 4
1893
1894 =item Arguments: \%col_values, { key => $unique_constraint }?
1895
1896 =item Return Value: $rowobject
1897
1898 =back
1899
1900   $resultset->update_or_create({ col => $val, ... });
1901
1902 First, searches for an existing row matching one of the unique constraints
1903 (including the primary key) on the source of this resultset. If a row is
1904 found, updates it with the other given column values. Otherwise, creates a new
1905 row.
1906
1907 Takes an optional C<key> attribute to search on a specific unique constraint.
1908 For example:
1909
1910   # In your application
1911   my $cd = $schema->resultset('CD')->update_or_create(
1912     {
1913       artist => 'Massive Attack',
1914       title  => 'Mezzanine',
1915       year   => 1998,
1916     },
1917     { key => 'cd_artist_title' }
1918   );
1919
1920   $cd->cd_to_producer->update_or_create({ 
1921     producer => $producer, 
1922     name => 'harry',
1923   }, { 
1924     key => 'primary,
1925   });
1926
1927
1928 If no C<key> is specified, it searches on all unique constraints defined on the
1929 source, including the primary key.
1930
1931 If the C<key> is specified as C<primary>, it searches only on the primary key.
1932
1933 See also L</find> and L</find_or_create>. For information on how to declare
1934 unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
1935
1936 B<Note>: C<update_or_create> is probably not what you want when
1937 looking for a row in a table that uses primary keys supplied by the
1938 database, unless you actually have a key value. Passing in a primary
1939 key column with a value of I<undef> will cause L</find> to attempt to
1940 search for a row with a value of I<NULL>.
1941
1942 =cut
1943
1944 sub update_or_create {
1945   my $self = shift;
1946   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
1947   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
1948
1949   my $row = $self->find($cond, $attrs);
1950   if (defined $row) {
1951     $row->update($cond);
1952     return $row;
1953   }
1954
1955   return $self->create($cond);
1956 }
1957
1958 =head2 get_cache
1959
1960 =over 4
1961
1962 =item Arguments: none
1963
1964 =item Return Value: \@cache_objects?
1965
1966 =back
1967
1968 Gets the contents of the cache for the resultset, if the cache is set.
1969
1970 The cache is populated either by using the L</prefetch> attribute to
1971 L</search> or by calling L</set_cache>.
1972
1973 =cut
1974
1975 sub get_cache {
1976   shift->{all_cache};
1977 }
1978
1979 =head2 set_cache
1980
1981 =over 4
1982
1983 =item Arguments: \@cache_objects
1984
1985 =item Return Value: \@cache_objects
1986
1987 =back
1988
1989 Sets the contents of the cache for the resultset. Expects an arrayref
1990 of objects of the same class as those produced by the resultset. Note that
1991 if the cache is set the resultset will return the cached objects rather
1992 than re-querying the database even if the cache attr is not set.
1993
1994 The contents of the cache can also be populated by using the
1995 L</prefetch> attribute to L</search>.
1996
1997 =cut
1998
1999 sub set_cache {
2000   my ( $self, $data ) = @_;
2001   $self->throw_exception("set_cache requires an arrayref")
2002       if defined($data) && (ref $data ne 'ARRAY');
2003   $self->{all_cache} = $data;
2004 }
2005
2006 =head2 clear_cache
2007
2008 =over 4
2009
2010 =item Arguments: none
2011
2012 =item Return Value: []
2013
2014 =back
2015
2016 Clears the cache for the resultset.
2017
2018 =cut
2019
2020 sub clear_cache {
2021   shift->set_cache(undef);
2022 }
2023
2024 =head2 related_resultset
2025
2026 =over 4
2027
2028 =item Arguments: $relationship_name
2029
2030 =item Return Value: $resultset
2031
2032 =back
2033
2034 Returns a related resultset for the supplied relationship name.
2035
2036   $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
2037
2038 =cut
2039
2040 sub related_resultset {
2041   my ($self, $rel) = @_;
2042
2043   $self->{related_resultsets} ||= {};
2044   return $self->{related_resultsets}{$rel} ||= do {
2045     my $rel_obj = $self->result_source->relationship_info($rel);
2046
2047     $self->throw_exception(
2048       "search_related: result source '" . $self->result_source->source_name .
2049         "' has no such relationship $rel")
2050       unless $rel_obj;
2051     
2052     my ($from,$seen) = $self->_resolve_from($rel);
2053
2054     my $join_count = $seen->{$rel};
2055     my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
2056
2057     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
2058     my %attrs = %{$self->{attrs}||{}};
2059     delete @attrs{qw(result_class alias)};
2060
2061     my $new_cache;
2062
2063     if (my $cache = $self->get_cache) {
2064       if ($cache->[0] && $cache->[0]->related_resultset($rel)->get_cache) {
2065         $new_cache = [ map { @{$_->related_resultset($rel)->get_cache} }
2066                         @$cache ];
2067       }
2068     }
2069
2070     my $rel_source = $self->result_source->related_source($rel);
2071
2072     my $new = do {
2073
2074       # The reason we do this now instead of passing the alias to the
2075       # search_rs below is that if you wrap/overload resultset on the
2076       # source you need to know what alias it's -going- to have for things
2077       # to work sanely (e.g. RestrictWithObject wants to be able to add
2078       # extra query restrictions, and these may need to be $alias.)
2079
2080       my $attrs = $rel_source->resultset_attributes;
2081       local $attrs->{alias} = $alias;
2082
2083       $rel_source->resultset
2084                  ->search_rs(
2085                      undef, {
2086                        %attrs,
2087                        join => undef,
2088                        prefetch => undef,
2089                        select => undef,
2090                        as => undef,
2091                        where => $self->{cond},
2092                        seen_join => $seen,
2093                        from => $from,
2094                    });
2095     };
2096     $new->set_cache($new_cache) if $new_cache;
2097     $new;
2098   };
2099 }
2100
2101 =head2 current_source_alias
2102
2103 =over 4
2104
2105 =item Arguments: none
2106
2107 =item Return Value: $source_alias
2108
2109 =back
2110
2111 Returns the current table alias for the result source this resultset is built
2112 on, that will be used in the SQL query. Usually it is C<me>.
2113
2114 Currently the source alias that refers to the result set returned by a
2115 L</search>/L</find> family method depends on how you got to the resultset: it's
2116 C<me> by default, but eg. L</search_related> aliases it to the related result
2117 source name (and keeps C<me> referring to the original result set). The long
2118 term goal is to make L<DBIx::Class> always alias the current resultset as C<me>
2119 (and make this method unnecessary).
2120
2121 Thus it's currently necessary to use this method in predefined queries (see
2122 L<DBIx::Class::Manual::Cookbook/Predefined searches>) when referring to the
2123 source alias of the current result set:
2124
2125   # in a result set class
2126   sub modified_by {
2127     my ($self, $user) = @_;
2128
2129     my $me = $self->current_source_alias;
2130
2131     return $self->search(
2132       "$me.modified" => $user->id,
2133     );
2134   }
2135
2136 =cut
2137
2138 sub current_source_alias {
2139   my ($self) = @_;
2140
2141   return ($self->{attrs} || {})->{alias} || 'me';
2142 }
2143
2144 sub _resolve_from {
2145   my ($self, $extra_join) = @_;
2146   my $source = $self->result_source;
2147   my $attrs = $self->{attrs};
2148   
2149   my $from = $attrs->{from}
2150     || [ { $attrs->{alias} => $source->from } ];
2151     
2152   my $seen = { %{$attrs->{seen_join}||{}} };
2153
2154   my $join = ($attrs->{join}
2155                ? [ $attrs->{join}, $extra_join ]
2156                : $extra_join);
2157
2158   # we need to take the prefetch the attrs into account before we 
2159   # ->resolve_join as otherwise they get lost - captainL
2160   my $merged = $self->_merge_attr( $join, $attrs->{prefetch} );
2161
2162   $from = [
2163     @$from,
2164     ($join ? $source->resolve_join($merged, $attrs->{alias}, $seen) : ()),
2165   ];
2166
2167   return ($from,$seen);
2168 }
2169
2170 sub _resolved_attrs {
2171   my $self = shift;
2172   return $self->{_attrs} if $self->{_attrs};
2173
2174   my $attrs = { %{$self->{attrs}||{}} };
2175   my $source = $self->result_source;
2176   my $alias = $attrs->{alias};
2177
2178   $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
2179   if ($attrs->{columns}) {
2180     delete $attrs->{as};
2181   } elsif (!$attrs->{select}) {
2182     $attrs->{columns} = [ $source->columns ];
2183   }
2184  
2185   $attrs->{select} = 
2186     ($attrs->{select}
2187       ? (ref $attrs->{select} eq 'ARRAY'
2188           ? [ @{$attrs->{select}} ]
2189           : [ $attrs->{select} ])
2190       : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
2191     );
2192   $attrs->{as} =
2193     ($attrs->{as}
2194       ? (ref $attrs->{as} eq 'ARRAY'
2195           ? [ @{$attrs->{as}} ]
2196           : [ $attrs->{as} ])
2197       : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
2198     );
2199   
2200   my $adds;
2201   if ($adds = delete $attrs->{include_columns}) {
2202     $adds = [$adds] unless ref $adds eq 'ARRAY';
2203     push(@{$attrs->{select}}, @$adds);
2204     push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
2205   }
2206   if ($adds = delete $attrs->{'+select'}) {
2207     $adds = [$adds] unless ref $adds eq 'ARRAY';
2208     push(@{$attrs->{select}},
2209            map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
2210   }
2211   if (my $adds = delete $attrs->{'+as'}) {
2212     $adds = [$adds] unless ref $adds eq 'ARRAY';
2213     push(@{$attrs->{as}}, @$adds);
2214   }
2215
2216   $attrs->{from} ||= [ { 'me' => $source->from } ];
2217
2218   if (exists $attrs->{join} || exists $attrs->{prefetch}) {
2219     my $join = delete $attrs->{join} || {};
2220
2221     if (defined $attrs->{prefetch}) {
2222       $join = $self->_merge_attr(
2223         $join, $attrs->{prefetch}
2224       );
2225       
2226     }
2227
2228     $attrs->{from} =   # have to copy here to avoid corrupting the original
2229       [
2230         @{$attrs->{from}}, 
2231         $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
2232       ];
2233
2234   }
2235
2236   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
2237   if ($attrs->{order_by}) {
2238     $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
2239                            ? [ @{$attrs->{order_by}} ]
2240                            : [ $attrs->{order_by} ]);
2241   } else {
2242     $attrs->{order_by} = [];    
2243   }
2244
2245   my $collapse = $attrs->{collapse} || {};
2246   if (my $prefetch = delete $attrs->{prefetch}) {
2247     $prefetch = $self->_merge_attr({}, $prefetch);
2248     my @pre_order;
2249     my $seen = { %{ $attrs->{seen_join} || {} } };
2250     foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
2251       # bring joins back to level of current class
2252       my @prefetch = $source->resolve_prefetch(
2253         $p, $alias, $seen, \@pre_order, $collapse
2254       );
2255       push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
2256       push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
2257     }
2258     push(@{$attrs->{order_by}}, @pre_order);
2259   }
2260   $attrs->{collapse} = $collapse;
2261
2262   if ($attrs->{page}) {
2263     $attrs->{offset} ||= 0;
2264     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
2265   }
2266
2267   return $self->{_attrs} = $attrs;
2268 }
2269
2270 sub _rollout_attr {
2271   my ($self, $attr) = @_;
2272   
2273   if (ref $attr eq 'HASH') {
2274     return $self->_rollout_hash($attr);
2275   } elsif (ref $attr eq 'ARRAY') {
2276     return $self->_rollout_array($attr);
2277   } else {
2278     return [$attr];
2279   }
2280 }
2281
2282 sub _rollout_array {
2283   my ($self, $attr) = @_;
2284
2285   my @rolled_array;
2286   foreach my $element (@{$attr}) {
2287     if (ref $element eq 'HASH') {
2288       push( @rolled_array, @{ $self->_rollout_hash( $element ) } );
2289     } elsif (ref $element eq 'ARRAY') {
2290       #  XXX - should probably recurse here
2291       push( @rolled_array, @{$self->_rollout_array($element)} );
2292     } else {
2293       push( @rolled_array, $element );
2294     }
2295   }
2296   return \@rolled_array;
2297 }
2298
2299 sub _rollout_hash {
2300   my ($self, $attr) = @_;
2301
2302   my @rolled_array;
2303   foreach my $key (keys %{$attr}) {
2304     push( @rolled_array, { $key => $attr->{$key} } );
2305   }
2306   return \@rolled_array;
2307 }
2308
2309 sub _calculate_score {
2310   my ($self, $a, $b) = @_;
2311
2312   if (ref $b eq 'HASH') {
2313     my ($b_key) = keys %{$b};
2314     if (ref $a eq 'HASH') {
2315       my ($a_key) = keys %{$a};
2316       if ($a_key eq $b_key) {
2317         return (1 + $self->_calculate_score( $a->{$a_key}, $b->{$b_key} ));
2318       } else {
2319         return 0;
2320       }
2321     } else {
2322       return ($a eq $b_key) ? 1 : 0;
2323     }       
2324   } else {
2325     if (ref $a eq 'HASH') {
2326       my ($a_key) = keys %{$a};
2327       return ($b eq $a_key) ? 1 : 0;
2328     } else {
2329       return ($b eq $a) ? 1 : 0;
2330     }
2331   }
2332 }
2333
2334 sub _merge_attr {
2335   my ($self, $orig, $import) = @_;
2336
2337   return $import unless defined($orig);
2338   return $orig unless defined($import);
2339   
2340   $orig = $self->_rollout_attr($orig);
2341   $import = $self->_rollout_attr($import);
2342
2343   my $seen_keys;
2344   foreach my $import_element ( @{$import} ) {
2345     # find best candidate from $orig to merge $b_element into
2346     my $best_candidate = { position => undef, score => 0 }; my $position = 0;
2347     foreach my $orig_element ( @{$orig} ) {
2348       my $score = $self->_calculate_score( $orig_element, $import_element );
2349       if ($score > $best_candidate->{score}) {
2350         $best_candidate->{position} = $position;
2351         $best_candidate->{score} = $score;
2352       }
2353       $position++;
2354     }
2355     my ($import_key) = ( ref $import_element eq 'HASH' ) ? keys %{$import_element} : ($import_element);
2356
2357     if ($best_candidate->{score} == 0 || exists $seen_keys->{$import_key}) {
2358       push( @{$orig}, $import_element );
2359     } else {
2360       my $orig_best = $orig->[$best_candidate->{position}];
2361       # merge orig_best and b_element together and replace original with merged
2362       if (ref $orig_best ne 'HASH') {
2363         $orig->[$best_candidate->{position}] = $import_element;
2364       } elsif (ref $import_element eq 'HASH') {
2365         my ($key) = keys %{$orig_best};
2366         $orig->[$best_candidate->{position}] = { $key => $self->_merge_attr($orig_best->{$key}, $import_element->{$key}) };
2367       }
2368     }
2369     $seen_keys->{$import_key} = 1; # don't merge the same key twice
2370   }
2371
2372   return $orig;
2373 }
2374
2375 sub result_source {
2376     my $self = shift;
2377
2378     if (@_) {
2379         $self->_source_handle($_[0]->handle);
2380     } else {
2381         $self->_source_handle->resolve;
2382     }
2383 }
2384
2385 =head2 throw_exception
2386
2387 See L<DBIx::Class::Schema/throw_exception> for details.
2388
2389 =cut
2390
2391 sub throw_exception {
2392   my $self=shift;
2393   if (ref $self && $self->_source_handle->schema) {
2394     $self->_source_handle->schema->throw_exception(@_)
2395   } else {
2396     croak(@_);
2397   }
2398
2399 }
2400
2401 # XXX: FIXME: Attributes docs need clearing up
2402
2403 =head1 ATTRIBUTES
2404
2405 Attributes are used to refine a ResultSet in various ways when
2406 searching for data. They can be passed to any method which takes an
2407 C<\%attrs> argument. See L</search>, L</search_rs>, L</find>,
2408 L</count>.
2409
2410 These are in no particular order:
2411
2412 =head2 order_by
2413
2414 =over 4
2415
2416 =item Value: ($order_by | \@order_by)
2417
2418 =back
2419
2420 Which column(s) to order the results by. This is currently passed
2421 through directly to SQL, so you can give e.g. C<year DESC> for a
2422 descending order on the column `year'.
2423
2424 Please note that if you have C<quote_char> enabled (see
2425 L<DBIx::Class::Storage::DBI/connect_info>) you will need to do C<\'year DESC' > to
2426 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
2427 so you will need to manually quote things as appropriate.)
2428
2429 If your L<SQL::Abstract> version supports it (>=1.50), you can also use
2430 C<{-desc => 'year'}>, which takes care of the quoting for you. This is the
2431 recommended syntax.
2432
2433 =head2 columns
2434
2435 =over 4
2436
2437 =item Value: \@columns
2438
2439 =back
2440
2441 Shortcut to request a particular set of columns to be retrieved.  Adds
2442 C<me.> onto the start of any column without a C<.> in it and sets C<select>
2443 from that, then auto-populates C<as> from C<select> as normal. (You may also
2444 use the C<cols> attribute, as in earlier versions of DBIC.)
2445
2446 =head2 include_columns
2447
2448 =over 4
2449
2450 =item Value: \@columns
2451
2452 =back
2453
2454 Shortcut to include additional columns in the returned results - for example
2455
2456   $schema->resultset('CD')->search(undef, {
2457     include_columns => ['artist.name'],
2458     join => ['artist']
2459   });
2460
2461 would return all CDs and include a 'name' column to the information
2462 passed to object inflation. Note that the 'artist' is the name of the
2463 column (or relationship) accessor, and 'name' is the name of the column
2464 accessor in the related table.
2465
2466 =head2 select
2467
2468 =over 4
2469
2470 =item Value: \@select_columns
2471
2472 =back
2473
2474 Indicates which columns should be selected from the storage. You can use
2475 column names, or in the case of RDBMS back ends, function or stored procedure
2476 names:
2477
2478   $rs = $schema->resultset('Employee')->search(undef, {
2479     select => [
2480       'name',
2481       { count => 'employeeid' },
2482       { sum => 'salary' }
2483     ]
2484   });
2485
2486 When you use function/stored procedure names and do not supply an C<as>
2487 attribute, the column names returned are storage-dependent. E.g. MySQL would
2488 return a column named C<count(employeeid)> in the above example.
2489
2490 =head2 +select
2491
2492 =over 4
2493
2494 Indicates additional columns to be selected from storage.  Works the same as
2495 L</select> but adds columns to the selection.
2496
2497 =back
2498
2499 =head2 +as
2500
2501 =over 4
2502
2503 Indicates additional column names for those added via L</+select>. See L</as>.
2504
2505 =back
2506
2507 =head2 as
2508
2509 =over 4
2510
2511 =item Value: \@inflation_names
2512
2513 =back
2514
2515 Indicates column names for object inflation. That is, C<as>
2516 indicates the name that the column can be accessed as via the
2517 C<get_column> method (or via the object accessor, B<if one already
2518 exists>).  It has nothing to do with the SQL code C<SELECT foo AS bar>.
2519
2520 The C<as> attribute is used in conjunction with C<select>,
2521 usually when C<select> contains one or more function or stored
2522 procedure names:
2523
2524   $rs = $schema->resultset('Employee')->search(undef, {
2525     select => [
2526       'name',
2527       { count => 'employeeid' }
2528     ],
2529     as => ['name', 'employee_count'],
2530   });
2531
2532   my $employee = $rs->first(); # get the first Employee
2533
2534 If the object against which the search is performed already has an accessor
2535 matching a column name specified in C<as>, the value can be retrieved using
2536 the accessor as normal:
2537
2538   my $name = $employee->name();
2539
2540 If on the other hand an accessor does not exist in the object, you need to
2541 use C<get_column> instead:
2542
2543   my $employee_count = $employee->get_column('employee_count');
2544
2545 You can create your own accessors if required - see
2546 L<DBIx::Class::Manual::Cookbook> for details.
2547
2548 Please note: This will NOT insert an C<AS employee_count> into the SQL
2549 statement produced, it is used for internal access only. Thus
2550 attempting to use the accessor in an C<order_by> clause or similar
2551 will fail miserably.
2552
2553 To get around this limitation, you can supply literal SQL to your
2554 C<select> attibute that contains the C<AS alias> text, eg:
2555
2556   select => [\'myfield AS alias']
2557
2558 =head2 join
2559
2560 =over 4
2561
2562 =item Value: ($rel_name | \@rel_names | \%rel_names)
2563
2564 =back
2565
2566 Contains a list of relationships that should be joined for this query.  For
2567 example:
2568
2569   # Get CDs by Nine Inch Nails
2570   my $rs = $schema->resultset('CD')->search(
2571     { 'artist.name' => 'Nine Inch Nails' },
2572     { join => 'artist' }
2573   );
2574
2575 Can also contain a hash reference to refer to the other relation's relations.
2576 For example:
2577
2578   package MyApp::Schema::Track;
2579   use base qw/DBIx::Class/;
2580   __PACKAGE__->table('track');
2581   __PACKAGE__->add_columns(qw/trackid cd position title/);
2582   __PACKAGE__->set_primary_key('trackid');
2583   __PACKAGE__->belongs_to(cd => 'MyApp::Schema::CD');
2584   1;
2585
2586   # In your application
2587   my $rs = $schema->resultset('Artist')->search(
2588     { 'track.title' => 'Teardrop' },
2589     {
2590       join     => { cd => 'track' },
2591       order_by => 'artist.name',
2592     }
2593   );
2594
2595 You need to use the relationship (not the table) name in  conditions, 
2596 because they are aliased as such. The current table is aliased as "me", so 
2597 you need to use me.column_name in order to avoid ambiguity. For example:
2598
2599   # Get CDs from 1984 with a 'Foo' track 
2600   my $rs = $schema->resultset('CD')->search(
2601     { 
2602       'me.year' => 1984,
2603       'tracks.name' => 'Foo'
2604     },
2605     { join => 'tracks' }
2606   );
2607   
2608 If the same join is supplied twice, it will be aliased to <rel>_2 (and
2609 similarly for a third time). For e.g.
2610
2611   my $rs = $schema->resultset('Artist')->search({
2612     'cds.title'   => 'Down to Earth',
2613     'cds_2.title' => 'Popular',
2614   }, {
2615     join => [ qw/cds cds/ ],
2616   });
2617
2618 will return a set of all artists that have both a cd with title 'Down
2619 to Earth' and a cd with title 'Popular'.
2620
2621 If you want to fetch related objects from other tables as well, see C<prefetch>
2622 below.
2623
2624 For more help on using joins with search, see L<DBIx::Class::Manual::Joining>.
2625
2626 =head2 prefetch
2627
2628 =over 4
2629
2630 =item Value: ($rel_name | \@rel_names | \%rel_names)
2631
2632 =back
2633
2634 Contains one or more relationships that should be fetched along with
2635 the main query (when they are accessed afterwards the data will
2636 already be available, without extra queries to the database).  This is
2637 useful for when you know you will need the related objects, because it
2638 saves at least one query:
2639
2640   my $rs = $schema->resultset('Tag')->search(
2641     undef,
2642     {
2643       prefetch => {
2644         cd => 'artist'
2645       }
2646     }
2647   );
2648
2649 The initial search results in SQL like the following:
2650
2651   SELECT tag.*, cd.*, artist.* FROM tag
2652   JOIN cd ON tag.cd = cd.cdid
2653   JOIN artist ON cd.artist = artist.artistid
2654
2655 L<DBIx::Class> has no need to go back to the database when we access the
2656 C<cd> or C<artist> relationships, which saves us two SQL statements in this
2657 case.
2658
2659 Simple prefetches will be joined automatically, so there is no need
2660 for a C<join> attribute in the above search. 
2661
2662 C<prefetch> can be used with the following relationship types: C<belongs_to>,
2663 C<has_one> (or if you're using C<add_relationship>, any relationship declared
2664 with an accessor type of 'single' or 'filter'). A more complex example that
2665 prefetches an artists cds, the tracks on those cds, and the tags associted 
2666 with that artist is given below (assuming many-to-many from artists to tags):
2667
2668  my $rs = $schema->resultset('Artist')->search(
2669    undef,
2670    {
2671      prefetch => [
2672        { cds => 'tracks' },
2673        { artist_tags => 'tags' }
2674      ]
2675    }
2676  );
2677  
2678
2679 B<NOTE:> If you specify a C<prefetch> attribute, the C<join> and C<select>
2680 attributes will be ignored.
2681
2682 =head2 page
2683
2684 =over 4
2685
2686 =item Value: $page
2687
2688 =back
2689
2690 Makes the resultset paged and specifies the page to retrieve. Effectively
2691 identical to creating a non-pages resultset and then calling ->page($page)
2692 on it.
2693
2694 If L<rows> attribute is not specified it defualts to 10 rows per page.
2695
2696 =head2 rows
2697
2698 =over 4
2699
2700 =item Value: $rows
2701
2702 =back
2703
2704 Specifes the maximum number of rows for direct retrieval or the number of
2705 rows per page if the page attribute or method is used.
2706
2707 =head2 offset
2708
2709 =over 4
2710
2711 =item Value: $offset
2712
2713 =back
2714
2715 Specifies the (zero-based) row number for the  first row to be returned, or the
2716 of the first row of the first page if paging is used.
2717
2718 =head2 group_by
2719
2720 =over 4
2721
2722 =item Value: \@columns
2723
2724 =back
2725
2726 A arrayref of columns to group by. Can include columns of joined tables.
2727
2728   group_by => [qw/ column1 column2 ... /]
2729
2730 =head2 having
2731
2732 =over 4
2733
2734 =item Value: $condition
2735
2736 =back
2737
2738 HAVING is a select statement attribute that is applied between GROUP BY and
2739 ORDER BY. It is applied to the after the grouping calculations have been
2740 done.
2741
2742   having => { 'count(employee)' => { '>=', 100 } }
2743
2744 =head2 distinct
2745
2746 =over 4
2747
2748 =item Value: (0 | 1)
2749
2750 =back
2751
2752 Set to 1 to group by all columns.
2753
2754 =head2 where
2755
2756 =over 4
2757
2758 Adds to the WHERE clause.
2759
2760   # only return rows WHERE deleted IS NULL for all searches
2761   __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
2762
2763 Can be overridden by passing C<{ where => undef }> as an attribute
2764 to a resulset.
2765
2766 =back
2767
2768 =head2 cache
2769
2770 Set to 1 to cache search results. This prevents extra SQL queries if you
2771 revisit rows in your ResultSet:
2772
2773   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
2774
2775   while( my $artist = $resultset->next ) {
2776     ... do stuff ...
2777   }
2778
2779   $rs->first; # without cache, this would issue a query
2780
2781 By default, searches are not cached.
2782
2783 For more examples of using these attributes, see
2784 L<DBIx::Class::Manual::Cookbook>.
2785
2786 =head2 from
2787
2788 =over 4
2789
2790 =item Value: \@from_clause
2791
2792 =back
2793
2794 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
2795 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
2796 clauses.
2797
2798 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
2799
2800 C<join> will usually do what you need and it is strongly recommended that you
2801 avoid using C<from> unless you cannot achieve the desired result using C<join>.
2802 And we really do mean "cannot", not just tried and failed. Attempting to use
2803 this because you're having problems with C<join> is like trying to use x86
2804 ASM because you've got a syntax error in your C. Trust us on this.
2805
2806 Now, if you're still really, really sure you need to use this (and if you're
2807 not 100% sure, ask the mailing list first), here's an explanation of how this
2808 works.
2809
2810 The syntax is as follows -
2811
2812   [
2813     { <alias1> => <table1> },
2814     [
2815       { <alias2> => <table2>, -join_type => 'inner|left|right' },
2816       [], # nested JOIN (optional)
2817       { <table1.column1> => <table2.column2>, ... (more conditions) },
2818     ],
2819     # More of the above [ ] may follow for additional joins
2820   ]
2821
2822   <table1> <alias1>
2823   JOIN
2824     <table2> <alias2>
2825     [JOIN ...]
2826   ON <table1.column1> = <table2.column2>
2827   <more joins may follow>
2828
2829 An easy way to follow the examples below is to remember the following:
2830
2831     Anything inside "[]" is a JOIN
2832     Anything inside "{}" is a condition for the enclosing JOIN
2833
2834 The following examples utilize a "person" table in a family tree application.
2835 In order to express parent->child relationships, this table is self-joined:
2836
2837     # Person->belongs_to('father' => 'Person');
2838     # Person->belongs_to('mother' => 'Person');
2839
2840 C<from> can be used to nest joins. Here we return all children with a father,
2841 then search against all mothers of those children:
2842
2843   $rs = $schema->resultset('Person')->search(
2844       undef,
2845       {
2846           alias => 'mother', # alias columns in accordance with "from"
2847           from => [
2848               { mother => 'person' },
2849               [
2850                   [
2851                       { child => 'person' },
2852                       [
2853                           { father => 'person' },
2854                           { 'father.person_id' => 'child.father_id' }
2855                       ]
2856                   ],
2857                   { 'mother.person_id' => 'child.mother_id' }
2858               ],
2859           ]
2860       },
2861   );
2862
2863   # Equivalent SQL:
2864   # SELECT mother.* FROM person mother
2865   # JOIN (
2866   #   person child
2867   #   JOIN person father
2868   #   ON ( father.person_id = child.father_id )
2869   # )
2870   # ON ( mother.person_id = child.mother_id )
2871
2872 The type of any join can be controlled manually. To search against only people
2873 with a father in the person table, we could explicitly use C<INNER JOIN>:
2874
2875     $rs = $schema->resultset('Person')->search(
2876         undef,
2877         {
2878             alias => 'child', # alias columns in accordance with "from"
2879             from => [
2880                 { child => 'person' },
2881                 [
2882                     { father => 'person', -join_type => 'inner' },
2883                     { 'father.id' => 'child.father_id' }
2884                 ],
2885             ]
2886         },
2887     );
2888
2889     # Equivalent SQL:
2890     # SELECT child.* FROM person child
2891     # INNER JOIN person father ON child.father_id = father.id
2892
2893 If you need to express really complex joins or you need a subselect, you
2894 can supply literal SQL to C<from> via a scalar reference. In this case
2895 the contents of the scalar will replace the table name asscoiated with the
2896 resultsource.
2897
2898 WARNING: This technique might very well not work as expected on chained
2899 searches - you have been warned.
2900
2901     # Assuming the Event resultsource is defined as:
2902
2903         MySchema::Event->add_columns (
2904             sequence => {
2905                 data_type => 'INT',
2906                 is_auto_increment => 1,
2907             },
2908             location => {
2909                 data_type => 'INT',
2910             },
2911             type => {
2912                 data_type => 'INT',
2913             },
2914         );
2915         MySchema::Event->set_primary_key ('sequence');
2916
2917     # This will get back the latest event for every location. The column
2918     # selector is still provided by DBIC, all we do is add a JOIN/WHERE
2919     # combo to limit the resultset
2920
2921     $rs = $schema->resultset('Event');
2922     $table = $rs->result_source->name;
2923     $latest = $rs->search (
2924         undef,
2925         { from => \ " 
2926             (SELECT e1.* FROM $table e1 
2927                 JOIN $table e2 
2928                     ON e1.location = e2.location 
2929                     AND e1.sequence < e2.sequence 
2930                 WHERE e2.sequence is NULL 
2931             ) me",
2932         },
2933     );
2934
2935     # Equivalent SQL (with the DBIC chunks added):
2936
2937     SELECT me.sequence, me.location, me.type FROM
2938        (SELECT e1.* FROM events e1
2939            JOIN events e2
2940                ON e1.location = e2.location
2941                AND e1.sequence < e2.sequence
2942            WHERE e2.sequence is NULL
2943        ) me;
2944
2945 =head2 for
2946
2947 =over 4
2948
2949 =item Value: ( 'update' | 'shared' )
2950
2951 =back
2952
2953 Set to 'update' for a SELECT ... FOR UPDATE or 'shared' for a SELECT
2954 ... FOR SHARED.
2955
2956 =cut
2957
2958 1;