I hate you all.
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index b694257..64bcd4e 100644 (file)
@@ -9,11 +9,9 @@ use overload
 use Carp::Clan qw/^DBIx::Class/;
 use Data::Page;
 use Storable;
-use Data::Dumper;
-use Scalar::Util qw/weaken/;
-use Data::Dumper;
 use DBIx::Class::ResultSetColumn;
 use base qw/DBIx::Class/;
+
 __PACKAGE__->load_components(qw/AccessorGroup/);
 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
 
@@ -85,9 +83,9 @@ will return a CD object, not a ResultSet.
 sub new {
   my $class = shift;
   return $class->new_result(@_) if ref $class;
-  
+
   my ($source, $attrs) = @_;
-  weaken $source;
+  #weaken $source;
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
@@ -97,17 +95,18 @@ sub new {
 
   $attrs->{alias} ||= 'me';
 
-  bless {
+  my $self = {
     result_source => $source,
     result_class => $attrs->{result_class} || $source->result_class,
     cond => $attrs->{where},
-#    from => $attrs->{from},
-#    collapse => $collapse,
     count => undef,
-    page => delete $attrs->{page},
     pager => undef,
     attrs => $attrs
-  }, $class;
+  };
+
+  bless $self, $class;
+
+  return $self;
 }
 
 =head2 search
@@ -134,6 +133,8 @@ call it as C<search(undef, \%attrs)>.
     columns => [qw/name artistid/],
   });
 
