Fix MC double-object creation (important for e.g. IC::FS which otherwise leaves orpha...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Row.pm
index e57699c..cf4c24c 100644 (file)
@@ -4,9 +4,21 @@ use strict;
 use warnings;
 
 use base qw/DBIx::Class/;
-use Carp::Clan qw/^DBIx::Class/;
+
+use DBIx::Class::Exception;
 use Scalar::Util ();
-use Scope::Guard;
+use Try::Tiny;
+
+###
+### Internal method
+### Do not use
+###
+BEGIN {
+  *MULTICREATE_DEBUG =
+    $ENV{DBIC_MULTICREATE_DEBUG}
+      ? sub () { 1 }
+      : sub () { 0 };
+}
 
 __PACKAGE__->mk_group_accessors('simple' => qw/_source_handle/);
 
@@ -66,6 +78,23 @@ passed objects.
 
 For a more involved explanation, see L<DBIx::Class::ResultSet/create>.
 
+Please note that if a value is not passed to new, no value will be sent
+in the SQL INSERT call, and the column will therefore assume whatever
+default value was specified in your database. While DBIC will retrieve the
+value of autoincrement columns, it will never make an explicit database
+trip to retrieve default values assigned by the RDBMS. You can explicitly
+request that all values be fetched back from the database by calling
+L</discard_changes>, or you can supply an explicit C<undef> to columns
+with NULL as the default, and save yourself a SELECT.
+
+ CAVEAT:
+
+ The behavior described above will backfire if you use a foreign key column
+ with a database-defined default. If you call the relationship accessor on
+ an object that doesn't have a set value for the FK column, DBIC will throw
+ an exception, as it has no way of knowing the PK of the related object (if
+ there is one).
+
 =cut
 
 ## It needs to store the new objects somewhere, and call insert on that list later when insert is called on this object. We may need an accessor for these so the user can retrieve them, if just doing ->new().
@@ -77,23 +106,40 @@ For a more involved explanation, see L<DBIx::Class::ResultSet/create>.
 
 sub __new_related_find_or_new_helper {
   my ($self, $relname, $data) = @_;
-  if ($self->__their_pk_needs_us($relname, $data)) {
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->new_result($data);
+
+  my $rsrc = $self->result_source;
+
+  # create a mock-object so all new/set_column component overrides will run:
+  my $rel_rs = $rsrc->related_source($relname)->resultset;
+  my $new_rel_obj = $rel_rs->new_result($data);
+  my $proc_data = { $new_rel_obj->get_columns };
+
+  if ($self->__their_pk_needs_us($relname)) {
+    MULTICREATE_DEBUG and warn "MC $self constructing $relname via new_result";
+    return $new_rel_obj;
   }
-  if ($self->result_source->pk_depends_on($relname, $data)) {
-    return $self->result_source
-                ->related_source($relname)
-                ->resultset
-                ->find_or_create($data);
+  elsif ($rsrc->_pk_depends_on($relname, $proc_data )) {
+    if (! keys %$proc_data) {
+      # there is nothing to search for - blind create
+      MULTICREATE_DEBUG and warn "MC $self constructing default-insert $relname";
+    }
+    else {
+      MULTICREATE_DEBUG and warn "MC $self constructing $relname via find_or_new";
+      # this is not *really* find or new, as we don't want to double-new the
+      # data (thus potentially double encoding or whatever)
+      my $exists = $rel_rs->find ($proc_data);
+      return $exists if $exists;
+    }
+    return $new_rel_obj;
+  }
+  else {
+    my $us = $rsrc->source_name;
+    $self->throw_exception ("'$us' neither depends nor is depended on by '$relname', something is wrong...");
   }
-  return $self->find_or_new_related($relname, $data);
 }
 
 sub __their_pk_needs_us { # this should maybe be in resultsource.
-  my ($self, $relname, $data) = @_;
+  my ($self, $relname) = @_;
   my $source = $self->result_source;
   my $reverse = $source->reverse_relationship_info($relname);
   my $rel_source = $source->related_source($relname);
@@ -101,7 +147,7 @@ sub __their_pk_needs_us { # this should maybe be in resultsource.
   foreach my $key (keys %$reverse) {
     # if their primary key depends on us, then we have to
     # just create a result and we'll fill it out afterwards
-    return 1 if $rel_source->pk_depends_on($key, $us);
+    return 1 if $rel_source->_pk_depends_on($key, $us);
   }
   return 0;
 }
@@ -124,55 +170,72 @@ sub new {
     $new->result_source($source);
   }
 
+  if (my $related = delete $attrs->{-cols_from_relations}) {
+    @{$new->{_ignore_at_insert}={}}{@$related} = ();
+  }
+
   if ($attrs) {
     $new->throw_exception("attrs must be a hashref")
       unless ref($attrs) eq 'HASH';
-    
+
     my ($related,$inflated);
-    ## Pretend all the rels are actual objects, unset below if not, for insert() to fix
-    $new->{_rel_in_storage} = 1;
 
     foreach my $key (keys %$attrs) {
       if (ref $attrs->{$key}) {
         ## Can we extract this lot to use with update(_or .. ) ?
-        confess "Can't do multi-create without result source" unless $source;
+        $new->throw_exception("Can't do multi-create without result source")
+          unless $source;
         my $info = $source->relationship_info($key);
-        if ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'single')
-        {
+        my $acc_type = $info->{attrs}{accessor} || '';
+        if ($acc_type eq 'single') {
           my $rel_obj = delete $attrs->{$key};
           if(!Scalar::Util::blessed($rel_obj)) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
 
-          $new->{_rel_in_storage} = 0 unless ($rel_obj->in_storage);
+          if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
+            $new->set_from_related($key, $rel_obj);
+          } else {
+            MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj\n";
+          }
 
-          $new->set_from_related($key, $rel_obj) if $rel_obj->in_storage;
           $related->{$key} = $rel_obj;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-            && $info->{attrs}{accessor} eq 'multi'
-            && ref $attrs->{$key} eq 'ARRAY') {
+        }
+        elsif ($acc_type eq 'multi' && ref $attrs->{$key} eq 'ARRAY' ) {
           my $others = delete $attrs->{$key};
-          foreach my $rel_obj (@$others) {
+          my $total = @$others;
+          my @objects;
+          foreach my $idx (0 .. $#$others) {
+            my $rel_obj = $others->[$idx];
             if(!Scalar::Util::blessed($rel_obj)) {
               $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
             }
 
-            $new->{_rel_in_storage} = 0 unless ($rel_obj->in_storage);
-            $new->set_from_related($key, $rel_obj) if $rel_obj->in_storage;
+            if ($rel_obj->in_storage) {
+              $rel_obj->throw_exception ('A multi relationship can not be pre-existing when doing multicreate. Something went wrong');
+            } else {
+              MULTICREATE_DEBUG and
+                warn "MC $new uninserted $key $rel_obj (${\($idx+1)} of $total)\n";
+            }
+            push(@objects, $rel_obj);
           }
-          $related->{$key} = $others;
+          $related->{$key} = \@objects;
           next;
-        } elsif ($info && $info->{attrs}{accessor}
-          && $info->{attrs}{accessor} eq 'filter')
-        {
+        }
+        elsif ($acc_type eq 'filter') {
           ## 'filter' should disappear and get merged in with 'single' above!
           my $rel_obj = delete $attrs->{$key};
           if(!Scalar::Util::blessed($rel_obj)) {
             $rel_obj = $new->__new_related_find_or_new_helper($key, $rel_obj);
           }
-          $new->{_rel_in_storage} = 0 unless ($rel_obj->in_storage);
+          if ($rel_obj->in_storage) {
+            $new->{_rel_in_storage}{$key} = 1;
+          }
+          else {
+            MULTICREATE_DEBUG and warn "MC $new uninserted $key $rel_obj";
+          }
           $inflated->{$key} = $rel_obj;
           next;
         } elsif ($class->has_column($key)
@@ -183,7 +246,7 @@ sub new {
       }
       $new->throw_exception("No such column $key on $class")
         unless $class->has_column($key);
-      $new->store_column($key => $attrs->{$key});          
+      $new->store_column($key => $attrs->{$key});
     }
 
     $new->{_relationship_data} = $related if $related;
@@ -231,91 +294,138 @@ sub insert {
   my $rollback_guard;
 
   # Check if we stored uninserted relobjs here in new()
-  my %related_stuff = (%{$self->{_relationship_data} || {}}, 
+  my %related_stuff = (%{$self->{_relationship_data} || {}},
                        %{$self->{_inflated_column} || {}});
 
-  if(!$self->{_rel_in_storage}) {
-
-    # The guard will save us if we blow out of this scope via die
-    $rollback_guard = $source->storage->txn_scope_guard;
+  # insert what needs to be inserted before us
+  my %pre_insert;
+  for my $relname (keys %related_stuff) {
+    my $rel_obj = $related_stuff{$relname};
 
-    ## Should all be in relationship_data, but we need to get rid of the
-    ## 'filter' reltype..
-    ## These are the FK rels, need their IDs for the insert.
+    if (! $self->{_rel_in_storage}{$relname}) {
+      next unless (Scalar::Util::blessed($rel_obj)
+                    && $rel_obj->isa('DBIx::Class::Row'));
 
-    my @pri = $self->primary_columns;
+      next unless $source->_pk_depends_on(
+                    $relname, { $rel_obj->get_columns }
+                  );
 
-    REL: foreach my $relname (keys %related_stuff) {
+      # The guard will save us if we blow out of this scope via die
+      $rollback_guard ||= $source->storage->txn_scope_guard;
 
-      my $rel_obj = $related_stuff{$relname};
+      MULTICREATE_DEBUG and warn "MC $self pre-reconstructing $relname $rel_obj\n";
 
-      next REL unless (Scalar::Util::blessed($rel_obj)
-                       && $rel_obj->isa('DBIx::Class::Row'));
+      my $them = { %{$rel_obj->{_relationship_data} || {} }, $rel_obj->get_columns };
+      my $existing;
 
-      next REL unless $source->pk_depends_on(
-                        $relname, { $rel_obj->get_columns }
-                      );
+      # if there are no keys - nothing to search for
+      if (keys %$them and $existing = $self->result_source
+                                           ->related_source($relname)
+                                           ->resultset
+                                           ->find($them)
+      ) {
+        %{$rel_obj} = %{$existing};
+      }
+      else {
+        $rel_obj->insert;
+      }
 
-      $rel_obj->insert();
-      $self->set_from_related($relname, $rel_obj);
-      delete $related_stuff{$relname};
+      $self->{_rel_in_storage}{$relname} = 1;
     }
+
+    $self->set_from_related($relname, $rel_obj);
+    delete $related_stuff{$relname};
   }
 
-  my $updated_cols = $source->storage->insert($source, { $self->get_columns });
-  $self->set_columns($updated_cols);
+  # start a transaction here if not started yet and there is more stuff
+  # to insert after us
+  if (keys %related_stuff) {
+    $rollback_guard ||= $source->storage->txn_scope_guard
+  }
 
   ## PK::Auto
-  my @auto_pri = grep {
-                   !defined $self->get_column($_) || 
-                   ref($self->get_column($_)) eq 'SCALAR'
-                 } $self->primary_columns;
+  my %auto_pri;
+  my $auto_idx = 0;
+  for ($self->primary_columns) {
+    if (
+      not defined $self->get_column($_)
+        ||
+      (ref($self->get_column($_)) eq 'SCALAR')
+    ) {
+      my $col_info = $source->column_info($_);
+      $auto_pri{$_} = $auto_idx++ unless $col_info->{auto_nextval};   # auto_nextval's are pre-fetched in the storage
+    }
+  }
 
-  if (@auto_pri) {
-    #$self->throw_exception( "More than one possible key found for auto-inc on ".ref $self )
-    #  if defined $too_many;
+  MULTICREATE_DEBUG and do {
+    no warnings 'uninitialized';
+    warn "MC $self inserting (".join(', ', $self->get_columns).")\n";
+  };
+  my $updated_cols = $source->storage->insert(
+    $source,
+    { $self->get_columns },
+    (keys %auto_pri) && $source->storage->_supports_insert_returning
+      ? { returning => [ sort { $auto_pri{$a} <=> $auto_pri{$b} } keys %auto_pri ] }
+      : ()
+    ,
+  );
+
+  foreach my $col (keys %$updated_cols) {
+    $self->store_column($col, $updated_cols->{$col});
+    delete $auto_pri{$col};
+  }
 
+  if (keys %auto_pri) {
+    my @missing = sort { $auto_pri{$a} <=> $auto_pri{$b} } keys %auto_pri;
+    MULTICREATE_DEBUG and warn "MC $self fetching missing PKs ".join(', ', @missing )."\n";
     my $storage = $self->result_source->storage;
     $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
       unless $storage->can('last_insert_id');
-    my @ids = $storage->last_insert_id($self->result_source,@auto_pri);
+    my @ids = $storage->last_insert_id($self->result_source, @missing);
     $self->throw_exception( "Can't get last insert id" )
-      unless (@ids == @auto_pri);
-    $self->store_column($auto_pri[$_] => $ids[$_]) for 0 .. $#ids;
+      unless (@ids == @missing);
+    $self->store_column($missing[$_] => $ids[$_]) for 0 .. $#missing;
   }
 
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
 
-  if(!$self->{_rel_in_storage}) {
-    ## Now do the has_many rels, that need $selfs ID.
-    foreach my $relname (keys %related_stuff) {
-      my $rel_obj = $related_stuff{$relname};
-      my @cands;
-      if (Scalar::Util::blessed($rel_obj)
-          && $rel_obj->isa('DBIx::Class::Row')) {
-        @cands = ($rel_obj);
-      } elsif (ref $rel_obj eq 'ARRAY') {
-        @cands = @$rel_obj;
-      }
-      if (@cands) {
-        my $reverse = $source->reverse_relationship_info($relname);
-        foreach my $obj (@cands) {
-          $obj->set_from_related($_, $self) for keys %$reverse;
-          my $them = { $obj->get_inflated_columns };
-          if ($self->__their_pk_needs_us($relname, $them)) {
-            $obj = $self->find_or_create_related($relname, $them);
-          } else {
-            $obj->insert();
+  foreach my $relname (keys %related_stuff) {
+    next unless $source->has_relationship ($relname);
+
+    my @cands = ref $related_stuff{$relname} eq 'ARRAY'
+      ? @{$related_stuff{$relname}}
+      : $related_stuff{$relname}
+    ;
+
+    if (@cands
+          && Scalar::Util::blessed($cands[0])
+            && $cands[0]->isa('DBIx::Class::Row')
+    ) {
+      my $reverse = $source->reverse_relationship_info($relname);
+      foreach my $obj (@cands) {
+        $obj->set_from_related($_, $self) for keys %$reverse;
+        if ($self->__their_pk_needs_us($relname)) {
+          if (exists $self->{_ignore_at_insert}{$relname}) {
+            MULTICREATE_DEBUG and warn "MC $self skipping post-insert on $relname";
           }
+          else {
+            MULTICREATE_DEBUG and warn "MC $self inserting $relname $obj";
+            $obj->insert;
+          }
+        } else {
+          MULTICREATE_DEBUG and warn "MC $self post-inserting $obj";
+          $obj->insert();
         }
       }
     }
-    $rollback_guard->commit;
   }
 
   $self->in_storage(1);
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
+  delete $self->{_ignore_at_insert};
+  $rollback_guard->commit if $rollback_guard;
+
   return $self;
 }
 
@@ -335,7 +445,7 @@ sub insert {
 Indicates whether the object exists as a row in the database or
 not. This is set to true when L<DBIx::Class::ResultSet/find>,
 L<DBIx::Class::ResultSet/create> or L<DBIx::Class::ResultSet/insert>
-are used. 
+are used.
 
 Creating a row object using L<DBIx::Class::ResultSet/new>, or calling
 L</delete> on one, sets it to false.
@@ -345,7 +455,7 @@ L</delete> on one, sets it to false.
 sub in_storage {
   my ($self, $val) = @_;
   $self->{_in_storage} = $val if @_ > 1;
-  return $self->{_in_storage};
+  return $self->{_in_storage} ? 1 : 0;
 }
 
 =head2 update
@@ -364,9 +474,13 @@ Throws an exception if the row object is not yet in the database,
 according to L</in_storage>.
 
 This method issues an SQL UPDATE query to commit any changes to the
-object to the database if required.
+object to the database if required (see L</get_dirty_columns>).
+It throws an exception if a proper WHERE clause uniquely identifying
+the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
-Also takes an optional hashref of C<< column_name => value> >> pairs
+Also takes an optional hashref of C<< column_name => value >> pairs
 to update on the object first. Be aware that the hashref will be
 passed to C<set_inflated_columns>, which might edit it in place, so
 don't rely on it being the same after a call to C<update>.  If you
@@ -374,7 +488,7 @@ need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<update>, e.g. ( { %{ $href } } )
 
 If the values passed or any of the column values set on the object
-contain scalar references, eg:
+contain scalar references, e.g.:
 
   $row->last_modified(\'NOW()');
   # OR
@@ -400,18 +514,21 @@ this method.
 
 sub update {
   my ($self, $upd) = @_;
-  $self->throw_exception( "Not in database" ) unless $self->in_storage;
-  my $ident_cond = $self->ident_condition;
-  $self->throw_exception("Cannot safely update a row in a PK-less table")
-    if ! keys %$ident_cond;
+
+  my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
 
   $self->set_inflated_columns($upd) if $upd;
   my %to_update = $self->get_dirty_columns;
   return $self unless keys %to_update;
+
+  $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
+  $self->throw_exception('Unable to update a row with incomplete or no identity')
+    if ! keys %$ident_cond;
+
   my $rows = $self->result_source->storage->update(
-               $self->result_source, \%to_update,
-               $self->{_orig_ident} || $ident_cond
-             );
+    $self->result_source, \%to_update, $ident_cond
+  );
   if ($rows == 0) {
     $self->throw_exception( "Can't update ${self}: row not found" );
   } elsif ($rows > 1) {
@@ -419,7 +536,7 @@ sub update {
   }
   $self->{_dirty_columns} = {};
   $self->{related_resultsets} = {};
-  undef $self->{_orig_ident};
+  delete $self->{_orig_ident};
   return $self;
 }
 
@@ -436,19 +553,31 @@ sub update {
 =back
 
 Throws an exception if the object is not in the database according to
-L</in_storage>. Runs an SQL DELETE statement using the primary key
-values to locate the row.
+L</in_storage>. Also throws an exception if a proper WHERE clause
+uniquely identifying the database row can not be constructed (see
+L<significance of primary keys|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+for more details).
 
 The object is still perfectly usable, but L</in_storage> will
 now return 0 and the object must be reinserted using L</insert>
-before it can be used to L</update> the row again. 
+before it can be used to L</update> the row again.
 
 If you delete an object in a class with a C<has_many> relationship, an
 attempt is made to delete all the related objects as well. To turn
 this behaviour off, pass C<< cascade_delete => 0 >> in the C<$attr>
 hashref of the relationship, see L<DBIx::Class::Relationship>. Any
 database-level cascade or restrict will take precedence over a
-DBIx-Class-based cascading delete. 
+DBIx-Class-based cascading delete, since DBIx-Class B<deletes the
+main row first> and only then attempts to delete any remaining related
+rows.
+
+If you delete an object within a txn_do() (see L<DBIx::Class::Storage/txn_do>)
+and the transaction subsequently fails, the row object will remain marked as
+not being in storage. If you know for a fact that the object is still in
+storage (i.e. by inspecting the cause of the transaction's failure), you can
+use C<< $obj->in_storage(1) >> to restore consistency between the object and
+the database. This would allow a subsequent C<< $obj->delete >> to work
+as expected.
 
 See also L<DBIx::Class::ResultSet/delete>.
 
@@ -458,17 +587,19 @@ sub delete {
   my $self = shift;
   if (ref $self) {
     $self->throw_exception( "Not in database" ) unless $self->in_storage;
+
     my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
-    $self->throw_exception("Cannot safely delete a row in a PK-less table")
+    $self->throw_exception('Unable to delete a row with incomplete or no identity')
       if ! keys %$ident_cond;
-    foreach my $column (keys %$ident_cond) {
-            $self->throw_exception("Can't delete the object unless it has loaded the primary keys")
-              unless exists $self->{_column_data}{$column};
-    }
+
     $self->result_source->storage->delete(
-      $self->result_source, $ident_cond);
+      $self->result_source, $ident_cond
+    );
+
+    delete $self->{_orig_ident};
     $self->in_storage(undef);
-  } else {
+  }
+  else {
     $self->throw_exception("Can't do class delete without a ResultSource instance")
       unless $self->can('result_source_instance');
     my $attrs = @_ > 1 && ref $_[$#_] eq 'HASH' ? { %{pop(@_)} } : {};
@@ -514,7 +645,7 @@ sub get_column {
   return $self->{_column_data}{$column} if exists $self->{_column_data}{$column};
   if (exists $self->{_inflated_column}{$column}) {
     return $self->store_column($column,
-      $self->_deflated_column($column, $self->{_inflated_column}{$column}));   
+      $self->_deflated_column($column, $self->{_inflated_column}{$column}));
   }
   $self->throw_exception( "No such column '${column}'" ) unless $self->has_column($column);
   return undef;
@@ -561,6 +692,8 @@ sub has_column_loaded {
 Returns all loaded column data as a hash, containing raw values. To
 get just one value for a particular column, use L</get_column>.
 
+See L</get_inflated_columns> to get the inflated values.
+
 =cut
 
 sub get_columns {
@@ -614,7 +747,7 @@ sub get_dirty_columns {
 Throws an exception if the column does not exist.
 
 Marks a column as having been changed regardless of whether it has
-really changed.  
+really changed.
 
 =cut
 sub make_column_dirty {
@@ -622,7 +755,21 @@ sub make_column_dirty {
 
   $self->throw_exception( "No such column '${column}'" )
     unless exists $self->{_column_data}{$column} || $self->has_column($column);
+
+  # the entire clean/dirty code relies on exists, not on true/false
+  return 1 if exists $self->{_dirty_columns}{$column};
+
   $self->{_dirty_columns}{$column} = 1;
+
+  # if we are just now making the column dirty, and if there is an inflated
+  # value, force it over the deflated one
+  if (exists $self->{_inflated_column}{$column}) {
+    $self->store_column($column,
+      $self->_deflated_column(
+        $column, $self->{_inflated_column}{$column}
+      )
+    );
+  }
 }
 
 =head2 get_inflated_columns
@@ -648,10 +795,41 @@ See L<DBIx::Class::InflateColumn> for how to setup inflation.
 
 sub get_inflated_columns {
   my $self = shift;
-  return map {
-    my $accessor = $self->column_info($_)->{'accessor'} || $_;
-    ($_ => $self->$accessor);
-  } $self->columns;
+
+  my %loaded_colinfo = (map
+    { $_ => $self->column_info($_) }
+    (grep { $self->has_column_loaded($_) } $self->columns)
+  );
+
+  my %inflated;
+  for my $col (keys %loaded_colinfo) {
+    if (exists $loaded_colinfo{$col}{accessor}) {
+      my $acc = $loaded_colinfo{$col}{accessor};
+      $inflated{$col} = $self->$acc if defined $acc;
+    }
+    else {
+      $inflated{$col} = $self->$col;
+    }
+  }
+
+  # return all loaded columns with the inflations overlayed on top
+  return ($self->get_columns, %inflated);
+}
+
+sub _is_column_numeric {
+   my ($self, $column) = @_;
+    my $colinfo = $self->column_info ($column);
+
+    # cache for speed (the object may *not* have a resultsource instance)
+    if (not defined $colinfo->{is_numeric} && $self->_source_handle) {
+      $colinfo->{is_numeric} =
+        $self->result_source->schema->storage->is_datatype_numeric ($colinfo->{data_type})
+          ? 1
+          : 0
+        ;
+    }
+
+    return $colinfo->{is_numeric};
 }
 
 =head2 set_column
@@ -679,12 +857,22 @@ instead, see L</set_inflated_columns>.
 sub set_column {
   my ($self, $column, $new_value) = @_;
 
-  $self->{_orig_ident} ||= $self->ident_condition;
+  # if we can't get an ident condition on first try - mark the object as unidentifiable
+  $self->{_orig_ident} ||= (try { $self->ident_condition }) || {};
+
   my $old_value = $self->get_column($column);
+  $new_value = $self->store_column($column, $new_value);
 
-  $self->store_column($column, $new_value);
-  $self->{_dirty_columns}{$column} = 1
-    if (defined $old_value xor defined $new_value) || (defined $old_value && $old_value ne $new_value);
+  my $dirty =
+    $self->{_dirty_columns}{$column}
+      ||
+    $self->in_storage # no point tracking dirtyness on uninserted data
+      ? ! $self->_eq_column_values ($column, $old_value, $new_value)
+      : 1
+  ;
+
+  # FIXME sadly the update code just checks for keys, not for their value
+  $self->{_dirty_columns}{$column} = 1 if $dirty;
 
   # XXX clear out the relation cache for this column
   delete $self->{related_resultsets}{$column};
@@ -692,11 +880,31 @@ sub set_column {
   return $new_value;
 }
 
+sub _eq_column_values {
+  my ($self, $col, $old, $new) = @_;
+
+  if (defined $old xor defined $new) {
+    return 0;
+  }
+  elsif (not defined $old) {  # both undef
+    return 1;
+  }
+  elsif ($old eq $new) {
+    return 1;
+  }
+  elsif ($self->_is_column_numeric($col)) {  # do a numeric comparison if datatype allows it
+    return $old == $new;
+  }
+  else {
+    return 0;
+  }
+}
+
 =head2 set_columns
 
   $row->set_columns({ $col => $val, ... });
 
-=over 
+=over
 
 =item Arguments: \%columndata
 
@@ -731,7 +939,7 @@ sub set_columns {
 =back
 
 Sets more than one column value at once. Any inflated values are
-deflated and the raw values stored. 
+deflated and the raw values stored.
 
 Any related values passed as Row objects, using the relation name as a
 key, are reduced to the appropriate foreign key values and stored. If
@@ -742,7 +950,7 @@ Will even accept arrayrefs of data as a value to a
 L<DBIx::Class::Relationship/has_many> key, and create the related
 objects if necessary.
 
-Be aware that the input hashref might be edited in place, so dont rely
+Be aware that the input hashref might be edited in place, so don't rely
 on it being the same after a call to C<set_inflated_columns>. If you
 need to preserve the hashref, it is sufficient to pass a shallow copy
 to C<set_inflated_columns>, e.g. ( { %{ $href } } )
@@ -756,33 +964,23 @@ sub set_inflated_columns {
   foreach my $key (keys %$upd) {
     if (ref $upd->{$key}) {
       my $info = $self->relationship_info($key);
-      if ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'single')
-      {
+      my $acc_type = $info->{attrs}{accessor} || '';
+      if ($acc_type eq 'single') {
         my $rel = delete $upd->{$key};
         $self->set_from_related($key => $rel);
-        $self->{_relationship_data}{$key} = $rel;          
-      } elsif ($info && $info->{attrs}{accessor}
-        && $info->{attrs}{accessor} eq 'multi'
-        && ref $upd->{$key} eq 'ARRAY') {
-        my $others = delete $upd->{$key};
-        foreach my $rel_obj (@$others) {
-          if(!Scalar::Util::blessed($rel_obj)) {
-            $rel_obj = $self->create_related($key, $rel_obj);
-          }
-        }
-        $self->{_relationship_data}{$key} = $others; 
-#            $related->{$key} = $others;
-        next;
+        $self->{_relationship_data}{$key} = $rel;
+      }
+      elsif ($acc_type eq 'multi') {
+        $self->throw_exception(
+          "Recursive update is not supported over relationships of type '$acc_type' ($key)"
+        );
       }
-      elsif ($self->has_column($key)
-        && exists $self->column_info($key)->{_inflate_info})
-      {
-        $self->set_inflated_column($key, delete $upd->{$key});          
+      elsif ($self->has_column($key) && exists $self->column_info($key)->{_inflate_info}) {
+        $self->set_inflated_column($key, delete $upd->{$key});
       }
     }
   }
-  $self->set_columns($upd);    
+  $self->set_columns($upd);
 }
 
 =head2 copy
@@ -799,12 +997,16 @@ sub set_inflated_columns {
 
 Inserts a new row into the database, as a copy of the original
 object. If a hashref of replacement data is supplied, these will take
-precedence over data in the original.
+precedence over data in the original. Also any columns which have
+the L<column info attribute|DBIx::Class::ResultSource/add_columns>
+C<< is_auto_increment => 1 >> are explicitly removed before the copy,
+so that the database can insert its own autoincremented values into
+the new object.
 
-If the row has related objects in a
-L<DBIx::Class::Relationship/has_many> then those objects may be copied
-too depending on the L<cascade_copy|DBIx::Class::Relationship>
-relationship attribute.
+Relationships will be followed by the copy procedure B<only> if the
+relationship specifies a true value for its
+L<cascade_copy|DBIx::Class::Relationship::Base> attribute. C<cascade_copy>
+is set by default on C<has_many> relationships and unset on all others.
 
 =cut
 
@@ -824,8 +1026,8 @@ sub copy {
   $new->set_inflated_columns($changes);
   $new->insert;
 
-  # Its possible we'll have 2 relations to the same Source. We need to make 
-  # sure we don't try to insert the same row twice esle we'll violate unique
+  # Its possible we'll have 2 relations to the same Source. We need to make
+  # sure we don't try to insert the same row twice else we'll violate unique
   # constraints
   my $rels_copied = {};
 
@@ -833,8 +1035,8 @@ sub copy {
     my $rel_info = $self->result_source->relationship_info($rel);
 
     next unless $rel_info->{attrs}{cascade_copy};
-  
-    my $resolved = $self->result_source->resolve_condition(
+
+    my $resolved = $self->result_source->_resolve_condition(
       $rel_info->{cond}, $rel, $new
     );
 
@@ -845,7 +1047,7 @@ sub copy {
       $copied->{$id_str} = 1;
       my $rel_copy = $related->copy($resolved);
     }
+
   }
   return $new;
 }
@@ -901,6 +1103,9 @@ for example to rebless the result into a different class.
 Reblessing can also be done more easily by setting C<result_class> in
 your Result class. See L<DBIx::Class::ResultSource/result_class>.
 
+Different types of results can also be created from a particular
+L<DBIx::Class::ResultSet>, see L<DBIx::Class::ResultSet/result_class>.
+
 =cut
 
 sub inflate_result {
@@ -909,56 +1114,68 @@ sub inflate_result {
   my ($source_handle) = $source;
 
   if ($source->isa('DBIx::Class::ResultSourceHandle')) {
-      $source = $source_handle->resolve
-  } else {
-      $source_handle = $source->handle
+    $source = $source_handle->resolve
+  } 
+  else {
+    $source_handle = $source->handle
   }
 
   my $new = {
     _source_handle => $source_handle,
     _column_data => $me,
-    _in_storage => 1
   };
   bless $new, (ref $class || $class);
 
-  my $schema;
   foreach my $pre (keys %{$prefetch||{}}) {
-    my $pre_val = $prefetch->{$pre};
-    my $pre_source = $source->related_source($pre);
-    $class->throw_exception("Can't prefetch non-existent relationship ${pre}")
-      unless $pre_source;
-    if (ref($pre_val->[0]) eq 'ARRAY') { # multi
-      my @pre_objects;
-      foreach my $pre_rec (@$pre_val) {
-        unless ($pre_source->primary_columns == grep { exists $pre_rec->[0]{$_}
-           and defined $pre_rec->[0]{$_} } $pre_source->primary_columns) {
-          next;
+
+    my $pre_source = $source->related_source($pre)
+      or $class->throw_exception("Can't prefetch non-existent relationship ${pre}");
+
+    my $accessor = $source->relationship_info($pre)->{attrs}{accessor}
+      or $class->throw_exception("No accessor for prefetched $pre");
+
+    my @pre_vals;
+    if (ref $prefetch->{$pre}[0] eq 'ARRAY') {
+      @pre_vals = @{$prefetch->{$pre}};
+    }
+    elsif ($accessor eq 'multi') {
+      $class->throw_exception("Implicit prefetch (via select/columns) not supported with accessor 'multi'");
+    }
+    else {
+      @pre_vals = $prefetch->{$pre};
+    }
+
+    my @pre_objects;
+    for my $me_pref (@pre_vals) {
+
+        # FIXME - this should not be necessary
+        # the collapser currently *could* return bogus elements with all
+        # columns set to undef
+        my $has_def;
+        for (values %{$me_pref->[0]}) {
+          if (defined $_) {
+            $has_def++;
+            last;
+          }
         }
-        push(@pre_objects, $pre_source->result_class->inflate_result(
-                             $pre_source, @{$pre_rec}));
-      }
-      $new->related_resultset($pre)->set_cache(\@pre_objects);
-    } elsif (defined $pre_val->[0]) {
-      my $fetched;
-      unless ($pre_source->primary_columns == grep { exists $pre_val->[0]{$_}
-         and !defined $pre_val->[0]{$_} } $pre_source->primary_columns)
-      {
-        $fetched = $pre_source->result_class->inflate_result(
-                      $pre_source, @{$pre_val});
-      }
-      $new->related_resultset($pre)->set_cache([ $fetched ]);
-      my $accessor = $source->relationship_info($pre)->{attrs}{accessor};
-      $class->throw_exception("No accessor for prefetched $pre")
-       unless defined $accessor;
-      if ($accessor eq 'single') {
-        $new->{_relationship_data}{$pre} = $fetched;
-      } elsif ($accessor eq 'filter') {
-        $new->{_inflated_column}{$pre} = $fetched;
-      } else {
-       $class->throw_exception("Prefetch not supported with accessor '$accessor'");
-      }
+        next unless $has_def;
+
+        push @pre_objects, $pre_source->result_class->inflate_result(
+          $pre_source, @$me_pref
+        );
     }
+
+    if ($accessor eq 'single') {
+      $new->{_relationship_data}{$pre} = $pre_objects[0];
+    }
+    elsif ($accessor eq 'filter') {
+      $new->{_inflated_column}{$pre} = $pre_objects[0];
+    }
+
+    $new->related_resultset($pre)->set_cache(\@pre_objects);
   }
+
+  $new->in_storage (1);
   return $new;
 }
 
@@ -1107,8 +1324,11 @@ sub register_column {
 =back
 
 Fetches a fresh copy of the Row object from the database and returns it.
-
-If passed the \%attrs argument, will first apply these attributes to
+Throws an exception if a proper WHERE clause identifying the database row
+can not be constructed (i.e. if the original object does not contain its
+entire
+ L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+). If passed the \%attrs argument, will first apply these attributes to
 the resultset used to find the row.
 
 This copy can then be used to compare to an existing row object, to
@@ -1127,14 +1347,61 @@ sub get_from_storage {
     my $self = shift @_;
     my $attrs = shift @_;
     my $resultset = $self->result_source->resultset;
-    
+
     if(defined $attrs) {
-       $resultset = $resultset->search(undef, $attrs);
+      $resultset = $resultset->search(undef, $attrs);
     }
-    
-    return $resultset->find($self->{_orig_ident} || $self->ident_condition);
+
+    my $ident_cond = $self->{_orig_ident} || $self->ident_condition;
+
+    $self->throw_exception('Unable to requery a row with incomplete or no identity')
+      if ! keys %$ident_cond;
+
+    return $resultset->find($ident_cond);
+}
+
+=head2 discard_changes ($attrs)
+
+Re-selects the row from the database, losing any changes that had
+been made. Throws an exception if a proper WHERE clause identifying
+the database row can not be constructed (i.e. if the original object
+does not contain its entire
+L<primary key|DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
+).
+
+This method can also be used to refresh from storage, retrieving any
+changes made since the row was last read from storage.
+
+$attrs is expected to be a hashref of attributes suitable for passing as the
+second argument to $resultset->search($cond, $attrs);
+
+=cut
+
+sub discard_changes {
+  my ($self, $attrs) = @_;
+  return unless $self->in_storage; # Don't reload if we aren't real!
+
+  # add a replication default to read from the master only
+  $attrs = { force_pool => 'master', %{$attrs||{}} };
+
+  if( my $current_storage = $self->get_from_storage($attrs)) {
+
+    # Set $self to the current.
+    %$self = %$current_storage;
+
+    # Avoid a possible infinite loop with
+    # sub DESTROY { $_[0]->discard_changes }
+    bless $current_storage, 'Do::Not::Exist';
+
+    return $self;
+  }
+  else {
+    $self->in_storage(0);
+    return $self;
+  }
 }
 
+
 =head2 throw_exception
 
 See L<DBIx::Class::Schema/throw_exception>.
@@ -1143,10 +1410,12 @@ See L<DBIx::Class::Schema/throw_exception>.
 
 sub throw_exception {
   my $self=shift;
+
   if (ref $self && ref $self->result_source && $self->result_source->schema) {
-    $self->result_source->schema->throw_exception(@_);
-  } else {
-    croak(@_);
+    $self->result_source->schema->throw_exception(@_)
+  }
+  else {
+    DBIx::Class::Exception->throw(@_);
   }
 }
 
@@ -1184,6 +1453,13 @@ This method can also be used to refresh from storage, retrieving any
 changes made since the row was last read from storage. Actually
 implemented in L<DBIx::Class::PK>
 
+Note: If you are using L<DBIx::Class::Storage::DBI::Replicated> as your
+storage, please kept in mind that if you L</discard_changes> on a row that you
+just updated or created, you should wrap the entire bit inside a transaction.
+Otherwise you run the risk that you insert or update to the master database
+but read from a replicant database that has not yet been updated from the
+master.  This will result in unexpected results.
+
 =cut
 
 1;