Merge 'DBIx-Class-current' into 'trunk'
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index bb3d37b..af51f79 100644 (file)
@@ -2,14 +2,17 @@ package DBIx::Class::ResultSet;
 
 use strict;
 use warnings;
-use Carp qw/croak/;
 use overload
-        '0+'     => 'count',
+        '0+'     => \&count,
         'bool'   => sub { 1; },
         fallback => 1;
 use Data::Page;
 use Storable;
 
+use base qw/DBIx::Class/;
+__PACKAGE__->load_components(qw/AccessorGroup/);
+__PACKAGE__->mk_group_accessors('simple' => 'result_source');
+
 =head1 NAME
 
 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
@@ -29,6 +32,7 @@ In the examples below, the following table classes are used:
 
   package MyApp::Schema::Artist;
   use base qw/DBIx::Class/;
+  __PACKAGE__->load_components(qw/Core/);
   __PACKAGE__->table('artist');
   __PACKAGE__->add_columns(qw/artistid name/);
   __PACKAGE__->set_primary_key('artistid');
@@ -37,7 +41,8 @@ In the examples below, the following table classes are used:
 
   package MyApp::Schema::CD;
   use base qw/DBIx::Class/;
-  __PACKAGE__->table('artist');
+  __PACKAGE__->load_components(qw/Core/);
+  __PACKAGE__->table('cd');
   __PACKAGE__->add_columns(qw/cdid artist title year/);
   __PACKAGE__->set_primary_key('cdid');
   __PACKAGE__->belongs_to(artist => 'MyApp::Schema::Artist');
@@ -45,10 +50,12 @@ In the examples below, the following table classes are used:
 
 =head1 METHODS
 
-=head2 new($source, \%$attrs)
+=head2 new
+
+=head3 Arguments: ($source, \%$attrs)
 
 The resultset constructor. Takes a source object (usually a
-L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATRRIBUTES>
+L<DBIx::Class::ResultSourceProxy::Table>) and an attribute hash (see L</ATTRIBUTES>
 below).  Does not perform any queries -- these are executed as needed by the
 other methods.
 
@@ -75,8 +82,13 @@ sub new {
     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
   }
   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
+  if (my $include = delete $attrs->{include_columns}) {
+    push(@{$attrs->{select}}, @$include);
+    push(@{$attrs->{as}}, map { m/([^\.]+)$/; $1; } @$include);
+  }
   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
   $attrs->{from} ||= [ { $alias => $source->from } ];
+  $attrs->{seen_join} ||= {};
   if (my $join = delete $attrs->{join}) {
     foreach my $j (ref $join eq 'ARRAY'
               ? (@{$join}) : ($join)) {
@@ -86,25 +98,37 @@ sub new {
         $seen{$j} = 1;
       }
     }
-    push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}));
+    push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}, $attrs->{seen_join}));
   }
   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
