WriteLOBs storage component and 2 implementations
Rafael Kitover [Mon, 30 Jan 2012 11:08:54 +0000 (12:08 +0100)]
Factor out the LOB Writing code through an API from ::Sybase::ASE into a
generic component, ::Storage::DBI::WriteLOBs which requires the
composing class to implement just two methods, _write_lobs and _empty_lob
and implements insert, update and insert_bulk as well as a bunch of low
level methods (which are private.) These methods are documented in the
component as top comments.

::DBI::Oracle::Generic and ::Sybase::ASE implement this component.

The implementation is stackable, so that a composing class override of
LOB handling using the low level methods will not conflict with the
inherited methods, however for performance reasons a flag has been
added:

    local $self->{_skip_writelobs_impl} = 1;

to shortcircuit the implementation even faster. This flag is not
documented and will be replaced using the capability system in the
future, it is used in the ASE implementation of insert_bulk because of
the special identities handling it requires.

This refactor addresses a number of issues:

 - rows can now be identified by unique constraints, not just PKs for
   LOB ops

 - ASE UPDATEs with LIKE queries on TEXT columns in the WHERE condition
   now work

 - LOB ops now work in Oracle with quoting turned on

 - LOB ops work in Oracle with DBD::Oracle 1.23

 - insert_bulk now works with LOBs for Oracle, as long as the slices can
   be uniquely identified

Changes
lib/DBIx/Class/Storage/DBI.pm
lib/DBIx/Class/Storage/DBI/Oracle/Generic.pm
lib/DBIx/Class/Storage/DBI/Sybase/ASE.pm
lib/DBIx/Class/Storage/DBI/WriteLOBs.pm [new file with mode: 0644]
t/73oracle_blob.t
t/746sybase.t

diff --git a/Changes b/Changes
index 2fef676..44e40d4 100644 (file)
--- a/Changes
+++ b/Changes
@@ -1,5 +1,14 @@
 Revision history for DBIx::Class
 
+        - Loosen primary key requirement restriction on TEXT/IMAGE operations
+          for Sybase ASE to unique constraints
+        - Support TEXT/IMAGE UPDATE operations with TEXT queries in WHERE for
+          Sybase ASE
+        - Support BLOB/CLOB operations for Oracle with quoting enabled (by
+          switching to ora_auto_lob => 0)
+        - Support BLOB/CLOB operations for Oracle on DBD::Oracle version 1.23
+        - Support insert_bulk with BLOBs/CLOBs for Oracle
+
 0.08200 2012-08-24 (UTC)
     * Fixes
         - Change one of the new tests for the previous release to not require
index ac84176..e2ddd5b 100644 (file)
@@ -23,9 +23,11 @@ use namespace::clean;
 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
 
 __PACKAGE__->mk_group_accessors('inherited' => qw/
-  sql_limit_dialect sql_quote_char sql_name_sep
+  sql_limit_dialect sql_quote_char sql_name_sep _prepare_attributes
 /);
 
+__PACKAGE__->_prepare_attributes({}); # see _dbh_sth
+
 __PACKAGE__->mk_group_accessors('component_class' => qw/sql_maker_class datetime_parser_type/);
 
 __PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker');
@@ -2304,8 +2306,8 @@ sub _dbh_sth {
 
   # 3 is the if_active parameter which avoids active sth re-use
   my $sth = $self->disable_sth_caching
-    ? $dbh->prepare($sql)
-    : $dbh->prepare_cached($sql, {}, 3);
+    ? $dbh->prepare($sql, $self->_prepare_attributes)
+    : $dbh->prepare_cached($sql, $self->_prepare_attributes, 3);
 
   # XXX You would think RaiseError would make this impossible,
   #  but apparently that's not true :(
index c107934..ac2895d 100644 (file)
@@ -2,7 +2,7 @@ package DBIx::Class::Storage::DBI::Oracle::Generic;
 
 use strict;
 use warnings;
-use base qw/DBIx::Class::Storage::DBI/;
+use base qw/DBIx::Class::Storage::DBI::WriteLOBs/;
 use mro 'c3';
 use DBIx::Class::Carp;
 use Scope::Guard ();
@@ -456,6 +456,37 @@ sub bind_attribute_by_data_type {
   }
 }
 
+sub _empty_lob {
+  my ($self, $source, $col) = @_;
+
+  return $self->_is_text_lob_type($source->column_info($col)->{data_type}) ?
+    \'EMPTY_CLOB()' : \'EMPTY_BLOB()';
+}
+
+sub _write_lobs {
+  my ($self, $source, $lobs, $where) = @_;
+
+  my @lobs = keys %$lobs;
+
+  local $self->_prepare_attributes->{ora_auto_lob} = 0;
+
+  my $cursor = $self->select($source, \@lobs, $where, { for => 'update' });
+
+  my $dbh = $self->_get_dbh;
+
+  while (my @locators = $cursor->next) {
+    my %lobs;
+    @lobs{@lobs} = @locators;
+
+    foreach my $lob (@lobs) {
+      my $data = \$lobs->{$lob};
+
+      $dbh->ora_lob_trim($lobs{$lob}, 0);
+      $dbh->ora_lob_write($lobs{$lob}, 1, $$data);
+    }
+  }
+}
+
 # Handle blob columns in WHERE.
 #
 # For equality comparisons:
