Add support for +select and +as attributes to ResultSet
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index 328b706..84162f9 100644 (file)
@@ -8,7 +8,10 @@ use overload
         fallback => 1;
 use Data::Page;
 use Storable;
+use Data::Dumper;
+use Scalar::Util qw/weaken/;
 
+use DBIx::Class::ResultSetColumn;
 use base qw/DBIx::Class/;
 __PACKAGE__->load_components(qw/AccessorGroup/);
 __PACKAGE__->mk_group_accessors('simple' => qw/result_source result_class/);
@@ -20,7 +23,7 @@ DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
 =head1 SYNOPSIS
 
   my $rs   = $schema->resultset('User')->search(registered => 1);
-  my @rows = $schema->resultset('Foo')->search(bar => 'baz');
+  my @rows = $schema->resultset('CD')->search(year => 2005);
 
 =head1 DESCRIPTION
 
@@ -52,18 +55,30 @@ In the examples below, the following table classes are used:
 
 =head2 new
 
-=head3 Arguments: ($source, \%$attrs)
+=over 4
+
+=item Arguments: $source, \%$attrs
+
+=item Return Value: $rs
+
+=back
 
 The resultset constructor. Takes a source object (usually a
-L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
-below).  Does not perform any queries -- these are executed as needed by the
-other methods.
+L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see
+L</ATTRIBUTES> below).  Does not perform any queries -- these are
+executed as needed by the other methods.
 
 Generally you won't need to construct a resultset manually.  You'll
 automatically get one from e.g. a L</search> called in scalar context:
 
   my $rs = $schema->resultset('CD')->search({ title => '100th Window' });
 
+IMPORTANT: If called on an object, proxies to new_result instead so
+
+  my $cd = $schema->resultset('CD')->new({ title => 'Spoon' });
+
+will return a CD object, not a ResultSet.
+
 =cut
 
 sub new {
@@ -71,62 +86,7 @@ sub new {
   return $class->new_result(@_) if ref $class;
   
   my ($source, $attrs) = @_;
-  #use Data::Dumper; warn Dumper($attrs);
-  $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
-  my $alias = ($attrs->{alias} ||= 'me');
-  
-  $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
-  delete $attrs->{as} if $attrs->{columns};
-  $attrs->{columns} ||= [ $source->columns ] unless $attrs->{select};
-  $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @{delete $attrs->{columns}} ]
-    if $attrs->{columns};
-  $attrs->{as} ||= [ map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}} ];
-  if (my $include = delete $attrs->{include_columns}) {
-    push(@{$attrs->{select}}, @$include);
-    push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
-  }
-  #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
-
-  $attrs->{from} ||= [ { $alias => $source->from } ];
-  $attrs->{seen_join} ||= {};
-  my %seen;
-  if (my $join = delete $attrs->{join}) {
-    foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
-      if (ref $j eq 'HASH') {
-        $seen{$_} = 1 foreach keys %$j;
-      } else {
-        $seen{$j} = 1;
-      }
-    }
-    push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
-  }
-  
-  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  $attrs->{order_by} = [ $attrs->{order_by} ] if $attrs->{order_by} and !ref($attrs->{order_by});
-  $attrs->{order_by} ||= [];
-
-  my $collapse = $attrs->{collapse} || {};
-  if (my $prefetch = delete $attrs->{prefetch}) {
-    my @pre_order;
-    foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
-      if ( ref $p eq 'HASH' ) {
-        foreach my $key (keys %$p) {
-          push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-            unless $seen{$key};
-        }
-      } else {
-        push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
-            unless $seen{$p};
-      }
-      my @prefetch = $source->resolve_prefetch(
-           $p, $attrs->{alias}, {}, \@pre_order, $collapse);
-      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
-      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
-    }
-    push(@{$attrs->{order_by}}, @pre_order);
-  }
-  $attrs->{collapse} = $collapse;
-#  use Data::Dumper; warn Dumper($collapse) if keys %{$collapse};
+  weaken $source;
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
@@ -134,12 +94,14 @@ sub new {
     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
   }
 
+  $attrs->{alias} ||= 'me';
+
   bless {
     result_source => $source,
     result_class => $attrs->{result_class} || $source->result_class,
     cond => $attrs->{where},
-    from => $attrs->{from},
-    collapse => $collapse,
+#    from => $attrs->{from},
+#    collapse => $collapse,
     count => undef,
     page => delete $attrs->{page},
     pager => undef,
@@ -149,71 +111,129 @@ sub new {
 
 =head2 search
 
-  my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
-  my $new_rs = $rs->search({ foo => 3 });
+=over 4
+
+=item Arguments: $cond, \%attrs?
+
+=item Return Value: $resultset (scalar context), @row_objs (list context)
+
+=back
+
+  my @cds    = $cd_rs->search({ year => 2001 }); # "... WHERE year = 2001"
+  my $new_rs = $cd_rs->search({ year => 2005 });
+
+  my $new_rs = $cd_rs->search([ { year => 2005 }, { year => 2004 } ]);
+                 # year = 2005 OR year = 2004
 
 If you need to pass in additional attributes but no additional condition,
-call it as C<search(undef, \%attrs);>.
+call it as C<search(undef, \%attrs)>.
 
-  # "SELECT foo, bar FROM $class_table"
-  my @all = $class->search(undef, { columns => [qw/foo bar/] });
+  # "SELECT name, artistid FROM $artist_table"
+  my @all_artists = $schema->resultset('Artist')->search(undef, {
+    columns => [qw/name artistid/],
+  });
 
 =cut
 
 sub search {
   my $self = shift;
+  my $rs = $self->search_rs( @_ );
+  return (wantarray ? $rs->all : $rs);
+}
 
-  my $rs;
-  if( @_ ) {
-    
-    my $attrs = { %{$self->{attrs}} };
-    my $having = delete $attrs->{having};
-    if (@_ > 1 && ref $_[$#_] eq 'HASH') {
-     $attrs = { %$attrs, %{ pop(@_) } };
-    }
+=head2 search_rs
 
-    my $where = (@_
-                  ? ((@_ == 1 || ref $_[0] eq "HASH")
-                      ? shift
-                      : ((@_ % 2)
-                          ? $self->throw_exception(
-                              "Odd number of arguments to search")
-                          : {@_}))
-                  : undef());
-    if (defined $where) {
-      $where = (defined $attrs->{where}
-                ? { '-and' =>
-                    [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
-                        $where, $attrs->{where} ] }
-                : $where);
-      $attrs->{where} = $where;
-    }
+=over 4
+
+=item Arguments: $cond, \%attrs?
 
-    if (defined $having) {
-      $having = (defined $attrs->{having}
-                ? { '-and' =>
-                    [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
-                        $having, $attrs->{having} ] }
-                : $having);
-      $attrs->{having} = $having;
+=item Return Value: $resultset
+
+=back
+
+This method does the same exact thing as search() except it will 
+always return a resultset, even in list context.
+
+=cut
+
+sub search_rs {
+  my $self = shift;
+
+  my $our_attrs = { %{$self->{attrs}} };
+  my $having = delete $our_attrs->{having};
+  my $attrs = {};
+  $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
+  
+  # merge new attrs into old
+  foreach my $key (qw/join prefetch/) {
+    next unless (exists $attrs->{$key});
+    if (exists $our_attrs->{$key}) {
+      $our_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
+    } else {
+      $our_attrs->{$key} = $attrs->{$key};
     }
+    delete $attrs->{$key};
+  }
 
-    $rs = (ref $self)->new($self->result_source, $attrs);
+  if (exists $our_attrs->{prefetch}) {
+      $our_attrs->{join} = $self->_merge_attr($our_attrs->{join}, $our_attrs->{prefetch}, 1);
   }
-  else {
-    $rs = $self;
-    $rs->reset();
+
+  my $new_attrs = { %{$our_attrs}, %{$attrs} };
+
+  # merge new where and having into old
+  my $where = (@_
+                ? ((@_ == 1 || ref $_[0] eq "HASH")
+                    ? shift
+                    : ((@_ % 2)
+                        ? $self->throw_exception(
+                            "Odd number of arguments to search")
+                        : {@_}))
+                : undef());
+  if (defined $where) {
+    $new_attrs->{where} = (defined $new_attrs->{where}
+              ? { '-and' =>
+                  [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+                      $where, $new_attrs->{where} ] }
+              : $where);
   }
-  return (wantarray ? $rs->all : $rs);
+
+  if (defined $having) {
+    $new_attrs->{having} = (defined $new_attrs->{having}
+              ? { '-and' =>
+                  [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+                      $having, $new_attrs->{having} ] }
+              : $having);
+  }
+
+  my $rs = (ref $self)->new($self->result_source, $new_attrs);
+  $rs->{_parent_rs} = $self->{_parent_rs} if ($self->{_parent_rs}); #XXX - hack to pass through parent of related resultsets
+
+  unless (@_) { # no search, effectively just a clone
+    my $rows = $self->get_cache;
+    if ($rows) {
+      $rs->set_cache($rows);
+    }
+  }
+  
+  return $rs;
 }
 
 =head2 search_literal
 
-  my @obj    = $rs->search_literal($literal_where_cond, @bind);
-  my $new_rs = $rs->search_literal($literal_where_cond, @bind);
+=over 4
+
+=item Arguments: $sql_fragment, @bind_values
+
+=item Return Value: $resultset (scalar context), @row_objs (list context)
+
+=back
+
+  my @cds   = $cd_rs->search_literal('year = ? AND title = ?', qw/2001 Reload/);
+  my $newrs = $artist_rs->search_literal('name = ?', 'Metallica');
 
 Pass a literal chunk of SQL to be added to the conditional part of the
-resultset.
+resultset query.
 
 =cut
 
@@ -226,14 +246,25 @@ sub search_literal {
 
 =head2 find
 
-=head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
+=over 4
+
+=item Arguments: @values | \%cols, \%attrs?
+
+=item Return Value: $row_object
 
-Finds a row based on its primary key or unique constraint. For example:
+=back
+
+Finds a row based on its primary key or unique constraint. For example, to find
+a row by its primary key:
 
   my $cd = $schema->resultset('CD')->find(5);
 
-Also takes an optional C<key> attribute, to search by a specific key or unique
-constraint. For example:
+You can also find a row by a specific unique constraint using the C<key>
+attribute. For example:
+
+  my $cd = $schema->resultset('CD')->find('Massive Attack', 'Mezzanine', { key => 'artist_title' });
+
+Additionally, you can specify the columns explicitly by name:
 
   my $cd = $schema->resultset('CD')->find(
     {
@@ -243,54 +274,115 @@ constraint. For example:
     { key => 'artist_title' }
   );
 
-See also L</find_or_create> and L</update_or_create>.
+If no C<key> is specified and you explicitly name columns, it searches on all
+unique constraints defined on the source, including the primary key.
+
+If the C<key> is specified as C<primary>, it searches only on the primary key.
+
+See also L</find_or_create> and L</update_or_create>. For information on how to
+declare unique constraints, see
+L<DBIx::Class::ResultSource/add_unique_constraint>.
 
 =cut
 
 sub find {
-  my ($self, @vals) = @_;
-  my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
-
-  my @cols = $self->result_source->primary_columns;
-  if (exists $attrs->{key}) {
-    my %uniq = $self->result_source->unique_constraints;
-    $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
-      unless exists $uniq{$attrs->{key}};
-    @cols = @{ $uniq{$attrs->{key}} };
+  my $self = shift;
+  my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
+
+  # Parse out a hash from input
+  my @cols = exists $attrs->{key}
+    ? $self->result_source->unique_constraint_columns($attrs->{key})
+    : $self->result_source->primary_columns;
+
+  my $hash;
+  if (ref $_[0] eq 'HASH') {
+    $hash = { %{$_[0]} };
   }
-  #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
-  $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
-    unless @cols;
-
-  my $query;
-  if (ref $vals[0] eq 'HASH') {
-    $query = { %{$vals[0]} };
-  } elsif (@cols == @vals) {
-    $query = {};
-    @{$query}{@cols} = @vals;
-  } else {
-    $query = {@vals};
+  elsif (@_ == @cols) {
+    $hash = {};
+    @{$hash}{@cols} = @_;
   }
-  foreach (keys %$query) {
-    next if m/\./;
-    $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
+  elsif (@_) {
+    # For backwards compatibility
+    $hash = {@_};
   }
-  #warn Dumper($query);
-  
+  else {
+    $self->throw_exception(
+      "Arguments to find must be a hashref or match the number of columns in the "
+        . (exists $attrs->{key} ? "$attrs->{key} unique constraint" : "primary key")
+    );
+  }
+
+  # Check the hash we just parsed against our source's unique constraints
+  my @constraint_names = exists $attrs->{key}
+    ? ($attrs->{key})
+    : $self->result_source->unique_constraint_names;
+  $self->throw_exception(
+    "Can't find unless a primary key or unique constraint is defined"
+  ) unless @constraint_names;
+
+  my @unique_queries;
+  foreach my $name (@constraint_names) {
+    my @unique_cols = $self->result_source->unique_constraint_columns($name);
+    my $unique_query = $self->_build_unique_query($hash, \@unique_cols);
+
+    # Add the ResultSet's alias
+    foreach my $key (grep { ! m/\./ } keys %$unique_query) {
+      my $alias = $self->{attrs}->{alias};
+      $unique_query->{"$alias.$key"} = delete $unique_query->{$key};
+    }
+
+    push @unique_queries, $unique_query if %$unique_query;
+  }
+
+  # Handle cases where the ResultSet already defines the query
+  my $query = @unique_queries ? \@unique_queries : undef;
+
+  # Run the query
   if (keys %$attrs) {
-      my $rs = $self->search($query,$attrs);
-      return keys %{$rs->{collapse}} ? $rs->next : $rs->single;
-  } else {
-      return keys %{$self->{collapse}} ? $self->search($query)->next : $self->single($query);
+    my $rs = $self->search($query, $attrs);
+    $rs->_resolve;
+    return keys %{$rs->{_attrs}->{collapse}} ? $rs->next : $rs->single;
+  }
+  else {
+    $self->_resolve;  
+    return (keys %{$self->{_attrs}->{collapse}})
+      ? $self->search($query)->next
+      : $self->single($query);
   }
 }
 
+# _build_unique_query
+#
+# Constrain the specified query hash based on the specified column names.
+
+sub _build_unique_query {
+  my ($self, $query, $unique_cols) = @_;
+
+  my %unique_query =
+    map  { $_ => $query->{$_} }
+    grep { exists $query->{$_} }
+    @$unique_cols;
+
+  return \%unique_query;
+}
+
 =head2 search_related
 
-  $rs->search_related('relname', $cond?, $attrs?);
+=over 4
+
+=item Arguments: $cond, \%attrs?
+
+=item Return Value: $new_resultset
+
+=back
+
+  $new_rs = $cd_rs->search_related('artist', {
+    name => 'Emo-R-Us',
+  });
 
-Search the specified relationship. Optionally specify a condition for matching
-records.
+Searches the specified relationship, optionally specifying a condition and
+attributes for matching records. See L</ATTRIBUTES> for more information.
 
 =cut
 
@@ -300,51 +392,110 @@ sub search_related {
 
 =head2 cursor
 
-Returns a storage-driven cursor to the given resultset.
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $cursor
+
+=back
+
+Returns a storage-driven cursor to the given resultset. See
+L<DBIx::Class::Cursor> for more information.
 
 =cut
 
 sub cursor {
   my ($self) = @_;
-  my ($attrs) = $self->{attrs};
-  $attrs = { %$attrs };
+
+  $self->_resolve;
+  my $attrs = { %{$self->{_attrs}} };
   return $self->{cursor}
-    ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
+    ||= $self->result_source->storage->select($attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
 }
 
 =head2 single
 
-Inflates the first result without creating a cursor
+=over 4
+
+=item Arguments: $cond?
+
+=item Return Value: $row_object?
+
+=back
+
+  my $cd = $schema->resultset('CD')->single({ year => 2001 });
+
+Inflates the first result without creating a cursor if the resultset has
+any records in it; if not returns nothing. Used by L</find> as an optimisation.
+
+Can optionally take an additional condition *only* - this is a fast-code-path
+method; if you need to add extra joins or similar call ->search and then
+->single without a condition on the $rs returned from that.
 
 =cut
 
 sub single {
-  my ($self, $extra) = @_;
-  my ($attrs) = $self->{attrs};
-  $attrs = { %$attrs };
-  if ($extra) {
+  my ($self, $where) = @_;
+  $self->_resolve;
+  my $attrs = { %{$self->{_attrs}} };
+  if ($where) {
     if (defined $attrs->{where}) {
       $attrs->{where} = {
-        '-and'
-          => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
-               delete $attrs->{where}, $extra ]
+        '-and' =>
+            [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+               $where, delete $attrs->{where} ]
       };
     } else {
-      $attrs->{where} = $extra;
+      $attrs->{where} = $where;
     }
   }
+
   my @data = $self->result_source->storage->select_single(
-          $self->{from}, $attrs->{select},
+          $attrs->{from}, $attrs->{select},
           $attrs->{where},$attrs);
   return (@data ? $self->_construct_object(@data) : ());
 }
 
+=head2 get_column
+
+=over 4
+
+=item Arguments: $cond?
+
+=item Return Value: $resultsetcolumn
+
+=back
+
+  my $max_length = $rs->get_column('length')->max;
+
+Returns a ResultSetColumn instance for $column based on $self
+
+=cut
+
+sub get_column {
+  my ($self, $column) = @_;
+
+  my $new = DBIx::Class::ResultSetColumn->new($self, $column);
+  return $new;
+}
 
 =head2 search_like
 
-Perform a search, but use C<LIKE> instead of equality as the condition. Note
-that this is simply a convenience method; you most likely want to use
+=over 4
+
+=item Arguments: $cond, \%attrs?
+
+=item Return Value: $resultset (scalar context), @row_objs (list context)
+
+=back
+
+  # WHERE title LIKE '%blue%'
+  $cd_rs = $rs->search_like({ title => '%blue%'});
+
+Performs a search, but uses C<LIKE> instead of C<=> as the condition. Note
+that this is simply a convenience method. You most likely want to use
 L</search> with specific operators.
 
 For more information, see L<DBIx::Class::Manual::Cookbook>.
@@ -352,36 +503,52 @@ For more information, see L<DBIx::Class::Manual::Cookbook>.
 =cut
 
 sub search_like {
-  my $class    = shift;
-  my $attrs = { };
-  if (@_ > 1 && ref $_[$#_] eq 'HASH') {
-    $attrs = pop(@_);
-  }
-  my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
+  my $class = shift;
+  my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
+  my $query = ref $_[0] eq 'HASH' ? { %{shift()} }: {@_};
   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
   return $class->search($query, { %$attrs });
 }
 
 =head2 slice
 
-=head3 Arguments: ($first, $last)
+=over 4
+
+=item Arguments: $first, $last
+
+=item Return Value: $resultset (scalar context), @row_objs (list context)
+
+=back
 
-Returns a subset of elements from the resultset.
+Returns a resultset or object list representing a subset of elements from the
+resultset slice is called on. Indexes are from 0, i.e., to get the first
+three records, call:
+
+  my ($one, $two, $three) = $rs->slice(0, 2);
 
 =cut
 
 sub slice {
   my ($self, $min, $max) = @_;
-  my $attrs = { %{ $self->{attrs} || {} } };
-  $attrs->{offset} ||= 0;
+  my $attrs = {}; # = { %{ $self->{attrs} || {} } };
+  $attrs->{offset} = $self->{attrs}{offset} || 0;
   $attrs->{offset} += $min;
   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
-  my $slice = (ref $self)->new($self->result_source, $attrs);
-  return (wantarray ? $slice->all : $slice);
+  return $self->search(undef(), $attrs);
+  #my $slice = (ref $self)->new($self->result_source, $attrs);
+  #return (wantarray ? $slice->all : $slice);
 }
 
 =head2 next
 
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $result?
+
+=back
+
 Returns the next element in the resultset (C<undef> is there is none).
 
 Can be used to efficiently iterate over records in the resultset:
@@ -391,46 +558,184 @@ Can be used to efficiently iterate over records in the resultset:
     print $cd->title;
   }
 
+Note that you need to store the resultset object, and call C<next> on it. 
+Calling C<< resultset('Table')->next >> repeatedly will always return the
+first record from the resultset.
+
 =cut
 
 sub next {
   my ($self) = @_;
-  my $cache;
-  if( @{$cache = $self->{all_cache} || []}) {
+  if (my $cache = $self->get_cache) {
     $self->{all_cache_position} ||= 0;
-    my $obj = $cache->[$self->{all_cache_position}];
-    $self->{all_cache_position}++;
-    return $obj;
+    return $cache->[$self->{all_cache_position}++];
   }
   if ($self->{attrs}{cache}) {
     $self->{all_cache_position} = 1;
     return ($self->all)[0];
   }
-  my @row = (exists $self->{stashed_row}
-               ? @{delete $self->{stashed_row}}
-               : $self->cursor->next);
-#  warn Dumper(\@row); use Data::Dumper;
+  my @row = (exists $self->{stashed_row} ?
+               @{delete $self->{stashed_row}} :
+               $self->cursor->next
+  );
   return unless (@row);
   return $self->_construct_object(@row);
 }
 
+sub _resolve {
+  my $self = shift;
+
+  return if(exists $self->{_attrs}); #return if _resolve has already been called
+
+  my $attrs = $self->{attrs};  
+  my $source = ($self->{_parent_rs}) ? $self->{_parent_rs} : $self->{result_source};
+
+  # XXX - lose storable dclone
+  my $record_filter = delete $attrs->{record_filter} if (defined $attrs->{record_filter});
+  $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
+  $attrs->{record_filter} = $record_filter if ($record_filter);
+  $self->{attrs}->{record_filter} = $record_filter if ($record_filter);
+
+  my $alias = $attrs->{alias};
+  $attrs->{columns} ||= delete $attrs->{cols} if $attrs->{cols};
+  delete $attrs->{as} if $attrs->{columns};
+  $attrs->{columns} ||= [ $self->{result_source}->columns ] unless $attrs->{select};
+  my $select_alias = ($self->{_parent_rs}) ? $self->{attrs}->{_live_join} : $alias;
+  $attrs->{select} = [
+                     map { m/\./ ? $_ : "${select_alias}.$_" } @{delete $attrs->{columns}}
+                     ] if $attrs->{columns};
+  $attrs->{as} ||= [
+                   map { m/^\Q$alias.\E(.+)$/ ? $1 : $_ } @{$attrs->{select}}
+                   ];
+  if (my $include = delete $attrs->{include_columns}) {
+      push(@{$attrs->{select}}, @$include);
+      push(@{$attrs->{as}}, map { m/([^.]+)$/; $1; } @$include);
+  }
+
+  $attrs->{from} ||= [ { $alias => $source->from } ];
+  $attrs->{seen_join} ||= {};
+  my %seen;
+  if (my $join = delete $attrs->{join}) {
+      foreach my $j (ref $join eq 'ARRAY' ? @$join : ($join)) {
+         if (ref $j eq 'HASH') {
+             $seen{$_} = 1 foreach keys %$j;
+         } else {
+             $seen{$j} = 1;
+         }
+      }
+
+      push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
+  }
+  $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
+  $attrs->{order_by} = [ $attrs->{order_by} ] if
+      $attrs->{order_by} and !ref($attrs->{order_by});
+  $attrs->{order_by} ||= [];
+
+ if(my $seladds = delete($attrs->{'+select'})) {
+   my @seladds = (ref($seladds) eq 'ARRAY' ? @$seladds : ($seladds));
+   $attrs->{select} = [
+     @{ $attrs->{select} },
+     map { (m/\./ || ref($_)) ? $_ : "${alias}.$_" } $seladds
+   ];
+ }
+ if(my $asadds = delete($attrs->{'+as'})) {
+   my @asadds = (ref($asadds) eq 'ARRAY' ? @$asadds : ($asadds));
+   $attrs->{as} = [ @{ $attrs->{as} }, @asadds ];
+ }
+  
+  my $collapse = $attrs->{collapse} || {};
+  if (my $prefetch = delete $attrs->{prefetch}) {
+      my @pre_order;
+      foreach my $p (ref $prefetch eq 'ARRAY' ? @$prefetch : ($prefetch)) {
+         if ( ref $p eq 'HASH' ) {
+             foreach my $key (keys %$p) {
+                 push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
+                     unless $seen{$key};
+             }
+         } else {
+             push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
+                 unless $seen{$p};
+         }
+         my @prefetch = $source->resolve_prefetch(
+                                                  $p, $attrs->{alias}, {}, \@pre_order, $collapse);
+         push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
+         push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+      }
+      push(@{$attrs->{order_by}}, @pre_order);
+  }
+  $attrs->{collapse} = $collapse;
+  $self->{_attrs} = $attrs;
+}
+
+sub _merge_attr {
+  my ($self, $a, $b, $is_prefetch) = @_;
+    
+  return $b unless $a;
+  if (ref $b eq 'HASH' && ref $a eq 'HASH') {
+               foreach my $key (keys %{$b}) {
+                       if (exists $a->{$key}) {
+             $a->{$key} = $self->_merge_attr($a->{$key}, $b->{$key}, $is_prefetch);
+                       } else {
+             $a->{$key} = delete $b->{$key};
+                       }
+               }
+               return $a;
+  } else {
+               $a = [$a] unless (ref $a eq 'ARRAY');
+               $b = [$b] unless (ref $b eq 'ARRAY');
+
+               my $hash = {};
+               my $array = [];      
+               foreach ($a, $b) {
+                       foreach my $element (@{$_}) {
+             if (ref $element eq 'HASH') {
+                                       $hash = $self->_merge_attr($hash, $element, $is_prefetch);
+             } elsif (ref $element eq 'ARRAY') {
+                                       $array = [@{$array}, @{$element}];
+             } else {  
+                                       if (($b == $_) && $is_prefetch) {
+                                               $self->_merge_array($array, $element, $is_prefetch);
+                                       } else {
+                                               push(@{$array}, $element);
+                                       }
+             }
+                       }
+               }
+
+               if ((keys %{$hash}) && (scalar(@{$array} > 0))) {
+                       return [$hash, @{$array}];
+               } else {        
+                       return (keys %{$hash}) ? $hash : $array;
+               }
+  }
+}
+
+sub _merge_array {
+       my ($self, $a, $b) = @_;
+       $b = [$b] unless (ref $b eq 'ARRAY');
+       # add elements from @{$b} to @{$a} which aren't already in @{$a}
+       foreach my $b_element (@{$b}) {
+               push(@{$a}, $b_element) unless grep {$b_element eq $_} @{$a};
+       }
+}
+
 sub _construct_object {
   my ($self, @row) = @_;
-  my @as = @{ $self->{attrs}{as} };
+  my @as = @{ $self->{_attrs}{as} };
 
   my $info = $self->_collapse_result(\@as, \@row);
-
   my $new = $self->result_class->inflate_result($self->result_source, @$info);
-
-  $new = $self->{attrs}{record_filter}->($new)
-    if exists $self->{attrs}{record_filter};
+  $new = $self->{_attrs}{record_filter}->($new)
+    if exists $self->{_attrs}{record_filter};
   return $new;
 }
 
 sub _collapse_result {
   my ($self, $as, $row, $prefix) = @_;
 
+  my $live_join = $self->{attrs}->{_live_join} ||="";
   my %const;
 
   my @copy = @$row;
@@ -439,18 +744,18 @@ sub _collapse_result {
     if (defined $prefix) {
       if ($this_as =~ m/^\Q${prefix}.\E(.+)$/) {
         my $remain = $1;
-        $remain =~ /^(?:(.*)\.)?([^\.]+)$/;
+        $remain =~ /^(?:(.*)\.)?([^.]+)$/;
         $const{$1||''}{$2} = $val;
       }
     } else {
-      $this_as =~ /^(?:(.*)\.)?([^\.]+)$/;
+      $this_as =~ /^(?:(.*)\.)?([^.]+)$/;
       $const{$1||''}{$2} = $val;
     }
   }
 
   my $info = [ {}, {} ];
   foreach my $key (keys %const) {
-    if (length $key) {
+    if (length $key && $key ne $live_join) {
       my $target = $info;
       my @parts = split(/\./, $key);
       foreach my $p (@parts) {
@@ -462,10 +767,15 @@ sub _collapse_result {
     }
   }
 
-  my @collapse = (defined($prefix)
-                   ? (map { (m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()); }
-                       keys %{$self->{collapse}})
-                   : keys %{$self->{collapse}});
+  my @collapse;
+  if (defined $prefix) {
+    @collapse = map {
+        m/^\Q${prefix}.\E(.+)$/ ? ($1) : ()
+    } keys %{$self->{_attrs}->{collapse}}
+  } else {
+    @collapse = keys %{$self->{_attrs}->{collapse}};
+  };
+
   if (@collapse) {
     my ($c) = sort { length $a <=> length $b } @collapse;
     my $target = $info;
@@ -473,35 +783,51 @@ sub _collapse_result {
       $target = $target->[1]->{$p} ||= [];
     }
     my $c_prefix = (defined($prefix) ? "${prefix}.${c}" : $c);
-    my @co_key = @{$self->{collapse}{$c_prefix}};
+    my @co_key = @{$self->{_attrs}->{collapse}{$c_prefix}};
     my %co_check = map { ($_, $target->[0]->{$_}); } @co_key;
     my $tree = $self->_collapse_result($as, $row, $c_prefix);
     my (@final, @raw);
     while ( !(grep {
-                !defined($tree->[0]->{$_})
-                || $co_check{$_} ne $tree->[0]->{$_}
+                !defined($tree->[0]->{$_}) ||
+                $co_check{$_} ne $tree->[0]->{$_}
               } @co_key) ) {
       push(@final, $tree);
       last unless (@raw = $self->cursor->next);
       $row = $self->{stashed_row} = \@raw;
       $tree = $self->_collapse_result($as, $row, $c_prefix);
-      #warn Data::Dumper::Dumper($tree, $row);
     }
-    @{$target} = @final;
+    @$target = (@final ? @final : [ {}, {} ]); 
+      # single empty result to indicate an empty prefetched has_many
   }
-
   return $info;
 }
 
 =head2 result_source
 
-Returns a reference to the result source for this recordset.
+=over 4
+
+=item Arguments: $result_source?
+
+=item Return Value: $result_source
+
+=back
+
+An accessor for the primary ResultSource object from which this ResultSet
+is derived.
 
 =cut
 
 
 =head2 count
 
+=over 4
+
+=item Arguments: $cond, \%attrs??
+
+=item Return Value: $count
+
+=back
+
 Performs an SQL C<COUNT> with the same query as the resultset was built
 with to find the number of elements. If passed arguments, does a search
 on the resultset and counts the results of that.
@@ -516,52 +842,62 @@ clause.
 
 sub count {
   my $self = shift;
-  return $self->search(@_)->count if @_ && defined $_[0];
-  unless (defined $self->{count}) {
-    return scalar @{ $self->get_cache }
-      if @{ $self->get_cache };
-    my $group_by;
-    my $select = { 'count' => '*' };
-    my $attrs = { %{ $self->{attrs} } };
-    if( $group_by = delete $attrs->{group_by} ) {
-      delete $attrs->{having};
-      my @distinct = (ref $group_by ?  @$group_by : ($group_by));
-      # todo: try CONCAT for multi-column pk
-      my @pk = $self->result_source->primary_columns;
-      if( scalar(@pk) == 1 ) {
-        my $pk = shift(@pk);
-        my $alias = $attrs->{alias};
-        my $re = qr/^($alias\.)?$pk$/;
-        foreach my $column ( @distinct) {
-          if( $column =~ $re ) {
-            @distinct = ( $column );
-            last;
-          }
-        } 
-      }
+  return $self->search(@_)->count if @_ and defined $_[0];
+  return scalar @{ $self->get_cache } if $self->get_cache;
 
-      $select = { count => { 'distinct' => \@distinct } };
-      #use Data::Dumper; die Dumper $select;
-    }
+  my $count = $self->_count;
+  return 0 unless $count;
 
-    $attrs->{select} = $select;
-    $attrs->{as} = [ 'count' ];
-    # offset, order by and page are not needed to count. record_filter is cdbi
-    delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
-        
-    ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
-  }
-  return 0 unless $self->{count};
-  my $count = $self->{count};
   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
   $count = $self->{attrs}{rows} if
-    ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
+    $self->{attrs}{rows} and $self->{attrs}{rows} < $count;
+  return $count;
+}
+
+sub _count { # Separated out so pager can get the full count
+  my $self = shift;
+  my $select = { count => '*' };
+  
+  $self->_resolve;
+  my $attrs = { %{ $self->{_attrs} } };
+  if (my $group_by = delete $attrs->{group_by}) {
+    delete $attrs->{having};
+    my @distinct = (ref $group_by ?  @$group_by : ($group_by));
+    # todo: try CONCAT for multi-column pk
+    my @pk = $self->result_source->primary_columns;
+    if (@pk == 1) {
+      foreach my $column (@distinct) {
+        if ($column =~ qr/^(?:\Q$attrs->{alias}.\E)?$pk[0]$/) {
+          @distinct = ($column);
+          last;
+        }
+      }
+    }
+
+    $select = { count => { distinct => \@distinct } };
+  }
+
+  $attrs->{select} = $select;
+  $attrs->{as} = [qw/count/];
+
+  # offset, order by and page are not needed to count. record_filter is cdbi
+  delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
+  my ($count) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
   return $count;
 }
 
 =head2 count_literal
 
-Calls L</search_literal> with the passed arguments, then L</count>.
+=over 4
+
+=item Arguments: $sql_fragment, @bind_values
+
+=item Return Value: $count
+
+=back
+
+Counts the results in a literal query. Equivalent to calling L</search_literal>
+with the passed arguments, then L</count>.
 
 =cut
 
@@ -569,48 +905,66 @@ sub count_literal { shift->search_literal(@_)->count; }
 
 =head2 all
 
-Returns all elements in the resultset. Called implictly if the resultset
+=over 4
+
+=item Arguments: none
+
+=item Return Value: @objects
+
+=back
+
+Returns all elements in the resultset. Called implicitly if the resultset
 is returned in list context.
 
 =cut
 
 sub all {
   my ($self) = @_;
-  return @{ $self->get_cache }
-    if @{ $self->get_cache };
+  return @{ $self->get_cache } if $self->get_cache;
 
   my @obj;
 
-  if (keys %{$self->{collapse}}) {
+  # TODO: don't call resolve here
+  $self->_resolve;
+  if (keys %{$self->{_attrs}->{collapse}}) {
+#  if ($self->{attrs}->{prefetch}) {
       # Using $self->cursor->all is really just an optimisation.
       # If we're collapsing has_many prefetches it probably makes
       # very little difference, and this is cleaner than hacking
       # _construct_object to survive the approach
-    my @row;
-    $self->cursor->reset;
-    while (@row = $self->cursor->next) {
+    my @row = $self->cursor->next;
+    while (@row) {
       push(@obj, $self->_construct_object(@row));
+      @row = (exists $self->{stashed_row}
+               ? @{delete $self->{stashed_row}}
+               : $self->cursor->next);
     }
   } else {
-    @obj = map { $self->_construct_object(@$_); }
-             $self->cursor->all;
-  }
-
-  if( $self->{attrs}->{cache} ) {
-    $self->set_cache( \@obj );
+    @obj = map { $self->_construct_object(@$_) } $self->cursor->all;
   }
 
+  $self->set_cache(\@obj) if $self->{attrs}{cache};
   return @obj;
 }
 
 =head2 reset
 
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $self
+
+=back
+
 Resets the resultset's cursor, so you can iterate through the elements again.
 
 =cut
 
 sub reset {
   my ($self) = @_;
+  delete $self->{_attrs} if (exists $self->{_attrs});
+
   $self->{all_cache_position} = 0;
   $self->cursor->reset;
   return $self;
@@ -618,7 +972,16 @@ sub reset {
 
 =head2 first
 
-Resets the resultset and returns the first element.
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $object?
+
+=back
+
+Resets the resultset and returns an object for the first result (if the
+resultset returns anything).
 
 =cut
 
@@ -626,33 +989,118 @@ sub first {
   return $_[0]->reset->next;
 }
 
+# _cond_for_update_delete
+#
+# update/delete require the condition to be modified to handle
+# the differing SQL syntax available.  This transforms the $self->{cond}
+# appropriately, returning the new condition.
+
+sub _cond_for_update_delete {
+  my ($self) = @_;
+  my $cond = {};
+
+  if (!ref($self->{cond})) {
+    # No-op. No condition, we're updating/deleting everything
+  }
+  elsif (ref $self->{cond} eq 'ARRAY') {
+    $cond = [
+      map {
+        my %hash;
+        foreach my $key (keys %{$_}) {
+          $key =~ /([^.]+)$/;
+          $hash{$1} = $_->{$key};
+        }
+        \%hash;
+      } @{$self->{cond}}
+    ];
+  }
+  elsif (ref $self->{cond} eq 'HASH') {
+    if ((keys %{$self->{cond}})[0] eq '-and') {
+      $cond->{-and} = [];
+
+      my @cond = @{$self->{cond}{-and}};
+      for (my $i = 0; $i <= @cond - 1; $i++) {
+        my $entry = $cond[$i];
+
+        my %hash;
+        if (ref $entry eq 'HASH') {
+          foreach my $key (keys %{$entry}) {
+            $key =~ /([^.]+)$/;
+            $hash{$1} = $entry->{$key};
+          }
+        }
+        else {
+          $entry =~ /([^.]+)$/;
+          $hash{$1} = $cond[++$i];
+        }
+
+        push @{$cond->{-and}}, \%hash;
+      }
+    }
+    else {
+      foreach my $key (keys %{$self->{cond}}) {
+        $key =~ /([^.]+)$/;
+        $cond->{$1} = $self->{cond}{$key};
+      }
+    }
+  }
+  else {
+    $self->throw_exception(
+      "Can't update/delete on resultset with condition unless hash or array"
+    );
+  }
+
+  return $cond;
+}
+
+
 =head2 update
 
-=head3 Arguments: (\%values)
+=over 4
+
+=item Arguments: \%values
+
+=item Return Value: $storage_rv
+
+=back
 
-Sets the specified columns in the resultset to the supplied values.
+Sets the specified columns in the resultset to the supplied values in a
+single query. Return value will be true if the update succeeded or false
+if no records were updated; exact type of success value is storage-dependent.
 
 =cut
 
 sub update {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
+  $self->throw_exception("Values for update must be a hash")
+    unless ref $values eq 'HASH';
+
+  my $cond = $self->_cond_for_update_delete;
+
   return $self->result_source->storage->update(
-           $self->result_source->from, $values, $self->{cond});
+    $self->result_source->from, $values, $cond
+  );
 }
 
 =head2 update_all
 
-=head3 Arguments: (\%values)
+=over 4
+
+=item Arguments: \%values
 
-Fetches all objects and updates them one at a time.  Note that C<update_all>
-will run cascade triggers while L</update> will not.
+=item Return Value: 1
+
+=back
+
+Fetches all objects and updates them one at a time. Note that C<update_all>
+will run DBIC cascade triggers, while L</update> will not.
 
 =cut
 
 sub update_all {
   my ($self, $values) = @_;
-  $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
+  $self->throw_exception("Values for update must be a hash")
+    unless ref $values eq 'HASH';
   foreach my $obj ($self->all) {
     $obj->set_columns($values)->update;
   }
@@ -661,41 +1109,42 @@ sub update_all {
 
 =head2 delete
 
-Deletes the contents of the resultset from its result source.
+=over 4
+
+=item Arguments: none
+
+=item Return Value: 1
+
+=back
+
+Deletes the contents of the resultset from its result source. Note that this
+will not run DBIC cascade triggers. See L</delete_all> if you need triggers
+to run.
 
 =cut
 
 sub delete {
   my ($self) = @_;
   my $del = {};
-  $self->throw_exception("Can't delete on resultset with condition unless hash or array")
-    unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
-  if (ref $self->{cond} eq 'ARRAY') {
-    $del = [ map { my %hash;
-      foreach my $key (keys %{$_}) {
-        $key =~ /([^\.]+)$/;
-        $hash{$1} = $_->{$key};
-      }; \%hash; } @{$self->{cond}} ];
-  } elsif ((keys %{$self->{cond}})[0] eq '-and') {
-    $del->{-and} = [ map { my %hash;
-      foreach my $key (keys %{$_}) {
-        $key =~ /([^\.]+)$/;
-        $hash{$1} = $_->{$key};
-      }; \%hash; } @{$self->{cond}{-and}} ];
-  } else {
-    foreach my $key (keys %{$self->{cond}}) {
-      $key =~ /([^\.]+)$/;
-      $del->{$1} = $self->{cond}{$key};
-    }
-  }
-  $self->result_source->storage->delete($self->result_source->from, $del);
+
+  my $cond = $self->_cond_for_update_delete;
+
+  $self->result_source->storage->delete($self->result_source->from, $cond);
   return 1;
 }
 
 =head2 delete_all
 
-Fetches all objects and deletes them one at a time.  Note that C<delete_all>
-will run cascade triggers while L</delete> will not.
+=over 4
+
+=item Arguments: none
+
+=item Return Value: 1
+
+=back
+
+Fetches all objects and deletes them one at a time. Note that C<delete_all>
+will run DBIC cascade triggers, while L</delete> will not.
 
 =cut
 
@@ -707,7 +1156,15 @@ sub delete_all {
 
 =head2 pager
 
-Returns a L<Data::Page> object for the current resultset. Only makes
+=over 4
+
+=item Arguments: none
+
+=item Return Value: $pager
+
+=back
+
+Return Value a L<Data::Page> object for the current resultset. Only makes
 sense for queries with a C<page> attribute.
 
 =cut
@@ -715,18 +1172,26 @@ sense for queries with a C<page> attribute.
 sub pager {
   my ($self) = @_;
   my $attrs = $self->{attrs};
-  $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
+  $self->throw_exception("Can't create pager for non-paged rs")
+    unless $self->{page};
   $attrs->{rows} ||= 10;
-  $self->count;
   return $self->{pager} ||= Data::Page->new(
-    $self->{count}, $attrs->{rows}, $self->{page});
+    $self->_count, $attrs->{rows}, $self->{page});
 }
 
 =head2 page
 
-=head3 Arguments: ($page_num)
+=over 4
+
+=item Arguments: $page_number
+
+=item Return Value: $rs
 
-Returns a new resultset for the specified page.
+=back
+
+Returns a resultset for the $page_number page of the resultset on which page
+is called, where each page contains a number of rows equal to the 'rows'
+attribute set on the resultset (10 by default).
 
 =cut
 
@@ -739,9 +1204,15 @@ sub page {
 
 =head2 new_result
 
-=head3 Arguments: (\%vals)
+=over 4
+
+=item Arguments: \%vals
+
+=item Return Value: $object
+
+=back
 
-Creates a result in the resultset's result class.
+Creates an object in the resultset's result class and returns it.
 
 =cut
 
@@ -749,23 +1220,56 @@ sub new_result {
   my ($self, $values) = @_;
   $self->throw_exception( "new_result needs a hash" )
     unless (ref $values eq 'HASH');
-  $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
-    if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
+  $self->throw_exception(
+    "Can't abstract implicit construct, condition not a hash"
+  ) if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
   my %new = %$values;
   my $alias = $self->{attrs}{alias};
   foreach my $key (keys %{$self->{cond}||{}}) {
-    $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
+    $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:\Q${alias}.\E)?([^.]+)$/);
   }
   my $obj = $self->result_class->new(\%new);
   $obj->result_source($self->result_source) if $obj->can('result_source');
-  $obj;
+  return $obj;
+}
+
+=head2 find_or_new
+
+=over 4
+
+=item Arguments: \%vals, \%attrs?
+
+=item Return Value: $object
+
+=back
+
+Find an existing record from this resultset. If none exists, instantiate a new
+result object and return it. The object will not be saved into your storage
+until you call L<DBIx::Class::Row/insert> on it.
+
+If you want objects to be saved immediately, use L</find_or_create> instead.
+
+=cut
+
+sub find_or_new {
+  my $self     = shift;
+  my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
+  my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
+  my $exists   = $self->find($hash, $attrs);
+  return defined $exists ? $exists : $self->new_result($hash);
 }
 
 =head2 create
 
-=head3 Arguments: (\%vals)
+=over 4
 
-Inserts a record into the resultset and returns the object.
+=item Arguments: \%vals
+
+=item Return Value: $object
+
+=back
+
+Inserts a record into the resultset and returns the object representing it.
 
 Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
 
@@ -773,13 +1277,20 @@ Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
 
 sub create {
   my ($self, $attrs) = @_;
-  $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
+  $self->throw_exception( "create needs a hashref" )
+    unless ref $attrs eq 'HASH';
   return $self->new_result($attrs)->insert;
 }
 
 =head2 find_or_create
 
-=head3 Arguments: (\%vals, \%attrs?)
+=over 4
+
+=item Arguments: \%vals, \%attrs?
+
+=item Return Value: $object
+
+=back
 
   $class->find_or_create({ key => $val, ... });
 
@@ -804,25 +1315,34 @@ constraint. For example:
     { key => 'artist_title' }
   );
 
-See also L</find> and L</update_or_create>.
+See also L</find> and L</update_or_create>. For information on how to declare
+unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
 
 =cut
 
 sub find_or_create {
   my $self     = shift;
   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
-  my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
+  my $hash     = ref $_[0] eq 'HASH' ? shift : {@_};
   my $exists   = $self->find($hash, $attrs);
-  return defined($exists) ? $exists : $self->create($hash);
+  return defined $exists ? $exists : $self->create($hash);
 }
 
 =head2 update_or_create
 
-  $class->update_or_create({ key => $val, ... });
+=over 4
+
+=item Arguments: \%col_values, { key => $unique_constraint }?
 
-First, search for an existing row matching one of the unique constraints
-(including the primary key) on the source of this resultset.  If a row is
-found, update it with the other given column values.  Otherwise, create a new
+=item Return Value: $object
+
+=back
+
+  $class->update_or_create({ col => $val, ... });
+
+First, searches for an existing row matching one of the unique constraints
+(including the primary key) on the source of this resultset. If a row is
+found, updates it with the other given column values. Otherwise, creates a new
 row.
 
 Takes an optional C<key> attribute to search on a specific unique constraint.
@@ -841,137 +1361,133 @@ For example:
 If no C<key> is specified, it searches on all unique constraints defined on the
 source, including the primary key.
 
-If the C<key> is specified as C<primary>, search only on the primary key.
+If the C<key> is specified as C<primary>, it searches only on the primary key.
 
-See also L</find> and L</find_or_create>.
+See also L</find> and L</find_or_create>. For information on how to declare
+unique constraints, see L<DBIx::Class::ResultSource/add_unique_constraint>.
 
 =cut
 
 sub update_or_create {
   my $self = shift;
-
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
-  my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
+  my $hash = ref $_[0] eq 'HASH' ? shift : {@_};
 
-  my %unique_constraints = $self->result_source->unique_constraints;
-  my @constraint_names   = (exists $attrs->{key}
-                            ? ($attrs->{key})
-                            : keys %unique_constraints);
-
-  my @unique_hashes;
-  foreach my $name (@constraint_names) {
-    my @unique_cols = @{ $unique_constraints{$name} };
-    my %unique_hash =
-      map  { $_ => $hash->{$_} }
-      grep { exists $hash->{$_} }
-      @unique_cols;
-
-    push @unique_hashes, \%unique_hash
-      if (scalar keys %unique_hash == scalar @unique_cols);
+  my $row = $self->find($hash, $attrs);
+  if (defined $row) {
+    $row->update($hash);
+    return $row;
   }
 
-  my $row;
-  if (@unique_hashes) {
-    $row = $self->search(\@unique_hashes, { rows => 1 })->first;
-    if ($row) {
-      $row->set_columns($hash);
-      $row->update;
-    }
-  }
-
-  unless ($row) {
-    $row = $self->create($hash);
-  }
-
-  return $row;
+  return $self->create($hash);
 }
 
 =head2 get_cache
 
-Gets the contents of the cache for the resultset.
+=over 4
+
+=item Arguments: none
+
+=item Return Value: \@cache_objects?
+
+=back
+
+Gets the contents of the cache for the resultset, if the cache is set.
 
 =cut
 
 sub get_cache {
-  my $self = shift;
-  return $self->{all_cache} || [];
+  shift->{all_cache};
 }
 
 =head2 set_cache
 
-Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
+=over 4
+
+=item Arguments: \@cache_objects
+
+=item Return Value: \@cache_objects
+
+=back
+
+Sets the contents of the cache for the resultset. Expects an arrayref
+of objects of the same class as those produced by the resultset. Note that
+if the cache is set the resultset will return the cached objects rather
+than re-querying the database even if the cache attr is not set.
 
 =cut
 
 sub set_cache {
   my ( $self, $data ) = @_;
   $self->throw_exception("set_cache requires an arrayref")
-    if ref $data ne 'ARRAY';
-  my $result_class = $self->result_class;
-  foreach( @$data ) {
-    $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
-      if ref $_ ne $result_class;
-  }
+      if defined($data) && (ref $data ne 'ARRAY');
   $self->{all_cache} = $data;
 }
 
 =head2 clear_cache
 
+=over 4
+
+=item Arguments: none
+
+=item Return Value: []
+
+=back
+
 Clears the cache for the resultset.
 
 =cut
 
 sub clear_cache {
-  my $self = shift;
-  $self->set_cache([]);
+  shift->set_cache(undef);
 }
 
 =head2 related_resultset
 
+=over 4
+
+=item Arguments: $relationship_name
+
+=item Return Value: $resultset
+
+=back
+
 Returns a related resultset for the supplied relationship name.
 
-  $rs = $rs->related_resultset('foo');
+  $artist_rs = $schema->resultset('CD')->related_resultset('Artist');
 
 =cut
 
 sub related_resultset {
-  my ( $self, $rel, @rest ) = @_;
+  my ( $self, $rel ) = @_;
+
   $self->{related_resultsets} ||= {};
-  my $resultsets = $self->{related_resultsets};
-  if( !exists $resultsets->{$rel} ) {
-    #warn "fetching related resultset for rel '$rel'";
-    my $rel_obj = $self->result_source->relationship_info($rel);
-    $self->throw_exception(
-      "search_related: result source '" . $self->result_source->name .
-      "' has no such relationship ${rel}")
-      unless $rel_obj; #die Dumper $self->{attrs};
-    my $rs = $self->search(undef, { join => $rel });
-    #if( $self->{attrs}->{cache} ) {
-    #  $rs = $self->search(undef);
-    #}
-    #else {
-    #}
-    #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
-    #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
-    my $alias = (defined $rs->{attrs}{seen_join}{$rel}
-                  && $rs->{attrs}{seen_join}{$rel} > 1
-                ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
-                : $rel);
-    $resultsets->{$rel} =
-      $self->result_source->schema->resultset($rel_obj->{class}
+  return $self->{related_resultsets}{$rel} ||= do {
+      #warn "fetching related resultset for rel '$rel' " . $self->result_source->{name};
+      my $rel_obj = $self->result_source->relationship_info($rel);
+      $self->throw_exception(
+        "search_related: result source '" . $self->result_source->name .
+        "' has no such relationship ${rel}")
+        unless $rel_obj; #die Dumper $self->{attrs};
+
+      my $rs = $self->result_source->schema->resultset($rel_obj->{class}
            )->search( undef,
-             { %{$rs->{attrs}},
-               alias => $alias,
-               select => undef(),
-               as => undef() }
-           )->search(@rest);
-  }
-  return $resultsets->{$rel};
+             { %{$self->{attrs}},
+               select => undef,
+               as => undef,
+              join => $rel,
+              _live_join => $rel }
+           );
+
+      # keep reference of the original resultset
+      $rs->{_parent_rs} = $self->result_source;
+      return $rs;
+  };
 }
 
 =head2 throw_exception
 
-See Schema's throw_exception
+See L<DBIx::Class::Schema/throw_exception> for details.
 
 =cut
 
@@ -980,6 +1496,8 @@ sub throw_exception {
   $self->result_source->schema->throw_exception(@_);
 }
 
+# XXX: FIXME: Attributes docs need clearing up
+
 =head1 ATTRIBUTES
 
 The resultset takes various attributes that modify its behavior. Here's an
@@ -987,12 +1505,23 @@ overview of them:
 
 =head2 order_by
 
-Which column(s) to order the results by. This is currently passed through
-directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
+=over 4
+
+=item Value: ($order_by | \@order_by)
+
+=back
+
+Which column(s) to order the results by. This is currently passed
+through directly to SQL, so you can give e.g. C<year DESC> for a
+descending order on the column `year'.
 
 =head2 columns
 
-=head3 Arguments: (arrayref)
+=over 4
+
+=item Value: \@columns
+
+=back
 
 Shortcut to request a particular set of columns to be retrieved.  Adds
 C<me.> onto the start of any column without a C<.> in it and sets C<select>
@@ -1001,74 +1530,111 @@ use the C<cols> attribute, as in earlier versions of DBIC.)
 
 =head2 include_columns
 
-=head3 Arguments: (arrayref)
+=over 4
+
+=item Value: \@columns
+
+=back
 
 Shortcut to include additional columns in the returned results - for example
 
-  { include_columns => ['foo.name'], join => ['foo'] }
+  $schema->resultset('CD')->search(undef, {
+    include_columns => ['artist.name'],
+    join => ['artist']
+  });
 
-would add a 'name' column to the information passed to object inflation
+would return all CDs and include a 'name' column to the information
+passed to object inflation
 
 =head2 select
 
-=head3 Arguments: (arrayref)
+=over 4
+
+=item Value: \@select_columns
+
+=back
 
 Indicates which columns should be selected from the storage. You can use
 column names, or in the case of RDBMS back ends, function or stored procedure
 names:
 
-  $rs = $schema->resultset('Foo')->search(
-    undef,
-    {
-      select => [
-        'column_name',
-        { count => 'column_to_count' },
-        { sum => 'column_to_sum' }
-      ]
-    }
-  );
+  $rs = $schema->resultset('Employee')->search(undef, {
+    select => [
+      'name',
+      { count => 'employeeid' },
+      { sum => 'salary' }
+    ]
+  });
 
 When you use function/stored procedure names and do not supply an C<as>
 attribute, the column names returned are storage-dependent. E.g. MySQL would
-return a column named C<count(column_to_count)> in the above example.
+return a column named C<count(employeeid)> in the above example.
+
+=head2 +select
+
+=over 4
+
+Indicates additional columns to be selected from storage.  Works the same as
+L<select> but adds columns to the selection.
+
+=back
+
+=head2 +as
+
+=over 4
+
+Indicates additional column names for those added via L<+select>.
+
+=back
 
 =head2 as
 
-=head3 Arguments: (arrayref)
+=over 4
+
+=item Value: \@inflation_names
+
+=back
 
 Indicates column names for object inflation. This is used in conjunction with
 C<select>, usually when C<select> contains one or more function or stored
 procedure names:
 
-  $rs = $schema->resultset('Foo')->search(
-    undef,
-    {
-      select => [
-        'column1',
-        { count => 'column2' }
-      ],
-      as => [qw/ column1 column2_count /]
-    }
-  );
+  $rs = $schema->resultset('Employee')->search(undef, {
+    select => [
+      'name',
+      { count => 'employeeid' }
+    ],
+    as => ['name', 'employee_count'],
+  });
 
-  my $foo = $rs->first(); # get the first Foo
+  my $employee = $rs->first(); # get the first Employee
 
 If the object against which the search is performed already has an accessor
 matching a column name specified in C<as>, the value can be retrieved using
 the accessor as normal:
 
-  my $column1 = $foo->column1();
+  my $name = $employee->name();
 
 If on the other hand an accessor does not exist in the object, you need to
 use C<get_column> instead:
 
-  my $column2_count = $foo->get_column('column2_count');
+  my $employee_count = $employee->get_column('employee_count');
 
 You can create your own accessors if required - see
 L<DBIx::Class::Manual::Cookbook> for details.
 
+Please note: This will NOT insert an C<AS employee_count> into the SQL statement
+produced, it is used for internal access only. Thus attempting to use the accessor
+in an C<order_by> clause or similar will fail misrably.
+
 =head2 join
 
+=over 4
+
+=item Value: ($rel_name | \@rel_names | \%rel_names)
+
+=back
+
 Contains a list of relationships that should be joined for this query.  For
 example:
 
@@ -1101,22 +1667,28 @@ For example:
 If the same join is supplied twice, it will be aliased to <rel>_2 (and
 similarly for a third time). For e.g.
 
-  my $rs = $schema->resultset('Artist')->search(
-    { 'cds.title'   => 'Foo',
-      'cds_2.title' => 'Bar' },
-    { join => [ qw/cds cds/ ] });
+  my $rs = $schema->resultset('Artist')->search({
+    'cds.title'   => 'Down to Earth',
+    'cds_2.title' => 'Popular',
+  }, {
+    join => [ qw/cds cds/ ],
+  });
 
-will return a set of all artists that have both a cd with title Foo and a cd
-with title Bar.
+will return a set of all artists that have both a cd with title 'Down
+to Earth' and a cd with title 'Popular'.
 
 If you want to fetch related objects from other tables as well, see C<prefetch>
 below.
 
 =head2 prefetch
 
-=head3 Arguments: arrayref/hashref
+=over 4
+
+=item Value: ($rel_name | \@rel_names | \%rel_names)
+
+=back
 
-Contains one or more relationships that should be fetched along with the main 
+Contains one or more relationships that should be fetched along with the main
 query (when they are accessed afterwards they will have already been
 "prefetched").  This is useful for when you know you will need the related
 objects, because it saves at least one query:
@@ -1149,30 +1721,125 @@ C<prefetch> can be used with the following relationship types: C<belongs_to>,
 C<has_one> (or if you're using C<add_relationship>, any relationship declared
 with an accessor type of 'single' or 'filter').
 
+=head2 page
+
+=over 4
+
+=item Value: $page
+
+=back
+
+Makes the resultset paged and specifies the page to retrieve. Effectively
+identical to creating a non-pages resultset and then calling ->page($page)
+on it.
+
+=head2 rows
+
+=over 4
+
+=item Value: $rows
+
+=back
+
+Specifes the maximum number of rows for direct retrieval or the number of
+rows per page if the page attribute or method is used.
+
+=head2 group_by
+
+=over 4
+
+=item Value: \@columns
+
+=back
+
+A arrayref of columns to group by. Can include columns of joined tables.
+
+  group_by => [qw/ column1 column2 ... /]
+
+=head2 having
+
+=over 4
+
+=item Value: $condition
+
+=back
+
+HAVING is a select statement attribute that is applied between GROUP BY and
+ORDER BY. It is applied to the after the grouping calculations have been
+done. 
+
+  having => { 'count(employee)' => { '>=', 100 } }
+
+=head2 distinct
+
+=over 4
+
+=item Value: (0 | 1)
+
+=back
+
+Set to 1 to group by all columns.
+
+=head2 cache
+
+Set to 1 to cache search results. This prevents extra SQL queries if you
+revisit rows in your ResultSet:
+
+  my $resultset = $schema->resultset('Artist')->search( undef, { cache => 1 } );
+  
+  while( my $artist = $resultset->next ) {
+    ... do stuff ...
+  }
+
+  $rs->first; # without cache, this would issue a query
+
+By default, searches are not cached.
+
+For more examples of using these attributes, see
+L<DBIx::Class::Manual::Cookbook>.
+
 =head2 from
 
-=head3 Arguments: (arrayref)
+=over 4
+
+=item Value: \@from_clause
+
+=back
 
 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
 clauses.
 
 NOTE: Use this on your own risk.  This allows you to shoot off your foot!
+
 C<join> will usually do what you need and it is strongly recommended that you
 avoid using C<from> unless you cannot achieve the desired result using C<join>.
+And we really do mean "cannot", not just tried and failed. Attempting to use
+this because you're having problems with C<join> is like trying to use x86
+ASM because you've got a syntax error in your C. Trust us on this.
 
-In simple terms, C<from> works as follows:
+Now, if you're still really, really sure you need to use this (and if you're
+not 100% sure, ask the mailing list first), here's an explanation of how this
+works.
 
-    [
-        { <alias> => <table>, -join-type => 'inner|left|right' }
-        [] # nested JOIN (optional)
-        { <table.column> = <foreign_table.foreign_key> }
-    ]
+The syntax is as follows -
 
-    JOIN
-        <alias> <table>
-        [JOIN ...]
-    ON <table.column> = <foreign_table.foreign_key>
+  [
+    { <alias1> => <table1> },
+    [
+      { <alias2> => <table2>, -join_type => 'inner|left|right' },
+      [], # nested JOIN (optional)
+      { <table1.column1> => <table2.column2>, ... (more conditions) },
+    ],
+    # More of the above [ ] may follow for additional joins
+  ]
+
+  <table1> <alias1>
+  JOIN
+    <table2> <alias2>
+    [JOIN ...]
+  ON <table1.column1> = <table2.column2>
+  <more joins may follow>
 
 An easy way to follow the examples below is to remember the following:
 
@@ -1227,7 +1894,7 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
             from => [
                 { child => 'person' },
                 [
-                    { father => 'person', -join-type => 'inner' },
+                    { father => 'person', -join_type => 'inner' },
                     { 'father.id' => 'child.father_id' }
                 ],
             ]
@@ -1238,34 +1905,6 @@ with a father in the person table, we could explicitly use C<INNER JOIN>:
     # SELECT child.* FROM person child
     # INNER JOIN person father ON child.father_id = father.id
 
-=head2 page
-
-For a paged resultset, specifies which page to retrieve.  Leave unset
-for an unpaged resultset.
-
-=head2 rows
-
-For a paged resultset, how many rows per page:
-
-  rows => 10
-
-Can also be used to simulate an SQL C<LIMIT>.
-
-=head2 group_by
-
-=head3 Arguments: (arrayref)
-
-A arrayref of columns to group by. Can include columns of joined tables.
-
-  group_by => [qw/ column1 column2 ... /]
-
-=head2 distinct
-
-Set to 1 to group by all columns.
-
-For more examples of using these attributes, see
-L<DBIx::Class::Manual::Cookbook>.
-
 =cut
 
 1;