Merge 'sybase' into 'sybase_bulk_insert'
Rafael Kitover [Fri, 18 Sep 2009 07:33:14 +0000 (03:33 -0400)]
1  2 
lib/DBIx/Class/Storage/DBI.pm
lib/DBIx/Class/Storage/DBI/Sybase.pm
t/746sybase.t

@@@ -1339,11 -1339,15 +1339,15 @@@ sub insert_bulk 
    my %colvalues;
    my $table = $source->from;
    @colvalues{@$cols} = (0..$#$cols);
- # XXX some bulk APIs require column list in database order
-   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
+   my ($sql, $bind) = $self->_prep_for_execute (
+     'insert', undef, $source, [\%colvalues]
+   );
+   my @bind = @$bind
+     or croak 'Cannot insert_bulk without support for placeholders';
  
    $self->_query_start( $sql, @bind );
 -  my $sth = $self->sth($sql);
 +  my $sth = $self->sth($sql, 'insert', $sth_attr);
  
  #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
  
      $placeholder_index++;
    }
    my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
+   $sth->finish;
 -  if (my $err = $@) {
 +  if (my $err = $@ || $sth->errstr) {
      my $i = 0;
      ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
  
        ),
      );
    }
 -  $self->throw_exception($sth->errstr) if !$rv;
  
-   $sth->finish;
    $self->_query_end( $sql, @bind );
    return (wantarray ? ($rv, $sth, @bind) : $rv);
  }
@@@ -19,7 -17,10 +19,10 @@@ __PACKAGE__->mk_group_accessors('simple
         _identity_method/
  );
  
 -my @also_proxy_to_writer_storage = qw/
 +my @also_proxy_to_extra_storages = qw/
+   connect_call_set_auto_cast auto_cast connect_call_blob_setup
+   connect_call_datetime_setup
    disconnect _connect_info _sql_maker _sql_maker_opts disable_sth_caching
    auto_savepoint unsafe cursor_class debug debugobj schema
  /;
@@@ -124,28 -125,14 +127,29 @@@ sub _init 
  
    my $writer_storage = (ref $self)->new;
  
 -  $writer_storage->_is_writer_storage(1);
 +  $writer_storage->_is_extra_storage(1);
    $writer_storage->connect_info($self->connect_info);
+   $writer_storage->auto_cast($self->auto_cast);
  
    $self->_writer_storage($writer_storage);
 +
 +# create a bulk storage unless connect_info is a coderef
 +  return
 +    if (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE';
 +
 +  my $bulk_storage = (ref $self)->new;
 +
 +  $bulk_storage->_is_extra_storage(1);
 +  $bulk_storage->_is_bulk_storage(1); # for special ->disconnect acrobatics
 +  $bulk_storage->connect_info($self->connect_info);
 +
 +# this is why
 +  $bulk_storage->_dbi_connect_info->[0] .= ';bulkLogin=1';
 +  
 +  $self->_bulk_storage($bulk_storage);
  }
  
 -for my $method (@also_proxy_to_writer_storage) {
 +for my $method (@also_proxy_to_extra_storages) {
    no strict 'refs';
    no warnings 'redefine';
  
@@@ -426,13 -428,14 +469,14 @@@ sub update 
    return $wantarray ? @res : $res[0];
  }
  
 -### the insert_bulk stuff stolen from DBI/MSSQL.pm
 +### the insert_bulk partially stolen from DBI/MSSQL.pm
  
  sub _set_identity_insert {
-   my ($self, $table) = @_;
+   my ($self, $table, $op) = @_;
  
    my $sql = sprintf (
-     'SET IDENTITY_INSERT %s ON',
+     'SET IDENTITY_%s %s ON',
+     (uc($op) || 'INSERT'),
      $self->sql_maker->_quote ($table),
    );
  
@@@ -454,199 -464,44 +505,207 @@@ sub _unset_identity_insert 
      $self->sql_maker->_quote ($table),
    );
  
+   $self->_query_start($sql);
    my $dbh = $self->_get_dbh;
    $dbh->do ($sql);
+   $self->_query_end($sql);
  }
  
