Fix deployment_statements context sensitivity regression
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 341d9f9..405041c 100644 (file)
@@ -4,7 +4,7 @@ package DBIx::Class::Storage::DBI;
 use strict;
 use warnings;
 
-use base 'DBIx::Class::Storage';
+use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
 use mro 'c3';
 
 use Carp::Clan qw/^DBIx::Class/;
@@ -13,6 +13,13 @@ use DBIx::Class::Storage::DBI::Cursor;
 use DBIx::Class::Storage::Statistics;
 use Scalar::Util();
 use List::Util();
+use Data::Dumper::Concise();
+use Sub::Name ();
+
+# what version of sqlt do we require if deploy() without a ddl_dir is invoked
+# when changing also adjust the corresponding author_require in Makefile.PL
+my $minimum_sqlt_version = '0.11002';
+
 
 __PACKAGE__->mk_group_accessors('simple' =>
   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
@@ -35,6 +42,38 @@ __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
 
 
+# Each of these methods need _determine_driver called before itself
+# in order to function reliably. This is a purely DRY optimization
+my @rdbms_specific_methods = qw/
+  sqlt_type
+  build_datetime_parser
+  datetime_parser_type
+
+  insert
+  insert_bulk
+  update
+  delete
+  select
+  select_single
+/;
+
+for my $meth (@rdbms_specific_methods) {
+
+  my $orig = __PACKAGE__->can ($meth)
+    or next;
+
+  no strict qw/refs/;
+  no warnings qw/redefine/;
+  *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
+    if (not $_[0]->_driver_determined) {
+      $_[0]->_determine_driver;
+      goto $_[0]->can($meth);
+    }
+    $orig->(@_);
+  };
+}
+
+
 =head1 NAME
 
 DBIx::Class::Storage::DBI - DBI storage handler
@@ -44,7 +83,14 @@ DBIx::Class::Storage::DBI - DBI storage handler
   my $schema = MySchema->connect('dbi:SQLite:my.db');
 
   $schema->storage->debug(1);
-  $schema->dbh_do("DROP TABLE authors");
+
+  my @stuff = $schema->storage->dbh_do(
+    sub {
+      my ($storage, $dbh, @args) = @_;
+      $dbh->do("DROP TABLE authors");
+    },
+    @column_list
+  );
 
   $schema->resultset('Book')->search({
      written_on => $schema->storage->datetime_parser(DateTime->now)
@@ -112,6 +158,12 @@ mixed together:
     %extra_attributes,
   }];
 
+  $connect_info_args = [{
+    dbh_maker => sub { DBI->connect (...) },
+    %dbi_attributes,
+    %extra_attributes,
+  }];
+
 This is particularly useful for L<Catalyst> based applications, allowing the
 following config (L<Config::General> style):
 
@@ -125,6 +177,10 @@ following config (L<Config::General> style):
     </connect_info>
   </Model::DB>
 
+The C<dsn>/C<user>/C<password> combination can be substituted by the
+C<dbh_maker> key whose value is a coderef that returns a connected
+L<DBI database handle|DBI/connect>
+
 =back
 
 Please note that the L<DBI> docs recommend that you always explicitly
@@ -337,6 +393,12 @@ L<DBIx::Class::Schema/connect>
   # Connect via subref
   ->connect_info([ sub { DBI->connect(...) } ]);
 
