Fix MC double-object creation (important for e.g. IC::FS which otherwise leaves orpha...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Row.pm
index 554023b..cf4c24c 100644 (file)
@@ -4,9 +4,10 @@ use strict;
 use warnings;
 
 use base qw/DBIx::Class/;
-use Carp::Clan qw/^DBIx::Class/;
+
+use DBIx::Class::Exception;
 use Scalar::Util ();
-use Scope::Guard;
+use Try::Tiny;
 
 ###
 ### Internal method
@@ -105,26 +106,40 @@ with NULL as the default, and save yourself a SELECT.
 
 sub __new_related_find_or_new_helper {
   my ($self, $relname, $data) = @_;
-  if ($self->__their_pk_needs_us($relname, $data)) {
+
+  my $rsrc = $self->result_source;
+
+  # create a mock-object so all new/set_column component overrides will run:
+  my $rel_rs = $rsrc->related_source($relname)->resultset;
+  my $new_rel_obj = $rel_rs->new_result($data);
+  my $proc_data = { $new_rel_obj->get_columns };
+
+  if ($self->__their_pk_needs_us($relname)) {
     MULTICREATE_DEBUG and warn "MC $self constructing $relname via new_result";
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->new_result($data);
+    return $new_rel_obj;
   }
-  if ($self->result_source->_pk_depends_on($relname, $data)) {
-    MULTICREATE_DEBUG and warn "MC $self constructing $relname via find_or_new";
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->find_or_new($data);
+  elsif ($rsrc->_pk_depends_on($relname, $proc_data )) {
+    if (! keys %$proc_data) {
+      # there is nothing to search for - blind create
+      MULTICREATE_DEBUG and warn "MC $self constructing default-insert $relname";
+    }
+    else {
+      MULTICREATE_DEBUG and warn "MC $self constructing $relname via find_or_new";
+      # this is not *really* find or new, as we don't want to double-new the
+      # data (thus potentially double encoding or whatever)
+      my $exists = $rel_rs->find ($proc_data);
+      return $exists if $exists;
+    }
+    return $new_rel_obj;
+  }
+  else {
+    my $us = $rsrc->source_name;
+    $self->throw_exception ("'$us' neither depends nor is depended on by '$relname', something is wrong...");
   }
-  MULTICREATE_DEBUG and warn "MC $self constructing $relname via find_or_new_related";
-  return $self->find_or_new_related($relname, $data);
 }
 
 sub __their_pk_needs_us { # this should maybe be in resultsource.
-  my ($self, $relname, $data) = @_;
+  my ($self, $relname) = @_;
   my $source = $self->result_source;
   my $reverse = $source->reverse_relationship_info($relname);
   my $rel_source = $source->related_source($relname);
@@ -155,43 +170,40 @@ sub new {
     $new->result_source($source);
   }
 
-  if (my $related = delete $attrs->{-from_resultset}) {
+  if (my $related = delete $attrs->{-cols_from_relations}) {
     @{$new->{_ignore_at_insert}={}}{@$related} = ();
   }
 
   if ($attrs) {
     $new->throw_exception("attrs must be a hashref")
       unless ref($attrs) eq 'HASH';
-    
+
     my ($related,$inflated);
-    ## Pretend all the rels are actual objects, unset below if not, for insert() to fix
-    $new->{_rel_in_storage} = 1;
 
     foreach my $key (keys %$attrs) {
       if (ref $attrs->{$key}) {
         ## Can we extract this lot to use with update(_or .. ) ?
-        confess "Can't do multi-create without result source" unless $source;
+        $new->throw_exception("Can't do multi-create without result source")
+          unless $source;
         my $info = $source->relationship_info($key);
-        if ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'single')
-        {
+        my $acc_type = $info->{attrs}{accessor} || '';
+        if ($acc_type eq 'single') {
           my $rel_obj = delete $attrs->{$key};
           if(!Scalar::Util::blessed($rel_obj)) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
 
           if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
             $new->set_from_related($key, $rel_obj);
           } else {
-            $new->{_rel_in_storage} = 0;
             MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj\n";
           }
 
           $related->{$key} = $rel_obj;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-            && $info->{attrs}{accessor} eq 'multi'
-            && ref $attrs->{$key} eq 'ARRAY') {
+        }
+        elsif ($acc_type eq 'multi' && ref $attrs->{$key} eq 'ARRAY' ) {
           my $others = delete $attrs->{$key};
           my $total = @$others;
           my @objects;
@@ -202,27 +214,26 @@ sub new {
             }
 
             if ($rel_obj->in_storage) {
-              $new->set_from_related($key, $rel_obj);
+              $rel_obj->throw_exception ('A multi relationship can not be pre-existing when doing multicreate. Something went wrong');
             } else {
-              $new->{_rel_in_storage} = 0;
               MULTICREATE_DEBUG and
                 warn "MC $new uninserted $key $rel_obj (${\($idx+1)} of $total)\n";
             }
-            $new->set_from_related($key, $rel_obj) if $rel_obj->in_storage;
             push(@objects, $rel_obj);
           }
           $related->{$key} = \@objects;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'filter')
-        {
+        }
+        elsif ($acc_type eq 'filter') {
           ## 'filter' should disappear and get merged in with 'single' above!
           my $rel_obj = delete $attrs->{$key};
           if(!Scalar::Util::blessed($rel_obj)) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
-          unless ($rel_obj->in_storage) {
-            $new->{_rel_in_storage} = 0;
+          if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
+          }
+          else {
             MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj";
           }
           $inflated->{$key} = $rel_obj;
@@ -235,7 +246,7 @@ sub new {
       }
       $new->throw_exception("No such column $key on $class")
         unless $class->has_column($key);
-      $new->store_column($key => $attrs->{$key});          
+      $new->store_column($key => $attrs->{$key});
     }
 
     $new->{_relationship_data} = $related if $related;
@@ -283,41 +294,66 @@ sub insert {
   my $rollback_guard;
 
   # Check if we stored uninserted relobjs here in new()
-  my %related_stuff = (%{$self->{_relationship_data} || {}}, 
+  my %related_stuff = (%{$self->{_relationship_data} || {}},
                        %{$self->{_inflated_column} || {}});
 
-  if(!$self->{_rel_in_storage}) {
+  # insert what needs to be inserted before us
+  my %pre_insert;
+  for my $relname (keys %related_stuff) {
+    my $rel_obj = $related_stuff{$relname};
 
-    # The guard will save us if we blow out of this scope via die
-    $rollback_guard = $source->storage->txn_scope_guard;
+    if (! $self->{_rel_in_storage}{$relname}) {
+      next unless (Scalar::Util::blessed($rel_obj)
+                    && $rel_obj->isa('DBIx::Class::Row'));
 
-    ## Should all be in relationship_data, but we need to get rid of the
-    ## 'filter' reltype..
-    ## These are the FK rels, need their IDs for the insert.
+      next unless $source->_pk_depends_on(
+                    $relname, { $rel_obj->get_columns }
+                  );
 
-    my @pri = $self->primary_columns;
+      # The guard will save us if we blow out of this scope via die
+      $rollback_guard ||= $source->storage->txn_scope_guard;
 
-    REL: foreach my $relname (keys %related_stuff) {
+      MULTICREATE_DEBUG and warn "MC $self pre-reconstructing $relname $rel_obj\n";
+
+      my $them = { %{$rel_obj->{_relationship_data} || {} }, $rel_obj->get_columns };
+      my $existing;
 
-      my $rel_obj = $related_stuff{$relname};
+      # if there are no keys - nothing to search for
+      if (keys %$them and $existing = $self->result_source
+                                           ->related_source($relname)
+                                           ->resultset
+                                           ->find($them)
+      ) {
+        %{$rel_obj} = %{$existing};
+      }
+      else {
+        $rel_obj->insert;
+      }
 
-      next REL unless (Scalar::Util::blessed($rel_obj)
-                       && $rel_obj->isa('DBIx::Class::Row'));
+      $self->{_rel_in_storage}{$relname} = 1;
+    }
 
-      next REL unless $source->_pk_depends_on(
-                        $relname, { $rel_obj->get_columns }
-                      );
+    $self->set_from_related($relname, $rel_obj);
+    delete $related_stuff{$relname};
+  }
 
-      MULTICREATE_DEBUG and warn "MC $self pre-reconstructing $relname $rel_obj\n";
+  # start a transaction here if not started yet and there is more stuff
+  # to insert after us
+  if (keys %related_stuff) {
+    $rollback_guard ||= $source->storage->txn_scope_guard
+  }
 
-      my $them = { %{$rel_obj->{_relationship_data} || {} }, $rel_obj->get_inflated_columns };
-      my $re = $self->result_source
-                    ->related_source($relname)
-                    ->resultset
-                    ->find_or_create($them);
-      %{$rel_obj} = %{$re};
-      $self->set_from_related($relname, $rel_obj);
-      delete $related_stuff{$relname};
+  ## PK::Auto
+  my %auto_pri;
+  my $auto_idx = 0;
+  for ($self->primary_columns) {
+    if (
+      not defined $self->get_column($_)
+        ||
+      (ref($self->get_column($_)) eq 'SCALAR')
+    ) {
+      my $col_info = $source->column_info($_);
+      $auto_pri{$_} = $auto_idx++ unless $col_info->{auto_nextval};   # auto_nextval's are pre-fetched in the storage
     }
   }
 
@@ -325,75 +361,71 @@ sub insert {
     no warnings 'uninitialized';
     warn "MC $self inserting (".join(', ', $self->get_columns).")\n";
   };
-  my $updated_cols = $source->storage->insert($source, { $self->get_columns });
+  my $updated_cols = $source->storage->insert(
+    $source,
+    { $self->get_columns },
+    (keys %auto_pri) && $source->storage->_supports_insert_returning
+      ? { returning => [ sort { $auto_pri{$a} <=> $auto_pri{$b} } keys %auto_pri ] }
+      : ()
+    ,
+  );
+
   foreach my $col (keys %$updated_cols) {
     $self->store_column($col, $updated_cols->{$col});
+    delete $auto_pri{$col};
   }
 
-  ## PK::Auto
-  my @auto_pri = grep {
-                   !defined $self->get_column($_) || 
-                   ref($self->get_column($_)) eq 'SCALAR'
-                 } $self->primary_columns;
-
-  if (@auto_pri) {
-    #$self->throw_exception( "More than one possible key found for auto-inc on ".ref $self )
-    #  if defined $too_many;
-    MULTICREATE_DEBUG and warn "MC $self fetching missing PKs ".join(', ', @auto_pri)."\n";
+  if (keys %auto_pri) {
+    my @missing = sort { $auto_pri{$a} <=> $auto_pri{$b} } keys %auto_pri;
+    MULTICREATE_DEBUG and warn "MC $self fetching missing PKs ".join(', ', @missing )."\n";
     my $storage = $self->result_source->storage;
     $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
       unless $storage->can('last_insert_id');
-    my @ids = $storage->last_insert_id($self->result_source,@auto_pri);
+    my @ids = $storage->last_insert_id($self->result_source, @missing);
     $self->throw_exception( "Can't get last insert id" )
-      unless (@ids == @auto_pri);
-    $self->store_column($auto_pri[$_] => $ids[$_]) for 0 .. $#ids;
+      unless (@ids == @missing);
+    $self->store_column($missing[$_] => $ids[$_]) for 0 .. $#missing;
   }
 
-
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
 
-  if(!$self->{_rel_in_storage}) {
-    ## Now do the relationships that need our ID (has_many etc.)
-    foreach my $relname (keys %related_stuff) {
-      my $rel_obj = $related_stuff{$relname};
-      my @cands;
-      if (Scalar::Util::blessed($rel_obj)
-          && $rel_obj->isa('DBIx::Class::Row')) {
-        @cands = ($rel_obj);
-      } elsif (ref $rel_obj eq 'ARRAY') {
-        @cands = @$rel_obj;
-      }
-      if (@cands) {
-        my $reverse = $source->reverse_relationship_info($relname);
-        foreach my $obj (@cands) {
-          $obj->set_from_related($_, $self) for keys %$reverse;
-          my $them = { %{$obj->{_relationship_data} || {} }, $obj->get_inflated_columns };
-          if ($self->__their_pk_needs_us($relname, $them)) {
-            if (exists $self->{_ignore_at_insert}{$relname}) {
-              MULTICREATE_DEBUG and warn "MC $self skipping post-insert on $relname";
-            } else {
-              MULTICREATE_DEBUG and warn "MC $self re-creating $relname $obj";
-              my $re = $self->result_source
-                            ->related_source($relname)
-                            ->resultset
-                            ->find_or_create($them);
-              %{$obj} = %{$re};
-              MULTICREATE_DEBUG and warn "MC $self new $relname $obj";
-            }
-          } else {
-            MULTICREATE_DEBUG and warn "MC $self post-inserting $obj";
-            $obj->insert();
+  foreach my $relname (keys %related_stuff) {
+    next unless $source->has_relationship ($relname);
+
+    my @cands = ref $related_stuff{$relname} eq 'ARRAY'
+      ? @{$related_stuff{$relname}}
+      : $related_stuff{$relname}
+    ;
+
+    if (@cands
+          && Scalar::Util::blessed($cands[0])
+            && $cands[0]->isa('DBIx::Class::Row')
+    ) {
+      my $reverse = $source->reverse_relationship_info($relname);
+      foreach my $obj (@cands) {
+        $obj->set_from_related($_, $self) for keys %$reverse;
+        if ($self->__their_pk_needs_us($relname)) {
+          if (exists $self->{_ignore_at_insert}{$relname}) {
+            MULTICREATE_DEBUG and warn "MC $self skipping post-insert on $relname";
           }
+          else {
+            MULTICREATE_DEBUG and warn "MC $self inserting $relname $obj";
+            $obj->insert;
+          }
+        } else {
+          MULTICREATE_DEBUG and warn "MC $self post-inserting $obj";
+          $obj->insert();
         }
       }
     }
-    delete $self->{_ignore_at_insert};
-    $rollback_guard->commit;
   }
 
   $self->in_storage(1);
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
+  delete $self->{_ignore_at_insert};
+  $rollback_guard->commit if $rollback_guard;
+
   return $self;
 }
 
@@ -413,7 +445,7 @@ sub insert {
 Indicates whether the object exists as a row in the database or
 not. This is set to true when L<DBIx::Class::ResultSet/find>,
 L<DBIx::Class::ResultSet/create> or L<DBIx::Class::ResultSet/insert>
-are used. 
+are used.
 
 Creating a row object using L<DBIx::Class::ResultSet/new>, or calling
 L</delete> on one, sets it to false.
@@ -423,7 +455,7 @@ L</delete> on one, sets it to false.
 sub in_storage {
   my ($self, $val) = @_;
   $self->{_in_storage} = $val if @_ > 1;
-  return $self->{_in_storage};
+  return $self->{_in_storage} ? 1 : 0;
 }
 
 =head2 update
@@ -442,9 +474,13 @@ Throws an exception if the row object is not yet in the database,
 according to L</in_storage>.
 
 This method issues an SQL UPDATE query to commit any changes to the
-object to the database if required.
+object to the database if required (see L</get_dirty_columns>).
+It throws an exception if a proper WHERE clause uniquely identifying
+the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
-Also takes an optional hashref of C<< column_name => value> >> pairs
+Also takes an optional hashref of C<< column_name => value >> pairs
 to update on the object first. Be aware that the hashref will be
 passed to C<set_inflated_columns>, which might edit it in place, so
 don't rely on it being the same after a call to C<update>.  If you
@@ -452,7 +488,7 @@ need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<update>, e.g. ( { %{ $href } } )
 
 If the values passed or any of the column values set on the object
-contain scalar references, eg:
+contain scalar references, e.g.:
 
   $row->last_modified(\'NOW()');
   # OR
@@ -478,18 +514,21 @@ this method.
 
 sub update {
   my ($self, $upd) = @_;
-  $self->throw_exception( "Not in database" ) unless $self->in_storage;
-  my $ident_cond = $self->ident_condition;
-  $self->throw_exception("Cannot safely update a row in a PK-less table")
-    if ! keys %$ident_cond;
+
+  my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
 
   $self->set_inflated_columns($upd) if $upd;
   my %to_update = $self->get_dirty_columns;
   return $self unless keys %to_update;
+
+  $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
+  $self->throw_exception('Unable to update a row with incomplete or no identity')
+    if ! keys %$ident_cond;
+
   my $rows = $self->result_source->storage->update(
-               $self->result_source, \%to_update,
-               $self->{_orig_ident} || $ident_cond
-             );
+    $self->result_source, \%to_update, $ident_cond
+  );
   if ($rows == 0) {
     $self->throw_exception( "Can't update ${self}: row not found" );
   } elsif ($rows > 1) {
@@ -497,7 +536,7 @@ sub update {
   }
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
   return $self;
 }
 
@@ -514,19 +553,23 @@ sub update {
 =back
 
 Throws an exception if the object is not in the database according to
-L</in_storage>. Runs an SQL DELETE statement using the primary key
-values to locate the row.
+L</in_storage>. Also throws an exception if a proper WHERE clause
+uniquely identifying the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
 The object is still perfectly usable, but L</in_storage> will
 now return 0 and the object must be reinserted using L</insert>
-before it can be used to L</update> the row again. 
+before it can be used to L</update> the row again.
 
 If you delete an object in a class with a C<has_many> relationship, an
 attempt is made to delete all the related objects as well. To turn
 this behaviour off, pass C<< cascade_delete => 0 >> in the C<$attr>
 hashref of the relationship, see L<DBIx::Class::Relationship>. Any
 database-level cascade or restrict will take precedence over a
-DBIx-Class-based cascading delete. 
+DBIx-Class-based cascading delete, since DBIx-Class B<deletes the
+main row first> and only then attempts to delete any remaining related
+rows.
 
 If you delete an object within a txn_do() (see L<DBIx::Class::Storage/txn_do>)
 and the transaction subsequently fails, the row object will remain marked as
@@ -544,17 +587,19 @@ sub delete {
   my $self = shift;
   if (ref $self) {
     $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
     my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
-    $self->throw_exception("Cannot safely delete a row in a PK-less table")
+    $self->throw_exception('Unable to delete a row with incomplete or no identity')
       if ! keys %$ident_cond;
-    foreach my $column (keys %$ident_cond) {
-            $self->throw_exception("Can't delete the object unless it has loaded the primary keys")
-              unless exists $self->{_column_data}{$column};
-    }
+
     $self->result_source->storage->delete(
-      $self->result_source, $ident_cond);
+      $self->result_source, $ident_cond
+    );
+
+    delete $self->{_orig_ident};
     $self->in_storage(undef);
-  } else {
+  }
+  else {
     $self->throw_exception("Can't do class delete without a ResultSource instance")
       unless $self->can('result_source_instance');
     my $attrs = @_ > 1 && ref $_[$#_] eq 'HASH' ? { %{pop(@_)} } : {};
@@ -600,7 +645,7 @@ sub get_column {
   return $self->{_column_data}{$column} if exists $self->{_column_data}{$column};
   if (exists $self->{_inflated_column}{$column}) {
     return $self->store_column($column,
-      $self->_deflated_column($column, $self->{_inflated_column}{$column}));   
+      $self->_deflated_column($column, $self->{_inflated_column}{$column}));
   }
   $self->throw_exception( "No such column '${column}'" ) unless $self->has_column($column);
   return undef;
@@ -702,7 +747,7 @@ sub get_dirty_columns {
 Throws an exception if the column does not exist.
 
 Marks a column as having been changed regardless of whether it has
-really changed.  
+really changed.
 
 =cut
 sub make_column_dirty {
@@ -711,7 +756,7 @@ sub make_column_dirty {
   $self->throw_exception( "No such column '${column}'" )
     unless exists $self->{_column_data}{$column} || $self->has_column($column);
 
-  # the entire clean/dirty code relieas on exists, not on true/false
+  # the entire clean/dirty code relies on exists, not on true/false
   return 1 if exists $self->{_dirty_columns}{$column};
 
   $self->{_dirty_columns}{$column} = 1;
@@ -750,10 +795,41 @@ See L<DBIx::Class::InflateColumn> for how to setup inflation.
 
 sub get_inflated_columns {
   my $self = shift;
-  return map {
-    my $accessor = $self->column_info($_)->{'accessor'} || $_;
-    ($_ => $self->$accessor);
-  } grep $self->has_column_loaded($_), $self->columns;
+
+  my %loaded_colinfo = (map
+    { $_ => $self->column_info($_) }
+    (grep { $self->has_column_loaded($_) } $self->columns)
+  );
+
+  my %inflated;
+  for my $col (keys %loaded_colinfo) {
+    if (exists $loaded_colinfo{$col}{accessor}) {
+      my $acc = $loaded_colinfo{$col}{accessor};
+      $inflated{$col} = $self->$acc if defined $acc;
+    }
+    else {
+      $inflated{$col} = $self->$col;
+    }
+  }
+
+  # return all loaded columns with the inflations overlayed on top
+  return ($self->get_columns, %inflated);
+}
+
+sub _is_column_numeric {
+   my ($self, $column) = @_;
+    my $colinfo = $self->column_info ($column);
+
+    # cache for speed (the object may *not* have a resultsource instance)
+    if (not defined $colinfo->{is_numeric} && $self->_source_handle) {
+      $colinfo->{is_numeric} =
+        $self->result_source->schema->storage->is_datatype_numeric ($colinfo->{data_type})
+          ? 1
+          : 0
+        ;
+    }
+
+    return $colinfo->{is_numeric};
 }
 
 =head2 set_column
@@ -781,42 +857,21 @@ instead, see L</set_inflated_columns>.
 sub set_column {
   my ($self, $column, $new_value) = @_;
 
-  $self->{_orig_ident} ||= $self->ident_condition;
-  my $old_value = $self->get_column($column);
-
-  $self->store_column($column, $new_value);
-
-  my $dirty;
-  if (defined $old_value xor defined $new_value) {
-    $dirty = 1;
-  }
-  elsif (not defined $old_value) {  # both undef
-    $dirty = 0;
-  }
-  elsif ($old_value eq $new_value) {
-    $dirty = 0;
-  }
-  else {  # do a numeric comparison if datatype allows it
-    my $colinfo = $self->column_info ($column);
+  # if we can't get an ident condition on first try - mark the object as unidentifiable
+  $self->{_orig_ident} ||= (try { $self->ident_condition }) || {};
 
-    # cache for speed
-    if (not defined $colinfo->{is_numeric} && $self->_source_handle) {
-      $colinfo->{is_numeric} =
-        $self->result_source->schema->storage->is_datatype_numeric ($colinfo->{data_type})
-          ? 1
-          : 0
-        ;
-    }
+  my $old_value = $self->get_column($column);
+  $new_value = $self->store_column($column, $new_value);
 
-    if ($colinfo->{is_numeric}) {
-      $dirty = $old_value != $new_value;
-    }
-    else {
-      $dirty = 1;
-    }
-  }
+  my $dirty =
+    $self->{_dirty_columns}{$column}
+      ||
+    $self->in_storage # no point tracking dirtyness on uninserted data
+      ? ! $self->_eq_column_values ($column, $old_value, $new_value)
+      : 1
+  ;
 
-  # sadly the update code just checks for keys, not for their value
+  # FIXME sadly the update code just checks for keys, not for their value
   $self->{_dirty_columns}{$column} = 1 if $dirty;
 
   # XXX clear out the relation cache for this column
@@ -825,11 +880,31 @@ sub set_column {
   return $new_value;
 }
 
+sub _eq_column_values {
+  my ($self, $col, $old, $new) = @_;
+
+  if (defined $old xor defined $new) {
+    return 0;
+  }
+  elsif (not defined $old) {  # both undef
+    return 1;
+  }
+  elsif ($old eq $new) {
+    return 1;
+  }
+  elsif ($self->_is_column_numeric($col)) {  # do a numeric comparison if datatype allows it
+    return $old == $new;
+  }
+  else {
+    return 0;
+  }
+}
+
 =head2 set_columns
 
   $row->set_columns({ $col => $val, ... });
 
-=over 
+=over
 
 =item Arguments: \%columndata
 
@@ -864,7 +939,7 @@ sub set_columns {
 =back
 
 Sets more than one column value at once. Any inflated values are
-deflated and the raw values stored. 
+deflated and the raw values stored.
 
 Any related values passed as Row objects, using the relation name as a
 key, are reduced to the appropriate foreign key values and stored. If
@@ -875,7 +950,7 @@ Will even accept arrayrefs of data as a value to a
 L<DBIx::Class::Relationship/has_many> key, and create the related
 objects if necessary.
 
-Be aware that the input hashref might be edited in place, so dont rely
+Be aware that the input hashref might be edited in place, so don't rely
 on it being the same after a call to C<set_inflated_columns>. If you
 need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<set_inflated_columns>, e.g. ( { %{ $href } } )
@@ -889,26 +964,23 @@ sub set_inflated_columns {
   foreach my $key (keys %$upd) {
     if (ref $upd->{$key}) {
       my $info = $self->relationship_info($key);
-      if ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'single')
-      {
+      my $acc_type = $info->{attrs}{accessor} || '';
+      if ($acc_type eq 'single') {
         my $rel = delete $upd->{$key};
         $self->set_from_related($key => $rel);
         $self->{_relationship_data}{$key} = $rel;
-      } elsif ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'multi') {
-          $self->throw_exception(
-            "Recursive update is not supported over relationships of type multi ($key)"
-          );
       }
-      elsif ($self->has_column($key)
-        && exists $self->column_info($key)->{_inflate_info})
-      {
+      elsif ($acc_type eq 'multi') {
+        $self->throw_exception(
+          "Recursive update is not supported over relationships of type '$acc_type' ($key)"
+        );
+      }
+      elsif ($self->has_column($key) && exists $self->column_info($key)->{_inflate_info}) {
         $self->set_inflated_column($key, delete $upd->{$key});
       }
     }
   }
-  $self->set_columns($upd);    
+  $self->set_columns($upd);
 }
 
 =head2 copy
@@ -932,7 +1004,7 @@ so that the database can insert its own autoincremented values into
 the new object.
 
 Relationships will be followed by the copy procedure B<only> if the
-relationship specifes a true value for its
+relationship specifies a true value for its
 L<cascade_copy|DBIx::Class::Relationship::Base> attribute. C<cascade_copy>
 is set by default on C<has_many> relationships and unset on all others.
 
@@ -954,8 +1026,8 @@ sub copy {
   $new->set_inflated_columns($changes);
   $new->insert;
 
-  # Its possible we'll have 2 relations to the same Source. We need to make 
-  # sure we don't try to insert the same row twice esle we'll violate unique
+  # Its possible we'll have 2 relations to the same Source. We need to make
+  # sure we don't try to insert the same row twice else we'll violate unique
   # constraints
   my $rels_copied = {};
 
@@ -963,7 +1035,7 @@ sub copy {
     my $rel_info = $self->result_source->relationship_info($rel);
 
     next unless $rel_info->{attrs}{cascade_copy};
-  
+
     my $resolved = $self->result_source->_resolve_condition(
       $rel_info->{cond}, $rel, $new
     );
@@ -975,7 +1047,7 @@ sub copy {
       $copied->{$id_str} = 1;
       my $rel_copy = $related->copy($resolved);
     }
+
   }
   return $new;
 }
@@ -1042,56 +1114,68 @@ sub inflate_result {
   my ($source_handle) = $source;
 
   if ($source->isa('DBIx::Class::ResultSourceHandle')) {
-      $source = $source_handle->resolve
-  } else {
-      $source_handle = $source->handle
+    $source = $source_handle->resolve
+  } 
+  else {
+    $source_handle = $source->handle
   }
 
   my $new = {
     _source_handle => $source_handle,
     _column_data => $me,
-    _in_storage => 1
   };
   bless $new, (ref $class || $class);
 
-  my $schema;
   foreach my $pre (keys %{$prefetch||{}}) {
-    my $pre_val = $prefetch->{$pre};
-    my $pre_source = $source->related_source($pre);
-    $class->throw_exception("Can't prefetch non-existent relationship ${pre}")
-      unless $pre_source;
-    if (ref($pre_val->[0]) eq 'ARRAY') { # multi
-      my @pre_objects;
-      foreach my $pre_rec (@$pre_val) {
-        unless ($pre_source->primary_columns == grep { exists $pre_rec->[0]{$_}
-           and defined $pre_rec->[0]{$_} } $pre_source->primary_columns) {
-          next;
+
+    my $pre_source = $source->related_source($pre)
+      or $class->throw_exception("Can't prefetch non-existent relationship ${pre}");
+
+    my $accessor = $source->relationship_info($pre)->{attrs}{accessor}
+      or $class->throw_exception("No accessor for prefetched $pre");
+
+    my @pre_vals;
+    if (ref $prefetch->{$pre}[0] eq 'ARRAY') {
+      @pre_vals = @{$prefetch->{$pre}};
+    }
+    elsif ($accessor eq 'multi') {
+      $class->throw_exception("Implicit prefetch (via select/columns) not supported with accessor 'multi'");
+    }
+    else {
+      @pre_vals = $prefetch->{$pre};
+    }
+
+    my @pre_objects;
+    for my $me_pref (@pre_vals) {
+
+        # FIXME - this should not be necessary
+        # the collapser currently *could* return bogus elements with all
+        # columns set to undef
+        my $has_def;
+        for (values %{$me_pref->[0]}) {
+          if (defined $_) {
+            $has_def++;
+            last;
+          }
         }
-        push(@pre_objects, $pre_source->result_class->inflate_result(
-                             $pre_source, @{$pre_rec}));
-      }
-      $new->related_resultset($pre)->set_cache(\@pre_objects);
-    } elsif (defined $pre_val->[0]) {
-      my $fetched;
-      unless ($pre_source->primary_columns == grep { exists $pre_val->[0]{$_}
-         and !defined $pre_val->[0]{$_} } $pre_source->primary_columns)
-      {
-        $fetched = $pre_source->result_class->inflate_result(
-                      $pre_source, @{$pre_val});
-      }
-      my $accessor = $source->relationship_info($pre)->{attrs}{accessor};
-      $class->throw_exception("No accessor for prefetched $pre")
-       unless defined $accessor;
-      if ($accessor eq 'single') {
-        $new->{_relationship_data}{$pre} = $fetched;
-      } elsif ($accessor eq 'filter') {
-        $new->{_inflated_column}{$pre} = $fetched;
-      } else {
-       $class->throw_exception("Prefetch not supported with accessor '$accessor'");
-      }
-      $new->related_resultset($pre)->set_cache([ $fetched ]);
+        next unless $has_def;
+
+        push @pre_objects, $pre_source->result_class->inflate_result(
+          $pre_source, @$me_pref
+        );
+    }
+
+    if ($accessor eq 'single') {
+      $new->{_relationship_data}{$pre} = $pre_objects[0];
     }
+    elsif ($accessor eq 'filter') {
+      $new->{_inflated_column}{$pre} = $pre_objects[0];
+    }
+
+    $new->related_resultset($pre)->set_cache(\@pre_objects);
   }
+
+  $new->in_storage (1);
   return $new;
 }
 
@@ -1240,8 +1324,11 @@ sub register_column {
 =back
 
 Fetches a fresh copy of the Row object from the database and returns it.
-
-If passed the \%attrs argument, will first apply these attributes to
+Throws an exception if a proper WHERE clause identifying the database row
+can not be constructed (i.e. if the original object does not contain its
+entire
+ L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+). If passed the \%attrs argument, will first apply these attributes to
 the resultset used to find the row.
 
 This copy can then be used to compare to an existing row object, to
@@ -1260,14 +1347,61 @@ sub get_from_storage {
     my $self = shift @_;
     my $attrs = shift @_;
     my $resultset = $self->result_source->resultset;
-    
+
     if(defined $attrs) {
-       $resultset = $resultset->search(undef, $attrs);
+      $resultset = $resultset->search(undef, $attrs);
     }
-    
-    return $resultset->find($self->{_orig_ident} || $self->ident_condition);
+
+    my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
+
+    $self->throw_exception('Unable to requery a row with incomplete or no identity')
+      if ! keys %$ident_cond;
+
+    return $resultset->find($ident_cond);
 }
 
+=head2 discard_changes ($attrs)
+
+Re-selects the row from the database, losing any changes that had
+been made. Throws an exception if a proper WHERE clause identifying
+the database row can not be constructed (i.e. if the original object
+does not contain its entire
+L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+).
+
+This method can also be used to refresh from storage, retrieving any
+changes made since the row was last read from storage.
+
+$attrs is expected to be a hashref of attributes suitable for passing as the
+second argument to $resultset->search($cond, $attrs);
+
+=cut
+
+sub discard_changes {
+  my ($self, $attrs) = @_;
+  return unless $self->in_storage; # Don't reload if we aren't real!
+
+  # add a replication default to read from the master only
+  $attrs = { force_pool => 'master', %{$attrs||{}} };
+
+  if( my $current_storage = $self->get_from_storage($attrs)) {
+
+    # Set $self to the current.
+    %$self = %$current_storage;
+
+    # Avoid a possible infinite loop with
+    # sub DESTROY { $_[0]->discard_changes }
+    bless $current_storage, 'Do::Not::Exist';
+
+    return $self;
+  }
+  else {
+    $self->in_storage(0);
+    return $self;
+  }
+}
+
+
 =head2 throw_exception
 
 See L<DBIx::Class::Schema/throw_exception>.
@@ -1276,10 +1410,12 @@ See L<DBIx::Class::Schema/throw_exception>.
 
 sub throw_exception {
   my $self=shift;
+
   if (ref $self && ref $self->result_source && $self->result_source->schema) {
-    $self->result_source->schema->throw_exception(@_);
-  } else {
-    croak(@_);
+    $self->result_source->schema->throw_exception(@_)
+  }
+  else {
+    DBIx::Class::Exception->throw(@_);
   }
 }
 
@@ -1317,6 +1453,13 @@ This method can also be used to refresh from storage, retrieving any
 changes made since the row was last read from storage. Actually
 implemented in L<DBIx::Class::PK>
 
+Note: If you are using L<DBIx::Class::Storage::DBI::Replicated> as your
+storage, please kept in mind that if you L</discard_changes> on a row that you
+just updated or created, you should wrap the entire bit inside a transaction.
+Otherwise you run the risk that you insert or update to the master database
+but read from a replicant database that has not yet been updated from the
+master.  This will result in unexpected results.
+
 =cut
 
 1;