+For a list of attributes that can be passed to C<search>, see L</ATTRIBUTES>. For more examples of using this function, see L<Searching|DBIx::Class::Manual::Cookbook/Searching>.
+
 =cut
 
 sub search {
@@ -152,7 +153,7 @@ sub search {
 
 =back
 
-This method does the same exact thing as search() except it will 
+This method does the same exact thing as search() except it will
 always return a resultset, even in list context.
 
 =cut
@@ -160,62 +161,45 @@ always return a resultset, even in list context.
 sub search_rs {
   my $self = shift;
 
+  my $rows;
+
+  unless (@_) {                 # no search, effectively just a clone
+    $rows = $self->get_cache;
+  }
+
   my $attrs = {};
   $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
-  my $our_attrs = ($attrs->{_parent_attrs})
-    ? { %{$attrs->{_parent_attrs}} }
-      : { %{$self->{attrs}} };
+  my $our_attrs = { %{$self->{attrs}} };
   my $having = delete $our_attrs->{having};
+  my $where = delete $our_attrs->{where};
 
-  # XXX this is getting messy
-  if ($attrs->{_live_join_stack}) {
-    my $live_join = $attrs->{_live_join_stack};
-    foreach (reverse @{$live_join}) {
-      $attrs->{_live_join_h} = (defined $attrs->{_live_join_h}) ? { $_ => $attrs->{_live_join_h} } : $_;
-    }
-  }
+  my $new_attrs = { %{$our_attrs}, %{$attrs} };
 
-  # merge new attrs into old
+  # merge new attrs into inherited
   foreach my $key (qw/join prefetch/) {
-    next unless (exists $attrs->{$key});
-    if ($attrs->{_live_join_stack} || $our_attrs->{_live_join_stack}) {
-      my $live_join = $attrs->{_live_join_stack} ||
-                      $our_attrs->{_live_join_stack};
-      foreach (reverse @{$live_join}) {
-        $attrs->{$key} = { $_ => $attrs->{$key} };
-      }
-    }
-
-    if (exists $our_attrs->{$key}) {
-      $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
-    } else {
-      $our_attrs->{$key} = $attrs->{$key};
-    }
-    delete $attrs->{$key};
+    next unless exists $attrs->{$key};
+    $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
   }
 
-  $our_attrs->{join} = $self->_merge_attr(
-    $our_attrs->{join}, $attrs->{_live_join_h}, 1
-  ) if ($attrs->{_live_join_h});
-
-  if (defined $our_attrs->{prefetch}) {
-    $our_attrs->{join} = $self->_merge_attr(
-      $our_attrs->{join}, $our_attrs->{prefetch}, 1
-    );
-  }
-
-  my $new_attrs = { %{$our_attrs}, %{$attrs} };
-  my $where = (@_
+  my $cond = (@_
     ? (
         (@_ == 1 || ref $_[0] eq "HASH")
-          ? shift
+          ? (
+              (ref $_[0] eq 'HASH')
+                ? (
+                    (keys %{ $_[0] }  > 0)
+                      ? shift
+                      : undef
+                   )
+                :  shift
+             )
           : (
               (@_ % 2)
                 ? $self->throw_exception("Odd number of arguments to search")
                 : {@_}
              )
-       )
-    : undef()
+      )
+    : undef
   );
 
   if (defined $where) {
@@ -230,6 +214,18 @@ sub search_rs {
         : $where);
   }
 
+  if (defined $cond) {
+    $new_attrs->{where} = (
+      defined $new_attrs->{where}
+        ? { '-and' => [
+              map {
+                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
+              } $cond, $new_attrs->{where}
+            ]
+          }
+        : $cond);
+  }
+
   if (defined $having) {
     $new_attrs->{having} = (
       defined $new_attrs->{having}
@@ -243,16 +239,9 @@ sub search_rs {
   }
 
   my $rs = (ref $self)->new($self->result_source, $new_attrs);
-  $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
-       #XXX - hack to pass through parent of related resultsets
-
-  unless (@_) {                 # no search, effectively just a clone
-    my $rows = $self->get_cache;
-    if ($rows) {
-      $rs->set_cache($rows);
-    }
+  if ($rows) {
+    $rs->set_cache($rows);
   }
-  
   return $rs;
 }
 
@@ -318,6 +307,9 @@ If the C<key> is specified as C<primary>, it searches only on the primary key.
 If no C<key> is specified, it searches on all unique constraints defined on the
 source, including the primary key.
 
+If your table does not have a primary key, you B<must> provide a value for the
+C<key> attribute matching one of the unique constraints on the source.
+
 See also L</find_or_create> and L</update_or_create>. For information on how to
 declare unique constraints, see
 L<DBIx::Class::ResultSource/add_unique_constraint>.
@@ -333,7 +325,7 @@ sub find {
     ? $self->result_source->unique_constraint_columns($attrs->{key})
     : $self->result_source->primary_columns;
   $self->throw_exception(
-    "Can't find unless a primary key or unique constraint is defined"
+    "Can't find unless a primary key is defined or unique constraint is specified"
   ) unless @cols;
 
   # Parse out a hashref from input
@@ -351,26 +343,60 @@ sub find {
     $input_query = {@_};
   }
 
+  my (%related, $info);
+
+  foreach my $key (keys %$input_query) {
+    if (ref($input_query->{$key})
+        && ($info = $self->result_source->relationship_info($key))) {
+      my $rel_q = $self->result_source->resolve_condition(
+                    $info->{cond}, delete $input_query->{$key}, $key
+                  );
+      die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
+      @related{keys %$rel_q} = values %$rel_q;
+    }
+  }
+  if (my @keys = keys %related) {
+    @{$input_query}{@keys} = values %related;
+  }
+
   my @unique_queries = $self->_unique_queries($input_query, $attrs);
 
-  # Handle cases where the ResultSet defines the query, or where the user is
-  # abusing find
-  my $query = @unique_queries ? \@unique_queries : $input_query;
+  # Build the final query: Default to the disjunction of the unique queries,
+  # but allow the input query in case the ResultSet defines the query or the
+  # user is abusing find
+  my $alias = exists $attrs->{alias} ? $attrs->{alias} : $self->{attrs}{alias};
+  my $query = @unique_queries
+    ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
+    : $self->_add_alias($input_query, $alias);
 
   # Run the query
   if (keys %$attrs) {
     my $rs = $self->search($query, $attrs);
-    $rs->_resolve;
-    return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
+    return keys %{$rs->_resolved_attrs->{collapse}} ? $rs->next : $rs->single;
   }
   else {
-    $self->_resolve;  
-    return (keys %{$self->{_attrs}->{collapse}})
+    return keys %{$self->_resolved_attrs->{collapse}}
       ? $self->search($query)->next
       : $self->single($query);
   }
 }
 
+# _add_alias
+#
+# Add the specified alias to the specified query hash. A copy is made so the
+# original query is not modified.
+
+sub _add_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %aliased = %$query;
+  foreach my $col (grep { ! m/\./ } keys %aliased) {
+    $aliased{"$alias.$col"} = delete $aliased{$col};
+  }
+
+  return \%aliased;
+}
+
 # _unique_queries
 #
 # Build a list of queries which satisfy unique constraints.
@@ -387,17 +413,16 @@ sub _unique_queries {
     my @unique_cols = $self->result_source->unique_constraint_columns($name);
     my $unique_query = $self->_build_unique_query($query, \@unique_cols);
 
-    next unless scalar keys %$unique_query;
-
-    # Add the ResultSet's alias
-    foreach my $key (grep { ! m/\./ } keys %$unique_query) {
-      my $alias = ($self->{attrs}->{_live_join})
-        ? $self->{attrs}->{_live_join}
-        : $self->{attrs}->{alias};
-      $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
-    }
+    my $num_query = scalar keys %$unique_query;
+    next unless $num_query;
 
-    push @unique_queries, $unique_query;
+    # XXX: Assuming quite a bit about $self->{attrs}{where}
+    my $num_cols = scalar @unique_cols;
+    my $num_where = exists $self->{attrs}{where}
+      ? scalar keys %{ $self->{attrs}{where} }
+      : 0;
+    push @unique_queries, $unique_query
+      if $num_query + $num_where == $num_cols;
   }
 
   return @unique_queries;
@@ -410,19 +435,18 @@ sub _unique_queries {
 sub _build_unique_query {
   my ($self, $query, $unique_cols) = @_;
 
-  my %unique_query =
+  return {
     map  { $_ => $query->{$_} }
     grep { exists $query->{$_} }
-    @$unique_cols;
-
-  return \%unique_query;
+      @$unique_cols
+  };
 }
 
 =head2 search_related
 
 =over 4
 
-=item Arguments: $cond, \%attrs?
+=item Arguments: $rel, $cond, \%attrs?
 
 =item Return Value: $new_resultset
 
@@ -459,8 +483,7 @@ L<DBIx::Class::Cursor> for more information.
 sub cursor {
   my ($self) = @_;
 
-  $self->_resolve;
-  my $attrs = { %{$self->{_attrs}} };
+  my $attrs = { %{$self->_resolved_attrs} };
   return $self->{cursor}
     ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
@@ -489,8 +512,7 @@ method; if you need to add extra joins or similar call ->search and then
 
 sub single {
   my ($self, $where) = @_;
-  $self->_resolve;
-  my $attrs = { %{$self->{_attrs}} };
+  my $attrs = { %{$self->_resolved_attrs} };
   if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
@@ -503,14 +525,15 @@ sub single {
     }
   }
 
-  unless ($self->_is_unique_query($attrs->{where})) {
-    carp "Query not guaranteed to return a single row"
-      . "; please declare your unique constraints or use search instead";
-  }
+#  XXX: Disabled since it doesn't infer uniqueness in all cases
+#  unless ($self->_is_unique_query($attrs->{where})) {
+#    carp "Query not guaranteed to return a single row"
+#      . "; please declare your unique constraints or use search instead";
+#  }
 
   my @data = $self->result_source->storage->select_single(
     $attrs->{from}, $attrs->{select},
-    $attrs->{where},$attrs
+    $attrs->{where}, $attrs
   );
 
   return (@data ? $self->_construct_object(@data) : ());
@@ -525,9 +548,7 @@ sub _is_unique_query {
   my ($self, $query) = @_;
 
   my $collapsed = $self->_collapse_query($query);
-  my $alias = ($self->{attrs}->{_live_join})
-    ? $self->{attrs}->{_live_join}
-    : $self->{attrs}->{alias};
+  my $alias = $self->{attrs}{alias};
 
   foreach my $name ($self->result_source->unique_constraint_names) {
     my @unique_cols = map {
@@ -538,11 +559,9 @@ sub _is_unique_query {
     my %seen = map { $_ => 0 } @unique_cols;
 
     foreach my $key (keys %$collapsed) {
-      my $aliased = $key;
-      $aliased = "$alias.$key" unless $key =~ /\./;
-
+      my $aliased = $key =~ /\./ ? $key : "$alias.$key";
       next unless exists $seen{$aliased};  # Additional constraints are okay
-      $seen{$aliased} = scalar @{ $collapsed->{$key} };
+      $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
     }
 
     # If we get 0 or more than 1 value for a column, it's not necessarily unique
@@ -577,8 +596,9 @@ sub _collapse_query {
     }
     else {
 #      warn "LEAF: " . Dumper $query;
-      foreach my $key (keys %$query) {
-        push @{$collapsed->{$key}}, $query->{$key};
+      foreach my $col (keys %$query) {
+        my $value = $query->{$col};
+        $collapsed->{$col}{$value}++;
       }
     }
   }
@@ -598,7 +618,7 @@ sub _collapse_query {
 
   my $max_length = $rs->get_column('length')->max;
 
-Returns a ResultSetColumn instance for $column based on $self
+Returns a L<DBIx::Class::ResultSetColumn> instance for a column of the ResultSet.
 
 =cut
 
@@ -685,7 +705,7 @@ Can be used to efficiently iterate over records in the resultset:
     print $cd->title;
   }
 
-Note that you need to store the resultset object, and call C<next> on it. 
+Note that you need to store the resultset object, and call C<next> on it.
 Calling C<< resultset('Table')->next >> repeatedly will always return the
 first record from the resultset.
 
@@ -710,200 +730,9 @@ sub next {
   return $self->_construct_object(@row);
 }
 
-sub _resolve {
-  my $self = shift;
-
-  return if(exists $self->{_attrs}); #return if _resolve has already been called
-
-  my $attrs = $self->{attrs};    
-  my $source = ($self->{_parent_rs})
-    ? $self->{_parent_rs}
-    : $self->{result_source};
-
-  # XXX - lose storable dclone
-  my $record_filter = delete $attrs->{record_filter}
-    if (defined $attrs->{record_filter});
-  $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
-  $attrs->{record_filter} = $record_filter if ($record_filter);
-  $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
-
-  my $alias = $attrs->{alias};
-  $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
-  delete $attrs->{as} if $attrs->{columns};
-  $attrs->{columns} ||= [ $self->{result_source}->columns ]
-    unless $attrs->{select};
-  my $select_alias = ($self->{_parent_rs})
-    ? $self->{attrs}->{_live_join}
-    : $alias;
-  $attrs->{select} = [
-    map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
-  ] if $attrs->{columns};
-  $attrs->{as} ||= [
-    map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
-  ];
-  if (my $include = delete $attrs->{include_columns}) {
-    push(@{$attrs->{select}}, @$include);
-    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
-  }
-
-  $attrs->{from} ||= [ { $alias => $source->from } ];
-  $attrs->{seen_join} ||= {};
-  my %seen;
-  if (my $join = delete $attrs->{join}) {
-    foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
-      if (ref $j eq 'HASH') {
-        $seen{$_} = 1 foreach keys %$j;
-      } else {
-        $seen{$j} = 1;
-      }
-    }
-    push(@{$attrs->{from}},
-      $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join})
-    );
-  }
-  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  $attrs->{order_by} = [ $attrs->{order_by} ] if
-      $attrs->{order_by} and !ref($attrs->{order_by});
-  $attrs->{order_by} ||= [];
-
-  if(my $seladds = delete($attrs->{'+select'})) {
-    my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
-    $attrs->{select} = [
-      @{ $attrs->{select} },
-      map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
-    ];
-  }
-  if(my $asadds = delete($attrs->{'+as'})) {
-    my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
-    $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
-  }
-  my $collapse = $attrs->{collapse} || {};
-  if (my $prefetch = delete $attrs->{prefetch}) {
-    my @pre_order;
-    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
-      if ( ref $p eq 'HASH' ) {
-        foreach my $key (keys %$p) {
-          push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-            unless $seen{$key};
-        }
-      } else {
-        push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-          unless $seen{$p};
-      }
-
-      # we're about to resolve_join on the current class, so we need to bring
-      # the joins (which are from the original class) to the right level
-      # XXX the below alg is ridiculous
-      if ($attrs->{_live_join_stack}) {
-      STACK:
-        foreach (@{$attrs->{_live_join_stack}}) {
-          if (ref $p eq 'HASH') {
-            if (exists $p->{$_}) {
-              $p = $p->{$_};
-            } else {
-              $p = undef;
-              last STACK;
-            }
-          } elsif (ref $p eq 'ARRAY') {
-            foreach my $pe (@{$p}) {
-              if ($pe eq $_) {
-                $p = undef;
-                last STACK;
-              }
-              next unless(ref $pe eq 'HASH');
-              next unless(exists $pe->{$_});
-              $p = $pe->{$_};
-              next STACK;
-            }                                           
-            $p = undef;
-            last STACK;
-          } else {
-            $p = undef;
-            last STACK;
-          }
-        }
-      }
-                
-      if ($p) {
-        my @prefetch = $self->result_source->resolve_prefetch(
-          $p, $attrs->{alias}, {}, \@pre_order, $collapse
-        );
-                
-        push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
-        push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
-      }
-    }
-    push(@{$attrs->{order_by}}, @pre_order);
-  }
-  $attrs->{collapse} = $collapse;
-  $self->{_attrs} = $attrs;
-}
-
-sub _merge_attr {
-  my ($self, $a, $b, $is_prefetch) = @_;
-    
-  return $b unless $a;
-  if (ref $b eq 'HASH' && ref $a eq 'HASH') {
-    foreach my $key (keys %{$b}) {
-      if (exists $a->{$key}) {
-        $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
-      } else {
-        $a->{$key} = $b->{$key};
-      }
-    }
-    return $a;
-  } else {
-    $a = [$a] unless (ref $a eq 'ARRAY');
-    $b = [$b] unless (ref $b eq 'ARRAY');
-    
-    my $hash = {};
-    my $array = [];      
-    foreach ($a, $b) {
-      foreach my $element (@{$_}) {
-        if (ref $element eq 'HASH') {
-          $hash = $self->_merge_attr($hash, $element, $is_prefetch);
-        } elsif (ref $element eq 'ARRAY') {
-          $array = [@{$array}, @{$element}];
-        } else {        
-          if (($b == $_) && $is_prefetch) {
-            $self->_merge_array($array, $element, $is_prefetch);
-          } else {
-            push(@{$array}, $element);
-          }
-        }
-      }
-    }
-
-    my $final_array = [];
-    foreach my $element (@{$array}) {
-      push(@{$final_array}, $element) unless (exists $hash->{$element});
-    }
-    $array = $final_array;
-
-    if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
-      return [$hash, @{$array}];
-    } else {    
-      return (keys %{$hash}) ? $hash : $array;
-    }
-  }
-}
-
-sub _merge_array {
-  my ($self, $a, $b) = @_;
-  
-  $b = [$b] unless (ref $b eq 'ARRAY');
-  # add elements from @{$b} to @{$a} which aren't already in @{$a}
-  foreach my $b_element (@{$b}) {
-    push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
-  }
-}
-
 sub _construct_object {
   my ($self, @row) = @_;
-  my @as = @{ $self->{_attrs}{as} };
-  
-  my $info = $self->_collapse_result(\@as, \@row);
+  my $info = $self->_collapse_result($self->{_attrs}{as}, \@row);
   my $new = $self->result_class->inflate_result($self->result_source, @$info);
   $new = $self->{_attrs}{record_filter}->($new)
     if exists $self->{_attrs}{record_filter};
@@ -913,10 +742,9 @@ sub _construct_object {
 sub _collapse_result {
   my ($self, $as, $row, $prefix) = @_;
 
-  my $live_join = $self->{attrs}->{_live_join} ||="";
   my %const;
-
   my @copy = @$row;
+  
   foreach my $this_as (@$as) {
     my $val = shift @copy;
     if (defined $prefix) {
@@ -931,9 +759,10 @@ sub _collapse_result {
     }
   }
 
+  my $alias = $self->{attrs}{alias};
   my $info = [ {}, {} ];
   foreach my $key (keys %const) {
-    if (length $key && $key ne $live_join) {
+    if (length $key && $key ne $alias) {
       my $target = $info;
       my @parts = split(/\./, $key);
       foreach my $p (@parts) {
@@ -944,14 +773,14 @@ sub _collapse_result {
       $info->[0] = $const{$key};
     }
   }
+  
   my @collapse;
-
   if (defined $prefix) {
     @collapse = map {
         m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
-    } keys %{$self->{_attrs}->{collapse}}
+    } keys %{$self->{_attrs}{collapse}}
   } else {
-    @collapse = keys %{$self->{_attrs}->{collapse}};
+    @collapse = keys %{$self->{_attrs}{collapse}};
   };
 
   if (@collapse) {
@@ -961,7 +790,7 @@ sub _collapse_result {
       $target = $target->[1]->{$p} ||= [];
     }
     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
-    my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
+    my @co_key = @{$self->{_attrs}{collapse}{$c_prefix}};
     my $tree = $self->_collapse_result($as, $row, $c_prefix);
     my %co_check = map { ($_, $tree->[0]->{$_}); } @co_key;
     my (@final, @raw);
@@ -978,7 +807,7 @@ sub _collapse_result {
       $row = $self->{stashed_row} = \@raw;
       $tree = $self->_collapse_result($as, $row, $c_prefix);
     }
-    @$target = (@final ? @final : [ {}, {} ]); 
+    @$target = (@final ? @final : [ {}, {} ]);
       # single empty result to indicate an empty prefetched has_many
   }
 
@@ -999,6 +828,20 @@ sub _collapse_result {
 An accessor for the primary ResultSource object from which this ResultSet
 is derived.
 
+=head2 result_class
+
+=over 4
+
+=item Arguments: $result_class?
+
+=item Return Value: $result_class
+
+=back
+
+An accessor for the class to use when creating row objects. Defaults to 
+C<< result_source->result_class >> - which in most cases is the name of the 
+L<"table"|DBIx::Class::Manual::Glossary/"ResultSource"> class.
+
 =cut
 
 
@@ -1040,17 +883,17 @@ sub count {
 sub _count { # Separated out so pager can get the full count
   my $self = shift;
   my $select = { count => '*' };
-  
-  $self->_resolve;
-  my $attrs = { %{ $self->{_attrs} } };
+
+  my $attrs = { %{$self->_resolved_attrs} };
   if (my $group_by = delete $attrs->{group_by}) {
     delete $attrs->{having};
     my @distinct = (ref $group_by ?  @$group_by : ($group_by));
     # todo: try CONCAT for multi-column pk
     my @pk = $self->result_source->primary_columns;
     if (@pk == 1) {
+      my $alias = $attrs->{alias};
       foreach my $column (@distinct) {
-        if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
+        if ($column =~ qr/^(?:\Q${alias}.\E)?$pk[0]$/) {
           @distinct = ($column);
           last;
         }
@@ -1065,10 +908,8 @@ sub _count { # Separated out so pager can get the full count
 
   # offset, order by and page are not needed to count. record_filter is cdbi
   delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
-        my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
-        $tmp_rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs});
-             #XXX - hack to pass through parent of related resultsets
 
+  my $tmp_rs = (ref $self)->new($self->result_source, $attrs);
   my ($count) = $tmp_rs->cursor->next;
   return $count;
 }
@@ -1112,9 +953,8 @@ sub all {
   my @obj;
 
   # TODO: don't call resolve here
-  $self->_resolve;
-  if (keys %{$self->{_attrs}->{collapse}}) {
-#  if ($self->{attrs}->{prefetch}) {
+  if (keys %{$self->_resolved_attrs->{collapse}}) {
+#  if ($self->{attrs}{prefetch}) {
       # Using $self->cursor->all is really just an optimisation.
       # If we're collapsing has_many prefetches it probably makes
       # very little difference, and this is cleaner than hacking
@@ -1150,8 +990,7 @@ Resets the resultset's cursor, so you can iterate through the elements again.
 
 sub reset {
   my ($self) = @_;
-  delete $self->{_attrs} if (exists $self->{_attrs});
-
+  delete $self->{_attrs} if exists $self->{_attrs};
   $self->{all_cache_position} = 0;
   $self->cursor->reset;
   return $self;
@@ -1183,13 +1022,14 @@ sub first {
 # appropriately, returning the new condition.
 
 sub _cond_for_update_delete {
-  my ($self) = @_;
+  my ($self, $full_cond) = @_;
   my $cond = {};
 
-  if (!ref($self->{cond})) {
-    # No-op. No condition, we're updating/deleting everything
-  }
-  elsif (ref $self->{cond} eq 'ARRAY') {
+  $full_cond ||= $self->{cond};
+  # No-op. No condition, we're updating/deleting everything
+  return $cond unless ref $full_cond;
+
+  if (ref $full_cond eq 'ARRAY') {
     $cond = [
       map {
         my %hash;
@@ -1198,36 +1038,33 @@ sub _cond_for_update_delete {
           $hash{$1} = $_->{$key};
         }
         \%hash;
-      } @{$self->{cond}}
+      } @{$full_cond}
     ];
   }
-  elsif (ref $self->{cond} eq 'HASH') {
-    if ((keys %{$self->{cond}})[0] eq '-and') {
+  elsif (ref $full_cond eq 'HASH') {
+    if ((keys %{$full_cond})[0] eq '-and') {
       $cond->{-and} = [];
 
-      my @cond = @{$self->{cond}{-and}};
-      for (my $i = 0; $i <= @cond - 1; $i++) {
+      my @cond = @{$full_cond->{-and}};
+      for (my $i = 0; $i < @cond; $i++) {
         my $entry = $cond[$i];
 
-        my %hash;
+        my $hash;
         if (ref $entry eq 'HASH') {
-          foreach my $key (keys %{$entry}) {
-            $key =~ /([^.]+)$/;
-            $hash{$1} = $entry->{$key};
-          }
+          $hash = $self->_cond_for_update_delete($entry);
         }
         else {
           $entry =~ /([^.]+)$/;
-          $hash{$1} = $cond[++$i];
+          $hash->{$1} = $cond[++$i];
         }
 
-        push @{$cond->{-and}}, \%hash;
+        push @{$cond->{-and}}, $hash;
       }
     }
     else {
-      foreach my $key (keys %{$self->{cond}}) {
+      foreach my $key (keys %{$full_cond}) {
         $key =~ /([^.]+)$/;
-        $cond->{$1} = $self->{cond}{$key};
+        $cond->{$1} = $full_cond->{$key};
       }
     }
   }
@@ -1306,13 +1143,12 @@ sub update_all {
 
 Deletes the contents of the resultset from its result source. Note that this
 will not run DBIC cascade triggers. See L</delete_all> if you need triggers
-to run.
+to run. See also L<DBIx::Class::Row/delete>.
 
 =cut
 
 sub delete {
   my ($self) = @_;
-  my $del = {};
 
   my $cond = $self->_cond_for_update_delete;
 
@@ -1360,10 +1196,10 @@ sub pager {
   my ($self) = @_;
   my $attrs = $self->{attrs};
   $self->throw_exception("Can't create pager for non-paged rs")
-    unless $self->{page};
+    unless $self->{attrs}{page};
   $attrs->{rows} ||= 10;
   return $self->{pager} ||= Data::Page->new(
-    $self->_count, $attrs->{rows}, $self->{page});
+    $self->_count, $attrs->{rows}, $self->{attrs}{page});
 }
 
 =head2 page
@@ -1384,9 +1220,7 @@ attribute set on the resultset (10 by default).
 
 sub page {
   my ($self, $page) = @_;
-  my $attrs = { %{$self->{attrs}} };
-  $attrs->{page} = $page;
-  return (ref $self)->new($self->result_source, $attrs);
+  return (ref $self)->new($self->result_source, { %{$self->{attrs}}, page => $page });
 }
 
 =head2 new_result
@@ -1410,16 +1244,77 @@ sub new_result {
   $self->throw_exception(
     "Can't abstract implicit construct, condition not a hash"
   ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
-  my %new = %$values;
+
   my $alias = $self->{attrs}{alias};
-  foreach my $key (keys %{$self->{cond}||{}}) {
-    $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
-  }
+  my $collapsed_cond = $self->{cond} ? $self->_collapse_cond($self->{cond}) : {};
+  my %new = (
+    %{ $self->_remove_alias($values, $alias) },
+    %{ $self->_remove_alias($collapsed_cond, $alias) },
+    -result_source => $self->result_source,
+  );
+
   my $obj = $self->result_class->new(\%new);
-  $obj->result_source($self->result_source) if $obj->can('result_source');
   return $obj;
 }
 
+# _collapse_cond
+#
+# Recursively collapse the condition.
+
+sub _collapse_cond {
+  my ($self, $cond, $collapsed) = @_;
+
+  $collapsed ||= {};
+
+  if (ref $cond eq 'ARRAY') {
+    foreach my $subcond (@$cond) {
+      next unless ref $subcond;  # -or
+#      warn "ARRAY: " . Dumper $subcond;
+      $collapsed = $self->_collapse_cond($subcond, $collapsed);
+    }
+  }
+  elsif (ref $cond eq 'HASH') {
+    if (keys %$cond and (keys %$cond)[0] eq '-and') {
+      foreach my $subcond (@{$cond->{-and}}) {
+#        warn "HASH: " . Dumper $subcond;
+        $collapsed = $self->_collapse_cond($subcond, $collapsed);
+      }
+    }
+    else {
+#      warn "LEAF: " . Dumper $cond;
+      foreach my $col (keys %$cond) {
+        my $value = $cond->{$col};
+        $collapsed->{$col} = $value;
+      }
+    }
+  }
+
+  return $collapsed;
+}
+
+# _remove_alias
+#
+# Remove the specified alias from the specified query hash. A copy is made so
+# the original query is not modified.
+
+sub _remove_alias {
+  my ($self, $query, $alias) = @_;
+
+  my %orig = %{ $query || {} };
+  my %unaliased;
+
+  foreach my $key (keys %orig) {
+    if ($key !~ /\./) {
+      $unaliased{$key} = $orig{$key};
+      next;
+    }
+    $unaliased{$1} = $orig{$key}
+      if $key =~ m/^(?:\Q$alias\E\.)?([^.]+)$/;
+  }
+
+  return \%unaliased;
+}
+
 =head2 find_or_new
 
 =over 4
@@ -1560,7 +1455,7 @@ sub update_or_create {
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $cond = ref $_[0] eq 'HASH' ? shift : {@_};
 
-  my $row = $self->find($cond);
+  my $row = $self->find($cond, $attrs);
   if (defined $row) {
     $row->update($cond);
     return $row;
@@ -1646,41 +1541,193 @@ Returns a related resultset for the supplied relationship name.
 =cut
 
 sub related_resultset {
-  my ( $self, $rel ) = @_;
-  
+  my ($self, $rel) = @_;
+
   $self->{related_resultsets} ||= {};
   return $self->{related_resultsets}{$rel} ||= do {
-    #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
     my $rel_obj = $self->result_source->relationship_info($rel);
-                #print Dumper($self->result_source->_relationships);
+
     $self->throw_exception(
       "search_related: result source '" . $self->result_source->name .
-        "' has no such relationship ${rel}")
-      unless $rel_obj; #die Dumper $self->{attrs};
+        "' has no such relationship $rel")
+      unless $rel_obj;
+    
+    my ($from,$seen) = $self->_resolve_from($rel);
 
-    my @live_join_stack = (
-      exists $self->{attrs}->{_live_join_stack})
-      ? @{$self->{attrs}->{_live_join_stack}}
-      : ();             
+    my $join_count = $seen->{$rel};
+    my $alias = ($join_count > 1 ? join('_', $rel, $join_count) : $rel);
 
-    push(@live_join_stack, $rel);
-                
-    my $rs = $self->result_source->schema->resultset($rel_obj->{class})->search(
+    $self->result_source->schema->resultset($rel_obj->{class})->search_rs(
       undef, {
+        %{$self->{attrs}||{}},
+        join => undef,
+        prefetch => undef,
         select => undef,
         as => undef,
-        _live_join => $rel, #the most recent
-        _live_join_stack => \@live_join_stack, #the trail of rels
-        _parent_attrs => $self->{attrs}}
-    );    
+        alias => $alias,
+        where => $self->{cond},
+        seen_join => $seen,
+        from => $from,
+    });
+  };
+}
 
-    # keep reference of the original resultset
-    $rs->{_parent_rs} = ($self->{_parent_rs})
-      ? $self->{_parent_rs}
-      : $self->result_source;
+sub _resolve_from {
+  my ($self, $extra_join) = @_;
+  my $source = $self->result_source;
+  my $attrs = $self->{attrs};
+  
+  my $from = $attrs->{from}
+    || [ { $attrs->{alias} => $source->from } ];
+    
+  my $seen = { %{$attrs->{seen_join}||{}} };
+
+  my $join = ($attrs->{join}
+               ? [ $attrs->{join}, $extra_join ]
+               : $extra_join);
+  $from = [
+    @$from,
+    ($join ? $source->resolve_join($join, $attrs->{alias}, $seen) : ()),
+  ];
 
-    return $rs;
-  };
+  return ($from,$seen);
+}
+
+sub _resolved_attrs {
+  my $self = shift;
+  return $self->{_attrs} if $self->{_attrs};
+
+  my $attrs = { %{$self->{attrs}||{}} };
+  my $source = $self->{result_source};
+  my $alias = $attrs->{alias};
+
+  $attrs->{columns} ||= delete $attrs->{cols} if exists $attrs->{cols};
+  if ($attrs->{columns}) {
+    delete $attrs->{as};
+  } elsif (!$attrs->{select}) {
+    $attrs->{columns} = [ $source->columns ];
+  }
+  $attrs->{select} = 
+    ($attrs->{select}
+      ? (ref $attrs->{select} eq 'ARRAY'
+          ? [ @{$attrs->{select}} ]
+          : [ $attrs->{select} ])
+      : [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
+    );
+  $attrs->{as} =
+    ($attrs->{as}
+      ? (ref $attrs->{as} eq 'ARRAY'
+          ? [ @{$attrs->{as}} ]
+          : [ $attrs->{as} ])
+      : [ map { m/^\Q${alias}.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ]
+    );
+  
+  my $adds;
+  if ($adds = delete $attrs->{include_columns}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}}, @$adds);
+    push(@{$attrs->{as}}, map { m/([^.]+)$/; $1 } @$adds);
+  }
+  if ($adds = delete $attrs->{'+select'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{select}},
+           map { /\./ || ref $_ ? $_ : "${alias}.$_" } @$adds);
+  }
+  if (my $adds = delete $attrs->{'+as'}) {
+    $adds = [$adds] unless ref $adds eq 'ARRAY';
+    push(@{$attrs->{as}}, @$adds);
+  }
+
+  $attrs->{from} ||= [ { 'me' => $source->from } ];
+
+  if (exists $attrs->{join} || exists $attrs->{prefetch}) {
+    my $join = delete $attrs->{join} || {};
+
+    if (defined $attrs->{prefetch}) {
+      $join = $self->_merge_attr(
+        $join, $attrs->{prefetch}
+      );
+    }
+
+    $attrs->{from} =   # have to copy here to avoid corrupting the original
+      [
+        @{$attrs->{from}}, 
+        $source->resolve_join($join, $alias, { %{$attrs->{seen_join}||{}} })
+      ];
+  }
+
+  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
+  if ($attrs->{order_by}) {
+    $attrs->{order_by} = (ref($attrs->{order_by}) eq 'ARRAY'
+                           ? [ @{$attrs->{order_by}} ]
+                           : [ $attrs->{order_by} ]);
+  } else {
+    $attrs->{order_by} = [];    
+  }
+
+  my $collapse = $attrs->{collapse} || {};
+  if (my $prefetch = delete $attrs->{prefetch}) {
+    $prefetch = $self->_merge_attr({}, $prefetch);
+    my @pre_order;
+    my $seen = $attrs->{seen_join} || {};
+    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
+      # bring joins back to level of current class
+      my @prefetch = $source->resolve_prefetch(
+        $p, $alias, $seen, \@pre_order, $collapse
+      );
+      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
+      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+    }
+    push(@{$attrs->{order_by}}, @pre_order);
+  }
+  $attrs->{collapse} = $collapse;
+
+  return $self->{_attrs} = $attrs;
+}
+
+sub _merge_attr {
+  my ($self, $a, $b) = @_;
+  return $b unless defined($a);
+  return $a unless defined($b);
+  
+  if (ref $b eq 'HASH' && ref $a eq 'HASH') {
+    foreach my $key (keys %{$b}) {
+      if (exists $a->{$key}) {
+        $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key});
+      } else {
+        $a->{$key} = $b->{$key};
+      }
+    }
+    return $a;
+  } else {
+    $a = [$a] unless ref $a eq 'ARRAY';
+    $b = [$b] unless ref $b eq 'ARRAY';
+
+    my $hash = {};
+    my @array;
+    foreach my $x ($a, $b) {
+      foreach my $element (@{$x}) {
+        if (ref $element eq 'HASH') {
+          $hash = $self->_merge_attr($hash, $element);
+        } elsif (ref $element eq 'ARRAY') {
+          push(@array, @{$element});
+        } else {
+          push(@array, $element) unless $b == $x
+            && grep { $_ eq $element } @array;
+        }
+      }
+    }
+    
+    @array = grep { !exists $hash->{$_} } @array;
+
+    return keys %{$hash}
+      ? ( scalar(@array)
+            ? [$hash, @array]
+            : $hash
+        )
+      : \@array;
+  }
 }
 
 =head2 throw_exception
@@ -1713,7 +1760,7 @@ Which column(s) to order the results by. This is currently passed
 through directly to SQL, so you can give e.g. C<year DESC> for a
 descending order on the column `year'.
 
-Please note that if you have quoting enabled (see 
+Please note that if you have quoting enabled (see
 L<DBIx::Class::Storage/quote_char>) you will need to do C<\'year DESC' > to
 specify an order. (The scalar ref causes it to be passed as raw sql to the DB,
 so you will need to manually quote things as appropriate.)
@@ -1826,9 +1873,15 @@ use C<get_column> instead:
 You can create your own accessors if required - see
 L<DBIx::Class::Manual::Cookbook> for details.
 
-Please note: This will NOT insert an C<AS employee_count> into the SQL statement
-produced, it is used for internal access only. Thus attempting to use the accessor
-in an C<order_by> clause or similar will fail misrably.
+Please note: This will NOT insert an C<AS employee_count> into the SQL
+statement produced, it is used for internal access only. Thus
+attempting to use the accessor in an C<order_by> clause or similar
+will fail miserably.
+
+To get around this limitation, you can supply literal SQL to your
+C<select> attibute that contains the C<AS alias> text, eg:
+
+  select => [\'myfield AS alias']
 
 =head2 join
 
@@ -1867,6 +1920,19 @@ For example:
     }
   );
 
+You need to use the relationship (not the table) name in  conditions, 
+because they are aliased as such. The current table is aliased as "me", so 
+you need to use me.column_name in order to avoid ambiguity. For example:
+
+  # Get CDs from 1984 with a 'Foo' track 
+  my $rs = $schema->resultset('CD')->search(
+    { 
+      'me.year' => 1984,
+      'tracks.name' => 'Foo'
+    },
+    { join => 'tracks' }
+  );
+  
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.
 
@@ -1934,7 +2000,7 @@ with an accessor type of 'single' or 'filter').
 
 Makes the resultset paged and specifies the page to retrieve. Effectively
 identical to creating a non-pages resultset and then calling ->page($page)
-on it. 
+on it.
 
 If L<rows> attribute is not specified it defualts to 10 rows per page.
 
@@ -1982,7 +2048,7 @@ A arrayref of columns to group by. Can include columns of joined tables.
 
 HAVING is a select statement attribute that is applied between GROUP BY and
 ORDER BY. It is applied to the after the grouping calculations have been
-done. 
+done.
 
   having => { 'count(employee)' => { '>=', 100 } }
 
@@ -1996,13 +2062,27 @@ done.
 
 Set to 1 to group by all columns.
 
+=head2 where
+
+=over 4
+
+Adds to the WHERE clause.
+
+  # only return rows WHERE deleted IS NULL for all searches
+  __PACKAGE__->resultset_attributes({ where => { deleted => undef } }); )
+
+Can be overridden by passing C<{ where => undef }> as an attribute
+to a resulset.
+
+=back
+
 =head2 cache
 
 Set to 1 to cache search results. This prevents extra SQL queries if you
 revisit rows in your ResultSet:
 
   my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
-  
+
   while( my $artist = $resultset->next ) {
     ... do stuff ...
   }