index 8d1419f..17c5ad6 100644 (file)
@@ -6,16 +6,15 @@ use warnings;
 use base qw/
   DBIx::Class::Storage::DBI::Sybase
   DBIx::Class::Storage::DBI::AutoCast
+  DBIx::Class::Storage::DBI::WriteLOBs
   DBIx::Class::Storage::DBI::IdentityInsert
 /;
 use mro 'c3';
 use DBIx::Class::Carp;
-use Scalar::Util qw/blessed weaken/;
+use Scalar::Util qw/weaken/;
 use List::Util 'first';
 use Sub::Name();
-use Data::Dumper::Concise 'Dumper';
 use Try::Tiny;
-use Context::Preserve 'preserve_context';
 use namespace::clean;
 
 __PACKAGE__->sql_limit_dialect ('RowCountOrGenericSubQ');
@@ -247,12 +246,6 @@ sub connect_call_blob_setup {
     if exists $args{log_on_update};
 }
 
-sub _is_lob_column {
-  my ($self, $source, $column) = @_;
-
-  return $self->_is_lob_type($source->column_info($column)->{data_type});
-}
-
 sub _prep_for_execute {
   my $self = shift;
   my ($op, $ident) = @_;
@@ -346,8 +339,7 @@ sub insert {
       keys %$columns_info )
     || '';
 
-  # FIXME - this is duplication from DBI.pm. When refactored towards
-  # the LobWriter this can be folded back where it belongs.
+  # FIXME - this is duplication from DBI.pm
   local $self->{_autoinc_supplied_for_op} = exists $to_insert->{$identity_col}
     ? 1
     : 0
@@ -363,7 +355,7 @@ sub insert {
   # try to insert explicit 'DEFAULT's instead (except for identity, timestamp
   # and computed columns)
   if (not %$to_insert) {
-    for my $col ($source->columns) {
+    foreach my $col ($source->columns) {
       next if $col eq $identity_col;
 
       my $info = $source->column_info($col);
@@ -377,8 +369,6 @@ sub insert {
     }
   }
 
-  my $blob_cols = $self->_remove_blob_cols($source, $to_insert);
-
   # do we need the horrific SELECT MAX(COL) hack?
   my $need_dumb_last_insert_id = (
     $self->_perform_autoinc_retrieval
@@ -386,25 +376,19 @@ sub insert {
     ($self->_identity_method||'') ne '@@IDENTITY'
   );
 
-  my $next = $self->next::can;
-
-  # we are already in a transaction, or there are no blobs
+  # we are already in a transaction, or there are no lobs
   # and we don't need the PK - just (try to) do it
   if ($self->{transaction_depth}
-        || (!$blob_cols && !$need_dumb_last_insert_id)
+|| (!$self->_have_lob_fields($source, $to_insert) && !$need_dumb_last_insert_id)
   ) {
-    return $self->_insert (
-      $next, $source, $to_insert, $blob_cols, $identity_col
-    );
+    return $self->next::method(@_);
   }
 
   # otherwise use the _writer_storage to do the insert+transaction on another
   # connection
   my $guard = $self->_writer_storage->txn_scope_guard;
 
-  my $updated_cols = $self->_writer_storage->_insert (
-    $next, $source, $to_insert, $blob_cols, $identity_col
-  );
+  my $updated_cols = $self->_writer_storage->next::method(@_);
 
   $self->_identity($self->_writer_storage->_identity);
 
@@ -413,80 +397,16 @@ sub insert {
   return $updated_cols;
 }
 
-sub _insert {
-  my ($self, $next, $source, $to_insert, $blob_cols, $identity_col) = @_;
-
-  my $updated_cols = $self->$next ($source, $to_insert);
-
-  my $final_row = {
-    ($identity_col ?
-      ($identity_col => $self->last_insert_id($source, $identity_col)) : ()),
-    %$to_insert,
-    %$updated_cols,
-  };
-
-  $self->_insert_blobs ($source, $blob_cols, $final_row) if $blob_cols;
-
-  return $updated_cols;
-}
-
 sub update {
   my $self = shift;
   my ($source, $fields, $where, @rest) = @_;
 
-  #
-  # When *updating* identities, ASE requires SET IDENTITY_UPDATE called
-  #
-  if (my $blob_cols = $self->_remove_blob_cols($source, $fields)) {
-
-    # If there are any blobs in $where, Sybase will return a descriptive error
-    # message.
-    # XXX blobs can still be used with a LIKE query, and this should be handled.
+  my $columns_info = $source->columns_info([keys %$fields]);
 
-    # update+blob update(s) done atomically on separate connection
-    $self = $self->_writer_storage;
+  local $self->{_autoinc_supplied_for_op} = 1
+    if first { $columns_info->{$_}{is_auto_increment} } keys %$columns_info;
 
-    my $guard = $self->txn_scope_guard;
-
-    # First update the blob columns to be updated to '' (taken from $fields, where
-    # it is originally put by _remove_blob_cols .)
-    my %blobs_to_empty = map { ($_ => delete $fields->{$_}) } keys %$blob_cols;
-
-    # We can't only update NULL blobs, because blobs cannot be in the WHERE clause.
-    $self->next::method($source, \%blobs_to_empty, $where, @rest);
-
-    # Now update the blobs before the other columns in case the update of other
-    # columns makes the search condition invalid.
-    my $rv = $self->_update_blobs($source, $blob_cols, $where);
-
-    if (keys %$fields) {
-
-      # Now set the identity update flags for the actual update
-      local $self->{_autoinc_supplied_for_op} = (first
-        { $_->{is_auto_increment} }
-        values %{ $source->columns_info([ keys %$fields ]) }
-      ) ? 1 : 0;
-
-      my $next = $self->next::can;
-      my $args = \@_;
-      return preserve_context {
-        $self->$next(@$args);
-      } after => sub { $guard->commit };
-    }
-    else {
-      $guard->commit;
-      return $rv;
-    }
-  }
-  else {
-    # Set the identity update flags for the actual update
-    local $self->{_autoinc_supplied_for_op} = (first
-      { $_->{is_auto_increment} }
-      values %{ $source->columns_info([ keys %$fields ]) }
-    ) ? 1 : 0;
-
-    return $self->next::method(@_);
-  }
+  return $self->next::method(@_);
 }
 
 sub insert_bulk {
@@ -519,20 +439,22 @@ sub insert_bulk {
   }
 
   if (not $use_bulk_api) {
-    my $blob_cols = $self->_remove_blob_cols_array($source, $cols, $data);
+    my $lobs = $self->_replace_lob_fields_array($source, $cols, $data);
 
 # next::method uses a txn anyway, but it ends too early in case we need to
-# select max(col) to get the identity for inserting blobs.
+# select max(col) to get the identity for inserting lobs.
     ($self, my $guard) = $self->{transaction_depth} == 0 ?
       ($self->_writer_storage, $self->_writer_storage->txn_scope_guard)
       :
       ($self, undef);
 
+    local $self->{_skip_writelobs_impl} = 1;
+
     $self->next::method(@_);
 
-    if ($blob_cols) {
-      if ($self->_autoinc_supplied_for_op) {
-        $self->_insert_blobs_array ($source, $blob_cols, $cols, $data);
+    if ($lobs) {
+      if ($self->_autoinc_supplied_for_op || (not defined $identity_col)) {
+        $self->_write_lobs_array($source, $lobs, $cols, $data);
       }
       else {
         my @cols_with_identities = (@$cols, $identity_col);
@@ -548,8 +470,8 @@ sub insert_bulk {
 
         my @data_with_identities = map [@$_, shift @identities], @$data;
 
-        $self->_insert_blobs_array (
-          $source, $blob_cols, \@cols_with_identities, \@data_with_identities
+        $self->_write_lobs_array(
+          $source, $lobs, \@cols_with_identities, \@data_with_identities
         );
       }
     }
@@ -624,8 +546,11 @@ sub insert_bulk {
 #        identity_column => $identity_idx,
 #      }
 #    });
+
+    my $table_name = $source->name;
+
     my $sql = 'INSERT INTO ' .
-      $bulk->sql_maker->_quote($source->name) . ' (' .
+      $bulk->sql_maker->_quote($table_name) . ' (' .
 # colname list is ignored for BCP, but does no harm
       (join ', ', map $bulk->sql_maker->_quote($_), @source_columns) . ') '.
       ' VALUES ('.  (join ', ', ('?') x @source_columns) . ')';
@@ -682,119 +607,34 @@ sub insert_bulk {
   }
 }
 
-# Make sure blobs are not bound as placeholders, and return any non-empty ones
-# as a hash.
-sub _remove_blob_cols {
-  my ($self, $source, $fields) = @_;
+# Override from WriteLOBs for NULL uniqueness (in ASE null values in UCs are
+# unique.)
+sub _identifying_column_set {
+  my ($self, $source, $cols) = @_;
 
-  my %blob_cols;
+  my $colinfos = ref $cols eq 'HASH' ? $cols : $source->columns_info($cols||());
 
-  for my $col (keys %$fields) {
-    if ($self->_is_lob_column($source, $col)) {
-      my $blob_val = delete $fields->{$col};
-      if (not defined $blob_val) {
-        $fields->{$col} = \'NULL';
-      }
-      else {
-        $fields->{$col} = \"''";
-        $blob_cols{$col} = $blob_val unless $blob_val eq '';
-      }
-    }
-  }
+  local $colinfos->{$_}{is_nullable} = 0 for keys %$colinfos;
 
-  return %blob_cols ? \%blob_cols : undef;
+  return $source->_identifying_column_set($colinfos);
 }
 
-# same for insert_bulk
-sub _remove_blob_cols_array {
-  my ($self, $source, $cols, $data) = @_;
+sub _empty_lob { \"''" }
 
-  my @blob_cols;
+sub _open_cursors_while_writing_lobs_allowed { 0 }
 
-  for my $i (0..$#$cols) {
-    my $col = $cols->[$i];
+sub _write_lobs {
+  my ($self, $source, $lobs, $where) = @_;
 
-    if ($self->_is_lob_column($source, $col)) {
-      for my $j (0..$#$data) {
-        my $blob_val = delete $data->[$j][$i];
-        if (not defined $blob_val) {
-          $data->[$j][$i] = \'NULL';
-        }
-        else {
-          $data->[$j][$i] = \"''";
-          $blob_cols[$j][$i] = $blob_val
-            unless $blob_val eq '';
-        }
-      }
-    }
-  }
-
-  return @blob_cols ? \@blob_cols : undef;
-}
-
-sub _update_blobs {
-  my ($self, $source, $blob_cols, $where) = @_;
-
-  my @primary_cols = try
-    { $source->_pri_cols }
-    catch {
-      $self->throw_exception("Cannot update TEXT/IMAGE column(s): $_")
-    };
-
-  my @pks_to_update;
-  if (
-    ref $where eq 'HASH'
-      and
-    @primary_cols == grep { defined $where->{$_} } @primary_cols
-  ) {
-    my %row_to_update;
-    @row_to_update{@primary_cols} = @{$where}{@primary_cols};
-    @pks_to_update = \%row_to_update;
-  }
-  else {
-    my $cursor = $self->select ($source, \@primary_cols, $where, {});
-    @pks_to_update = map {
-      my %row; @row{@primary_cols} = @$_; \%row
-    } $cursor->all;
-  }
-
-  for my $ident (@pks_to_update) {
-    $self->_insert_blobs($source, $blob_cols, $ident);
-  }
-}
-
-sub _insert_blobs {
-  my ($self, $source, $blob_cols, $row) = @_;
   my $dbh = $self->_get_dbh;
 
-  my $table = $source->name;
+  foreach my $col (keys %$lobs) {
+    my $lob = $lobs->{$col};
 
-  my %row = %$row;
-  my @primary_cols = try
-    { $source->_pri_cols }
-    catch {
-      $self->throw_exception("Cannot update TEXT/IMAGE column(s): $_")
-    };
-
-  $self->throw_exception('Cannot update TEXT/IMAGE column(s) without primary key values')
-    if ((grep { defined $row{$_} } @primary_cols) != @primary_cols);
-
-  for my $col (keys %$blob_cols) {
-    my $blob = $blob_cols->{$col};
-
-    my %where = map { ($_, $row{$_}) } @primary_cols;
-
-    my $cursor = $self->select ($source, [$col], \%where, {});
+    my $cursor = $self->select($source, [$col], $where, {});
     $cursor->next;
     my $sth = $cursor->sth;
 
-    if (not $sth) {
-      $self->throw_exception(
-          "Could not find row in table '$table' for blob update:\n"
-        . (Dumper \%where)
-      );
-    }
-
     try {
       do {
         $sth->func('CS_GET', 1, 'ct_data_info') or die $sth->errstr;
@@ -806,11 +646,11 @@ sub _insert_blobs {
       $log_on_update    = 1 if not defined $log_on_update;
 
       $sth->func('CS_SET', 1, {
-        total_txtlen => length($blob),
+        total_txtlen => length($lob),
         log_on_update => $log_on_update
       }, 'ct_data_info') or die $sth->errstr;
 
-      $sth->func($blob, length($blob), 'ct_send_data') or die $sth->errstr;
+      $sth->func($lob, length($lob), 'ct_send_data') or die $sth->errstr;
 
       $sth->func('ct_finish_send') or die $sth->errstr;
     }
@@ -830,26 +670,6 @@ sub _insert_blobs {
   }
 }
 
-sub _insert_blobs_array {
-  my ($self, $source, $blob_cols, $cols, $data) = @_;
-
-  for my $i (0..$#$data) {
-    my $datum = $data->[$i];
-
-    my %row;
-    @row{ @$cols } = @$datum;
-
-    my %blob_vals;
-    for my $j (0..$#$cols) {
-      if (exists $blob_cols->[$i][$j]) {
-        $blob_vals{ $cols->[$j] } = $blob_cols->[$i][$j];
-      }
-    }
-
-    $self->_insert_blobs ($source, \%blob_vals, \%row);
-  }
-}
-
 =head2 connect_call_datetime_setup
 
 Used as:
@@ -1176,10 +996,6 @@ Real limits and limited counts using stored procedures deployed on startup.
 
 =item *
 
-Blob update with a LIKE query on a blob, without invalidating the WHERE condition.
-
-=item *
-
 bulk_insert using prepare_cached (see comments.)
 
 =back
diff --git a/lib/DBIx/Class/Storage/DBI/WriteLOBs.pm b/lib/DBIx/Class/Storage/DBI/WriteLOBs.pm
new file mode 100644 (file)
index 0000000..5d77601
--- /dev/null
@@ -0,0 +1,390 @@
+package DBIx::Class::Storage::DBI::WriteLOBs;
+
+use strict;
+use warnings;
+use base 'DBIx::Class::Storage::DBI';
+use mro 'c3';
+use Data::Dumper::Concise 'Dumper';
+use Try::Tiny;
+use namespace::clean;
+
+=head1 NAME
+
+DBIx::Class::Storage::DBI::WriteLOBs - Storage component for RDBMS drivers that
+need to use a special API for writing LOBs
+
+=head1 DESCRIPTION
+
+This is a storage component for database drivers that need to use an API outside
+of the normal L<DBI> APIs for writing LOB values. This component implements
+C<insert>, C<update> and C<insert_bulk>.
+
+=cut
+
+# REQUIRED METHODS
+#
+# The following methods must be implemented by the composing class:
+#
+# _write_lobs
+#
+# Arguments: $source, \%lobs, \%where
+#
+# Writes %lobs which is a column-value hash to the row pointed to by %where using
+# the driver's DBD API. It is expected that the columns are already LOBs, the
+# method is expected to truncate them before writing to them.
+#
+# _empty_lob
+#
+# Arguments: $source, $col
+#
+# Return Value: \"literal SQL"
+#
+# Returns the field to bind in the insert/update query in place of the LOB, for
+# example C<\"''"> or C<\'EMPTY_BLOB()'>.
+#
+# _open_cursors_while_writing_lobs_allowed
+#
+# Arguments: NONE
+#
+# Optional. Should return 0 if the database does not allow having open cursors
+# while writing LOBs. If not defined, it is assumed to be true.
+#
+# PROVIDED METHODS
+#
+# Private methods for your own implementations of the DML operations. If you
+# implement them yourself, you may also set:
+#
+#   local $self->{_skip_writelobs_impl} = 1;
+#
+# to shortcircuit the inherited ones for a minor speedup.
+
+sub _is_lob_column {
+  my ($self, $source, $column) = @_;
+
+  return $self->_is_lob_type($source->column_info($column)->{data_type});
+}
+
+# _have_lob_fields
+#
+# Arguments: $source, \%fields
+#
+# Return Value: $yes_no
+#
+# Returns true if any of %fields are non-empty LOBs.
+
+sub _have_lob_fields {
+  my ($self, $source, $fields) = @_;
+
+  for my $col (keys %$fields) {
+    if ($self->_is_lob_column($source, $col)) {
+      return 1 if defined $fields->{$col} && $fields->{$col} ne '';
+    }
+  }
+
+  return 0;
+}
+
+# _replace_lob_fields
+#
+# Arguments: $source, \%fields
+#
+# Return Value: \%lob_fields
+#
+# Replace LOB fields with L</_empty_lob> values, and return any non-empty ones as
+# a hash keyed by field name.
+
+sub _replace_lob_fields {
+  my ($self, $source, $fields) = @_;
+
+  my %lob_cols;
+
+  for my $col (keys %$fields) {
+    if ($self->_is_lob_column($source, $col)) {
+      my $lob_val = delete $fields->{$col};
+      if (not defined $lob_val) {
+        $fields->{$col} = \'NULL';
+      }
+      elsif (ref $lob_val && $$lob_val eq ${ $self->_empty_lob($source, $col) })
+      {
+        # put back, composing class is handling LOBs itself most likely
+        $fields->{$col} = $lob_val;
+      }
+      else {
+        $fields->{$col} = $self->_empty_lob($source, $col);
+        $lob_cols{$col} = $lob_val unless $lob_val eq '';
+      }
+    }
+  }
+
+  return %lob_cols ? \%lob_cols : undef;
+}
+
+# _remove_lob_fields
+#
+# Arguments: $source, \%fields
+#
+# Return Value: \%lob_fields
+#
+# Remove LOB fields from %fields entirely, and return any non-empty ones as a
+# hash keyed by field name.
+
+sub _remove_lob_fields {
+  my ($self, $source, $fields) = @_;
+
+  my %lob_cols;
+
+  for my $col (keys %$fields) {
+    if ($self->_is_lob_column($source, $col)) {
+      my $lob_val = delete $fields->{$col};
+      if (not defined $lob_val) {
+        $fields->{$col} = \'NULL';
+      }
+      else {
+        delete $fields->{$col};
+        $lob_cols{$col} = $lob_val unless $lob_val eq '';
+      }
+    }
+  }
+
+  return %lob_cols ? \%lob_cols : undef;
+}
+
+# _replace_lob_fields_array
+#
+# Arguments: $source, \@cols, \@data
+#
+# Return Value: \@rows_of_lob_values
+#
+# Like L</_replace_lob_fields> above, but operates on a set of rows in @data
+# with @cols as the column names as passed to
+# L<DBIx::Class::Storage::DBI/insert_bulk>.
+#
+# Returns a set of rows of LOB values with the LOBs in the original positions
+# they were in @data.
+
+sub _replace_lob_fields_array {
+  my ($self, $source, $cols, $data) = @_;
+
+  my @lob_cols;
+
+  for my $i (0..$#$cols) {
+    my $col = $cols->[$i];
+
+    if ($self->_is_lob_column($source, $col)) {
+      for my $j (0..$#$data) {
+        my $lob_val = delete $data->[$j][$i];
+        if (not defined $lob_val) {
+          $data->[$j][$i] = \'NULL';
+        }
+        elsif (ref $lob_val && $$lob_val eq ${ $self->_empty_lob($source, $col)})
+        {
+          # put back, composing class is handling LOBs itself most likely
+          $data->[$j][$i] = $lob_val;
+        }
+        else {
+          $data->[$j][$i] = $self->_empty_lob($source, $col);
+          $lob_cols[$j][$i] = $lob_val
+            unless $lob_val eq '';
+        }
+      }
+    }
+  }
+
+  return @lob_cols ? \@lob_cols : undef;
+}
+
+# _write_lobs_array
+#
+# Arguments: $source, \@lobs, \@cols, \@data
+#
+# Uses the L</_write_lobs> API to write out each row of the @lobs array
+# identified by the @data slice.
+#
+# The @lobs array is as prepared by L</_replace_lob_fields_array> above.
+
+sub _write_lobs_array {
+  my ($self, $source, $lobs, $cols, $data) = @_;
+
+  for my $i (0..$#$data) {
+    my $datum = $data->[$i];
+
+    my %row;
+    @row{@$cols} = @$datum;
+
+    %row = %{ $self->_ident_cond_for_cols($source, \%row) }
+      or $self->throw_exception(
+        'cannot identify slice for LOB insert '
+        . Dumper($datum)
+      );
+
+    my %lob_vals;
+    for my $j (0..$#$cols) {
+      if (exists $lobs->[$i][$j]) {
+        $lob_vals{ $cols->[$j] } = $lobs->[$i][$j];
+      }
+    }
+
+    $self->_write_lobs($source, \%lob_vals, \%row);
+  }
+}
+
+# Proxy for ResultSource, for overriding in ASE
+sub _identifying_column_set {
+  my ($self, $source, @args) = @_;
+  return $source->_identifying_column_set(@args);
+}
+
+# _ident_cond_for_cols
+#
+# Arguments: $source, \%row
+#
+# Return Value: \%condition||undef
+#
+# Attempts to identify %row (as, for example, made by
+# L<DBIx::Class::Row/get_columns> or L<DBIx::Class::ResultClass::HashRefInflator>)
+# by unique constraints and extract them into the
+# %condition that can be used for a select. Returns undef if the %row is
+# ambiguous, or there are no unique constraints.
+#
+# Returns the smallest possible identifying condition, giving preference to the
+# primary key.
+#
+# Uses _identifying_column_set from DBIx::Class::ResultSource.
+
+sub _ident_cond_for_cols {
+  my ($self, $source, $row) = @_;
+
+  my $colinfos = $source->columns_info([keys %$row]);
+
+  my $nullable = { map +($_, +{ is_nullable => $colinfos->{$_}{is_nullable} }),
+                        keys %$row };
+
+  # Don't skip keys with nullable columns if the column has a defined value in
+  # the %row.
+  local $colinfos->{$_}{is_nullable} =
+    defined $row->{$_} ? 0 : $nullable->{$_} for keys %$row;
+
+  my $cols = $self->_identifying_column_set($source, $colinfos);
+
+  return undef if not $cols;
+
+  return +{ map +($_, $row->{$_}), @$cols };
+}
+
+sub insert {
+  my $self = shift;
+
+  return $self->next::method(@_) if $self->{_skip_writelobs_impl};
+
+  my ($source, $to_insert) = @_;
+
+  my $lobs = $self->_replace_lob_fields($source, $to_insert);
+
+  return $self->next::method(@_) unless $lobs;
+
+  my $guard = $self->txn_scope_guard;
+
+  my $updated_cols = $self->next::method(@_);
+
+  my $row = { %$to_insert, %$updated_cols };
+
+  my $where = $self->_ident_cond_for_cols($source, $row)
+    or $self->throw_exception(
+      'Could not identify row for LOB insert '
+      . Dumper($row)
+    );
+
+  $self->_write_lobs($source, $lobs, $where);
+
+  $guard->commit;
+
+  return $updated_cols;
+}
+
+sub update {
+  my $self = shift;
+
+  return $self->next::method(@_) if $self->{_skip_writelobs_impl};
+
+  my ($source, $fields, $where, @rest) = @_;
+
+  my $lobs = $self->_remove_lob_fields($source, $fields);
+
+  return $self->next::method(@_) unless $lobs;
+
+  my @key_cols = @{ $self->_identifying_column_set($source) || [] }
+    or $self->throw_exception(
+         'must be able to uniquely identify rows for LOB updates'
+       );
+
+  my $autoinc_supplied_for_op = $self->_autoinc_supplied_for_op;
+
+  $self = $self->_writer_storage if $self->can('_writer_storage'); # for ASE
+
+  local $self->{_autoinc_supplied_for_op} = $autoinc_supplied_for_op
+    if $autoinc_supplied_for_op;
+
+  my $guard = $self->txn_scope_guard;
+
+  my ($cursor, @rows);
+  {
+    local $self->{_autoinc_supplied_for_op} = 0;
+    $cursor = $self->select($source, \@key_cols, $where, {});
+
+    if ($self->can('_open_cursors_while_writing_lobs_allowed')
+      && (not $self->_open_cursors_while_writing_lobs_allowed)) { # e.g. ASE
+      @rows   = $cursor->all;
+      $cursor = undef;
+    }
+  }
+
+  my $count = "0E0";
+
+  while (my $cond = shift @rows
+          || ($cursor && do { my @r = $cursor->next; @r && \@r })) {
+    $cond = do { my %c; @c{@key_cols} = @$cond; \%c };
+    {
+      local $self->{_autoinc_supplied_for_op} = 0;
+      $self->_write_lobs($source, $lobs, $cond);
+    }
+
+    $self->next::method($source, $fields, $cond, @rest) if %$fields;
+
+    $count++;
+  }
+
+  $guard->commit;
+
+  return $count;
+}
+
+sub insert_bulk {
+  my $self = shift;
+
+  return $self->next::method(@_) if $self->{_skip_writelobs_impl};
+
+  my ($source, $cols, $data) = @_;
+
+  my $lobs = $self->_replace_lob_fields_array($source, $cols, $data);
+
+  my $guard = $self->txn_scope_guard;
+
+  $self->next::method(@_);
+
+  $self->_write_lobs_array($source, $lobs, $cols, $data) if $lobs;
+
+  $guard->commit;
+}
+
+=head1 AUTHOR
+
+See L<DBIx::Class/AUTHOR> and L<DBIx::Class/CONTRIBUTORS>.
+
+=head1 LICENSE
+
+You may distribute this code under the same terms as Perl itself.
+
+=cut
+
+1;
+# vim:sts=2 sw=2:
index 3965ea3..e1e2745 100644 (file)
@@ -8,6 +8,23 @@ use Try::Tiny;
 use DBIx::Class::Optional::Dependencies ();
 
 use lib qw(t/lib);
+
+# add extra columns for the bindtype tests
+BEGIN {
+  require DBICTest::RunMode;
+  require DBICTest::Schema::BindType;
+  DBICTest::Schema::BindType->add_columns(
+    'blob2' => {
+      data_type => 'blob',
+      is_nullable => 1,
+    },
+    'clob2' => {
+      data_type => 'clob',
+      is_nullable => 1,
+    },
+  );
+}
+
 use DBICTest;
 use DBIC::SqlMakerTest;
 
@@ -51,53 +68,44 @@ for my $opt (@tryopt) {
 
   do_creates($dbh, $q);
 
-  _run_blob_tests($schema, $opt);
+  _run_tests($schema, $opt);
 }
 
-sub _run_blob_tests {
-SKIP: {
-TODO: {
+sub _run_tests {
   my ($schema, $opt) = @_;
+
+  my $q = $schema->storage->sql_maker->quote_char || '';
+
   my %binstr = ( 'small' => join('', map { chr($_) } ( 1 .. 127 )) );
   $binstr{'large'} = $binstr{'small'} x 1024;
 
-  my $maxloblen = (length $binstr{'large'}) + 5;
+  my $maxloblen = (length $binstr{'large'}) + 6;
   note "Localizing LongReadLen to $maxloblen to avoid truncation of test data";
   local $dbh->{'LongReadLen'} = $maxloblen;
 
   my $rs = $schema->resultset('BindType');
 
-  if ($DBD::Oracle::VERSION eq '1.23') {
-    throws_ok { $rs->create({ id => 1, blob => $binstr{large} }) }
-      qr/broken/,
-      'throws on blob insert with DBD::Oracle == 1.23';
-    skip 'buggy BLOB support in DBD::Oracle 1.23', 1;
-  }
-
-  my $q = $schema->storage->sql_maker->quote_char || '';
-  local $TODO = 'Something is confusing column bindtype assignment when quotes are active'
-              . ': https://rt.cpan.org/Ticket/Display.html?id=64206'
-    if $q;
-
-  # so we can disable BLOB mega-output
+  # disable BLOB mega-output
   my $orig_debug = $schema->storage->debug;
 
   my $id;
   foreach my $size (qw( small large )) {
     $id++;
 
-    local $schema->storage->{debug} = $size eq 'large'
-      ? 0
-      : $orig_debug
-    ;
+    if ($size eq 'small') {
+      $schema->storage->debug($orig_debug);
+    }
+    elsif ($size eq 'large') {
+      $schema->storage->debug(0);
+    }
 
     my $str = $binstr{$size};
     lives_ok {
-      $rs->create( { 'id' => $id, blob => "blob:$str", clob => "clob:$str" } )
+      $rs->create( { 'id' => $id, blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" } )
     } "inserted $size without dying";
 
     my %kids = %{$schema->storage->_dbh->{CachedKids}};
-    my @objs = $rs->search({ blob => "blob:$str", clob => "clob:$str" })->all;
+    my @objs = $rs->search({ blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" })->all;
     is_deeply (
       $schema->storage->_dbh->{CachedKids},
       \%kids,
@@ -105,7 +113,22 @@ TODO: {
     ) if $size eq 'large';
     is @objs, 1, 'One row found matching on both LOBs';
     ok (try { $objs[0]->blob }||'' eq "blob:$str", 'blob inserted/retrieved correctly');
+    ok (try { $objs[0]->blob2 }||'' eq "blob2:$str", 'blob2 inserted/retrieved correctly');
+    ok (try { $objs[0]->clob }||'' eq "clob:$str", 'clob inserted/retrieved correctly');
+    ok (try { $objs[0]->clob2 }||'' eq "clob2:$str", 'clob2 inserted/retrieved correctly');
+
+    $rs->find($id)->delete;
+
+    lives_ok {
+      $rs->populate( [ { 'id' => $id, blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" } ] )
+    } "inserted $size via insert_bulk without dying";
+
+    @objs = $rs->search({ blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" })->all;
+    is @objs, 1, 'One row found matching on both LOBs';
+    ok (try { $objs[0]->blob }||'' eq "blob:$str", 'blob inserted/retrieved correctly');
+    ok (try { $objs[0]->blob2 }||'' eq "blob2:$str", 'blob2 inserted/retrieved correctly');
     ok (try { $objs[0]->clob }||'' eq "clob:$str", 'clob inserted/retrieved correctly');
+    ok (try { $objs[0]->clob2 }||'' eq "clob2:$str", 'clob2 inserted/retrieved correctly');
 
     TODO: {
       local $TODO = '-like comparison on blobs not tested before ora 10 (fails on 8i)'
@@ -118,7 +141,7 @@ TODO: {
     }
 
     ok(my $subq = $rs->search(
-      { blob => "blob:$str", clob => "clob:$str" },
+      { blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" },
       {
         from => \ "(SELECT * FROM ${q}bindtype_test${q} WHERE ${q}id${q} != ?) ${q}me${q}",
         bind => [ [ undef => 12345678 ] ],
@@ -129,35 +152,64 @@ TODO: {
     is (@objs, 1, 'One row found matching on both LOBs as a subquery');
 
     lives_ok {
-      $rs->search({ id => $id, blob => "blob:$str", clob => "clob:$str" })
-        ->update({ blob => 'updated blob', clob => 'updated clob' });
+      $rs->search({ id => $id, blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" })
+        ->update({ id => 9999 });
     } 'blob UPDATE with blobs in WHERE clause survived';
 
-    @objs = $rs->search({ blob => "updated blob", clob => 'updated clob' })->all;
+    @objs = $rs->search({ id => 9999, blob => "blob:$str", blob2 => "blob2:$str", clob => "clob:$str", clob2 => "clob2:$str" })->all;
+    is @objs, 1, 'found updated row';
+
+    lives_ok {
+      $rs->search({ id => 9999 })->update({ blob => 'updated blob', blob2 => 'updated blob2', clob => 'updated clob', clob2 => 'updated clob2' });
+    } 'blob UPDATE survived';
+
+    @objs = $rs->search({ blob => "updated blob", blob2 => "updated blob2", clob => 'updated clob', clob2 => 'updated clob2' })->all;
     is @objs, 1, 'found updated row';
     ok (try { $objs[0]->blob }||'' eq "updated blob", 'blob updated/retrieved correctly');
+    ok (try { $objs[0]->blob2 }||'' eq "updated blob2", 'blob2 updated/retrieved correctly');
     ok (try { $objs[0]->clob }||'' eq "updated clob", 'clob updated/retrieved correctly');
+    ok (try { $objs[0]->clob2 }||'' eq "updated clob2", 'clob2 updated/retrieved correctly');
+
+    # test multirow update
+    $rs->create({ id => $id+1, blob => 'updated blob', blob2 => 'updated blob2', clob => 'updated clob', clob2 => 'updated clob2' });
+
+    lives_ok {
+      $rs->search({ id => [ 9999, $id+1 ], blob => 'updated blob', blob2 => 'updated blob2', clob => 'updated clob', clob2 => 'updated clob2' })->update({ blob => 'updated blob again', blob2 => 'updated blob2 again', clob => 'updated clob again', clob2 => 'updated clob2 again' });
+    } 'lob multirow UPDATE based on lobs in WHERE clause survived';
+
+    @objs = $rs->search({ blob => "updated blob again", blob2 => "updated blob2 again", clob => 'updated clob again', clob2 => 'updated clob2 again' })->all;
+    is @objs, 2, 'found updated rows';
+    foreach my $idx (0..1) {
+      ok (try { $objs[$idx]->blob }||'' eq "updated blob again", 'blob updated/retrieved correctly');
+      ok (try { $objs[$idx]->blob2 }||'' eq "updated blob2 again", 'blob2 updated/retrieved correctly');
+      ok (try { $objs[$idx]->clob }||'' eq "updated clob again", 'clob updated/retrieved correctly');
+      ok (try { $objs[$idx]->clob2 }||'' eq "updated clob2 again", 'clob2 updated/retrieved correctly');
+    }
+
+    $rs->find($id+1)->delete;
+    $rs->find(9999)->update({ id => $id });
 
     lives_ok {
       $rs->search({ id => $id  })
-        ->update({ blob => 're-updated blob', clob => 're-updated clob' });
+        ->update({ blob => 're-updated blob', blob2 => 're-updated blob2', clob => 're-updated clob', clob2 => 're-updated clob2' });
     } 'blob UPDATE without blobs in WHERE clause survived';
 
-    @objs = $rs->search({ blob => 're-updated blob', clob => 're-updated clob' })->all;
+    @objs = $rs->search({ blob => 're-updated blob', blob2 => 're-updated blob2', clob => 're-updated clob', clob2 => 're-updated clob2' })->all;
     is @objs, 1, 'found updated row';
     ok (try { $objs[0]->blob }||'' eq 're-updated blob', 'blob updated/retrieved correctly');
+    ok (try { $objs[0]->blob2 }||'' eq 're-updated blob', 'blob2 updated/retrieved correctly');
     ok (try { $objs[0]->clob }||'' eq 're-updated clob', 'clob updated/retrieved correctly');
+    ok (try { $objs[0]->clob2 }||'' eq 're-updated clob2', 'clob2 updated/retrieved correctly');
 
     lives_ok {
-      $rs->search({ blob => "re-updated blob", clob => "re-updated clob" })
+      $rs->search({ blob => "re-updated blob", blob2 => "re-updated blob2", clob => "re-updated clob", clob2 => "re-updated clob2" })
         ->delete;
     } 'blob DELETE with WHERE clause survived';
-    @objs = $rs->search({ blob => "re-updated blob", clob => 're-updated clob' })->all;
+    @objs = $rs->search({ blob => "re-updated blob", blob2 => "re-updated blob2", clob => 're-updated clob', clob2 => 're-updated clob2' })->all;
     is @objs, 0, 'row deleted successfully';
   }
 
   $schema->storage->debug ($orig_debug);
-}}
 
   do_clean ($dbh);
 }
index abf6551..c12918a 100644 (file)
@@ -434,6 +434,19 @@ SQL
 
     $rs->delete;
 
+    lives_ok {
+      $rs->create({ id => 1, clob => "foobar$binstr{large}" })
+    } 'inserted large TEXT without dying with manual PK';
+
+    lives_and {
+      $rs->search({ clob => { -like => 'foobar%' } })->update({
+        clob => 'updated TEXT'
+      });
+      is((grep $_->clob eq 'updated TEXT', $rs->all), 1);
+    } 'TEXT UPDATE with LIKE query in WHERE';
+
+    $rs->delete;
+
     # now try insert_bulk with blobs and only blobs
     $new_str = $binstr{large} . 'bar';
     lives_ok {