Overhaul populate code - fix \[] support and exotic values (arrays, etc.)
Peter Rabbitson [Wed, 9 Mar 2011 10:21:36 +0000 (11:21 +0100)]
In addition cleanup populate() error messages a bit

Changes
lib/DBIx/Class/Storage/DBI.pm
t/100populate.t
t/72pg.t
t/sqlmaker/literal_with_bind.t [new file with mode: 0644]

diff --git a/Changes b/Changes
index 49a1b35..fe69450 100644 (file)
--- a/Changes
+++ b/Changes
@@ -44,6 +44,7 @@ Revision history for DBIx::Class
         - Change SQLMaker carp-monkeypatch to be compatible with versions
           of SQL::Abstract >= 1.73
         - Fix using \[] literals in the from resultset attribute
+        - Fix populate() with \[], arrays (datatype) and other exotic values
 
     * Misc
         - Rewire all warnings to a new Carp-like implementation internal
index cdaac30..a4eb7c7 100644 (file)
@@ -1826,84 +1826,127 @@ sub insert {
 }
 
 
-## Currently it is assumed that all values passed will be "normal", i.e. not
-## scalar refs, or at least, all the same type as the first set, the statement is
-## only prepped once.
 sub insert_bulk {
   my ($self, $source, $cols, $data) = @_;
 
-  my %colvalues;
-  @colvalues{@$cols} = (0..$#$cols);
-
-  for my $i (0..$#$cols) {
-    my $first_val = $data->[0][$i];
-    next unless ref $first_val eq 'SCALAR';
-
-    $colvalues{ $cols->[$i] } = $first_val;
+  # FIXME - perhaps this is not even needed? does DBI stringify?
+  #
+  # forcibly stringify whatever is stringifiable
+  for my $r (0 .. $#$data) {
+    for my $c (0 .. $#{$data->[$r]}) {
+      $data->[$r][$c] = "$data->[$r][$c]"
+        if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
+    }
   }
 
-  # check for bad data and stringify stringifiable objects
-  my $bad_slice = sub {
-    my ($msg, $col_idx, $slice_idx) = @_;
-    $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
-      $msg,
-      $cols->[$col_idx],
-      do {
-        require Data::Dumper::Concise;
-        local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
-        Data::Dumper::Concise::Dumper ({
-          map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
-        }),
-      }
-    );
-  };
-
-  for my $datum_idx (0..$#$data) {
-    my $datum = $data->[$datum_idx];
+  # check the data for consistency
+  # report a sensible error on bad data
+  #
+  # also create a list of dynamic binds (ones that will be changing
+  # for each row)
+  my $dyn_bind_idx;
+  for my $col_idx (0..$#$cols) {
+
+    # the first "row" is used as a point of reference
+    my $reference_val = $data->[0][$col_idx];
+    my $is_literal = ref $reference_val eq 'SCALAR';
+    my $is_literal_bind = ( !$is_literal and (
+      ref $reference_val eq 'REF'
+        and
+      ref $$reference_val eq 'ARRAY'
+    ) );
+
+    $dyn_bind_idx->{$col_idx} = 1
+      if (!$is_literal and !$is_literal_bind);
+
+    # use a closure for convenience (less to pass)
+    my $bad_slice = sub {
+      my ($msg, $slice_idx) = @_;
+      $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
+        $msg,
+        $cols->[$col_idx],
+        do {
+          require Data::Dumper::Concise;
+          local $Data::Dumper::Maxdepth = 2;
+          Data::Dumper::Concise::Dumper ({
+            map { $cols->[$_] =>
+              $data->[$slice_idx][$_]
+            } (0 .. $#$cols)
+          }),
+        }
+      );
+    };
 
-    for my $col_idx (0..$#$cols) {
-      my $val            = $datum->[$col_idx];
-      my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
-      my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
+    for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
+      my $val = $data->[$row_idx][$col_idx];
 
-      if ($is_literal_sql) {
-        if (not ref $val) {
-          $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
+      if ($is_literal) {
+        if (ref $val ne 'SCALAR') {
+          $bad_slice->(
+            "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
+            $row_idx
+          );
         }
-        elsif ((my $reftype = ref $val) ne 'SCALAR') {
-          $bad_slice->("$reftype reference found where literal SQL expected",
-            $col_idx, $datum_idx);
+        elsif ($$val ne $$reference_val) {
+          $bad_slice->(
+            "Inconsistent literal SQL value (expecting \\'$$reference_val')",
+            $row_idx
+          );
         }
-        elsif ($$val ne $$sqla_bind){
-          $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
-            $col_idx, $datum_idx);
+      }
+      elsif ($is_literal_bind) {
+        if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
+          $bad_slice->(
+            "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
+            $row_idx
+          );
+        }
+        elsif (${$val}->[0] ne ${$reference_val}->[0]) {
+          $bad_slice->(
+            "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
+            $row_idx
+          );
         }
       }
-      elsif (my $reftype = ref $val) {
-        require overload;
-        if (overload::Method($val, '""')) {
-          $datum->[$col_idx] = "".$val;
+      elsif (ref $val) {
+        if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
+          $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
         }
         else {
-          $bad_slice->("$reftype reference found where bind expected",
-            $col_idx, $datum_idx);
+          $bad_slice->("$val reference found where bind expected", $row_idx);
         }
       }
     }
   }
 
-  my ($sql, $bind) = $self->_prep_for_execute (
-    'insert', $source, [\%colvalues]
+  # Get the sql with bind values interpolated where necessary. For dynamic
+  # binds convert the values of the first row into a literal+bind combo, with
+  # extra positional info in the bind attr hashref. This will allow us to match
+  # the order properly, and is so contrived because a user-supplied literal
+  # bind (or something else specific to a resultsource and/or storage driver)
+  # can inject extra binds along the way, so one can't rely on "shift
+  # positions" ordering at all. Also we can't just hand SQLA a set of some
+  # known "values" (e.g. hashrefs that can be later matched up by address),
+  # because we want to supply a real value on which perhaps e.g. datatype
+  # checks will be performed
+  my ($sql, $proto_bind) = $self->_prep_for_execute (
+    'insert',
+    $source,
+    [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
+      ? \[ '?', [
+          { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
+            =>
+          $data->[0][$_]
+        ] ]
+      : $data->[0][$_]
+    } (0..$#$cols) } ],
   );
 
-  if (! @$bind) {
-    # if the bindlist is empty - make sure all "values" are in fact
-    # literal scalarrefs. If not the case this means the storage ate
-    # them away (e.g. the NoBindVars component) and interpolated them
-    # directly into the SQL. This obviosly can't be good for multi-inserts
-
-    $self->throw_exception('Cannot insert_bulk without support for placeholders')
-      if first { ref $_ ne 'SCALAR' } values %colvalues;
+  if (! @$proto_bind and keys %$dyn_bind_idx) {
+    # if the bindlist is empty and we had some dynamic binds, this means the
+    # storage ate them away (e.g. the NoBindVars component) and interpolated
+    # them directly into the SQL. This obviosly can't be good for multi-inserts
+    $self->throw_exception('Cannot insert_bulk without support for placeholders');
   }
 
   # neither _execute_array, nor _execute_inserts_with_no_binds are
@@ -1911,12 +1954,13 @@ sub insert_bulk {
   # scope guard
   my $guard = $self->txn_scope_guard;
 
-  $self->_query_start( $sql, @$bind ? [[undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
   my $sth = $self->_sth($sql);
   my $rv = do {
-    if (@$bind) {
-      #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
-      $self->_execute_array( $source, $sth, $bind, $cols, $data );
+    if (@$proto_bind) {
+      # proto bind contains the information on which pieces of $data to pull
+      # $cols is passed in only for prettier error-reporting
+      $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
     }
     else {
       # bind_param_array doesn't work if there are no binds
@@ -1924,29 +1968,37 @@ sub insert_bulk {
     }
   };
 
-  $self->_query_end( $sql, @$bind ? [[ undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
 
   $guard->commit;
 
-  return (wantarray ? ($rv, $sth, @$bind) : $rv);
+  return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
 }
 
 sub _execute_array {
-  my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
+  my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
 
   ## This must be an arrayref, else nothing works!
   my $tuple_status = [];
 
-  # $bind contains colnames as keys and dbic-col-index as values
-  my $bind_attrs = $self->_dbi_attrs_for_bind($source, $bind);
+  my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
 
   # Bind the values by column slices
-  for my $i (0 .. $#$bind) {
-    my $dbic_data_index = $bind->[$i][1];
+  for my $i (0 .. $#$proto_bind) {
+    my $data_slice_idx = (
+      ref $proto_bind->[$i][0] eq 'HASH'
+        and
+      exists $proto_bind->[$i][0]{_bind_data_slice_idx}
+    ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
 
     $sth->bind_param_array(
       $i+1, # DBI bind indexes are 1-based
-      [ map { $_->[$dbic_data_index] } @$data ],
+      defined $data_slice_idx
+        # either get a "column" of dynamic values, or just repeat the same
+        # bind over and over
+        ? [ map { $_->[$data_slice_idx] } @$data ]
+        : [ ($proto_bind->[$i][1]) x @$data ]
+      ,
       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
     );
   }
@@ -1983,7 +2035,7 @@ sub _execute_array {
       if ($i > $#$tuple_status);
 
     require Data::Dumper::Concise;
-    $self->throw_exception(sprintf "%s for populate slice:\n%s",
+    $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
       ($tuple_status->[$i][1] || $err),
       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
     );
@@ -1993,9 +2045,8 @@ sub _execute_array {
 }
 
 sub _dbh_execute_array {
-    my ($self, $sth, $tuple_status, @extra) = @_;
-
-    return $sth->execute_array({ArrayTupleStatus => $tuple_status});
+  #my ($self, $sth, $tuple_status, @extra) = @_;
+  return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
 }
 
 sub _dbh_execute_inserts_with_no_binds {
index 0bd49da..9588f4e 100644 (file)
@@ -45,7 +45,7 @@ throws_ok ( sub {
       }
     } ('Huey', 'Dewey', $ex_title, 'Louie')
   ])
-}, qr/columns .+ are not unique for populate slice.+$ex_title/ms, 'Readable exception thrown for failed populate');
+}, qr/\Qexecute_array() aborted with 'constraint failed\E.+ at populate slice.+$ex_title/ms, 'Readable exception thrown for failed populate');
 
 ## make sure populate honors fields/orders in list context
 ## schema order
@@ -171,7 +171,7 @@ throws_ok {
             name => 'foo3',
         },
     ]);
-} qr/slice/, 'bad slice';
+} qr/\Qexecute_array() aborted with 'datatype mismatch'/, 'bad slice';
 
 is($rs->count, 0, 'populate is atomic');
 
@@ -189,7 +189,7 @@ throws_ok {
       name => \"'foo'",
     }
   ]);
-} qr/bind expected/, 'literal sql where bind expected throws';
+} qr/Literal SQL found where a plain bind value is expected/, 'literal sql where bind expected throws';
 
 # ... and vice-versa.
 
@@ -204,7 +204,7 @@ throws_ok {
       name => \"'foo'",
     }
   ]);
-} qr/literal SQL expected/i, 'bind where literal sql expected throws';
+} qr/\QIncorrect value (expecting SCALAR-ref/, 'bind where literal sql expected throws';
 
 throws_ok {
   $rs->populate([
@@ -217,7 +217,7 @@ throws_ok {
       name => \"'bar'",
     }
   ]);
-} qr/inconsistent/, 'literal sql must be the same in all slices';
+} qr/Inconsistent literal SQL value/, 'literal sql must be the same in all slices';
 
 # the stringification has nothing to do with the artist name
 # this is solely for testing consistency
index 5a79709..1f7312b 100644 (file)
--- a/t/72pg.t
+++ b/t/72pg.t
@@ -227,6 +227,13 @@ for my $use_insert_returning ($test_server_supports_insert_returning
       arrayfield => [5, 6],
     });
 
+    lives_ok {
+      $schema->populate('ArrayTest', [
+        [ qw/arrayfield/ ],
+        [ [0,0]          ],
+      ]);
+    } 'inserting arrayref using void ctx populate';
+
     # Search using arrays
     lives_ok {
       is_deeply (
diff --git a/t/sqlmaker/literal_with_bind.t b/t/sqlmaker/literal_with_bind.t
new file mode 100644 (file)
index 0000000..1024a62
--- /dev/null
@@ -0,0 +1,60 @@
+use strict;
+use warnings;
+use Test::More;
+use Test::Exception;
+use lib qw(t/lib);
+use DBICTest;
+
+my $schema = DBICTest->init_schema(no_populate => 1);
+my $ars    = $schema->resultset('Artist');
+
+my $rank = \13;
+my $ref1 = \['?', [name => 'foo']];
+my $ref2 = \['?', [name => 'bar']];
+my $ref3 = \['?', [name => 'baz']];
+
+# do it twice, make sure the args are untouched
+for (1,2) {
+  $ars->delete;
+
+  lives_ok {
+    $ars->create({ artistid => 666, name => $ref1, rank => $rank });
+  } 'inserted row using literal sql';
+
+  ok (($ars->search({ name => 'foo' })->first),
+    'row was inserted');
+
+  lives_ok {
+    $ars->search({ name => { '=' => $ref1} })->update({ name => $ref2, rank => $rank });
+  } 'search/updated row using literal sql';
+
+  ok (($ars->search({ name => 'bar' })->first),
+    'row was updated');
+
+  lives_ok {
+    $ars->populate([{ artistid => 777, name => $ref3, rank => $rank  }]);
+  } 'populated row using literal sql';
+
+  ok (($ars->search({ name => 'baz' })->first),
+    'row was populated');
+}
+
+is_deeply(
+  $ref1,
+  \['?', [name => 'foo']],
+  'ref1 unchanged',
+);
+is_deeply(
+  $ref2,
+  \['?', [name => 'bar']],
+  'ref2 unchanged',
+);
+is_deeply(
+  $ref3,
+  \['?', [name => 'baz']],
+  'ref3 unchanged',
+);
+
+done_testing;
+
+# vim:sts=2 sw=2: