Another overhaul (hopefully one of the last ones) of the rollback handling
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index a53200d..0c388ed 100644 (file)
@@ -12,9 +12,8 @@ use Scalar::Util qw/refaddr weaken reftype blessed/;
 use List::Util qw/first/;
 use Context::Preserve 'preserve_context';
 use Try::Tiny;
-use Data::Compare (); # no imports!!! guard against insane architecture
 use SQL::Abstract qw(is_plain_value is_literal_value);
-use DBIx::Class::_Util qw(quote_sub perlstring);
+use DBIx::Class::_Util qw(quote_sub perlstring serialize detected_reinvoked_destructor scope_guard);
 use namespace::clean;
 
 # default cursor class, overridable in connect_info attributes
@@ -120,12 +119,16 @@ for my $meth (keys %$storage_accessor_idx, qw(
   my $orig = __PACKAGE__->can ($meth)
     or die "$meth is not a ::Storage::DBI method!";
 
-  my $is_getter = $storage_accessor_idx->{$meth} ? 0 : 1;
+  my $possibly_a_setter = $storage_accessor_idx->{$meth} ? 1 : 0;
 
   quote_sub
-    __PACKAGE__ ."::$meth", sprintf( <<'EOC', $is_getter, perlstring $meth ), { '$orig' => \$orig };
+    __PACKAGE__ ."::$meth", sprintf( <<'EOC', $possibly_a_setter, perlstring $meth ), { '$orig' => \$orig };
 
     if (
+      # if this is an actual *setter* - just set it, no need to connect
+      # and determine the driver
+      !( %1$s and @_ > 1 )
+        and
       # only fire when invoked on an instance, a valid class-based invocation
       # would e.g. be setting a default for an inherited accessor
       ref $_[0]
@@ -134,10 +137,6 @@ for my $meth (keys %$storage_accessor_idx, qw(
         and
       ! $_[0]->{_in_determine_driver}
         and
-      # if this is a known *setter* - just set it, no need to connect
-      # and determine the driver
-      ( %1$s or @_ <= 1 )
-        and
       # Only try to determine stuff if we have *something* that either is or can
       # provide a DSN. Allows for bare $schema's generated with a plain ->connect()
       # to still be marginally useful
@@ -225,13 +224,17 @@ sub new {
   }
 
   END {
-    local $?; # just in case the DBI destructor changes it somehow
 
-    # destroy just the object if not native to this process
-    $_->_verify_pid for (grep
-      { defined $_ }
-      values %seek_and_destroy
-    );
+    if(
+      ! DBIx::Class::_ENV_::BROKEN_FORK
+        and
+      my @instances = grep { defined $_ } values %seek_and_destroy
+    ) {
+      local $?; # just in case the DBI destructor changes it somehow
+
+      # disarm the handle if not native to this process (see comment on top)
+      $_->_verify_pid for @instances;
+    }
   }
 
   sub CLONE {
@@ -243,9 +246,7 @@ sub new {
 
     for (@instances) {
       $_->_dbh(undef);
-
-      $_->transaction_depth(0);
-      $_->savepoints([]);
+      $_->disconnect;
 
       # properly renumber existing refs
       $_->_arm_global_destructor
@@ -254,10 +255,16 @@ sub new {
 }
 
 sub DESTROY {
+  return if &detected_reinvoked_destructor;
+
   $_[0]->_verify_pid unless DBIx::Class::_ENV_::BROKEN_FORK;
+
   # some databases spew warnings on implicit disconnect
+  return unless defined $_[0]->_dbh;
+
   local $SIG{__WARN__} = sub {};
   $_[0]->_dbh(undef);
+  # not calling ->disconnect here - we are being destroyed - nothing to reset
 
   # this op is necessary, since the very last perl runtime statement
   # triggers a global destruction shootout, and the $SIG localization
@@ -274,8 +281,7 @@ sub _verify_pid {
   if( defined $pid and $pid != $$ and my $dbh = $_[0]->_dbh ) {
     $dbh->{InactiveDestroy} = 1;
     $_[0]->_dbh(undef);
-    $_[0]->transaction_depth(0);
-    $_[0]->savepoints([]);
+    $_[0]->disconnect;
   }
 
   return;
@@ -869,20 +875,35 @@ database is not in C<AutoCommit> mode.
 =cut
 
 sub disconnect {
+  my $self = shift;
+
+  # this physical disconnect below might very well throw
+  # in order to unambiguously reset the state - do the cleanup in guard
+
+  my $g = scope_guard {
+    $self->_dbh(undef);
+    $self->_dbh_details({});
+    $self->transaction_depth(undef);
+    $self->_dbh_autocommit(undef);
+    $self->savepoints([]);
+
+    # FIXME - this needs reenabling with the proper "no reset on same DSN" check
+    #$self->_sql_maker(undef); # this may also end up being different
+  };
 
-  if( my $dbh = $_[0]->_dbh ) {
+  if( my $dbh = $self->_dbh ) {
 
-    $_[0]->_do_connection_actions(disconnect_call_ => $_) for (
-      ( $_[0]->on_disconnect_call || () ),
-      $_[0]->_parse_connect_do ('on_disconnect_do')
+    $self->_do_connection_actions(disconnect_call_ => $_) for (
+      ( $self->on_disconnect_call || () ),
+      $self->_parse_connect_do ('on_disconnect_do')
     );
 
     # stops the "implicit rollback on disconnect" warning
-    $_[0]->_exec_txn_rollback unless $_[0]->_dbh_autocommit;
+    $self->_exec_txn_rollback unless $self->_dbh_autocommit;
 
     %{ $dbh->{CachedKids} } = ();
+
     $dbh->disconnect;
-    $_[0]->_dbh(undef);
   }
 }
 
@@ -1037,12 +1058,9 @@ sub _init {}
 
 sub _populate_dbh {
 
-  $_[0]->_dbh(undef); # in case ->connected failed we might get sent here
-
-  $_[0]->_dbh_details({}); # reset everything we know
-
-  # FIXME - this needs reenabling with the proper "no reset on same DSN" check
-  #$_[0]->_sql_maker(undef); # this may also end up being different
+  # reset internal states
+  # also in case ->connected failed we might get sent here
+  $_[0]->disconnect;
 
   $_[0]->_dbh($_[0]->_connect);
 
@@ -1052,7 +1070,7 @@ sub _populate_dbh {
 
   # Always set the transaction depth on connect, since
   #  there is no transaction in progress by definition
-  $_[0]->{transaction_depth} = $_[0]->_dbh_autocommit ? 0 : 1;
+  $_[0]->transaction_depth( $_[0]->_dbh_autocommit ? 0 : 1 );
 
   $_[0]->_run_connection_actions unless $_[0]->{_in_determine_driver};
 
@@ -1109,10 +1127,16 @@ sub get_dbms_capability {
 sub _server_info {
   my $self = shift;
 
-  my $info;
-  unless ($info = $self->_dbh_details->{info}) {
+  # FIXME - ideally this needs to be an ||= assignment, and the final
+  # assignment at the end of this do{} should be gone entirely. However
+  # this confuses CXSA: https://rt.cpan.org/Ticket/Display.html?id=103296
+  $self->_dbh_details->{info} || do {
+
+    # this guarantees that problematic conninfo won't be hidden
+    # by the try{} below
+    $self->ensure_connected;
 
-    $info = {};
+    my $info = {};
 
     my $server_version = try {
       $self->_get_server_version
@@ -1149,9 +1173,7 @@ sub _server_info {
     }
 
     $self->_dbh_details->{info} = $info;
-  }
-
-  return $info;
+  };
 }
 
 sub _get_server_version {
@@ -1330,7 +1352,7 @@ sub _extract_driver_from_connect_info {
 sub _determine_connector_driver {
   my ($self, $conn) = @_;
 
-  my $dbtype = $self->_dbh_get_info('SQL_DBMS_NAME');
+  my $dbtype = $self->_get_rdbms_name;
 
   if (not $dbtype) {
     $self->_warn_undetermined_driver(
@@ -1357,6 +1379,8 @@ sub _determine_connector_driver {
   }
 }
 
+sub _get_rdbms_name { shift->_dbh_get_info('SQL_DBMS_NAME') }
+
 sub _warn_undetermined_driver {
   my ($self, $msg) = @_;
 
@@ -1370,24 +1394,41 @@ sub _warn_undetermined_driver {
 }
 
 sub _do_connection_actions {
-  my $self          = shift;
-  my $method_prefix = shift;
-  my $call          = shift;
-
-  if (not ref($call)) {
-    my $method = $method_prefix . $call;
-    $self->$method(@_);
-  } elsif (ref($call) eq 'CODE') {
-    $self->$call(@_);
-  } elsif (ref($call) eq 'ARRAY') {
-    if (ref($call->[0]) ne 'ARRAY') {
-      $self->_do_connection_actions($method_prefix, $_) for @$call;
-    } else {
-      $self->_do_connection_actions($method_prefix, @$_) for @$call;
+  my ($self, $method_prefix, $call, @args) = @_;
+
+  try {
+    if (not ref($call)) {
+      my $method = $method_prefix . $call;
+      $self->$method(@args);
+    }
+    elsif (ref($call) eq 'CODE') {
+      $self->$call(@args);
+    }
+    elsif (ref($call) eq 'ARRAY') {
+      if (ref($call->[0]) ne 'ARRAY') {
+        $self->_do_connection_actions($method_prefix, $_) for @$call;
+      }
+      else {
+        $self->_do_connection_actions($method_prefix, @$_) for @$call;
+      }
+    }
+    else {
+      $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
     }
-  } else {
-    $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
   }
+  catch {
+    if ( $method_prefix =~ /^connect/ ) {
+      # this is an on_connect cycle - we can't just throw while leaving
+      # a handle in an undefined state in our storage object
+      # kill it with fire and rethrow
+      $self->_dbh(undef);
+      $self->disconnect;  # the $dbh is gone, but we still need to reset the rest
+      $self->throw_exception( $_[0] );
+    }
+    else {
+      carp "Disconnect action failed: $_[0]";
+    }
+  };
 
   return $self;
 }
@@ -1595,7 +1636,9 @@ sub _exec_txn_commit {
 sub txn_rollback {
   my $self = shift;
 
-  $self->throw_exception("Unable to txn_rollback() on a disconnected storage")
+  # do a minimal connectivity check due to weird shit like
+  # https://rt.cpan.org/Public/Bug/Display.html?id=62370
+  $self->throw_exception("lost connection to storage")
     unless $self->_seems_connected;
 
   # esoteric case for folks using external $dbh handles
@@ -1668,8 +1711,8 @@ sub _gen_sql_bind {
   ) {
     carp_unique 'DateTime objects passed to search() are not supported '
       . 'properly (InflateColumn::DateTime formats and settings are not '
-      . 'respected.) See "Formatting DateTime objects in queries" in '
-      . 'DBIx::Class::Manual::Cookbook. To disable this warning for good '
+      . 'respected.) See ".. format a DateTime object for searching?" in '
+      . 'DBIx::Class::Manual::FAQ. To disable this warning for good '
       . 'set $ENV{DBIC_DT_SEARCH_OK} to true'
   }
 
@@ -1699,7 +1742,6 @@ sub _resolve_bindattrs {
   };
 
   return [ map {
-    my $resolved =
       ( ref $_ ne 'ARRAY' or @$_ != 2 ) ? [ {}, $_ ]
     : ( ! defined $_->[0] )             ? [ {}, $_->[1] ]
     : (ref $_->[0] eq 'HASH')           ? [(
@@ -1716,31 +1758,6 @@ sub _resolve_bindattrs {
     :                                     [ $resolve_bindinfo->(
                                               { dbic_colname => $_->[0] }
                                             ), $_->[1] ]
-    ;
-
-    if (
-      ! exists $resolved->[0]{dbd_attrs}
-        and
-      ! $resolved->[0]{sqlt_datatype}
-        and
-      length ref $resolved->[1]
-        and
-      ! is_plain_value $resolved->[1]
-    ) {
-      require Data::Dumper;
-      local $Data::Dumper::Maxdepth = 1;
-      local $Data::Dumper::Terse = 1;
-      local $Data::Dumper::Useqq = 1;
-      local $Data::Dumper::Indent = 0;
-      local $Data::Dumper::Pad = ' ';
-      $self->throw_exception(
-        'You must supply a datatype/bindtype (see DBIx::Class::ResultSet/DBIC BIND VALUES) '
-      . 'for non-scalar value '. Data::Dumper::Dumper ($resolved->[1])
-      );
-    }
-
-    $resolved;
-
   } @$bind ];
 }
 
@@ -1774,31 +1791,28 @@ sub _query_end {
 }
 
 sub _dbi_attrs_for_bind {
-  my ($self, $ident, $bind) = @_;
+  #my ($self, $ident, $bind) = @_;
 
-  my @attrs;
+  return [ map {
 
-  for (map { $_->[0] } @$bind) {
-    push @attrs, do {
-      if (exists $_->{dbd_attrs}) {
-        $_->{dbd_attrs}
-      }
-      elsif($_->{sqlt_datatype}) {
-        # cache the result in the dbh_details hash, as it can not change unless
-        # we connect to something else
-        my $cache = $self->_dbh_details->{_datatype_map_cache} ||= {};
-        if (not exists $cache->{$_->{sqlt_datatype}}) {
-          $cache->{$_->{sqlt_datatype}} = $self->bind_attribute_by_data_type($_->{sqlt_datatype}) || undef;
-        }
-        $cache->{$_->{sqlt_datatype}};
-      }
-      else {
-        undef;  # always push something at this position
-      }
-    }
-  }
+    exists $_->{dbd_attrs}  ?  $_->{dbd_attrs}
+
+  : ! $_->{sqlt_datatype}   ? undef
+
+  :                           do {
+
+    # cache the result in the dbh_details hash, as it (usually) can not change
+    # unless we connect to something else
+    # FIXME: for the time being Oracle is an exception, pending a rewrite of
+    # the LOB storage
+    my $cache = $_[0]->_dbh_details->{_datatype_map_cache} ||= {};
 
-  return \@attrs;
+    $cache->{$_->{sqlt_datatype}} = $_[0]->bind_attribute_by_data_type($_->{sqlt_datatype})
+      if ! exists $cache->{$_->{sqlt_datatype}};
+
+    $cache->{$_->{sqlt_datatype}};
+
+  } } map { $_->[0] } @{$_[2]} ];
 }
 
 sub _execute {
@@ -1974,12 +1988,30 @@ sub insert {
 
   my %returned_cols = %$to_insert;
   if (my $retlist = $sqla_opts->{returning}) {  # if IR is supported - we will get everything in one set
-    @ir_container = try {
-      local $SIG{__WARN__} = sub {};
-      my @r = $sth->fetchrow_array;
-      $sth->finish;
-      @r;
-    } unless @ir_container;
+
+    unless( @ir_container ) {
+      try {
+
+        # FIXME - need to investigate why Caelum silenced this in 4d4dc518
+        local $SIG{__WARN__} = sub {};
+
+        @ir_container = $sth->fetchrow_array;
+        $sth->finish;
+
+      } catch {
+        # Evict the $sth from the cache in case we got here, since the finish()
+        # is crucial, at least on older Firebirds, possibly on other engines too
+        #
+        # It would be too complex to make this a proper subclass override,
+        # and besides we already take the try{} penalty, adding a catch that
+        # triggers infrequently is a no-brainer
+        #
+        if( my $kids = $self->_dbh->{CachedKids} ) {
+          $kids->{$_} == $sth and delete $kids->{$_}
+            for keys %$kids
+        }
+      };
+    }
 
     @returned_cols{@$retlist} = @ir_container if @ir_container;
   }
@@ -2070,7 +2102,7 @@ sub _insert_bulk {
   # can't just hand SQLA a set of some known "values" (e.g. hashrefs that
   # can be later matched up by address), because we want to supply a real
   # value on which perhaps e.g. datatype checks will be performed
-  my ($proto_data, $value_type_by_col_idx);
+  my ($proto_data, $serialized_bind_type_by_col_idx);
   for my $col_idx (0..$#$cols) {
     my $colname = $cols->[$col_idx];
     if (ref $data->[0][$col_idx] eq 'SCALAR') {
@@ -2089,7 +2121,7 @@ sub _insert_bulk {
 
       # store value-less (attrs only) bind info - we will be comparing all
       # supplied binds against this for sanity
-      $value_type_by_col_idx->{$col_idx} = [ map { $_->[0] } @$resolved_bind ];
+      $serialized_bind_type_by_col_idx->{$col_idx} = serialize [ map { $_->[0] } @$resolved_bind ];
 
       $proto_data->{$colname} = \[ $sql, map { [
         # inject slice order to use for $proto_bind construction
@@ -2100,7 +2132,7 @@ sub _insert_bulk {
       ];
     }
     else {
-      $value_type_by_col_idx->{$col_idx} = undef;
+      $serialized_bind_type_by_col_idx->{$col_idx} = undef;
 
       $proto_data->{$colname} = \[ '?', [
         { dbic_colname => $colname, _bind_data_slice_idx => $col_idx }
@@ -2116,7 +2148,7 @@ sub _insert_bulk {
     [ $proto_data ],
   );
 
-  if (! @$proto_bind and keys %$value_type_by_col_idx) {
+  if (! @$proto_bind and keys %$serialized_bind_type_by_col_idx) {
     # if the bindlist is empty and we had some dynamic binds, this means the
     # storage ate them away (e.g. the NoBindVars component) and interpolated
     # them directly into the SQL. This obviously can't be good for multi-inserts
@@ -2150,7 +2182,7 @@ sub _insert_bulk {
     for my $row_idx (1..$#$data) {  # we are comparing against what we got from [0] above, hence start from 1
       my $val = $data->[$row_idx][$col_idx];
 
-      if (! exists $value_type_by_col_idx->{$col_idx}) { # literal no binds
+      if (! exists $serialized_bind_type_by_col_idx->{$col_idx}) { # literal no binds
         if (ref $val ne 'SCALAR') {
           $bad_slice_report_cref->(
             "Incorrect value (expecting SCALAR-ref \\'$$reference_val')",
@@ -2166,7 +2198,7 @@ sub _insert_bulk {
           );
         }
       }
-      elsif (! defined $value_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
+      elsif (! defined $serialized_bind_type_by_col_idx->{$col_idx} ) {  # regular non-literal value
         if (is_literal_value($val)) {
           $bad_slice_report_cref->("Literal SQL found where a plain bind value is expected", $row_idx, $col_idx);
         }
@@ -2194,16 +2226,17 @@ sub _insert_bulk {
           }
           # need to check the bind attrs - a bind will happen only once for
           # the entire dataset, so any changes further down will be ignored.
-          elsif (! Data::Compare::Compare(
-            $value_type_by_col_idx->{$col_idx},
-            [
+          elsif (
+            $serialized_bind_type_by_col_idx->{$col_idx}
+              ne
+            serialize [
               map
               { $_->[0] }
               @{$self->_resolve_bindattrs(
                 $source, [ @{$$val}[1 .. $#$$val] ], $colinfos,
               )}
-            ],
-          )) {
+            ]
+          ) {
             $bad_slice_report_cref->(
               'Differing bind attributes on literal/bind values not supported',
               $row_idx,
@@ -2220,7 +2253,7 @@ sub _insert_bulk {
   # scope guard
   my $guard = $self->txn_scope_guard;
 
-  $self->_query_start( $sql, @$proto_bind ? [[undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_start( $sql, @$proto_bind ? [[ {} => '__BULK_INSERT__' ]] : () );
   my $sth = $self->_prepare_sth($self->_dbh, $sql);
   my $rv = do {
     if (@$proto_bind) {
@@ -2234,7 +2267,7 @@ sub _insert_bulk {
     }
   };
 
-  $self->_query_end( $sql, @$proto_bind ? [[ undef => '__BULK_INSERT__' ]] : () );
+  $self->_query_end( $sql, @$proto_bind ? [[ {} => '__BULK_INSERT__' ]] : () );
 
   $guard->commit;
 
@@ -2291,10 +2324,12 @@ sub _dbh_execute_for_fetch {
 
       # FIXME SUBOPTIMAL - DBI needs fixing to always stringify regardless of DBD
       # For the time being forcibly stringify whatever is stringifiable
-      (length ref $v and is_plain_value $v)
-        ? "$v"
-        : $v
-      ;
+      my $vref;
+
+      ( !length ref $v or ! ($vref = is_plain_value $v) )   ? $v
+    : defined blessed( $$vref )                             ? "$$vref"
+                                                            : $$vref
+    ;
     } map { $_->[0] } @$proto_bind ];
   };
 
@@ -2430,21 +2465,9 @@ sub _select_args {
     where => $where,
   };
 
-  # Sanity check the attributes (SQLMaker does it too, but
-  # in case of a software_limit we'll never reach there)
-  if (defined $attrs->{offset}) {
-    $self->throw_exception('A supplied offset attribute must be a non-negative integer')
-      if ( $attrs->{offset} =~ /\D/ or $attrs->{offset} < 0 );
-  }
-
-  if (defined $attrs->{rows}) {
-    $self->throw_exception("The rows attribute must be a positive integer if present")
-      if ( $attrs->{rows} =~ /\D/ or $attrs->{rows} <= 0 );
-  }
-  elsif ($attrs->{offset}) {
-    # MySQL actually recommends this approach.  I cringe.
-    $attrs->{rows} = $sql_maker->__max_int;
-  }
+  # MySQL actually recommends this approach.  I cringe.
+  $attrs->{rows} ||= $sql_maker->__max_int
+    if $attrs->{offset};
 
   # see if we will need to tear the prefetch apart to satisfy group_by == select
   # this is *extremely tricky* to get right, I am still not sure I did
@@ -2584,9 +2607,9 @@ see L<DBIx::Class::SQLMaker::LimitDialects>.
 sub _dbh_columns_info_for {
   my ($self, $dbh, $table) = @_;
 
-  if ($dbh->can('column_info')) {
-    my %result;
-    my $caught;
+  my %result;
+
+  if (! DBIx::Class::_ENV_::STRESSTEST_COLUMN_INFO_UNAWARE_STORAGE and $dbh->can('column_info')) {
     try {
       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
@@ -2603,39 +2626,75 @@ sub _dbh_columns_info_for {
         $result{$col_name} = \%column_info;
       }
     } catch {
-      $caught = 1;
+      %result = ();
     };
-    return \%result if !$caught && scalar keys %result;
+
+    return \%result if keys %result;
   }
 
-  my %result;
   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
   $sth->execute;
-  my @columns = @{$sth->{NAME_lc}};
-  for my $i ( 0 .. $#columns ){
-    my %column_info;
-    $column_info{data_type} = $sth->{TYPE}->[$i];
-    $column_info{size} = $sth->{PRECISION}->[$i];
-    $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
-
-    if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
-      $column_info{data_type} = $1;
-      $column_info{size}    = $2;
+
+### The acrobatics with lc names is necessary to support both the legacy
+### API that used NAME_lc exclusively, *AND* at the same time work properly
+### with column names differing in cas eonly (thanks pg!)
+
+  my ($columns, $seen_lcs);
+
+  ++$seen_lcs->{lc($_)} and $columns->{$_} = {
+    idx => scalar keys %$columns,
+    name => $_,
+    lc_name => lc($_),
+  } for @{$sth->{NAME}};
+
+  $seen_lcs->{$_->{lc_name}} == 1
+    and
+  $_->{name} = $_->{lc_name}
+    for values %$columns;
+
+  for ( values %$columns ) {
+    my $inf = {
+      data_type => $sth->{TYPE}->[$_->{idx}],
+      size => $sth->{PRECISION}->[$_->{idx}],
+      is_nullable => $sth->{NULLABLE}->[$_->{idx}] ? 1 : 0,
+    };
+
+    if ($inf->{data_type} =~ m/^(.*?)\((.*?)\)$/) {
+      @{$inf}{qw( data_type  size)} = ($1, $2);
     }
 
-    $result{$columns[$i]} = \%column_info;
+    $result{$_->{name}} = $inf;
   }
+
   $sth->finish;
 
-  foreach my $col (keys %result) {
-    my $colinfo = $result{$col};
-    my $type_num = $colinfo->{data_type};
-    my $type_name;
-    if(defined $type_num && $dbh->can('type_info')) {
-      my $type_info = $dbh->type_info($type_num);
-      $type_name = $type_info->{TYPE_NAME} if $type_info;
-      $colinfo->{data_type} = $type_name if $type_name;
+  if ($dbh->can('type_info')) {
+    for my $inf (values %result) {
+      next if ! defined $inf->{data_type};
+
+      $inf->{data_type} = (
+        (
+          (
+            $dbh->type_info( $inf->{data_type} )
+              ||
+            next
+          )
+            ||
+          next
+        )->{TYPE_NAME}
+          ||
+        next
+      );
+
+      # FIXME - this may be an artifact of the DBD::Pg implmentation alone
+      # needs more testing in the future...
+      $inf->{size} -= 4 if (
+        ( $inf->{size}||0 > 4 )
+          and
+        $inf->{data_type} =~ qr/^text$/i
+      );
     }
+
   }
 
   return \%result;
@@ -2871,8 +2930,8 @@ sub create_ddl_dir {
     %{$sqltargs || {}}
   };
 
-  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
-    $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  if (my $missing = DBIx::Class::Optional::Dependencies->req_missing_for ('deploy')) {
+    $self->throw_exception("Can't create a ddl file without $missing");
   }
 
   my $sqlt = SQL::Translator->new( $sqltargs );
@@ -3028,8 +3087,8 @@ sub deployment_statements {
       return join('', @rows);
   }
 
-  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
-    $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  if (my $missing = DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') ) {
+    $self->throw_exception("Can't deploy without a pregenerated 'ddl_dir' directory or $missing");
   }
 
   # sources needs to be a parser arg, but for simplicity allow at top level