Refactor insert logic (Row should not handle SQLA options)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Row.pm
index abc34ab..b8c4da7 100644 (file)
@@ -4,10 +4,16 @@ use strict;
 use warnings;
 
 use base qw/DBIx::Class/;
-use Carp::Clan qw/^DBIx::Class/;
-use Scalar::Util ();
-use Scope::Guard;
 
+use DBIx::Class::Exception;
+use Scalar::Util 'blessed';
+use Try::Tiny;
+use namespace::clean;
+
+###
+### Internal method
+### Do not use
+###
 BEGIN {
   *MULTICREATE_DEBUG =
     $ENV{DBIC_MULTICREATE_DEBUG}
@@ -73,6 +79,23 @@ passed objects.
 
 For a more involved explanation, see L<DBIx::Class::ResultSet/create>.
 
+Please note that if a value is not passed to new, no value will be sent
+in the SQL INSERT call, and the column will therefore assume whatever
+default value was specified in your database. While DBIC will retrieve the
+value of autoincrement columns, it will never make an explicit database
+trip to retrieve default values assigned by the RDBMS. You can explicitly
+request that all values be fetched back from the database by calling
+L</discard_changes>, or you can supply an explicit C<undef> to columns
+with NULL as the default, and save yourself a SELECT.
+
+ CAVEAT:
+
+ The behavior described above will backfire if you use a foreign key column
+ with a database-defined default. If you call the relationship accessor on
+ an object that doesn't have a set value for the FK column, DBIC will throw
+ an exception, as it has no way of knowing the PK of the related object (if
+ there is one).
+
 =cut
 
 ## It needs to store the new objects somewhere, and call insert on that list later when insert is called on this object. We may need an accessor for these so the user can retrieve them, if just doing ->new().
@@ -84,23 +107,43 @@ For a more involved explanation, see L<DBIx::Class::ResultSet/create>.
 
 sub __new_related_find_or_new_helper {
   my ($self, $relname, $data) = @_;
-  if ($self->__their_pk_needs_us($relname, $data)) {
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->new_result($data);
+
+  my $rsrc = $self->result_source;
+
+  # create a mock-object so all new/set_column component overrides will run:
+  my $rel_rs = $rsrc->related_source($relname)->resultset;
+  my $new_rel_obj = $rel_rs->new_result($data);
+  my $proc_data = { $new_rel_obj->get_columns };
+
+  if ($self->__their_pk_needs_us($relname)) {
+    MULTICREATE_DEBUG and warn "MC $self constructing $relname via new_result";
+    return $new_rel_obj;
+  }
+  elsif ($rsrc->_pk_depends_on($relname, $proc_data )) {
+    if (! keys %$proc_data) {
+      # there is nothing to search for - blind create
+      MULTICREATE_DEBUG and warn "MC $self constructing default-insert $relname";
+    }
+    else {
+      MULTICREATE_DEBUG and warn "MC $self constructing $relname via find_or_new";
+      # this is not *really* find or new, as we don't want to double-new the
+      # data (thus potentially double encoding or whatever)
+      my $exists = $rel_rs->find ($proc_data);
+      return $exists if $exists;
+    }
+    return $new_rel_obj;
   }
-  if ($self->result_source->pk_depends_on($relname, $data)) {
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->find_or_create($data);
+  else {
+    my $us = $rsrc->source_name;
+    $self->throw_exception (
+      "Unable to determine relationship '$relname' direction from '$us', "
+    . "possibly due to a missing reverse-relationship on '$relname' to '$us'."
+    );
   }
-  return $self->find_or_new_related($relname, $data);
 }
 
 sub __their_pk_needs_us { # this should maybe be in resultsource.
-  my ($self, $relname, $data) = @_;
+  my ($self, $relname) = @_;
   my $source = $self->result_source;
   my $reverse = $source->reverse_relationship_info($relname);
   my $rel_source = $source->related_source($relname);
@@ -108,7 +151,7 @@ sub __their_pk_needs_us { # this should maybe be in resultsource.
   foreach my $key (keys %$reverse) {
     # if their primary key depends on us, then we have to
     # just create a result and we'll fill it out afterwards
-    return 1 if $rel_source->pk_depends_on($key, $us);
+    return 1 if $rel_source->_pk_depends_on($key, $us);
   }
   return 0;
 }
@@ -131,71 +174,71 @@ sub new {
     $new->result_source($source);
   }
 
+  if (my $related = delete $attrs->{-cols_from_relations}) {
+    @{$new->{_ignore_at_insert}={}}{@$related} = ();
+  }
+
   if ($attrs) {
     $new->throw_exception("attrs must be a hashref")
       unless ref($attrs) eq 'HASH';
-    
+
     my ($related,$inflated);
-    ## Pretend all the rels are actual objects, unset below if not, for insert() to fix
-    $new->{_rel_in_storage} = 1;
 
     foreach my $key (keys %$attrs) {
       if (ref $attrs->{$key}) {
         ## Can we extract this lot to use with update(_or .. ) ?
-        confess "Can't do multi-create without result source" unless $source;
+        $new->throw_exception("Can't do multi-create without result source")
+          unless $source;
         my $info = $source->relationship_info($key);
-        if ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'single')
-        {
+        my $acc_type = $info->{attrs}{accessor} || '';
+        if ($acc_type eq 'single') {
           my $rel_obj = delete $attrs->{$key};
-          if(!Scalar::Util::blessed($rel_obj)) {
+          if(!blessed $rel_obj) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
 
           if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
             $new->set_from_related($key, $rel_obj);
           } else {
-            $new->{_rel_in_storage} = 0;
-            MULTICREATE_DEBUG and warn "MC $new: uninserted $key $rel_obj\n";
+            MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj\n";
           }
 
           $related->{$key} = $rel_obj;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-            && $info->{attrs}{accessor} eq 'multi'
-            && ref $attrs->{$key} eq 'ARRAY') {
+        }
+        elsif ($acc_type eq 'multi' && ref $attrs->{$key} eq 'ARRAY' ) {
           my $others = delete $attrs->{$key};
           my $total = @$others;
           my @objects;
           foreach my $idx (0 .. $#$others) {
             my $rel_obj = $others->[$idx];
-            if(!Scalar::Util::blessed($rel_obj)) {
+            if(!blessed $rel_obj) {
               $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
             }
 
             if ($rel_obj->in_storage) {
-              $new->set_from_related($key, $rel_obj);
+              $rel_obj->throw_exception ('A multi relationship can not be pre-existing when doing multicreate. Something went wrong');
             } else {
-              $new->{_rel_in_storage} = 0;
               MULTICREATE_DEBUG and
-                warn "MC $new: uninserted $key $rel_obj ($idx of $total)\n";
+                warn "MC $new uninserted $key $rel_obj (${\($idx+1)} of $total)\n";
             }
-            $new->set_from_related($key, $rel_obj) if $rel_obj->in_storage;
             push(@objects, $rel_obj);
           }
           $related->{$key} = \@objects;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'filter')
-        {
+        }
+        elsif ($acc_type eq 'filter') {
           ## 'filter' should disappear and get merged in with 'single' above!
           my $rel_obj = delete $attrs->{$key};
-          if(!Scalar::Util::blessed($rel_obj)) {
+          if(!blessed $rel_obj) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
-          unless ($rel_obj->in_storage) {
-            $new->{_rel_in_storage} = 0;
-            MULTICREATE_DEBUG and warn "MC $new: uninserted $key $rel_obj";
+          if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
+          }
+          else {
+            MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj";
           }
           $inflated->{$key} = $rel_obj;
           next;
@@ -207,7 +250,7 @@ sub new {
       }
       $new->throw_exception("No such column $key on $class")
         unless $class->has_column($key);
-      $new->store_column($key => $attrs->{$key});          
+      $new->store_column($key => $attrs->{$key});
     }
 
     $new->{_relationship_data} = $related if $related;
@@ -252,105 +295,138 @@ sub insert {
   $self->throw_exception("No result_source set on this object; can't insert")
     unless $source;
 
+  my $storage = $source->storage;
+
   my $rollback_guard;
 
   # Check if we stored uninserted relobjs here in new()
-  my %related_stuff = (%{$self->{_relationship_data} || {}}, 
+  my %related_stuff = (%{$self->{_relationship_data} || {}},
                        %{$self->{_inflated_column} || {}});
 
-  if(!$self->{_rel_in_storage}) {
+  # insert what needs to be inserted before us
+  my %pre_insert;
+  for my $relname (keys %related_stuff) {
+    my $rel_obj = $related_stuff{$relname};
 
-    # The guard will save us if we blow out of this scope via die
-    $rollback_guard = $source->storage->txn_scope_guard;
+    if (! $self->{_rel_in_storage}{$relname}) {
+      next unless (blessed $rel_obj && $rel_obj->isa('DBIx::Class::Row'));
 
-    ## Should all be in relationship_data, but we need to get rid of the
-    ## 'filter' reltype..
-    ## These are the FK rels, need their IDs for the insert.
+      next unless $source->_pk_depends_on(
+                    $relname, { $rel_obj->get_columns }
+                  );
 
-    my @pri = $self->primary_columns;
+      # The guard will save us if we blow out of this scope via die
+      $rollback_guard ||= $storage->txn_scope_guard;
 
-    REL: foreach my $relname (keys %related_stuff) {
+      MULTICREATE_DEBUG and warn "MC $self pre-reconstructing $relname $rel_obj\n";
 
-      my $rel_obj = $related_stuff{$relname};
+      my $them = { %{$rel_obj->{_relationship_data} || {} }, $rel_obj->get_columns };
+      my $existing;
 
-      next REL unless (Scalar::Util::blessed($rel_obj)
-                       && $rel_obj->isa('DBIx::Class::Row'));
+      # if there are no keys - nothing to search for
+      if (keys %$them and $existing = $self->result_source
+                                           ->related_source($relname)
+                                           ->resultset
+                                           ->find($them)
+      ) {
+        %{$rel_obj} = %{$existing};
+      }
+      else {
+        $rel_obj->insert;
+      }
 
-      next REL unless $source->pk_depends_on(
-                        $relname, { $rel_obj->get_columns }
-                      );
+      $self->{_rel_in_storage}{$relname} = 1;
+    }
 
-      MULTICREATE_DEBUG and warn "MC $self pre-inserting $relname $rel_obj\n";
+    $self->set_from_related($relname, $rel_obj);
+    delete $related_stuff{$relname};
+  }
 
-      $rel_obj->insert();
-      $self->set_from_related($relname, $rel_obj);
-      delete $related_stuff{$relname};
-    }
+  # start a transaction here if not started yet and there is more stuff
+  # to insert after us
+  if (keys %related_stuff) {
+    $rollback_guard ||= $storage->txn_scope_guard
   }
 
-  MULTICREATE_DEBUG and warn "MC $self inserting self\n";
-  my $updated_cols = $source->storage->insert($source, { $self->get_columns });
-  foreach my $col (keys %$updated_cols) {
-    $self->store_column($col, $updated_cols->{$col});
+  MULTICREATE_DEBUG and do {
+    no warnings 'uninitialized';
+    warn "MC $self inserting (".join(', ', $self->get_columns).")\n";
+  };
+
+  my %current_rowdata = $self->get_columns;
+
+  # perform the insert - the storage may return some stuff for us right there
+  #
+  my $returned_cols = $storage->insert(
+    $source,
+    \%current_rowdata,
+  );
+
+  for (keys %$returned_cols) {
+    $self->store_column(
+      $_,
+      ( $current_rowdata{$_} = $returned_cols->{$_} )
+    );
   }
 
-  ## PK::Auto
-  my @auto_pri = grep {
-                   !defined $self->get_column($_) || 
-                   ref($self->get_column($_)) eq 'SCALAR'
-                 } $self->primary_columns;
-
-  if (@auto_pri) {
-    #$self->throw_exception( "More than one possible key found for auto-inc on ".ref $self )
-    #  if defined $too_many;
-    MULTICREATE_DEBUG and warn "MC $self fetching missing PKs ".join(', ', @auto_pri)."\n";
-    my $storage = $self->result_source->storage;
+  # see if any of the pcols still need filling (or re-querying in case of scalarrefs)
+  my @missing_pri = grep
+    { ! defined $current_rowdata{$_} or ref $current_rowdata{$_} eq 'SCALAR' }
+    $self->primary_columns
+  ;
+
+  if (@missing_pri) {
+    MULTICREATE_DEBUG and warn "MC $self fetching missing PKs ".join(', ', @missing_pri )."\n";
+
     $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
       unless $storage->can('last_insert_id');
-    my @ids = $storage->last_insert_id($self->result_source,@auto_pri);
+
+    my @pri_values = $storage->last_insert_id($self->result_source, @missing_pri);
+
     $self->throw_exception( "Can't get last insert id" )
-      unless (@ids == @auto_pri);
-    $self->store_column($auto_pri[$_] => $ids[$_]) for 0 .. $#ids;
-#use Data::Dumper; warn Dumper($self);
-  }
+      unless (@pri_values == @missing_pri);
 
+    $self->store_column($missing_pri[$_] => $pri_values[$_]) for 0 .. $#missing_pri;
+  }
 
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
 
-  if(!$self->{_rel_in_storage}) {
-    ## Now do the has_many rels, that need $selfs ID.
-    foreach my $relname (keys %related_stuff) {
-      my $rel_obj = $related_stuff{$relname};
-      my @cands;
-      if (Scalar::Util::blessed($rel_obj)
-          && $rel_obj->isa('DBIx::Class::Row')) {
-        @cands = ($rel_obj);
-      } elsif (ref $rel_obj eq 'ARRAY') {
-        @cands = @$rel_obj;
-      }
-      if (@cands) {
-        my $reverse = $source->reverse_relationship_info($relname);
-        foreach my $obj (@cands) {
-          $obj->set_from_related($_, $self) for keys %$reverse;
-          my $them = { %{$obj->{_relationship_data} || {} }, $obj->get_inflated_columns };
-          if ($self->__their_pk_needs_us($relname, $them)) {
-            MULTICREATE_DEBUG and warn "MC $self re-creating $relname $obj";
-            my $re = $self->find_or_create_related($relname, $them);
-            $obj->{_column_data} = $re->{_column_data};
-            MULTICREATE_DEBUG and warn "MC $self new $relname $obj";
-          } else {
-            MULTICREATE_DEBUG and warn "MC $self post-inserting $obj";
-            $obj->insert();
+  foreach my $relname (keys %related_stuff) {
+    next unless $source->has_relationship ($relname);
+
+    my @cands = ref $related_stuff{$relname} eq 'ARRAY'
+      ? @{$related_stuff{$relname}}
+      : $related_stuff{$relname}
+    ;
+
+    if (@cands && blessed $cands[0] && $cands[0]->isa('DBIx::Class::Row')
+    ) {
+      my $reverse = $source->reverse_relationship_info($relname);
+      foreach my $obj (@cands) {
+        $obj->set_from_related($_, $self) for keys %$reverse;
+        if ($self->__their_pk_needs_us($relname)) {
+          if (exists $self->{_ignore_at_insert}{$relname}) {
+            MULTICREATE_DEBUG and warn "MC $self skipping post-insert on $relname";
           }
+          else {
+            MULTICREATE_DEBUG and warn "MC $self inserting $relname $obj";
+            $obj->insert;
+          }
+        } else {
+          MULTICREATE_DEBUG and warn "MC $self post-inserting $obj";
+          $obj->insert();
         }
       }
     }
-    $rollback_guard->commit;
   }
 
   $self->in_storage(1);
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
+  delete $self->{_orig_ident_failreason};
+  delete $self->{_ignore_at_insert};
+  $rollback_guard->commit if $rollback_guard;
+
   return $self;
 }
 
@@ -370,7 +446,7 @@ sub insert {
 Indicates whether the object exists as a row in the database or
 not. This is set to true when L<DBIx::Class::ResultSet/find>,
 L<DBIx::Class::ResultSet/create> or L<DBIx::Class::ResultSet/insert>
-are used. 
+are used.
 
 Creating a row object using L<DBIx::Class::ResultSet/new>, or calling
 L</delete> on one, sets it to false.
@@ -380,7 +456,7 @@ L</delete> on one, sets it to false.
 sub in_storage {
   my ($self, $val) = @_;
   $self->{_in_storage} = $val if @_ > 1;
-  return $self->{_in_storage};
+  return $self->{_in_storage} ? 1 : 0;
 }
 
 =head2 update
@@ -399,9 +475,13 @@ Throws an exception if the row object is not yet in the database,
 according to L</in_storage>.
 
 This method issues an SQL UPDATE query to commit any changes to the
-object to the database if required.
+object to the database if required (see L</get_dirty_columns>).
+It throws an exception if a proper WHERE clause uniquely identifying
+the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
-Also takes an optional hashref of C<< column_name => value> >> pairs
+Also takes an optional hashref of C<< column_name => value >> pairs
 to update on the object first. Be aware that the hashref will be
 passed to C<set_inflated_columns>, which might edit it in place, so
 don't rely on it being the same after a call to C<update>.  If you
@@ -409,7 +489,7 @@ need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<update>, e.g. ( { %{ $href } } )
 
 If the values passed or any of the column values set on the object
-contain scalar references, eg:
+contain scalar references, e.g.:
 
   $row->last_modified(\'NOW()');
   # OR
@@ -435,18 +515,21 @@ this method.
 
 sub update {
   my ($self, $upd) = @_;
-  $self->throw_exception( "Not in database" ) unless $self->in_storage;
-  my $ident_cond = $self->ident_condition;
-  $self->throw_exception("Cannot safely update a row in a PK-less table")
-    if ! keys %$ident_cond;
+
+  my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
 
   $self->set_inflated_columns($upd) if $upd;
   my %to_update = $self->get_dirty_columns;
   return $self unless keys %to_update;
+
+  $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
+  $self->throw_exception($self->{_orig_ident_failreason})
+    if ! keys %$ident_cond;
+
   my $rows = $self->result_source->storage->update(
-               $self->result_source, \%to_update,
-               $self->{_orig_ident} || $ident_cond
-             );
+    $self->result_source, \%to_update, $ident_cond
+  );
   if ($rows == 0) {
     $self->throw_exception( "Can't update ${self}: row not found" );
   } elsif ($rows > 1) {
@@ -454,7 +537,7 @@ sub update {
   }
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
   return $self;
 }
 
@@ -471,19 +554,23 @@ sub update {
 =back
 
 Throws an exception if the object is not in the database according to
-L</in_storage>. Runs an SQL DELETE statement using the primary key
-values to locate the row.
+L</in_storage>. Also throws an exception if a proper WHERE clause
+uniquely identifying the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
 The object is still perfectly usable, but L</in_storage> will
 now return 0 and the object must be reinserted using L</insert>
-before it can be used to L</update> the row again. 
+before it can be used to L</update> the row again.
 
 If you delete an object in a class with a C<has_many> relationship, an
 attempt is made to delete all the related objects as well. To turn
 this behaviour off, pass C<< cascade_delete => 0 >> in the C<$attr>
 hashref of the relationship, see L<DBIx::Class::Relationship>. Any
 database-level cascade or restrict will take precedence over a
-DBIx-Class-based cascading delete. 
+DBIx-Class-based cascading delete, since DBIx-Class B<deletes the
+main row first> and only then attempts to delete any remaining related
+rows.
 
 If you delete an object within a txn_do() (see L<DBIx::Class::Storage/txn_do>)
 and the transaction subsequently fails, the row object will remain marked as
@@ -501,17 +588,19 @@ sub delete {
   my $self = shift;
   if (ref $self) {
     $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
     my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
-    $self->throw_exception("Cannot safely delete a row in a PK-less table")
+    $self->throw_exception($self->{_orig_ident_failreason})
       if ! keys %$ident_cond;
-    foreach my $column (keys %$ident_cond) {
-            $self->throw_exception("Can't delete the object unless it has loaded the primary keys")
-              unless exists $self->{_column_data}{$column};
-    }
+
     $self->result_source->storage->delete(
-      $self->result_source, $ident_cond);
+      $self->result_source, $ident_cond
+    );
+
+    delete $self->{_orig_ident};  # no longer identifiable
     $self->in_storage(undef);
-  } else {
+  }
+  else {
     $self->throw_exception("Can't do class delete without a ResultSource instance")
       unless $self->can('result_source_instance');
     my $attrs = @_ > 1 && ref $_[$#_] eq 'HASH' ? { %{pop(@_)} } : {};
@@ -557,7 +646,7 @@ sub get_column {
   return $self->{_column_data}{$column} if exists $self->{_column_data}{$column};
   if (exists $self->{_inflated_column}{$column}) {
     return $self->store_column($column,
-      $self->_deflated_column($column, $self->{_inflated_column}{$column}));   
+      $self->_deflated_column($column, $self->{_inflated_column}{$column}));
   }
   $self->throw_exception( "No such column '${column}'" ) unless $self->has_column($column);
   return undef;
@@ -604,6 +693,8 @@ sub has_column_loaded {
 Returns all loaded column data as a hash, containing raw values. To
 get just one value for a particular column, use L</get_column>.
 
+See L</get_inflated_columns> to get the inflated values.
+
 =cut
 
 sub get_columns {
@@ -657,7 +748,7 @@ sub get_dirty_columns {
 Throws an exception if the column does not exist.
 
 Marks a column as having been changed regardless of whether it has
-really changed.  
+really changed.
 
 =cut
 sub make_column_dirty {
@@ -665,7 +756,21 @@ sub make_column_dirty {
 
   $self->throw_exception( "No such column '${column}'" )
     unless exists $self->{_column_data}{$column} || $self->has_column($column);
+
+  # the entire clean/dirty code relies on exists, not on true/false
+  return 1 if exists $self->{_dirty_columns}{$column};
+
   $self->{_dirty_columns}{$column} = 1;
+
+  # if we are just now making the column dirty, and if there is an inflated
+  # value, force it over the deflated one
+  if (exists $self->{_inflated_column}{$column}) {
+    $self->store_column($column,
+      $self->_deflated_column(
+        $column, $self->{_inflated_column}{$column}
+      )
+    );
+  }
 }
 
 =head2 get_inflated_columns
@@ -691,10 +796,40 @@ See L<DBIx::Class::InflateColumn> for how to setup inflation.
 
 sub get_inflated_columns {
   my $self = shift;
-  return map {
-    my $accessor = $self->column_info($_)->{'accessor'} || $_;
-    ($_ => $self->$accessor);
-  } $self->columns;
+
+  my $loaded_colinfo = $self->columns_info ([
+    grep { $self->has_column_loaded($_) } $self->columns
+  ]);
+
+  my %inflated;
+  for my $col (keys %$loaded_colinfo) {
+    if (exists $loaded_colinfo->{$col}{accessor}) {
+      my $acc = $loaded_colinfo->{$col}{accessor};
+      $inflated{$col} = $self->$acc if defined $acc;
+    }
+    else {
+      $inflated{$col} = $self->$col;
+    }
+  }
+
+  # return all loaded columns with the inflations overlayed on top
+  return %{ { $self->get_columns, %inflated } };
+}
+
+sub _is_column_numeric {
+   my ($self, $column) = @_;
+    my $colinfo = $self->column_info ($column);
+
+    # cache for speed (the object may *not* have a resultsource instance)
+    if (! defined $colinfo->{is_numeric} && $self->_source_handle) {
+      $colinfo->{is_numeric} =
+        $self->result_source->schema->storage->is_datatype_numeric ($colinfo->{data_type})
+          ? 1
+          : 0
+        ;
+    }
+
+    return $colinfo->{is_numeric};
 }
 
 =head2 set_column
@@ -722,12 +857,31 @@ instead, see L</set_inflated_columns>.
 sub set_column {
   my ($self, $column, $new_value) = @_;
 
-  $self->{_orig_ident} ||= $self->ident_condition;
+  # if we can't get an ident condition on first try - mark the object as unidentifiable
+  # (by using an empty hashref) and store the error for further diag
+  unless ($self->{_orig_ident}) {
+    try {
+      $self->{_orig_ident} = $self->ident_condition
+    }
+    catch {
+      $self->{_orig_ident_failreason} = $_;
+      $self->{_orig_ident} = {};
+    };
+  }
+
   my $old_value = $self->get_column($column);
+  $new_value = $self->store_column($column, $new_value);
+
+  my $dirty =
+    $self->{_dirty_columns}{$column}
+      ||
+    $self->in_storage # no point tracking dirtyness on uninserted data
+      ? ! $self->_eq_column_values ($column, $old_value, $new_value)
+      : 1
+  ;
 
-  $self->store_column($column, $new_value);
-  $self->{_dirty_columns}{$column} = 1
-    if (defined $old_value xor defined $new_value) || (defined $old_value && $old_value ne $new_value);
+  # FIXME sadly the update code just checks for keys, not for their value
+  $self->{_dirty_columns}{$column} = 1 if $dirty;
 
   # XXX clear out the relation cache for this column
   delete $self->{related_resultsets}{$column};
@@ -735,11 +889,31 @@ sub set_column {
   return $new_value;
 }
 
+sub _eq_column_values {
+  my ($self, $col, $old, $new) = @_;
+
+  if (defined $old xor defined $new) {
+    return 0;
+  }
+  elsif (not defined $old) {  # both undef
+    return 1;
+  }
+  elsif ($old eq $new) {
+    return 1;
+  }
+  elsif ($self->_is_column_numeric($col)) {  # do a numeric comparison if datatype allows it
+    return $old == $new;
+  }
+  else {
+    return 0;
+  }
+}
+
 =head2 set_columns
 
   $row->set_columns({ $col => $val, ... });
 
-=over 
+=over
 
 =item Arguments: \%columndata
 
@@ -774,7 +948,7 @@ sub set_columns {
 =back
 
 Sets more than one column value at once. Any inflated values are
-deflated and the raw values stored. 
+deflated and the raw values stored.
 
 Any related values passed as Row objects, using the relation name as a
 key, are reduced to the appropriate foreign key values and stored. If
@@ -785,7 +959,7 @@ Will even accept arrayrefs of data as a value to a
 L<DBIx::Class::Relationship/has_many> key, and create the related
 objects if necessary.
 
-Be aware that the input hashref might be edited in place, so dont rely
+Be aware that the input hashref might be edited in place, so don't rely
 on it being the same after a call to C<set_inflated_columns>. If you
 need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<set_inflated_columns>, e.g. ( { %{ $href } } )
@@ -799,26 +973,23 @@ sub set_inflated_columns {
   foreach my $key (keys %$upd) {
     if (ref $upd->{$key}) {
       my $info = $self->relationship_info($key);
-      if ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'single')
-      {
+      my $acc_type = $info->{attrs}{accessor} || '';
+      if ($acc_type eq 'single') {
         my $rel = delete $upd->{$key};
         $self->set_from_related($key => $rel);
         $self->{_relationship_data}{$key} = $rel;
-      } elsif ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'multi') {
-          $self->throw_exception(
-            "Recursive update is not supported over relationships of type multi ($key)"
-          );
       }
-      elsif ($self->has_column($key)
-        && exists $self->column_info($key)->{_inflate_info})
-      {
+      elsif ($acc_type eq 'multi') {
+        $self->throw_exception(
+          "Recursive update is not supported over relationships of type '$acc_type' ($key)"
+        );
+      }
+      elsif ($self->has_column($key) && exists $self->column_info($key)->{_inflate_info}) {
         $self->set_inflated_column($key, delete $upd->{$key});
       }
     }
   }
-  $self->set_columns($upd);    
+  $self->set_columns($upd);
 }
 
 =head2 copy
@@ -835,12 +1006,16 @@ sub set_inflated_columns {
 
 Inserts a new row into the database, as a copy of the original
 object. If a hashref of replacement data is supplied, these will take
-precedence over data in the original.
+precedence over data in the original. Also any columns which have
+the L<column info attribute|DBIx::Class::ResultSource/add_columns>
+C<< is_auto_increment => 1 >> are explicitly removed before the copy,
+so that the database can insert its own autoincremented values into
+the new object.
 
-If the row has related objects in a
-L<DBIx::Class::Relationship/has_many> then those objects may be copied
-too depending on the L<cascade_copy|DBIx::Class::Relationship>
-relationship attribute.
+Relationships will be followed by the copy procedure B<only> if the
+relationship specifies a true value for its
+L<cascade_copy|DBIx::Class::Relationship::Base> attribute. C<cascade_copy>
+is set by default on C<has_many> relationships and unset on all others.
 
 =cut
 
@@ -848,9 +1023,11 @@ sub copy {
   my ($self, $changes) = @_;
   $changes ||= {};
   my $col_data = { %{$self->{_column_data}} };
+
+  my $colinfo = $self->columns_info([ keys %$col_data ]);
   foreach my $col (keys %$col_data) {
     delete $col_data->{$col}
-      if $self->result_source->column_info($col)->{is_auto_increment};
+      if $colinfo->{$col}{is_auto_increment};
   }
 
   my $new = { _column_data => $col_data };
@@ -860,8 +1037,8 @@ sub copy {
   $new->set_inflated_columns($changes);
   $new->insert;
 
-  # Its possible we'll have 2 relations to the same Source. We need to make 
-  # sure we don't try to insert the same row twice esle we'll violate unique
+  # Its possible we'll have 2 relations to the same Source. We need to make
+  # sure we don't try to insert the same row twice else we'll violate unique
   # constraints
   my $rels_copied = {};
 
@@ -869,8 +1046,8 @@ sub copy {
     my $rel_info = $self->result_source->relationship_info($rel);
 
     next unless $rel_info->{attrs}{cascade_copy};
-  
-    my $resolved = $self->result_source->resolve_condition(
+
+    my $resolved = $self->result_source->_resolve_condition(
       $rel_info->{cond}, $rel, $new
     );
 
@@ -881,7 +1058,7 @@ sub copy {
       $copied->{$id_str} = 1;
       my $rel_copy = $related->copy($resolved);
     }
+
   }
   return $new;
 }
@@ -937,6 +1114,9 @@ for example to rebless the result into a different class.
 Reblessing can also be done more easily by setting C<result_class> in
 your Result class. See L<DBIx::Class::ResultSource/result_class>.
 
+Different types of results can also be created from a particular
+L<DBIx::Class::ResultSet>, see L<DBIx::Class::ResultSet/result_class>.
+
 =cut
 
 sub inflate_result {
@@ -945,56 +1125,68 @@ sub inflate_result {
   my ($source_handle) = $source;
 
   if ($source->isa('DBIx::Class::ResultSourceHandle')) {
-      $source = $source_handle->resolve
-  } else {
-      $source_handle = $source->handle
+    $source = $source_handle->resolve
+  }
+  else {
+    $source_handle = $source->handle
   }
 
   my $new = {
     _source_handle => $source_handle,
     _column_data => $me,
-    _in_storage => 1
   };
   bless $new, (ref $class || $class);
 
-  my $schema;
   foreach my $pre (keys %{$prefetch||{}}) {
-    my $pre_val = $prefetch->{$pre};
-    my $pre_source = $source->related_source($pre);
-    $class->throw_exception("Can't prefetch non-existent relationship ${pre}")
-      unless $pre_source;
-    if (ref($pre_val->[0]) eq 'ARRAY') { # multi
-      my @pre_objects;
-      foreach my $pre_rec (@$pre_val) {
-        unless ($pre_source->primary_columns == grep { exists $pre_rec->[0]{$_}
-           and defined $pre_rec->[0]{$_} } $pre_source->primary_columns) {
-          next;
+
+    my $pre_source = $source->related_source($pre)
+      or $class->throw_exception("Can't prefetch non-existent relationship ${pre}");
+
+    my $accessor = $source->relationship_info($pre)->{attrs}{accessor}
+      or $class->throw_exception("No accessor for prefetched $pre");
+
+    my @pre_vals;
+    if (ref $prefetch->{$pre}[0] eq 'ARRAY') {
+      @pre_vals = @{$prefetch->{$pre}};
+    }
+    elsif ($accessor eq 'multi') {
+      $class->throw_exception("Implicit prefetch (via select/columns) not supported with accessor 'multi'");
+    }
+    else {
+      @pre_vals = $prefetch->{$pre};
+    }
+
+    my @pre_objects;
+    for my $me_pref (@pre_vals) {
+
+        # FIXME - this should not be necessary
+        # the collapser currently *could* return bogus elements with all
+        # columns set to undef
+        my $has_def;
+        for (values %{$me_pref->[0]}) {
+          if (defined $_) {
+            $has_def++;
+            last;
+          }
         }
-        push(@pre_objects, $pre_source->result_class->inflate_result(
-                             $pre_source, @{$pre_rec}));
-      }
-      $new->related_resultset($pre)->set_cache(\@pre_objects);
-    } elsif (defined $pre_val->[0]) {
-      my $fetched;
-      unless ($pre_source->primary_columns == grep { exists $pre_val->[0]{$_}
-         and !defined $pre_val->[0]{$_} } $pre_source->primary_columns)
-      {
-        $fetched = $pre_source->result_class->inflate_result(
-                      $pre_source, @{$pre_val});
-      }
-      $new->related_resultset($pre)->set_cache([ $fetched ]);
-      my $accessor = $source->relationship_info($pre)->{attrs}{accessor};
-      $class->throw_exception("No accessor for prefetched $pre")
-       unless defined $accessor;
-      if ($accessor eq 'single') {
-        $new->{_relationship_data}{$pre} = $fetched;
-      } elsif ($accessor eq 'filter') {
-        $new->{_inflated_column}{$pre} = $fetched;
-      } else {
-       $class->throw_exception("Prefetch not supported with accessor '$accessor'");
-      }
+        next unless $has_def;
+
+        push @pre_objects, $pre_source->result_class->inflate_result(
+          $pre_source, @$me_pref
+        );
+    }
+
+    if ($accessor eq 'single') {
+      $new->{_relationship_data}{$pre} = $pre_objects[0];
     }
+    elsif ($accessor eq 'filter') {
+      $new->{_inflated_column}{$pre} = $pre_objects[0];
+    }
+
+    $new->related_resultset($pre)->set_cache(\@pre_objects);
   }
+
+  $new->in_storage (1);
   return $new;
 }
 
@@ -1143,8 +1335,11 @@ sub register_column {
 =back
 
 Fetches a fresh copy of the Row object from the database and returns it.
-
-If passed the \%attrs argument, will first apply these attributes to
+Throws an exception if a proper WHERE clause identifying the database row
+can not be constructed (i.e. if the original object does not contain its
+entire
+ L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+). If passed the \%attrs argument, will first apply these attributes to
 the resultset used to find the row.
 
 This copy can then be used to compare to an existing row object, to
@@ -1163,14 +1358,77 @@ sub get_from_storage {
     my $self = shift @_;
     my $attrs = shift @_;
     my $resultset = $self->result_source->resultset;
-    
+
     if(defined $attrs) {
-       $resultset = $resultset->search(undef, $attrs);
+      $resultset = $resultset->search(undef, $attrs);
     }
-    
-    return $resultset->find($self->{_orig_ident} || $self->ident_condition);
+
+    my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
+
+    $self->throw_exception($self->{_orig_ident_failreason})
+      if ! keys %$ident_cond;
+
+    return $resultset->find($ident_cond);
+}
+
+=head2 discard_changes ($attrs?)
+
+  $row->discard_changes
+
+=over
+
+=item Arguments: none or $attrs
+
+=item Returns: self (updates object in-place)
+
+=back
+
+Re-selects the row from the database, losing any changes that had
+been made. Throws an exception if a proper C<WHERE> clause identifying
+the database row can not be constructed (i.e. if the original object
+does not contain its entire
+L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>).
+
+This method can also be used to refresh from storage, retrieving any
+changes made since the row was last read from storage.
+
+$attrs, if supplied, is expected to be a hashref of attributes suitable for passing as the
+second argument to C<< $resultset->search($cond, $attrs) >>;
+
+Note: If you are using L<DBIx::Class::Storage::DBI::Replicated> as your
+storage, please kept in mind that if you L</discard_changes> on a row that you
+just updated or created, you should wrap the entire bit inside a transaction.
+Otherwise you run the risk that you insert or update to the master database
+but read from a replicant database that has not yet been updated from the
+master.  This will result in unexpected results.
+
+=cut
+
+sub discard_changes {
+  my ($self, $attrs) = @_;
+  return unless $self->in_storage; # Don't reload if we aren't real!
+
+  # add a replication default to read from the master only
+  $attrs = { force_pool => 'master', %{$attrs||{}} };
+
+  if( my $current_storage = $self->get_from_storage($attrs)) {
+
+    # Set $self to the current.
+    %$self = %$current_storage;
+
+    # Avoid a possible infinite loop with
+    # sub DESTROY { $_[0]->discard_changes }
+    bless $current_storage, 'Do::Not::Exist';
+
+    return $self;
+  }
+  else {
+    $self->in_storage(0);
+    return $self;
+  }
 }
 
+
 =head2 throw_exception
 
 See L<DBIx::Class::Schema/throw_exception>.
@@ -1179,10 +1437,12 @@ See L<DBIx::Class::Schema/throw_exception>.
 
 sub throw_exception {
   my $self=shift;
+
   if (ref $self && ref $self->result_source && $self->result_source->schema) {
-    $self->result_source->schema->throw_exception(@_);
-  } else {
-    croak(@_);
+    $self->result_source->schema->throw_exception(@_)
+  }
+  else {
+    DBIx::Class::Exception->throw(@_);
   }
 }
 
@@ -1201,27 +1461,6 @@ sub throw_exception {
 Returns the primary key(s) for a row. Can't be called as a class method.
 Actually implemented in L<DBIx::Class::PK>
 
-=head2 discard_changes
-
-  $row->discard_changes
-
-=over
-
-=item Arguments: none
-
-=item Returns: nothing (updates object in-place)
-
-=back
-
-Retrieves and sets the row object data from the database, losing any
-local changes made.
-
-This method can also be used to refresh from storage, retrieving any
-changes made since the row was last read from storage. Actually
-implemented in L<DBIx::Class::PK>
-
-=cut
-
 1;
 
 =head1 AUTHORS
@@ -1233,3 +1472,5 @@ Matt S. Trout <mst@shadowcatsystems.co.uk>
 You may distribute this code under the same terms as Perl itself.
 
 =cut
+
+1;