- ## XXX add blob support
+ # for tests
+ sub _can_insert_bulk { 1 }
 -# XXX this should use the DBD::Sybase bulk API, where possible
  sub insert_bulk {
    my $self = shift;
    my ($source, $cols, $data) = @_;
  
    my $is_identity_insert = (List::Util::first
 -      { $source->column_info ($_)->{is_auto_increment} }
 -      (@{$cols})
 -  )
 -     ? 1
 -     : 0;
 -
 -  if ($is_identity_insert) {
 -     $self->_set_identity_insert ($source->name);
 +    { $source->column_info ($_)->{is_auto_increment} } @{$cols}
 +  ) ? 1 : 0;
 +
 +  my @source_columns = $source->columns;
 +
 +  my $use_bulk_api =
 +    $self->_bulk_storage && 
 +    $self->_get_dbh->{syb_has_blk};
 +
 +  if ((not $use_bulk_api) &&
 +      (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE' &&
 +      (not $self->_bulk_disabled_due_to_coderef_connect_info_warned)) {
 +    carp <<'EOF';
 +Bulk API support disabled due to use of a CODEREF connect_info. Reverting to
 +array inserts.
 +EOF
 +    $self->_bulk_disabled_due_to_coderef_connect_info_warned(1);
    }
  
 -  $self->next::method(@_);
 +  if (not $use_bulk_api) {
 +    if ($is_identity_insert) {
 +       $self->_set_identity_insert ($source->name);
 +    }
 +
 +    $self->next::method(@_);
 +
 +    if ($is_identity_insert) {
 +       $self->_unset_identity_insert ($source->name);
 +    }
  
 -  if ($is_identity_insert) {
 -     $self->_unset_identity_insert ($source->name);
 +    return;
    }
 -}
  
 -### end of stolen insert_bulk section
 +# otherwise, use the bulk API
 +
 +# rearrange @$data so that columns are in database order
 +  my %orig_idx;
 +  @orig_idx{@$cols} = 0..$#$cols;
 +
 +  my %new_idx;
 +  @new_idx{@source_columns} = 0..$#source_columns;
 +
 +  my @new_data;
 +  for my $datum (@$data) {
 +    my $new_datum = [];
 +    for my $col (@source_columns) {
 +# identity data will be 'undef' if not $is_identity_insert
 +# columns with defaults will also be 'undef'
 +      $new_datum->[ $new_idx{$col} ] =
 +        exists $orig_idx{$col} ? $datum->[ $orig_idx{$col} ] : undef;
 +    }
 +    push @new_data, $new_datum;
 +  }
 +
 +  my $identity_col = List::Util::first
 +    { $source->column_info($_)->{is_auto_increment} } @source_columns;
 +
 +# bcp identity index is 1-based
 +  my $identity_idx = exists $new_idx{$identity_col} ?
 +    $new_idx{$identity_col} + 1 : 0;
 +
 +## Set a client-side conversion error handler, straight from DBD::Sybase docs.
 +# This ignores any data conversion errors detected by the client side libs, as
 +# they are usually harmless.
 +  my $orig_cslib_cb = DBD::Sybase::set_cslib_cb(
 +    Sub::Name::subname insert_bulk => sub {
 +      my ($layer, $origin, $severity, $errno, $errmsg, $osmsg, $blkmsg) = @_;
 +
 +      return 1 if $errno == 36;
 +
 +      carp 
 +        "Layer: $layer, Origin: $origin, Severity: $severity, Error: $errno" .
 +        ($errmsg ? "\n$errmsg" : '') .
 +        ($osmsg  ? "\n$osmsg"  : '')  .
 +        ($blkmsg ? "\n$blkmsg" : '');
 +      
 +      return 0;
 +  });
 +
 +  eval {
 +    my $bulk = $self->_bulk_storage;
 +
 +    my $guard = $bulk->txn_scope_guard;
 +
 +## XXX get this to work instead of our own $sth
 +## will require SQLA or *Hacks changes for ordered columns
 +#    $bulk->next::method($source, \@source_columns, \@new_data, {
 +#      syb_bcp_attribs => {
 +#        identity_flag   => $is_identity_insert,
 +#        identity_column => $identity_idx, 
 +#      }
 +#    });
 +    my $sql = 'INSERT INTO ' .
 +      $bulk->sql_maker->_quote($source->name) . ' (' .
 +# colname list is ignored for BCP, but does no harm
 +      (join ', ', map $bulk->sql_maker->_quote($_), @source_columns) . ') '.
 +      ' VALUES ('.  (join ', ', ('?') x @source_columns) . ')';
 +
 +## XXX there's a bug in the DBD::Sybase bulk support that makes $sth->finish for
 +## a prepare_cached statement ineffective. Replace with ->sth when fixed, or
 +## better yet the version above. Should be fixed in DBD::Sybase .
 +    my $sth = $bulk->_get_dbh->prepare($sql,
 +#      'insert', # op
 +      {
 +        syb_bcp_attribs => {
 +          identity_flag   => $is_identity_insert,
 +          identity_column => $identity_idx, 
 +        }
 +      }
 +    );
 +
 +    my $bind_attributes = $self->source_bind_attributes($source);
 +
 +    foreach my $slice_idx (0..$#source_columns) {
 +      my $col = $source_columns[$slice_idx];
 +
 +      my $attributes = $bind_attributes->{$col}
 +        if $bind_attributes && defined $bind_attributes->{$col};
 +
 +      my @slice = map $_->[$slice_idx], @new_data;
 +
 +      $sth->bind_param_array(($slice_idx + 1), \@slice, $attributes);
 +    }
 +
 +    $bulk->_query_start($sql);
 +
 +# this is stolen from DBI::insert_bulk
 +    my $tuple_status = [];
 +    my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
 +
 +    if (my $err = $@ || $sth->errstr) {
 +      my $i = 0;
 +      ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
 +
 +      $self->throw_exception("Unexpected populate error: $err")
 +        if ($i > $#$tuple_status);
 +
 +      require Data::Dumper;
 +      local $Data::Dumper::Terse = 1;
 +      local $Data::Dumper::Indent = 1;
 +      local $Data::Dumper::Useqq = 1;
 +      local $Data::Dumper::Quotekeys = 0;
 +      local $Data::Dumper::Sortkeys = 1;
 +
 +      $self->throw_exception(sprintf "%s for populate slice:\n%s",
 +        ($tuple_status->[$i][1] || $err),
 +        Data::Dumper::Dumper(
 +          { map { $source_columns[$_] => $new_data[$i][$_] } (0 .. $#$cols) }
 +        ),
 +      );
 +    }
 +
 +    $guard->commit;
 +    $sth->finish;
 +
 +    $bulk->_query_end($sql);
 +  };
 +  my $exception = $@;
 +  if ($exception =~ /-Y option/) {
 +    carp <<"EOF";
 +
 +Sybase bulk API operation failed due to character set incompatibility, reverting
 +to regular array inserts:
 +
 +*** Try unsetting the LANG environment variable.
 +
 +$@
 +EOF
 +    $self->_bulk_storage(undef);
 +    DBD::Sybase::set_cslib_cb($orig_cslib_cb);
 +    unshift @_, $self;
 +    goto \&insert_bulk;
 +  }
 +  elsif ($exception) {
 +    DBD::Sybase::set_cslib_cb($orig_cslib_cb);
 +# rollback makes the bulkLogin connection unusable
 +    $self->_bulk_storage->disconnect;
 +    $self->throw_exception($exception);
 +  }
 +
 +  DBD::Sybase::set_cslib_cb($orig_cslib_cb);
 +}
  
+ # Make sure blobs are not bound as placeholders, and return any non-empty ones
+ # as a hash.
  sub _remove_blob_cols {
    my ($self, $source, $fields) = @_;
  
diff --cc t/746sybase.t
@@@ -11,7 -12,7 +12,7 @@@ require DBIx::Class::Storage::DBI::Syba
  
  my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_SYBASE_${_}" } qw/DSN USER PASS/};
  
- my $TESTS = 55 + 2;
 -my $TESTS = 51 + 2;
++my $TESTS = 58 + 2;
  
  if (not ($dsn && $user)) {
    plan skip_all =>
      name => { -like => 'bulk artist %' }
    });
  
-   is $bulk_rs->count, 3, 'correct number inserted via insert_bulk';
 -# test insert_bulk using populate, this should always pass whether or not it
 -# does anything Sybase specific or not. Just here to aid debugging.
++# test insert_bulk using populate.
+   SKIP: {
+     skip 'insert_bulk not supported', 4
+       unless $schema->storage->_can_insert_bulk;
  
-   is ((grep $_->charfield eq 'foo', $bulk_rs->all), 3,
-     'column set correctly via insert_bulk');
+     lives_ok {
+       $schema->resultset('Artist')->populate([
+         {
+           name => 'bulk artist 1',
+           charfield => 'foo',
+         },
+         {
+           name => 'bulk artist 2',
+           charfield => 'foo',
+         },
+         {
+           name => 'bulk artist 3',
+           charfield => 'foo',
+         },
+       ]);
+     } 'insert_bulk via populate';
  
-   my %bulk_ids;
-   @bulk_ids{map $_->artistid, $bulk_rs->all} = ();
+     is $bulk_rs->count, 3, 'correct number inserted via insert_bulk';
  
-   is ((scalar keys %bulk_ids), 3,
-     'identities generated correctly in insert_bulk');
+     is ((grep $_->charfield eq 'foo', $bulk_rs->all), 3,
+       'column set correctly via insert_bulk');
  
-   $bulk_rs->delete;
+     my %bulk_ids;
+     @bulk_ids{map $_->artistid, $bulk_rs->all} = ();
+     is ((scalar keys %bulk_ids), 3,
+       'identities generated correctly in insert_bulk');
+     $bulk_rs->delete;
+   }
  
 +# make sure insert_bulk works a second time on the same connection
 +  lives_ok {
 +    $schema->resultset('Artist')->populate([
 +      {
 +        name => 'bulk artist 1',
 +        charfield => 'bar',
 +      },
 +      {
 +        name => 'bulk artist 2',
 +        charfield => 'bar',
 +      },
 +      {
 +        name => 'bulk artist 3',
 +        charfield => 'bar',
 +      },
 +    ]);
 +  } 'insert_bulk via populate called a second time';
 +
 +  is $bulk_rs->count, 3,
 +    'correct number inserted via insert_bulk';
 +
 +  is ((grep $_->charfield eq 'bar', $bulk_rs->all), 3,
 +    'column set correctly via insert_bulk');
 +
 +  $bulk_rs->delete;
 +
 +# test invalid insert_bulk (missing required column)
 +#
 +# There should be a rollback, reconnect and the next valid insert_bulk should
 +# succeed.
 +  throws_ok {
 +    $schema->resultset('Artist')->populate([
 +      {
 +        charfield => 'foo',
 +      }
 +    ]);
 +  } qr/no value or default|does not allow null/i,
 +# The second pattern is the error from fallback to regular array insert on
 +# incompatible charset.
 +  'insert_bulk with missing required column throws error';
 +
  # now test insert_bulk with IDENTITY_INSERT
-   lives_ok {
-     $schema->resultset('Artist')->populate([
-       {
-         artistid => 2001,
-         name => 'bulk artist 1',
-         charfield => 'foo',
-       },
-       {
-         artistid => 2002,
-         name => 'bulk artist 2',
-         charfield => 'foo',
-       },
-       {
-         artistid => 2003,
-         name => 'bulk artist 3',
-         charfield => 'foo',
-       },
-     ]);
-   } 'insert_bulk with IDENTITY_INSERT via populate';
+   SKIP: {
+     skip 'insert_bulk not supported', 3
+       unless $schema->storage->_can_insert_bulk;
  
-   is $bulk_rs->count, 3,
-     'correct number inserted via insert_bulk with IDENTITY_INSERT';
+     lives_ok {
+       $schema->resultset('Artist')->populate([
+         {
+           artistid => 2001,
+           name => 'bulk artist 1',
+           charfield => 'foo',
+         },
+         {
+           artistid => 2002,
+           name => 'bulk artist 2',
+           charfield => 'foo',
+         },
+         {
+           artistid => 2003,
+           name => 'bulk artist 3',
+           charfield => 'foo',
+         },
+       ]);
+     } 'insert_bulk with IDENTITY_INSERT via populate';
  
-   is ((grep $_->charfield eq 'foo', $bulk_rs->all), 3,
-     'column set correctly via insert_bulk with IDENTITY_INSERT');
+     is $bulk_rs->count, 3,
+       'correct number inserted via insert_bulk with IDENTITY_INSERT';
  
-   $bulk_rs->delete;
+     is ((grep $_->charfield eq 'foo', $bulk_rs->all), 3,
+       'column set correctly via insert_bulk with IDENTITY_INSERT');
+     $bulk_rs->delete;
+   }
  
  # test correlated subquery
    my $subq = $schema->resultset('Artist')->search({ artistid => { '>' => 3 } })
  
  # mostly stolen from the blob stuff Nniuq wrote for t/73oracle.t
    SKIP: {
-     skip 'TEXT/IMAGE support does not work with FreeTDS', 16
 -    skip 'TEXT/IMAGE support does not work with FreeTDS', 15
++    skip 'TEXT/IMAGE support does not work with FreeTDS', 18
        if $schema->storage->using_freetds;
  
      my $dbh = $schema->storage->_dbh;
        $schema = get_schema();
      }
  
 -    eval { $rs->search({ id => 1 })->update({ blob => $new_str }) };
 -    ok !$@, 'updated blob successfully';
 -    diag $@ if $@;
 -    $got = eval {
 -      $rs->find(1)->blob
 -    };
 -    diag $@ if $@;
 -    ok($got eq $new_str, "verified updated blob");
 +    lives_ok {
 +      $rs->search({ id => 1 })->update({ blob => $new_str })
 +    } 'updated blob successfully';
 +
 +    lives_and {
 +      ok($rs->find(1)->blob eq $new_str)
 +    } 'verified updated blob';
  
+     # try a blob update with IDENTITY_UPDATE
+     lives_and {
+       $new_str = $binstr{large} . 'hlagh';
+       $rs->find(1)->update({ id => 999, blob => $new_str });
+       ok($rs->find(999)->blob eq $new_str);
+     } 'verified updated blob with IDENTITY_UPDATE';
      ## try multi-row blob update
      # first insert some blobs
 -    $rs->delete;
 -    $rs->create({ blob => $binstr{large} }) for (1..3);
      $new_str = $binstr{large} . 'foo';
 -    $rs->update({ blob => $new_str });
 -    is((grep $_->blob eq $new_str, $rs->all), 3, 'multi-row blob update');
 +    lives_and {
 +      $rs->delete;
 +      $rs->create({ blob => $binstr{large} }) for (1..2);
 +      $rs->update({ blob => $new_str });
 +      is((grep $_->blob eq $new_str, $rs->all), 2);
 +    } 'multi-row blob update';
 +
 +    $rs->delete;
 +
 +    # now try insert_bulk with blobs
 +    $new_str = $binstr{large} . 'bar';
 +    lives_ok {
 +      $rs->populate([
 +        {
 +          bytea => 1,
 +          blob => $binstr{large},
 +          clob => $new_str,
 +        },
 +        {
 +          bytea => 1,
 +          blob => $binstr{large},
 +          clob => $new_str,
 +        },
 +      ]);
 +    } 'insert_bulk with blobs does not die';
 +
 +    is((grep $_->blob eq $binstr{large}, $rs->all), 2,
 +      'IMAGE column set correctly via insert_bulk');
 +
 +    is((grep $_->clob eq $new_str, $rs->all), 2,
 +      'TEXT column set correctly via insert_bulk');
+     # make sure impossible blob update throws
+     throws_ok {
+       $rs->update({ clob => 'foo' });
+       $rs->create({ clob => 'bar' });
+       $rs->search({ clob => 'foo' })->update({ clob => 'bar' });
+     } qr/impossible/, 'impossible blob update throws';
    }
  
  # test MONEY column support