+  # Connect via subref in hashref
+  ->connect_info([{
+    dbh_maker => sub { DBI->connect(...) },
+    on_connect_do => 'alter session ...',
+  }]);
+
   # A bit more complicated
   ->connect_info(
     [
@@ -407,8 +469,21 @@ sub connect_info {
   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
     %attrs = %{$args[0]};
     @args = ();
-    for (qw/password user dsn/) {
-      unshift @args, delete $attrs{$_};
+    if (my $code = delete $attrs{dbh_maker}) {
+      @args = $code;
+
+      my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
+      if (@ignored) {
+        carp sprintf (
+            'Attribute(s) %s in connect_info were ignored, as they can not be applied '
+          . "to the result of 'dbh_maker'",
+
+          join (', ', map { "'$_'" } (@ignored) ),
+        );
+      }
+    }
+    else {
+      @args = delete @attrs{qw/dsn user password/};
     }
   }
   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
@@ -452,7 +527,11 @@ sub connect_info {
 }
 
 sub _default_dbi_connect_attributes {
-  return { AutoCommit => 1 };
+  return {
+    AutoCommit => 1,
+    RaiseError => 1,
+    PrintError => 0,
+  };
 }
 
 =head2 on_connect_do
@@ -523,7 +602,7 @@ sub dbh_do {
   my $self = shift;
   my $code = shift;
 
-  my $dbh = $self->_dbh;
+  my $dbh = $self->_get_dbh;
 
   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
       || $self->{transaction_depth};
@@ -534,11 +613,6 @@ sub dbh_do {
   my $want_array = wantarray;
 
   eval {
-    $self->_verify_pid if $dbh;
-    if(!$self->_dbh) {
-        $self->_populate_dbh;
-        $dbh = $self->_dbh;
-    }
 
     if($want_array) {
         @result = $self->$code($dbh, @_);
@@ -551,6 +625,7 @@ sub dbh_do {
     }
   };
 
+  # ->connected might unset $@ - copy
   my $exception = $@;
   if(!$exception) { return $want_array ? @result : $result[0] }
 
@@ -558,6 +633,8 @@ sub dbh_do {
 
   # We were not connected - reconnect and retry, but let any
   #  exception fall right through this time
+  carp "Retrying $code after catching disconnected exception: $exception"
+    if $ENV{DBIC_DBIRETRY_DEBUG};
   $self->_populate_dbh;
   $self->$code($self->_dbh, @_);
 }
@@ -582,8 +659,7 @@ sub txn_do {
   my $tried = 0;
   while(1) {
     eval {
-      $self->_verify_pid if $self->_dbh;
-      $self->_populate_dbh if !$self->_dbh;
+      $self->_get_dbh;
 
       $self->txn_begin;
       if($want_array) {
@@ -598,6 +674,7 @@ sub txn_do {
       $self->txn_commit;
     };
 
+    # ->connected might unset $@ - copy
     my $exception = $@;
     if(!$exception) { return $want_array ? @result : $result[0] }
 
@@ -619,6 +696,8 @@ sub txn_do {
 
     # We were not connected, and was first try - reconnect and retry
     # via the while loop
+    carp "Retrying $coderef after catching disconnected exception: $exception"
+      if $ENV{DBIC_DBIRETRY_DEBUG};
     $self->_populate_dbh;
   }
 }
@@ -667,7 +746,6 @@ in MySQL's case disabled entirely.
 # Storage subclasses should override this
 sub with_deferred_fk_checks {
   my ($self, $sub) = @_;
-
   $sub->();
 }
 
@@ -688,22 +766,32 @@ answering, etc.) This method is used internally by L</dbh>.
 =cut
 
 sub connected {
-  my ($self) = @_;
+  my $self = shift;
+  return 0 unless $self->_seems_connected;
 
-  if(my $dbh = $self->_dbh) {
-      if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
-          $self->_dbh(undef);
-          $self->{_dbh_gen}++;
-          return;
-      }
-      else {
-          $self->_verify_pid;
-          return 0 if !$self->_dbh;
-      }
-      return ($dbh->FETCH('Active') && $self->_ping);
+  #be on the safe side
+  local $self->_dbh->{RaiseError} = 1;
+
+  return $self->_ping;
+}
+
+sub _seems_connected {
+  my $self = shift;
+
+  my $dbh = $self->_dbh
+    or return 0;
+
+  if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
+    $self->_dbh(undef);
+    $self->{_dbh_gen}++;
+    return 0;
+  }
+  else {
+    $self->_verify_pid;
+    return 0 if !$self->_dbh;
   }
 
-  return 0;
+  return $dbh->FETCH('Active');
 }
 
 sub _ping {
@@ -740,7 +828,9 @@ sub ensure_connected {
 
 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
 is guaranteed to be healthy by implicitly calling L</connected>, and if
-necessary performing a reconnection before returning.
+necessary performing a reconnection before returning. Keep in mind that this
+is very B<expensive> on some database engines. Consider using L<dbh_do>
+instead.
 
 =cut
 
@@ -755,18 +845,10 @@ sub dbh {
   return $self->_dbh;
 }
 
-=head2 last_dbh
-
-This returns the B<last> available C<$dbh> if any, or attempts to
-connect and returns the resulting handle. This method differs from
-L</dbh> by not validating if a preexisting handle is still healthy
-via L</connected>. Make sure you take appropriate precautions
-when using this method, as the C<$dbh> may be useless at this point.
-
-=cut
-
-sub last_dbh {
+# this is the internal "get dbh or connect (don't check)" method
+sub _get_dbh {
   my $self = shift;
+  $self->_verify_pid if $self->_dbh;
   $self->_populate_dbh unless $self->_dbh;
   return $self->_dbh;
 }
@@ -777,7 +859,7 @@ sub _sql_maker_args {
     return (
       bindtype=>'columns',
       array_datatypes => 1,
-      limit_dialect => $self->last_dbh,
+      limit_dialect => $self->_get_dbh,
       %{$self->_sql_maker_opts}
     );
 }
@@ -792,12 +874,15 @@ sub sql_maker {
   return $self->_sql_maker;
 }
 
+# nothing to do by default
 sub _rebless {}
+sub _init {}
 
 sub _populate_dbh {
   my ($self) = @_;
 
   my @info = @{$self->_dbi_connect_info || []};
+  $self->_dbh(undef); # in case ->connected failed we might get sent here
   $self->_dbh($self->_connect(@info));
 
   $self->_conn_pid($$);
@@ -826,18 +911,26 @@ sub _determine_driver {
   my ($self) = @_;
 
   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
-    my $started_unconnected = 0;
+    my $started_connected = 0;
     local $self->{_in_determine_driver} = 1;
 
     if (ref($self) eq __PACKAGE__) {
       my $driver;
       if ($self->_dbh) { # we are connected
         $driver = $self->_dbh->{Driver}{Name};
+        $started_connected = 1;
       } else {
-        # try to use dsn to not require being connected, the driver may still
-        # force a connection in _rebless to determine version
-        ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
-        $started_unconnected = 1;
+        # if connect_info is a CODEREF, we have no choice but to connect
+        if (ref $self->_dbi_connect_info->[0] &&
+            Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
+          $self->_populate_dbh;
+          $driver = $self->_dbh->{Driver}{Name};
+        }
+        else {
+          # try to use dsn to not require being connected, the driver may still
+          # force a connection in _rebless to determine version
+          ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+        }
       }
 
       my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
@@ -850,8 +943,10 @@ sub _determine_driver {
 
     $self->_driver_determined(1);
 
+    $self->_init; # run driver-specific initializations
+
     $self->_run_connection_actions
-        if $started_unconnected && defined $self->_dbh;
+        if !$started_connected && defined $self->_dbh;
   }
 }
 
@@ -909,7 +1004,7 @@ sub _do_query {
     my @bind = map { [ undef, $_ ] } @do_args;
 
     $self->_query_start($sql, @bind);
-    $self->_dbh->do($sql, $attrs, @do_args);
+    $self->_get_dbh->do($sql, $attrs, @do_args);
     $self->_query_end($sql, @bind);
   }
 
@@ -945,6 +1040,8 @@ sub _connect {
             $weak_self->throw_exception("DBI Exception: $_[0]");
           }
           else {
+            # the handler may be invoked by something totally out of
+            # the scope of DBIC
             croak ("DBI Exception: $_[0]");
           }
       };
@@ -1064,18 +1161,22 @@ sub txn_begin {
 
 sub _dbh_begin_work {
   my $self = shift;
-  # being here implies we have AutoCommit => 1
-  # if the user is utilizing txn_do - good for
-  # him, otherwise we need to ensure that the
-  # $dbh is healthy on BEGIN
-  my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
-  $self->$dbh_method->begin_work;
+
+  # if the user is utilizing txn_do - good for him, otherwise we need to
+  # ensure that the $dbh is healthy on BEGIN.
+  # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
+  # will be replaced by a failure of begin_work itself (which will be
+  # then retried on reconnect)
+  if ($self->{_in_dbh_do}) {
+    $self->_dbh->begin_work;
+  } else {
+    $self->dbh_do(sub { $_[1]->begin_work });
+  }
 }
 
 sub txn_commit {
   my $self = shift;
   if ($self->{transaction_depth} == 1) {
-    my $dbh = $self->_dbh;
     $self->debugobj->txn_commit()
       if ($self->debug);
     $self->_dbh_commit;
@@ -1091,7 +1192,9 @@ sub txn_commit {
 
 sub _dbh_commit {
   my $self = shift;
-  $self->_dbh->commit;
+  my $dbh  = $self->_dbh
+    or $self->throw_exception('cannot COMMIT on a disconnected handle');
+  $dbh->commit;
 }
 
 sub txn_rollback {
@@ -1128,7 +1231,9 @@ sub txn_rollback {
 
 sub _dbh_rollback {
   my $self = shift;
-  $self->_dbh->rollback;
+  my $dbh  = $self->_dbh
+    or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
+  $dbh->rollback;
 }
 
 # This used to be the top-half of _execute.  It was split out to make it
@@ -1225,18 +1330,12 @@ sub _dbh_execute {
 
 sub _execute {
     my $self = shift;
-    $self->dbh_do('_dbh_execute', @_)
+    $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
 }
 
 sub insert {
   my ($self, $source, $to_insert) = @_;
 
-# redispatch to insert method of storage we reblessed into, if necessary
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can('insert');
-  }
-
   my $ident = $source->from;
   my $bind_attributes = $self->source_bind_attributes($source);
 
@@ -1250,7 +1349,7 @@ sub insert {
         $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
           'nextval',
           $col_info->{sequence} ||
-            $self->_dbh_get_autoinc_seq($self->last_dbh, $source)
+            $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
         );
       }
     }
@@ -1267,17 +1366,103 @@ sub insert {
 ## only prepped once.
 sub insert_bulk {
   my ($self, $source, $cols, $data) = @_;
+
   my %colvalues;
-  my $table = $source->from;
   @colvalues{@$cols} = (0..$#$cols);
-  my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
 
-  $self->_determine_driver;
+  for my $i (0..$#$cols) {
+    my $first_val = $data->[0][$i];
+    next unless ref $first_val eq 'SCALAR';
+
+    $colvalues{ $cols->[$i] } = $first_val;
+  }
+
+  # check for bad data and stringify stringifiable objects
+  my $bad_slice = sub {
+    my ($msg, $col_idx, $slice_idx) = @_;
+    $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
+      $msg,
+      $cols->[$col_idx],
+      do {
+        local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
+        Data::Dumper::Concise::Dumper({
+          map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
+        }),
+      }
+    );
+  };
+
+  for my $datum_idx (0..$#$data) {
+    my $datum = $data->[$datum_idx];
+
+    for my $col_idx (0..$#$cols) {
+      my $val            = $datum->[$col_idx];
+      my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
+      my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
+
+      if ($is_literal_sql) {
+        if (not ref $val) {
+          $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
+        }
+        elsif ((my $reftype = ref $val) ne 'SCALAR') {
+          $bad_slice->("$reftype reference found where literal SQL expected",
+            $col_idx, $datum_idx);
+        }
+        elsif ($$val ne $$sqla_bind){
+          $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
+            $col_idx, $datum_idx);
+        }
+      }
+      elsif (my $reftype = ref $val) {
+        require overload;
+        if (overload::Method($val, '""')) {
+          $datum->[$col_idx] = "".$val;
+        }
+        else {
+          $bad_slice->("$reftype reference found where bind expected",
+            $col_idx, $datum_idx);
+        }
+      }
+    }
+  }
+
+  my ($sql, $bind) = $self->_prep_for_execute (
+    'insert', undef, $source, [\%colvalues]
+  );
+  my @bind = @$bind;
+
+  my $empty_bind = 1 if (not @bind) &&
+    (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
 
-  $self->_query_start( $sql, @bind );
+  if ((not @bind) && (not $empty_bind)) {
+    $self->throw_exception(
+      'Cannot insert_bulk without support for placeholders'
+    );
+  }
+
+  $self->_query_start( $sql, ['__BULK__'] );
   my $sth = $self->sth($sql);
 
-#  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
+  my $rv = do {
+    if ($empty_bind) {
+      # bind_param_array doesn't work if there are no binds
+      $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
+    }
+    else {
+#      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
+      $self->_execute_array( $source, $sth, \@bind, $cols, $data );
+    }
+  };
+
+  $self->_query_end( $sql, ['__BULK__'] );
+
+  return (wantarray ? ($rv, $sth, @bind) : $rv);
+}
+
+sub _execute_array {
+  my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
+
+  my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
 
   ## This must be an arrayref, else nothing works!
   my $tuple_status = [];
@@ -1288,7 +1473,7 @@ sub insert_bulk {
   ## Bind the values and execute
   my $placeholder_index = 1;
 
-  foreach my $bound (@bind) {
+  foreach my $bound (@$bind) {
 
     my $attributes = {};
     my ($column_name, $data_index) = @$bound;
@@ -1303,62 +1488,89 @@ sub insert_bulk {
     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
     $placeholder_index++;
   }
-  my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
-  if (my $err = $@) {
+
+  my $rv = eval {
+    $self->_dbh_execute_array($sth, $tuple_status, @extra);
+  };
+  my $err = $@ || $sth->errstr;
+
+# Statement must finish even if there was an exception.
+  eval { $sth->finish };
+  $err = $@ unless $err;
+
+  if ($err) {
     my $i = 0;
     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
 
-    $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
+    $self->throw_exception("Unexpected populate error: $err")
       if ($i > $#$tuple_status);
 
-    require Data::Dumper;
-    local $Data::Dumper::Terse = 1;
-    local $Data::Dumper::Indent = 1;
-    local $Data::Dumper::Useqq = 1;
-    local $Data::Dumper::Quotekeys = 0;
-
     $self->throw_exception(sprintf "%s for populate slice:\n%s",
-      $tuple_status->[$i][1],
-      Data::Dumper::Dumper(
-        { map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols) }
-      ),
+      ($tuple_status->[$i][1] || $err),
+      Data::Dumper::Concise::Dumper({
+        map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
+      }),
     );
   }
-  $self->throw_exception($sth->errstr) if !$rv;
 
-  $self->_query_end( $sql, @bind );
-  return (wantarray ? ($rv, $sth, @bind) : $rv);
+  $guard->commit if $guard;
+
+  return $rv;
+}
+
+sub _dbh_execute_array {
+    my ($self, $sth, $tuple_status, @extra) = @_;
+
+    return $sth->execute_array({ArrayTupleStatus => $tuple_status});
+}
+
+sub _dbh_execute_inserts_with_no_binds {
+  my ($self, $sth, $count) = @_;
+
+  my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
+
+  eval {
+    my $dbh = $self->_get_dbh;
+    local $dbh->{RaiseError} = 1;
+    local $dbh->{PrintError} = 0;
+
+    $sth->execute foreach 1..$count;
+  };
+  my $exception = $@;
+
+# Make sure statement is finished even if there was an exception.
+  eval { $sth->finish };
+  $exception = $@ unless $exception;
+
+  $self->throw_exception($exception) if $exception;
+
+  $guard->commit if $guard;
+
+  return $count;
 }
 
 sub update {
   my ($self, $source, @args) = @_; 
 
-# redispatch to update method of storage we reblessed into, if necessary
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can('update');
-  }
-
-  my $bind_attributes = $self->source_bind_attributes($source);
+  my $bind_attrs = $self->source_bind_attributes($source);
 
-  return $self->_execute('update' => [], $source, $bind_attributes, @args);
+  return $self->_execute('update' => [], $source, $bind_attrs, @args);
 }
 
 
 sub delete {
-  my $self = shift @_;
-  my $source = shift @_;
-  $self->_determine_driver;
+  my ($self, $source, @args) = @_;
+
   my $bind_attrs = $self->source_bind_attributes($source);
 
-  return $self->_execute('delete' => [], $source, $bind_attrs, @_);
+  return $self->_execute('delete' => [], $source, $bind_attrs, @args);
 }
 
 # We were sent here because the $rs contains a complex search
 # which will require a subquery to select the correct rows
-# (i.e. joined or limited resultsets)
+# (i.e. joined or limited resultsets, or non-introspectable conditions)
 #
-# Genarating a single PK column subquery is trivial and supported
+# Generating a single PK column subquery is trivial and supported
 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
 # Look at _multipk_update_delete()
 sub _subq_update_delete {
@@ -1367,14 +1579,27 @@ sub _subq_update_delete {
 
   my $rsrc = $rs->result_source;
 
-  # we already check this, but double check naively just in case. Should be removed soon
+  # quick check if we got a sane rs on our hands
+  my @pcols = $rsrc->primary_columns;
+  unless (@pcols) {
+    $self->throw_exception (
+      sprintf (
+        "You must declare primary key(s) on source '%s' (via set_primary_key) in order to update or delete complex resultsets",
+        $rsrc->source_name || $rsrc->from
+      )
+    );
+  }
+
   my $sel = $rs->_resolved_attrs->{select};
   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
-  my @pcols = $rsrc->primary_columns;
-  if (@$sel != @pcols) {
+
+  if (
+      join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
+        ne
+      join ("\x00", sort @$sel )
+  ) {
     $self->throw_exception (
-      'Subquery update/delete can not be called on resultsets selecting a'
-     .' number of columns different than the number of primary keys'
+      '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
     );
   }
 
@@ -1548,12 +1773,52 @@ sub _select_args {
   if (
     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
        ||
-    ( $attrs->{group_by} && @{$attrs->{group_by}} &&
-      $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
+    ( $attrs->{group_by}
+        &&
+      @{$attrs->{group_by}}
+        &&
+      $attrs->{_prefetch_select}
+        &&
+      @{$attrs->{_prefetch_select}}
+    )
   ) {
+
     ($ident, $select, $where, $attrs)
       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
   }
+
+  elsif (
+    ($attrs->{rows} || $attrs->{offset})
+      &&
+    $sql_maker->limit_dialect eq 'RowNumberOver'
+      &&
+    (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
+      &&
+    scalar $sql_maker->_order_by_chunks ($attrs->{order_by})
+  ) {
+    # the RNO limit dialect above mangles the SQL such that the join gets lost
+    # wrap a subquery here
+
+    push @limit, delete @{$attrs}{qw/rows offset/};
+
+    my $subq = $self->_select_args_to_query (
+      $ident,
+      $select,
+      $where,
+      $attrs,
+    );
+
+    $ident = {
+      -alias => $attrs->{alias},
+      -source_handle => $ident->[0]{-source_handle},
+      $attrs->{alias} => $subq,
+    };
+
+    # all part of the subquery now
+    delete @{$attrs}{qw/order_by group_by having/};
+    $where = undef;
+  }
+
   elsif (! $attrs->{software_limit} ) {
     push @limit, $attrs->{rows}, $attrs->{offset};
   }
@@ -1576,279 +1841,6 @@ sub _select_args {
   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
 }
 
-#
-# This is the code producing joined subqueries like:
-# SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
-#
-sub _adjust_select_args_for_complex_prefetch {
-  my ($self, $from, $select, $where, $attrs) = @_;
-
-  $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
-    if (ref $from ne 'ARRAY');
-
-  # copies for mangling
-  $from = [ @$from ];
-  $select = [ @$select ];
-  $attrs = { %$attrs };
-
-  # separate attributes
-  my $sub_attrs = { %$attrs };
-  delete $attrs->{$_} for qw/where bind rows offset group_by having/;
-  delete $sub_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
-
-  my $select_root_alias = $attrs->{alias};
-  my $sql_maker = $self->sql_maker;
-
-  # create subquery select list - consider only stuff *not* brought in by the prefetch
-  my $sub_select = [];
-  my $sub_group_by;
-  for my $i (0 .. @{$attrs->{select}} - @{$attrs->{_prefetch_select}} - 1) {
-    my $sel = $attrs->{select}[$i];
-
-    # alias any functions to the dbic-side 'as' label
-    # adjust the outer select accordingly
-    if (ref $sel eq 'HASH' && !$sel->{-select}) {
-      $sel = { -select => $sel, -as => $attrs->{as}[$i] };
-      $select->[$i] = join ('.', $attrs->{alias}, ($attrs->{as}[$i] || "select_$i") );
-    }
-
-    push @$sub_select, $sel;
-  }
-
-  # bring over all non-collapse-induced order_by into the inner query (if any)
-  # the outer one will have to keep them all
-  delete $sub_attrs->{order_by};
-  if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
-    $sub_attrs->{order_by} = [
-      @{$attrs->{order_by}}[ 0 .. $ord_cnt - 1]
-    ];
-  }
-
-  # mangle {from}, keep in mind that $from is "headless" from here on
-  my $join_root = shift @$from;
-
-  my %inner_joins;
-  my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
-
-  # in complex search_related chains $select_root_alias may *not* be
-  # 'me' so always include it in the inner join
-  $inner_joins{$select_root_alias} = 1 if ($join_root->{-alias} ne $select_root_alias);
-
-
-  # decide which parts of the join will remain on the inside
-  #
-  # this is not a very viable optimisation, but it was written
-  # before I realised this, so might as well remain. We can throw
-  # away _any_ branches of the join tree that are:
-  # 1) not mentioned in the condition/order
-  # 2) left-join leaves (or left-join leaf chains)
-  # Most of the join conditions will not satisfy this, but for real
-  # complex queries some might, and we might make some RDBMS happy.
-  #
-  #
-  # since we do not have introspectable SQLA, we fall back to ugly
-  # scanning of raw SQL for WHERE, and for pieces of ORDER BY
-  # in order to determine what goes into %inner_joins
-  # It may not be very efficient, but it's a reasonable stop-gap
-  {
-    # produce stuff unquoted, so it can be scanned
-    local $sql_maker->{quote_char};
-    my $sep = $self->_sql_maker_opts->{name_sep} || '.';
-    $sep = "\Q$sep\E";
-
-    my @order_by = (map
-      { ref $_ ? $_->[0] : $_ }
-      $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
-    );
-
-    my $where_sql = $sql_maker->where ($where);
-    my $select_sql = $sql_maker->_recurse_fields ($sub_select);
-
-    # sort needed joins
-    for my $alias (keys %join_info) {
-
-      # any table alias found on a column name in where or order_by
-      # gets included in %inner_joins
-      # Also any parent joins that are needed to reach this particular alias
-      for my $piece ($select_sql, $where_sql, @order_by ) {
-        if ($piece =~ /\b $alias $sep/x) {
-          $inner_joins{$alias} = 1;
-        }
-      }
-    }
-  }
-
-  # scan for non-leaf/non-left joins and mark as needed
-  # also mark all ancestor joins that are needed to reach this particular alias
-  # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
-  #
-  # traverse by the size of the -join_path i.e. reverse depth first
-  for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
-
-    my $j = $join_info{$alias};
-    $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
-
-    if ($inner_joins{$alias}) {
-      $inner_joins{$_} = 1 for (@{$j->{-join_path}});
-    }
-  }
-
-  # construct the inner $from for the subquery
-  my $inner_from = [ $join_root ];
-  for my $j (@$from) {
-    push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
-  }
-
-  # if a multi-type join was needed in the subquery ("multi" is indicated by
-  # presence in {collapse}) - add a group_by to simulate the collapse in the subq
-  unless ($sub_attrs->{group_by}) {
-    for my $alias (keys %inner_joins) {
-
-      # the dot comes from some weirdness in collapse
-      # remove after the rewrite
-      if ($attrs->{collapse}{".$alias"}) {
-        $sub_attrs->{group_by} ||= $sub_select;
-        last;
-      }
-    }
-  }
-
-  # generate the subquery
-  my $subq = $self->_select_args_to_query (
-    $inner_from,
-    $sub_select,
-    $where,
-    $sub_attrs
-  );
-  my $subq_joinspec = {
-    -alias => $select_root_alias,
-    -source_handle => $join_root->{-source_handle},
-    $select_root_alias => $subq,
-  };
-
-  # Generate a new from (really just replace the join slot with the subquery)
-  # Before we would start the outer chain from the subquery itself (i.e.
-  # SELECT ... FROM (SELECT ... ) alias JOIN ..., but this turned out to be
-  # a bad idea for search_related, as the root of the chain was effectively
-  # lost (i.e. $artist_rs->search_related ('cds'... ) would result in alias
-  # of 'cds', which would prevent from doing things like order_by artist.*)
-  # See t/prefetch/via_search_related.t for a better idea
-  my @outer_from;
-  if ($join_root->{-alias} eq $select_root_alias) { # just swap the root part and we're done
-    @outer_from = (
-      $subq_joinspec,
-      @$from,
-    )
-  }
-  else {  # this is trickier
-    @outer_from = ($join_root);
-
-    for my $j (@$from) {
-      if ($j->[0]{-alias} eq $select_root_alias) {
-        push @outer_from, [
-          $subq_joinspec,
-          @{$j}[1 .. $#$j],
-        ];
-      }
-      else {
-        push @outer_from, $j;
-      }
-    }
-  }
-
-  # This is totally horrific - the $where ends up in both the inner and outer query
-  # Unfortunately not much can be done until SQLA2 introspection arrives, and even
-  # then if where conditions apply to the *right* side of the prefetch, you may have
-  # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
-  # the outer select to exclude joins you didin't want in the first place
-  #
-  # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
-  return (\@outer_from, $select, $where, $attrs);
-}
-
-sub _resolve_ident_sources {
-  my ($self, $ident) = @_;
-
-  my $alias2source = {};
-  my $rs_alias;
-
-  # the reason this is so contrived is that $ident may be a {from}
-  # structure, specifying multiple tables to join
-  if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
-    # this is compat mode for insert/update/delete which do not deal with aliases
-    $alias2source->{me} = $ident;
-    $rs_alias = 'me';
-  }
-  elsif (ref $ident eq 'ARRAY') {
-
-    for (@$ident) {
-      my $tabinfo;
-      if (ref $_ eq 'HASH') {
-        $tabinfo = $_;
-        $rs_alias = $tabinfo->{-alias};
-      }
-      if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
-        $tabinfo = $_->[0];
-      }
-
-      $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
-        if ($tabinfo->{-source_handle});
-    }
-  }
-
-  return ($alias2source, $rs_alias);
-}
-
-# Takes $ident, \@column_names
-#
-# returns { $column_name => \%column_info, ... }
-# also note: this adds -result_source => $rsrc to the column info
-#
-# usage:
-#   my $col_sources = $self->_resolve_column_info($ident, @column_names);
-sub _resolve_column_info {
-  my ($self, $ident, $colnames) = @_;
-  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
-
-  my $sep = $self->_sql_maker_opts->{name_sep} || '.';
-  $sep = "\Q$sep\E";
-
-  my (%return, %seen_cols);
-
-  # compile a global list of column names, to be able to properly
-  # disambiguate unqualified column names (if at all possible)
-  for my $alias (keys %$alias2src) {
-    my $rsrc = $alias2src->{$alias};
-    for my $colname ($rsrc->columns) {
-      push @{$seen_cols{$colname}}, $alias;
-    }
-  }
-
-  COLUMN:
-  foreach my $col (@$colnames) {
-    my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
-
-    unless ($alias) {
-      # see if the column was seen exactly once (so we know which rsrc it came from)
-      if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
-        $alias = $seen_cols{$colname}[0];
-      }
-      else {
-        next COLUMN;
-      }
-    }
-
-    my $rsrc = $alias2src->{$alias};
-    $return{$col} = $rsrc && {
-      %{$rsrc->column_info($colname)},
-      -result_source => $rsrc,
-      -source_alias => $alias,
-    };
-  }
-
-  return \%return;
-}
-
 # Returns a counting SELECT for a simple count
 # query. Abstracted so that a storage could override
 # this to { count => 'firstcol' } or whatever makes
@@ -1873,22 +1865,6 @@ sub _subq_count_select {
   return @pcols ? \@pcols : [ 1 ];
 }
 
-#
-# Returns an ordered list of column names before they are used
-# in a SELECT statement. By default simply returns the list
-# passed in.
-#
-# This may be overridden in a specific storage when there are
-# requirements such as moving BLOB columns to the end of the 
-# SELECT list.
-sub _order_select_columns {
-  #my ($self, $source, $columns) = @_;
-  return @{$_[2]};
-}
-
-
-
-
 sub source_bind_attributes {
   my ($self, $source) = @_;
 
@@ -1963,7 +1939,7 @@ sub _dbh_sth {
 
 sub sth {
   my ($self, $sql) = @_;
-  $self->dbh_do('_dbh_sth', $sql);
+  $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
 }
 
 sub _dbh_columns_info_for {
@@ -2025,7 +2001,7 @@ sub _dbh_columns_info_for {
 
 sub columns_info_for {
   my ($self, $table) = @_;
-  $self->dbh_do('_dbh_columns_info_for', $table);
+  $self->_dbh_columns_info_for ($self->_get_dbh, $table);
 }
 
 =head2 last_insert_id
@@ -2051,7 +2027,68 @@ EOE
 
 sub last_insert_id {
   my $self = shift;
-  $self->dbh_do('_dbh_last_insert_id', @_);
+  $self->_dbh_last_insert_id ($self->_dbh, @_);
+}
+
+=head2 _native_data_type
+
+=over 4
+
+=item Arguments: $type_name
+
+=back
+
+This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
+currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
+L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
+
+The default implementation returns C<undef>, implement in your Storage driver if
+you need this functionality.
+
+Should map types from other databases to the native RDBMS type, for example
+C<VARCHAR2> to C<VARCHAR>.
+
+Types with modifiers should map to the underlying data type. For example,
+C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
+
+Composite types should map to the container type, for example
+C<ENUM(foo,bar,baz)> becomes C<ENUM>.
+
+=cut
+
+sub _native_data_type {
+  #my ($self, $data_type) = @_;
+  return undef
+}
+
+# Check if placeholders are supported at all
+sub _placeholders_supported {
+  my $self = shift;
+  my $dbh  = $self->_get_dbh;
+
+  # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
+  # but it is inaccurate more often than not
+  eval {
+    local $dbh->{PrintError} = 0;
+    local $dbh->{RaiseError} = 1;
+    $dbh->do('select ?', {}, 1);
+  };
+  return $@ ? 0 : 1;
+}
+
+# Check if placeholders bound to non-string types throw exceptions
+#
+sub _typeless_placeholders_supported {
+  my $self = shift;
+  my $dbh  = $self->_get_dbh;
+
+  eval {
+    local $dbh->{PrintError} = 0;
+    local $dbh->{RaiseError} = 1;
+    # this specifically tests a bind that is NOT a string
+    $dbh->do('select 1 where 1 = ?', {}, 1);
+  };
+  return $@ ? 0 : 1;
 }
 
 =head2 sqlt_type
@@ -2060,7 +2097,9 @@ Returns the database driver name.
 
 =cut
 
-sub sqlt_type { shift->last_dbh->{Driver}->{Name} }
+sub sqlt_type {
+  shift->_get_dbh->{Driver}->{Name};
+}
 
 =head2 bind_attribute_by_data_type
 
@@ -2176,9 +2215,8 @@ sub create_ddl_dir {
     %{$sqltargs || {}}
   };
 
-  $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
-      . $self->_check_sqlt_message . q{'})
-          if !$self->_check_sqlt_version;
+  $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
+    if !$self->_sqlt_version_ok;
 
   my $sqlt = SQL::Translator->new( $sqltargs );
 
@@ -2306,8 +2344,6 @@ See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
 
 sub deployment_statements {
   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
-  # Need to be connected to get the correct sqlt_type
-  $self->last_dbh() unless $type;
   $type ||= $self->sqlt_type;
   $version ||= $schema->schema_version || '1.x';
   $dir ||= './';
@@ -2322,22 +2358,34 @@ sub deployment_statements {
       return join('', @rows);
   }
 
-  $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
-      . $self->_check_sqlt_message . q{'})
-          if !$self->_check_sqlt_version;
-
-  require SQL::Translator::Parser::DBIx::Class;
-  eval qq{use SQL::Translator::Producer::${type}};
-  $self->throw_exception($@) if $@;
+  $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
+    if !$self->_sqlt_version_ok;
 
   # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
       if exists $sqltargs->{sources};
 
-  my $tr = SQL::Translator->new(%$sqltargs);
-  SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
-  return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
+  my $tr = SQL::Translator->new(
+    producer => "SQL::Translator::Producer::${type}",
+    %$sqltargs,
+    parser => 'SQL::Translator::Parser::DBIx::Class',
+    data => $schema,
+  );
+
+  my @ret;
+  my $wa = wantarray;
+  if ($wa) {
+    @ret = $tr->translate;
+  }
+  else {
+    $ret[0] = $tr->translate;
+  }
+
+  $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
+    unless (@ret && defined $ret[0]);
+
+  return $wa ? @ret : $ret[0];
 }
 
 sub deploy {
@@ -2352,10 +2400,9 @@ sub deploy {
     return if $line =~ /^\s+$/; # skip whitespace only
     $self->_query_start($line);
     eval {
-      # a previous error may invalidate $dbh - thus we need to use dbh()
-      # to guarantee a healthy $dbh (this is temporary until we get
-      # proper error handling on deploy() )
-      $self->dbh->do($line);
+      # do a dbh_do cycle here, as we need some error checking in
+      # place (even though we will ignore errors)
+      $self->dbh_do (sub { $_[1]->do($line) });
     };
     if ($@) {
       carp qq{$@ (running "${line}")};
@@ -2384,7 +2431,6 @@ Returns the datetime parser class
 sub datetime_parser {
   my $self = shift;
   return $self->{datetime_parser} ||= do {
-    $self->last_dbh;
     $self->build_datetime_parser(@_);
   };
 }
@@ -2407,26 +2453,10 @@ See L</datetime_parser>
 sub build_datetime_parser {
   my $self = shift;
   my $type = $self->datetime_parser_type(@_);
-  eval "use ${type}";
-  $self->throw_exception("Couldn't load ${type}: $@") if $@;
+  $self->ensure_class_loaded ($type);
   return $type;
 }
 
-{
-    my $_check_sqlt_version; # private
-    my $_check_sqlt_message; # private
-    sub _check_sqlt_version {
-        return $_check_sqlt_version if defined $_check_sqlt_version;
-        eval 'use SQL::Translator "0.09003"';
-        $_check_sqlt_message = $@ || '';
-        $_check_sqlt_version = !$@;
-    }
-
-    sub _check_sqlt_message {
-        _check_sqlt_version if !defined $_check_sqlt_message;
-        $_check_sqlt_message;
-    }
-}
 
 =head2 is_replicating
 
@@ -2453,10 +2483,44 @@ sub lag_behind_master {
     return;
 }
 
+# SQLT version handling
+{
+  my $_sqlt_version_ok;     # private
+  my $_sqlt_version_error;  # private
+
+  sub _sqlt_version_ok {
+    if (!defined $_sqlt_version_ok) {
+      eval "use SQL::Translator $minimum_sqlt_version";
+      if ($@) {
+        $_sqlt_version_ok = 0;
+        $_sqlt_version_error = $@;
+      }
+      else {
+        $_sqlt_version_ok = 1;
+      }
+    }
+    return $_sqlt_version_ok;
+  }
+
+  sub _sqlt_version_error {
+    shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
+    return $_sqlt_version_error;
+  }
+
+  sub _sqlt_minimum_version { $minimum_sqlt_version };
+}
+
 sub DESTROY {
   my $self = shift;
-  return if !$self->_dbh;
-  $self->_verify_pid;
+
+  $self->_verify_pid if $self->_dbh;
+
+  # some databases need this to stop spewing warnings
+  if (my $dbh = $self->_dbh) {
+    local $@;
+    eval { $dbh->disconnect };
+  }
+
   $self->_dbh(undef);
 }