Add retrieve_on_insert column_info flag, to autoretrieve RDBMS-side defaults
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index cdaac30..1c7ea76 100644 (file)
@@ -1766,6 +1766,8 @@ sub _prefetch_autovalues {
         ! exists $to_insert->{$col}
           or
         ref $to_insert->{$col} eq 'SCALAR'
+          or
+        (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
       )
     ) {
       $values{$col} = $self->_sequence_fetch(
@@ -1785,33 +1787,43 @@ sub insert {
 
   my $prefetched_values = $self->_prefetch_autovalues($source, $to_insert);
 
-  # fuse the values
+  # fuse the values, but keep a separate list of prefetched_values so that
+  # they can be fused once again with the final return
   $to_insert = { %$to_insert, %$prefetched_values };
 
-  # list of primary keys we try to fetch from the database
-  # both not-exsists and scalarrefs are considered
-  my %fetch_pks;
-  for ($source->primary_columns) {
-    $fetch_pks{$_} = scalar keys %fetch_pks  # so we can preserve order for prettyness
-      if ! exists $to_insert->{$_} or ref $to_insert->{$_} eq 'SCALAR';
-  }
+  my $col_infos = $source->columns_info;
+  my %pcols = map { $_ => 1 } $source->primary_columns;
+  my %retrieve_cols;
+  for my $col ($source->columns) {
+    # nothing to retrieve when explicit values are supplied
+    next if (defined $to_insert->{$col} and ! (
+      ref $to_insert->{$col} eq 'SCALAR'
+        or
+      (ref $to_insert->{$col} eq 'REF' and ref ${$to_insert->{$col}} eq 'ARRAY')
+    ));
+
+    # the 'scalar keys' is a trick to preserve the ->columns declaration order
+    $retrieve_cols{$col} = scalar keys %retrieve_cols if (
+      $pcols{$col}
+        or
+      $col_infos->{$col}{retrieve_on_insert}
+    );
+  };
 
   my ($sqla_opts, @ir_container);
-  if ($self->_use_insert_returning) {
+  if (%retrieve_cols and $self->_use_insert_returning) {
+    $sqla_opts->{returning_container} = \@ir_container
+      if $self->_use_insert_returning_bound;
 
-    # retain order as declared in the resultsource
-    for (sort { $fetch_pks{$a} <=> $fetch_pks{$b} } keys %fetch_pks ) {
-      push @{$sqla_opts->{returning}}, $_;
-      $sqla_opts->{returning_container} = \@ir_container
-        if $self->_use_insert_returning_bound;
-    }
+    $sqla_opts->{returning} = [
+      sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols
+    ];
   }
 
   my ($rv, $sth) = $self->_execute('insert', $source, $to_insert, $sqla_opts);
 
-  my %returned_cols;
-
-  if (my $retlist = $sqla_opts->{returning}) {
+  my %returned_cols = %$to_insert;
+  if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
     @ir_container = try {
       local $SIG{__WARN__} = sub {};
       my @r = $sth->fetchrow_array;
@@ -1821,89 +1833,166 @@ sub insert {
 
     @returned_cols{@$retlist} = @ir_container if @ir_container;
   }
+  else {
+    # pull in PK if needed and then everything else
+    if (my @missing_pri = grep { $pcols{$_} } keys %retrieve_cols) {
 
-  return { %$prefetched_values, %returned_cols };
-}
+      $self->throw_exception( "Missing primary key but Storage doesn't support last_insert_id" )
+        unless $self->can('last_insert_id');
 
+      my @pri_values = $self->last_insert_id($source, @missing_pri);
 
-## Currently it is assumed that all values passed will be "normal", i.e. not
-## scalar refs, or at least, all the same type as the first set, the statement is
-## only prepped once.
-sub insert_bulk {
-  my ($self, $source, $cols, $data) = @_;
+      $self->throw_exception( "Can't get last insert id" )
+        unless (@pri_values == @missing_pri);
+
+      @returned_cols{@missing_pri} = @pri_values;
+      delete $retrieve_cols{$_} for @missing_pri;
+    }
+
+    # if there is more left to pull
+    if (%retrieve_cols) {
+      $self->throw_exception(
+        'Unable to retrieve additional columns without a Primary Key on ' . $source->source_name
+      ) unless %pcols;
+
+      my @left_to_fetch = sort { $retrieve_cols{$a} <=> $retrieve_cols{$b} } keys %retrieve_cols;
 
-  my %colvalues;
-  @colvalues{@$cols} = (0..$#$cols);
+      my $cur = DBIx::Class::ResultSet->new($source, {
+        where => { map { $_ => $returned_cols{$_} } (keys %pcols) },
+        select => \@left_to_fetch,
+      })->cursor;
 
-  for my $i (0..$#$cols) {
-    my $first_val = $data->[0][$i];
-    next unless ref $first_val eq 'SCALAR';
+      @returned_cols{@left_to_fetch} = $cur->next;
 
-    $colvalues{ $cols->[$i] } = $first_val;
+      $self->throw_exception('Duplicate row returned for PK-search after fresh insert')
+        if scalar $cur->next;
+    }
   }
 
-  # check for bad data and stringify stringifiable objects
-  my $bad_slice = sub {
-    my ($msg, $col_idx, $slice_idx) = @_;
-    $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
-      $msg,
-      $cols->[$col_idx],
-      do {
-        require Data::Dumper::Concise;
-        local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
-        Data::Dumper::Concise::Dumper ({
-          map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
-        }),
-      }
-    );
-  };
+  return { %$prefetched_values, %returned_cols };
+}
 
-  for my $datum_idx (0..$#$data) {
-    my $datum = $data->[$datum_idx];
+sub insert_bulk {
+  my ($self, $source, $cols, $data) = @_;
 
-    for my $col_idx (0..$#$cols) {
-      my $val            = $datum->[$col_idx];
-      my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
-      my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
+  # FIXME - perhaps this is not even needed? does DBI stringify?
+  #
+  # forcibly stringify whatever is stringifiable
+  for my $r (0 .. $#$data) {
+    for my $c (0 .. $#{$data->[$r]}) {
+      $data->[$r][$c] = "$data->[$r][$c]"
+        if ( ref $data->[$r][$c] and overload::Method($data->[$r][$c], '""') );
+    }
+  }
 
-      if ($is_literal_sql) {
-        if (not ref $val) {
-          $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
+  # check the data for consistency
+  # report a sensible error on bad data
+  #
+  # also create a list of dynamic binds (ones that will be changing
+  # for each row)
+  my $dyn_bind_idx;
+  for my $col_idx (0..$#$cols) {
+
+    # the first "row" is used as a point of reference
+    my $reference_val = $data->[0][$col_idx];
+    my $is_literal = ref $reference_val eq 'SCALAR';
+    my $is_literal_bind = ( !$is_literal and (
+      ref $reference_val eq 'REF'
+        and
+      ref $$reference_val eq 'ARRAY'
+    ) );
+
+    $dyn_bind_idx->{$col_idx} = 1
+      if (!$is_literal and !$is_literal_bind);
+
+    # use a closure for convenience (less to pass)
+    my $bad_slice = sub {
+      my ($msg, $slice_idx) = @_;
+      $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
+        $msg,
+        $cols->[$col_idx],
+        do {
+          require Data::Dumper::Concise;
+          local $Data::Dumper::Maxdepth = 2;
+          Data::Dumper::Concise::Dumper ({
+            map { $cols->[$_] =>
+              $data->[$slice_idx][$_]
+            } (0 .. $#$cols)
+          }),
         }
-        elsif ((my $reftype = ref $val) ne 'SCALAR') {
-          $bad_slice->("$reftype reference found where literal SQL expected",
-            $col_idx, $datum_idx);
+      );
+    };
+
+    for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
+      my $val = $data->[$row_idx][$col_idx];
+
+      if ($is_literal) {
+        if (ref $val ne 'SCALAR') {
+          $bad_slice->(
+            "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
+            $row_idx
+          );
+        }
+        elsif ($$val ne $$reference_val) {
+          $bad_slice->(
+            "Inconsistent literal SQL value (expecting \\'$$reference_val')",
+            $row_idx
+          );
         }
-        elsif ($$val ne $$sqla_bind){
-          $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
-            $col_idx, $datum_idx);
+      }
+      elsif ($is_literal_bind) {
+        if (ref $val ne 'REF' or ref $$val ne 'ARRAY') {
+          $bad_slice->(
+            "Incorrect value (expecting ARRAYREF-ref \\['${$reference_val}->[0]', ... ])",
+            $row_idx
+          );
+        }
+        elsif (${$val}->[0] ne ${$reference_val}->[0]) {
+          $bad_slice->(
+            "Inconsistent literal SQL-bind value (expecting \\['${$reference_val}->[0]', ... ])",
+            $row_idx
+          );
         }
       }
-      elsif (my $reftype = ref $val) {
-        require overload;
-        if (overload::Method($val, '""')) {
-          $datum->[$col_idx] = "".$val;
+      elsif (ref $val) {
+        if (ref $val eq 'SCALAR' or (ref $val eq 'REF' and ref $$val eq 'ARRAY') ) {
+          $bad_slice->("Literal SQL found where a plain bind value is expected", $row_idx);
         }
         else {
-          $bad_slice->("$reftype reference found where bind expected",
-            $col_idx, $datum_idx);
+          $bad_slice->("$val reference found where bind expected", $row_idx);
         }
       }
     }
   }
 
-  my ($sql, $bind) = $self->_prep_for_execute (
-    'insert', $source, [\%colvalues]
+  # Get the sql with bind values interpolated where necessary. For dynamic
+  # binds convert the values of the first row into a literal+bind combo, with
+  # extra positional info in the bind attr hashref. This will allow us to match
+  # the order properly, and is so contrived because a user-supplied literal
+  # bind (or something else specific to a resultsource and/or storage driver)
+  # can inject extra binds along the way, so one can't rely on "shift
+  # positions" ordering at all. Also we can't just hand SQLA a set of some
+  # known "values" (e.g. hashrefs that can be later matched up by address),
+  # because we want to supply a real value on which perhaps e.g. datatype
+  # checks will be performed
+  my ($sql, $proto_bind) = $self->_prep_for_execute (
+    'insert',
+    $source,
+    [ { map { $cols->[$_] => $dyn_bind_idx->{$_}
+      ? \[ '?', [
+          { dbic_colname => $cols->[$_], _bind_data_slice_idx => $_ }
+            =>
+          $data->[0][$_]
+        ] ]
+      : $data->[0][$_]
+    } (0..$#$cols) } ],
   );
 
-  if (! @$bind) {
-    # if the bindlist is empty - make sure all "values" are in fact
-    # literal scalarrefs. If not the case this means the storage ate
-    # them away (e.g. the NoBindVars component) and interpolated them
-    # directly into the SQL. This obviosly can't be good for multi-inserts
-
-    $self->throw_exception('Cannot insert_bulk without support for placeholders')
-      if first { ref $_ ne 'SCALAR' } values %colvalues;
+  if (! @$proto_bind and keys %$dyn_bind_idx) {
+    # if the bindlist is empty and we had some dynamic binds, this means the
+    # storage ate them away (e.g. the NoBindVars component) and interpolated
+    # them directly into the SQL. This obviosly can't be good for multi-inserts
+    $self->throw_exception('Cannot insert_bulk without support for placeholders');
   }
 
   # neither _execute_array, nor _execute_inserts_with_no_binds are
@@ -1911,12 +2000,13 @@ sub insert_bulk {
   # scope guard
   my $guard = $self->txn_scope_guard;
 
-  $self->_query_start( $sql, @$bind ? [[undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
   my $sth = $self->_sth($sql);
   my $rv = do {
-    if (@$bind) {
-      #@bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
-      $self->_execute_array( $source, $sth, $bind, $cols, $data );
+    if (@$proto_bind) {
+      # proto bind contains the information on which pieces of $data to pull
+      # $cols is passed in only for prettier error-reporting
+      $self->_execute_array( $source, $sth, $proto_bind, $cols, $data );
     }
     else {
       # bind_param_array doesn't work if there are no binds
@@ -1924,29 +2014,37 @@ sub insert_bulk {
     }
   };
 
-  $self->_query_end( $sql, @$bind ? [[ undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
 
   $guard->commit;
 
-  return (wantarray ? ($rv, $sth, @$bind) : $rv);
+  return (wantarray ? ($rv, $sth, @$proto_bind) : $rv);
 }
 
 sub _execute_array {
-  my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
+  my ($self, $source, $sth, $proto_bind, $cols, $data, @extra) = @_;
 
   ## This must be an arrayref, else nothing works!
   my $tuple_status = [];
 
-  # $bind contains colnames as keys and dbic-col-index as values
-  my $bind_attrs = $self->_dbi_attrs_for_bind($source, $bind);
+  my $bind_attrs = $self->_dbi_attrs_for_bind($source, $proto_bind);
 
   # Bind the values by column slices
-  for my $i (0 .. $#$bind) {
-    my $dbic_data_index = $bind->[$i][1];
+  for my $i (0 .. $#$proto_bind) {
+    my $data_slice_idx = (
+      ref $proto_bind->[$i][0] eq 'HASH'
+        and
+      exists $proto_bind->[$i][0]{_bind_data_slice_idx}
+    ) ? $proto_bind->[$i][0]{_bind_data_slice_idx} : undef;
 
     $sth->bind_param_array(
       $i+1, # DBI bind indexes are 1-based
-      [ map { $_->[$dbic_data_index] } @$data ],
+      defined $data_slice_idx
+        # either get a "column" of dynamic values, or just repeat the same
+        # bind over and over
+        ? [ map { $_->[$data_slice_idx] } @$data ]
+        : [ ($proto_bind->[$i][1]) x @$data ]
+      ,
       defined $bind_attrs->[$i] ? $bind_attrs->[$i] : (), # some DBDs throw up when given an undef
     );
   }
@@ -1983,7 +2081,7 @@ sub _execute_array {
       if ($i > $#$tuple_status);
 
     require Data::Dumper::Concise;
-    $self->throw_exception(sprintf "%s for populate slice:\n%s",
+    $self->throw_exception(sprintf "execute_array() aborted with '%s' at populate slice:\n%s",
       ($tuple_status->[$i][1] || $err),
       Data::Dumper::Concise::Dumper( { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) } ),
     );
@@ -1993,9 +2091,8 @@ sub _execute_array {
 }
 
 sub _dbh_execute_array {
-    my ($self, $sth, $tuple_status, @extra) = @_;
-
-    return $sth->execute_array({ArrayTupleStatus => $tuple_status});
+  #my ($self, $sth, $tuple_status, @extra) = @_;
+  return $_[1]->execute_array({ArrayTupleStatus => $_[2]});
 }
 
 sub _dbh_execute_inserts_with_no_binds {