-  foreach my $pre (@{delete $attrs->{prefetch} || []}) {
-    push(@{$attrs->{from}}, $source->resolve_join($pre, $attrs->{alias}))
-      unless $seen{$pre};
-    my @pre = 
-      map { "$pre.$_" }
-      $source->related_source($pre)->columns;
-    push(@{$attrs->{select}}, @pre);
-    push(@{$attrs->{as}}, @pre);
+
+  if (my $prefetch = delete $attrs->{prefetch}) {
+    foreach my $p (ref $prefetch eq 'ARRAY'
+              ? (@{$prefetch}) : ($prefetch)) {
+      if( ref $p eq 'HASH' ) {
+        foreach my $key (keys %$p) {
+          push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
+            unless $seen{$key};
+        }
+      }
+      else {
+        push(@{$attrs->{from}}, $source->resolve_join($p, $attrs->{alias}))
+            unless $seen{$p};
+      }
+      my @prefetch = $source->resolve_prefetch($p, $attrs->{alias});
+      #die Dumper \@cols;
+      push(@{$attrs->{select}}, map { $_->[0] } @prefetch);
+      push(@{$attrs->{as}}, map { $_->[1] } @prefetch);
+    }
   }
+
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
     $attrs->{offset} ||= 0;
     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
   }
   my $new = {
-    source => $source,
+    result_source => $source,
     cond => $attrs->{where},
     from => $attrs->{from},
     count => undef,
@@ -131,25 +155,47 @@ call it as C<search({}, \%attrs);>.
 sub search {
   my $self = shift;
 
-  #use Data::Dumper;warn Dumper(@_);
-
-  my $attrs = { %{$self->{attrs}} };
-  if (@_ > 1 && ref $_[$#_] eq 'HASH') {
-    $attrs = { %$attrs, %{ pop(@_) } };
-  }
+  my $rs;
+  if( @_ ) {
+    
+    my $attrs = { %{$self->{attrs}} };
+    my $having = delete $attrs->{having};
+    if (@_ > 1 && ref $_[$#_] eq 'HASH') {
+     $attrs = { %$attrs, %{ pop(@_) } };
+    }
 
-  my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
-  if (defined $where) {
-    $where = (defined $attrs->{where}
+    my $where = (@_
+                  ? ((@_ == 1 || ref $_[0] eq "HASH")
+                      ? shift
+                      : ((@_ % 2)
+                          ? $self->throw_exception(
+                              "Odd number of arguments to search")
+                          : {@_}))
+                  : undef());
+    if (defined $where) {
+      $where = (defined $attrs->{where}
                 ? { '-and' =>
                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
                         $where, $attrs->{where} ] }
                 : $where);
-    $attrs->{where} = $where;
-  }
+      $attrs->{where} = $where;
+    }
 
-  my $rs = (ref $self)->new($self->{source}, $attrs);
+    if (defined $having) {
+      $having = (defined $attrs->{having}
+                ? { '-and' =>
+                    [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+                        $having, $attrs->{having} ] }
+                : $having);
+      $attrs->{having} = $having;
+    }
 
+    $rs = (ref $self)->new($self->result_source, $attrs);
+  }
+  else {
+    $rs = $self;
+    $rs->reset();
+  }
   return (wantarray ? $rs->all : $rs);
 }
 
@@ -162,7 +208,7 @@ Pass a literal chunk of SQL to be added to the conditional part of the
 resultset.
 
 =cut
-                                                         
+
 sub search_literal {
   my ($self, $cond, @vals) = @_;
   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
@@ -170,7 +216,9 @@ sub search_literal {
   return $self->search(\$cond, $attrs);
 }
 
-=head2 find(@colvalues), find(\%cols, \%attrs?)
+=head2 find
+
+=head3 Arguments: (@colvalues) | (\%cols, \%attrs?)
 
 Finds a row based on its primary key or unique constraint. For example:
 
@@ -179,7 +227,7 @@ Finds a row based on its primary key or unique constraint. For example:
 Also takes an optional C<key> attribute, to search by a specific key or unique
 constraint. For example:
 
-  my $cd = $schema->resultset('CD')->find_or_create(
+  my $cd = $schema->resultset('CD')->find(
     {
       artist => 'Massive Attack',
       title  => 'Mezzanine',
@@ -195,32 +243,34 @@ sub find {
   my ($self, @vals) = @_;
   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
 
-  my @cols = $self->{source}->primary_columns;
+  my @cols = $self->result_source->primary_columns;
   if (exists $attrs->{key}) {
-    my %uniq = $self->{source}->unique_constraints;
+    my %uniq = $self->result_source->unique_constraints;
     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
       unless exists $uniq{$attrs->{key}};
     @cols = @{ $uniq{$attrs->{key}} };
   }
   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
-  $self->{source}->result_class->throw( "Can't find unless a primary key or unique constraint is defined" )
+  $self->throw_exception( "Can't find unless a primary key or unique constraint is defined" )
     unless @cols;
 
   my $query;
   if (ref $vals[0] eq 'HASH') {
-    $query = $vals[0];
+    $query = { %{$vals[0]} };
   } elsif (@cols == @vals) {
     $query = {};
     @{$query}{@cols} = @vals;
   } else {
     $query = {@vals};
   }
+  foreach (keys %$query) {
+    next if m/\./;
+    $query->{$self->{attrs}{alias}.'.'.$_} = delete $query->{$_};
+  }
   #warn Dumper($query);
-  # Useless -> disabled
-  #$self->{source}->result_class->throw( "Can't find unless all primary keys are specified" )
-  #  unless (keys %$query >= @pk); # If we check 'em we run afoul of uc/lc
-                                  # column names etc. Not sure what to do yet
-  return $self->search($query)->next;
+  return (keys %$attrs
+           ? $self->search($query,$attrs)->single
+           : $self->single($query));
 }
 
 =head2 search_related
@@ -233,19 +283,7 @@ records.
 =cut
 
 sub search_related {
-  my ($self, $rel, @rest) = @_;
-  my $rel_obj = $self->{source}->relationship_info($rel);
-  $self->{source}->result_class->throw(
-    "No such relationship ${rel} in search_related")
-      unless $rel_obj;
-  my $rs = $self->search(undef, { join => $rel });
-  return $self->{source}->schema->resultset($rel_obj->{class}
-           )->search( undef,
-             { %{$rs->{attrs}},
-               alias => $rel,
-               select => undef(),
-               as => undef() }
-           )->search(@rest);
+  return shift->related_resultset(shift)->search(@_);
 }
 
 =head2 cursor
@@ -256,13 +294,41 @@ Returns a storage-driven cursor to the given resultset.
 
 sub cursor {
   my ($self) = @_;
-  my ($source, $attrs) = @{$self}{qw/source attrs/};
+  my ($attrs) = $self->{attrs};
   $attrs = { %$attrs };
   return $self->{cursor}
-    ||= $source->storage->select($self->{from}, $attrs->{select},
+    ||= $self->result_source->storage->select($self->{from}, $attrs->{select},
+          $attrs->{where},$attrs);
+}
+
+=head2 single
+
+Inflates the first result without creating a cursor
+
+=cut
+
+sub single {
+  my ($self, $extra) = @_;
+  my ($attrs) = $self->{attrs};
+  $attrs = { %$attrs };
+  if ($extra) {
+    if (defined $attrs->{where}) {
+      $attrs->{where} = {
+        '-and'
+          => [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+               delete $attrs->{where}, $extra ]
+      };
+    } else {
+      $attrs->{where} = $extra;
+    }
+  }
+  my @data = $self->result_source->storage->select_single(
+          $self->{from}, $attrs->{select},
           $attrs->{where},$attrs);
+  return (@data ? $self->_construct_object(@data) : ());
 }
 
+
 =head2 search_like
 
 Perform a search, but use C<LIKE> instead of equality as the condition. Note
@@ -284,7 +350,9 @@ sub search_like {
   return $class->search($query, { %$attrs });
 }
 
-=head2 slice($first, $last)
+=head2 slice
+
+=head3 Arguments: ($first, $last)
 
 Returns a subset of elements from the resultset.
 
@@ -296,7 +364,7 @@ sub slice {
   $attrs->{offset} ||= 0;
   $attrs->{offset} += $min;
   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
-  my $slice = (ref $self)->new($self->{source}, $attrs);
+  my $slice = (ref $self)->new($self->result_source, $attrs);
   return (wantarray ? $slice->all : $slice);
 }
 
@@ -315,6 +383,17 @@ Can be used to efficiently iterate over records in the resultset:
 
 sub next {
   my ($self) = @_;
+  my $cache;
+  if( @{$cache = $self->{all_cache} || []}) {
+    $self->{all_cache_position} ||= 0;
+    my $obj = $cache->[$self->{all_cache_position}];
+    $self->{all_cache_position}++;
+    return $obj;
+  }
+  if ($self->{attrs}{cache}) {
+    $self->{all_cache_position} = 0;
+    return ($self->all)[0];
+  }
   my @row = $self->cursor->next;
 #  warn Dumper(\@row); use Data::Dumper;
   return unless (@row);
@@ -323,22 +402,86 @@ sub next {
 
 sub _construct_object {
   my ($self, @row) = @_;
-  my @cols = @{ $self->{attrs}{as} };
+  my @row_orig = @row; # copy @row for key comparison later, because @row will change
+  my @as = @{ $self->{attrs}{as} };
+#use Data::Dumper; warn Dumper \@as;
   #warn "@cols -> @row";
-  my (%me, %pre);
-  foreach my $col (@cols) {
-    if ($col =~ /([^\.]+)\.([^\.]+)/) {
-      $pre{$1}[0]{$2} = shift @row;
-    } else {
-      $me{$col} = shift @row;
+  my $info = [ {}, {} ];
+  foreach my $as (@as) {
+    my $rs = $self;
+    my $target = $info;
+    my @parts = split(/\./, $as);
+    my $col = pop(@parts);
+    foreach my $p (@parts) {
+      $target = $target->[1]->{$p} ||= [];
+      
+      $rs = $rs->related_resultset($p) if $rs->{attrs}->{cache};
     }
+    
+    $target->[0]->{$col} = shift @row
+      if ref($target->[0]) ne 'ARRAY'; # arrayref is pre-inflated objects, do not overwrite
   }
-  my $new = $self->{source}->result_class->inflate_result(
-              $self->{source}, \%me, \%pre);
+  #use Data::Dumper; warn Dumper(\@as, $info);
+  my $new = $self->result_source->result_class->inflate_result(
+              $self->result_source, @$info);
   $new = $self->{attrs}{record_filter}->($new)
     if exists $self->{attrs}{record_filter};
+  if( $self->{attrs}->{cache} ) {
+    while( my( $rel, $rs ) = each( %{$self->{related_resultsets}} ) ) {
+      $rs->all;
+      #warn "$rel:", @{$rs->get_cache};
+    }
+    $self->build_rr( $self, $new );
+  }
   return $new;
 }
+  
+sub build_rr {
+  # build related resultsets for supplied object
+  my ( $self, $context, $obj ) = @_;
+  
+  my $re = qr/^\w+\./;
+  while( my ($rel, $rs) = each( %{$context->{related_resultsets}} ) ) {  
+    #warn "context:", $context->result_source->name, ", rel:$rel, rs:", $rs->result_source->name;
+    my @objs = ();
+    my $map = {};
+    my $cond = $context->result_source->relationship_info($rel)->{cond};
+    keys %$cond;
+    while( my( $rel_key, $pk ) = each(%$cond) ) {
+      $rel_key =~ s/$re//;
+      $pk =~ s/$re//;
+      $map->{$rel_key} = $pk;
+    }
+    
+    $rs->reset();
+    while( my $rel_obj = $rs->next ) {
+      while( my( $rel_key, $pk ) = each(%$map) ) {
+        if( $rel_obj->get_column($rel_key) eq $obj->get_column($pk) ) {
+          push @objs, $rel_obj;
+        }
+      }
+    }
+
+    my $rel_rs = $obj->related_resultset($rel);
+    $rel_rs->{attrs}->{cache} = 1;
+    $rel_rs->set_cache( \@objs );
+    
+    while( my $rel_obj = $rel_rs->next ) {
+      $self->build_rr( $rs, $rel_obj );
+    }
+    
+  }
+  
+}
+
+=head2 result_source
+
+Returns a reference to the result source for this recordset.
+
+=cut
+
 
 =head2 count
 
@@ -346,20 +489,50 @@ Performs an SQL C<COUNT> with the same query as the resultset was built
 with to find the number of elements. If passed arguments, does a search
 on the resultset and counts the results of that.
 
+Note: When using C<count> with C<group_by>, L<DBIX::Class> emulates C<GROUP BY>
+using C<COUNT( DISTINCT( columns ) )>. Some databases (notably SQLite) do
+not support C<DISTINCT> with multiple columns. If you are using such a
+database, you should only use columns from the main table in your C<group_by>
+clause.
+
 =cut
 
 sub count {
   my $self = shift;
   return $self->search(@_)->count if @_ && defined $_[0];
-  croak "Unable to ->count with a GROUP BY" if defined $self->{attrs}{group_by};
   unless (defined $self->{count}) {
-    my $attrs = { %{ $self->{attrs} },
-                  select => { 'count' => '*' },
-                  as => [ 'count' ] };
+    return scalar @{ $self->get_cache }
+      if @{ $self->get_cache };
+    my $group_by;
+    my $select = { 'count' => '*' };
+    my $attrs = { %{ $self->{attrs} } };
+    if( $group_by = delete $attrs->{group_by} ) {
+      delete $attrs->{having};
+      my @distinct = (ref $group_by ?  @$group_by : ($group_by));
+      # todo: try CONCAT for multi-column pk
+      my @pk = $self->result_source->primary_columns;
+      if( scalar(@pk) == 1 ) {
+        my $pk = shift(@pk);
+        my $alias = $attrs->{alias};
+        my $re = qr/^($alias\.)?$pk$/;
+        foreach my $column ( @distinct) {
+          if( $column =~ $re ) {
+            @distinct = ( $column );
+            last;
+          }
+        } 
+      }
+
+      $select = { count => { 'distinct' => \@distinct } };
+      #use Data::Dumper; die Dumper $select;
+    }
+
+    $attrs->{select} = $select;
+    $attrs->{as} = [ 'count' ];
     # offset, order by and page are not needed to count. record_filter is cdbi
     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
         
-    ($self->{count}) = (ref $self)->new($self->{source}, $attrs)->cursor->next;
+    ($self->{count}) = (ref $self)->new($self->result_source, $attrs)->cursor->next;
   }
   return 0 unless $self->{count};
   my $count = $self->{count};
@@ -386,6 +559,14 @@ is returned in list context.
 
 sub all {
   my ($self) = @_;
+  return @{ $self->get_cache }
+    if @{ $self->get_cache };
+  if( $self->{attrs}->{cache} ) {
+    my @obj = map { $self->_construct_object(@$_); }
+            $self->cursor->all;
+    $self->set_cache( \@obj );
+    return @obj;
+  }
   return map { $self->_construct_object(@$_); }
            $self->cursor->all;
 }
@@ -398,6 +579,7 @@ Resets the resultset's cursor, so you can iterate through the elements again.
 
 sub reset {
   my ($self) = @_;
+  $self->{all_cache_position} = 0;
   $self->cursor->reset;
   return $self;
 }
@@ -412,7 +594,9 @@ sub first {
   return $_[0]->reset->next;
 }
 
-=head2 update(\%values)
+=head2 update
+
+=head3 Arguments: (\%values)
 
 Sets the specified columns in the resultset to the supplied values.
 
@@ -420,12 +604,14 @@ Sets the specified columns in the resultset to the supplied values.
 
 sub update {
   my ($self, $values) = @_;
-  croak "Values for update must be a hash" unless ref $values eq 'HASH';
-  return $self->{source}->storage->update(
-           $self->{source}->from, $values, $self->{cond});
+  $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
+  return $self->result_source->storage->update(
+           $self->result_source->from, $values, $self->{cond});
 }
 
-=head2 update_all(\%values)
+=head2 update_all
+
+=head3 Arguments: (\%values)
 
 Fetches all objects and updates them one at a time.  Note that C<update_all>
 will run cascade triggers while L</update> will not.
@@ -434,7 +620,7 @@ will run cascade triggers while L</update> will not.
 
 sub update_all {
   my ($self, $values) = @_;
-  croak "Values for update must be a hash" unless ref $values eq 'HASH';
+  $self->throw_exception("Values for update must be a hash") unless ref $values eq 'HASH';
   foreach my $obj ($self->all) {
     $obj->set_columns($values)->update;
   }
@@ -449,7 +635,28 @@ Deletes the contents of the resultset from its result source.
 
 sub delete {
   my ($self) = @_;
-  $self->{source}->storage->delete($self->{source}->from, $self->{cond});
+  my $del = {};
+  $self->throw_exception("Can't delete on resultset with condition unless hash or array")
+    unless (ref($self->{cond}) eq 'HASH' || ref($self->{cond}) eq 'ARRAY');
+  if (ref $self->{cond} eq 'ARRAY') {
+    $del = [ map { my %hash;
+      foreach my $key (keys %{$_}) {
+        $key =~ /([^\.]+)$/;
+        $hash{$1} = $_->{$key};
+      }; \%hash; } @{$self->{cond}} ];
+  } elsif ((keys %{$self->{cond}})[0] eq '-and') {
+    $del->{-and} = [ map { my %hash;
+      foreach my $key (keys %{$_}) {
+        $key =~ /([^\.]+)$/;
+        $hash{$1} = $_->{$key};
+      }; \%hash; } @{$self->{cond}{-and}} ];
+  } else {
+    foreach my $key (keys %{$self->{cond}}) {
+      $key =~ /([^\.]+)$/;
+      $del->{$1} = $self->{cond}{$key};
+    }
+  }
+  $self->result_source->storage->delete($self->result_source->from, $del);
   return 1;
 }
 
@@ -476,14 +683,16 @@ sense for queries with a C<page> attribute.
 sub pager {
   my ($self) = @_;
   my $attrs = $self->{attrs};
-  croak "Can't create pager for non-paged rs" unless $self->{page};
+  $self->throw_exception("Can't create pager for non-paged rs") unless $self->{page};
   $attrs->{rows} ||= 10;
   $self->count;
   return $self->{pager} ||= Data::Page->new(
     $self->{count}, $attrs->{rows}, $self->{page});
 }
 
-=head2 page($page_num)
+=head2 page
+
+=head3 Arguments: ($page_num)
 
 Returns a new resultset for the specified page.
 
@@ -493,10 +702,12 @@ sub page {
   my ($self, $page) = @_;
   my $attrs = { %{$self->{attrs}} };
   $attrs->{page} = $page;
-  return (ref $self)->new($self->{source}, $attrs);
+  return (ref $self)->new($self->result_source, $attrs);
 }
 
-=head2 new_result(\%vals)
+=head2 new_result
+
+=head3 Arguments: (\%vals)
 
 Creates a result in the resultset's result class.
 
@@ -504,21 +715,23 @@ Creates a result in the resultset's result class.
 
 sub new_result {
   my ($self, $values) = @_;
-  $self->{source}->result_class->throw( "new_result needs a hash" )
+  $self->throw_exception( "new_result needs a hash" )
     unless (ref $values eq 'HASH');
-  $self->{source}->result_class->throw( "Can't abstract implicit construct, condition not a hash" )
+  $self->throw_exception( "Can't abstract implicit construct, condition not a hash" )
     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
   my %new = %$values;
   my $alias = $self->{attrs}{alias};
   foreach my $key (keys %{$self->{cond}||{}}) {
     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
   }
-  my $obj = $self->{source}->result_class->new(\%new);
-  $obj->result_source($self->{source}) if $obj->can('result_source');
+  my $obj = $self->result_source->result_class->new(\%new);
+  $obj->result_source($self->result_source) if $obj->can('result_source');
   $obj;
 }
 
-=head2 create(\%vals)
+=head2 create
+
+=head3 Arguments: (\%vals)
 
 Inserts a record into the resultset and returns the object.
 
@@ -528,16 +741,18 @@ Effectively a shortcut for C<< ->new_result(\%vals)->insert >>.
 
 sub create {
   my ($self, $attrs) = @_;
-  $self->{source}->result_class->throw( "create needs a hashref" ) unless ref $attrs eq 'HASH';
+  $self->throw_exception( "create needs a hashref" ) unless ref $attrs eq 'HASH';
   return $self->new_result($attrs)->insert;
 }
 
-=head2 find_or_create(\%vals, \%attrs?)
+=head2 find_or_create
+
+=head3 Arguments: (\%vals, \%attrs?)
 
   $class->find_or_create({ key => $val, ... });
 
-Searches for a record matching the search condition; if it doesn't find one,    
-creates one and returns that instead.                                       
+Searches for a record matching the search condition; if it doesn't find one,
+creates one and returns that instead.
 
   my $cd = $schema->resultset('CD')->find_or_create({
     cdid   => 5,
@@ -606,7 +821,7 @@ sub update_or_create {
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
 
-  my %unique_constraints = $self->{source}->unique_constraints;
+  my %unique_constraints = $self->result_source->unique_constraints;
   my @constraint_names   = (exists $attrs->{key}
                             ? ($attrs->{key})
                             : keys %unique_constraints);
@@ -639,6 +854,101 @@ sub update_or_create {
   return $row;
 }
 
+=head2 get_cache
+
+Gets the contents of the cache for the resultset.
+
+=cut
+
+sub get_cache {
+  my $self = shift;
+  return $self->{all_cache} || [];
+}
+
+=head2 set_cache
+
+Sets the contents of the cache for the resultset. Expects an arrayref of objects of the same class as those produced by the resultset.
+
+=cut
+
+sub set_cache {
+  my ( $self, $data ) = @_;
+  $self->throw_exception("set_cache requires an arrayref")
+    if ref $data ne 'ARRAY';
+  my $result_class = $self->result_source->result_class;
+  foreach( @$data ) {
+    $self->throw_exception("cannot cache object of type '$_', expected '$result_class'")
+      if ref $_ ne $result_class;
+  }
+  $self->{all_cache} = $data;
+}
+
+=head2 clear_cache
+
+Clears the cache for the resultset.
+
+=cut
+
+sub clear_cache {
+  my $self = shift;
+  $self->set_cache([]);
+}
+
+=head2 related_resultset
+
+Returns a related resultset for the supplied relationship name.
+
+  $rs = $rs->related_resultset('foo');
+
+=cut
+
+sub related_resultset {
+  my ( $self, $rel, @rest ) = @_;
+  $self->{related_resultsets} ||= {};
+  my $resultsets = $self->{related_resultsets};
+  if( !exists $resultsets->{$rel} ) {
+    #warn "fetching related resultset for rel '$rel'";
+    my $rel_obj = $self->result_source->relationship_info($rel);
+    $self->throw_exception(
+      "search_related: result source '" . $self->result_source->name .
+      "' has no such relationship ${rel}")
+      unless $rel_obj; #die Dumper $self->{attrs};
+    my $rs;
+    if( $self->{attrs}->{cache} ) {
+      $rs = $self->search(undef);
+    }
+    else {
+      $rs = $self->search(undef, { join => $rel });
+    }
+    #use Data::Dumper; die Dumper $rs->{attrs};#$rs = $self->search( undef );
+    #use Data::Dumper; warn Dumper $self->{attrs}, Dumper $rs->{attrs};
+    my $alias = (defined $rs->{attrs}{seen_join}{$rel}
+                  && $rs->{attrs}{seen_join}{$rel} > 1
+                ? join('_', $rel, $rs->{attrs}{seen_join}{$rel})
+                : $rel);
+    $resultsets->{$rel} =
+      $self->result_source->schema->resultset($rel_obj->{class}
+           )->search( undef,
+             { %{$rs->{attrs}},
+               alias => $alias,
+               select => undef(),
+               as => undef() }
+           )->search(@rest);
+  }
+  return $resultsets->{$rel};
+}
+
+=head2 throw_exception
+
+See Schema's throw_exception
+
+=cut
+
+sub throw_exception {
+  my $self=shift;
+  $self->result_source->schema->throw_exception(@_);
+}
+
 =head1 ATTRIBUTES
 
 The resultset takes various attributes that modify its behavior. Here's an
@@ -649,13 +959,27 @@ overview of them:
 Which column(s) to order the results by. This is currently passed through
 directly to SQL, so you can give e.g. C<foo DESC> for a descending order.
 
-=head2 cols (arrayref)
+=head2 cols
+
+=head3 Arguments: (arrayref)
 
 Shortcut to request a particular set of columns to be retrieved.  Adds
 C<me.> onto the start of any column without a C<.> in it and sets C<select>
 from that, then auto-populates C<as> from C<select> as normal.
 
-=head2 select (arrayref)
+=head2 include_columns
+
+=head3 Arguments: (arrayref)
+
+Shortcut to include additional columns in the returned results - for example
+
+  { include_columns => ['foo.name'], join => ['foo'] }
+
+would add a 'name' column to the information passed to object inflation
+
+=head2 select
+
+=head3 Arguments: (arrayref)
 
 Indicates which columns should be selected from the storage. You can use
 column names, or in the case of RDBMS back ends, function or stored procedure
@@ -676,7 +1000,9 @@ When you use function/stored procedure names and do not supply an C<as>
 attribute, the column names returned are storage-dependent. E.g. MySQL would
 return a column named C<count(column_to_count)> in the above example.
 
-=head2 as (arrayref)
+=head2 as
+
+=head3 Arguments: (arrayref)
 
 Indicates column names for object inflation. This is used in conjunction with
 C<select>, usually when C<select> contains one or more function or stored
@@ -740,18 +1066,60 @@ For example:
     }
   );
 
-If you want to fetch the columns from the related table as well, see
-C<prefetch> below.
+If the same join is supplied twice, it will be aliased to <rel>_2 (and
+similarly for a third time). For e.g.
+
+  my $rs = $schema->resultset('Artist')->search(
+    { 'cds.title'   => 'Foo',
+      'cds_2.title' => 'Bar' },
+    { join => [ qw/cds cds/ ] });
+
+will return a set of all artists that have both a cd with title Foo and a cd
+with title Bar.
+
+If you want to fetch related objects from other tables as well, see C<prefetch>
+below.
 
 =head2 prefetch
 
-Contains a list of relationships that should be fetched along with the main 
+=head3 Arguments: arrayref/hashref
+
+Contains one or more relationships that should be fetched along with the main 
 query (when they are accessed afterwards they will have already been
 "prefetched").  This is useful for when you know you will need the related
-objects, because it saves a query.  Currently limited to prefetching
-one relationship deep, so unlike C<join>, prefetch must be an arrayref.
+objects, because it saves at least one query:
+
+  my $rs = $schema->resultset('Tag')->search(
+    {},
+    {
+      prefetch => {
+        cd => 'artist'
+      }
+    }
+  );
+
+The initial search results in SQL like the following:
+
+  SELECT tag.*, cd.*, artist.* FROM tag
+  JOIN cd ON tag.cd = cd.cdid
+  JOIN artist ON cd.artist = artist.artistid
+
+L<DBIx::Class> has no need to go back to the database when we access the
+C<cd> or C<artist> relationships, which saves us two SQL statements in this
+case.
 
-=head2 from (arrayref)
+Simple prefetches will be joined automatically, so there is no need
+for a C<join> attribute in the above search. If you're prefetching to
+depth (e.g. { cd => { artist => 'label' } or similar), you'll need to
+specify the join as well.
+
+C<prefetch> can be used with the following relationship types: C<belongs_to>,
+C<has_one> (or if you're using C<add_relationship>, any relationship declared
+with an accessor type of 'single' or 'filter').
+
+=head2 from
+
+=head3 Arguments: (arrayref)
 
 The C<from> attribute gives you manual control over the C<FROM> clause of SQL
 statements generated by L<DBIx::Class>, allowing you to express custom C<JOIN>
@@ -803,7 +1171,7 @@ then search against all mothers of those children:
                       ]
                   ],
                   { 'mother.person_id' => 'child.mother_id' }
-              ],                
+              ],
           ]
       },
   );
@@ -851,10 +1219,11 @@ For a paged resultset, how many rows per page:
 
 Can also be used to simulate an SQL C<LIMIT>.
 
-=head2 group_by (arrayref)
+=head2 group_by
+
+=head3 Arguments: (arrayref)
 
-A arrayref of columns to group by. Can include columns of joined tables. Note
-note that L</count> doesn't work on grouped resultsets.
+A arrayref of columns to group by. Can include columns of joined tables.
 
   group_by => [qw/ column1 column2 ... /]