Backout sybase changes
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 1cbc7ef..89cf5a3 100644 (file)
@@ -1,10 +1,12 @@
 package DBIx::Class::Storage::DBI;
 # -*- mode: cperl; cperl-indent-level: 2 -*-
 
+use strict;
+use warnings;
+
 use base 'DBIx::Class::Storage';
+use mro 'c3';
 
-use strict;    
-use warnings;
 use Carp::Clan qw/^DBIx::Class/;
 use DBI;
 use DBIx::Class::Storage::DBI::Cursor;
@@ -12,9 +14,14 @@ use DBIx::Class::Storage::Statistics;
 use Scalar::Util();
 use List::Util();
 
+# what version of sqlt do we require if deploy() without a ddl_dir is invoked
+# when changing also adjust the corresponding author_require in Makefile.PL
+my $minimum_sqlt_version = '0.11002';
+
+
 __PACKAGE__->mk_group_accessors('simple' =>
-  qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
-     _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
+  qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
+     _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
 );
 
 # the values for these accessors are picked out (and deleted) from
@@ -42,7 +49,14 @@ DBIx::Class::Storage::DBI - DBI storage handler
   my $schema = MySchema->connect('dbi:SQLite:my.db');
 
   $schema->storage->debug(1);
-  $schema->dbh_do("DROP TABLE authors");
+
+  my @stuff = $schema->storage->dbh_do(
+    sub {
+      my ($storage, $dbh, @args) = @_;
+      $dbh->do("DROP TABLE authors");
+    },
+    @column_list
+  );
 
   $schema->resultset('Book')->search({
      written_on => $schema->storage->datetime_parser(DateTime->now)
@@ -90,8 +104,8 @@ recognized by DBIx::Class:
 
 =item *
 
-A single code reference which returns a connected 
-L<DBI database handle|DBI/connect> optionally followed by 
+A single code reference which returns a connected
+L<DBI database handle|DBI/connect> optionally followed by
 L<extra attributes|/DBIx::Class specific connection attributes> recognized
 by DBIx::Class:
 
@@ -110,7 +124,13 @@ mixed together:
     %extra_attributes,
   }];
 
-This is particularly useful for L<Catalyst> based applications, allowing the 
+  $connect_info_args = [{
+    dbh_maker => sub { DBI->connect (...) },
+    %dbi_attributes,
+    %extra_attributes,
+  }];
+
+This is particularly useful for L<Catalyst> based applications, allowing the
 following config (L<Config::General> style):
 
   <Model::DB>
@@ -123,13 +143,17 @@ following config (L<Config::General> style):
     </connect_info>
   </Model::DB>
 
+The C<dsn>/C<user>/C<password> combination can be substituted by the
+C<dbh_maker> key whose value is a coderef that returns a connected
+L<DBI database handle|DBI/connect>
+
 =back
 
 Please note that the L<DBI> docs recommend that you always explicitly
 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
 recommends that it be set to I<1>, and that you perform transactions
 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
-to I<1> if you do not do explicitly set it to zero.  This is the default 
+to I<1> if you do not do explicitly set it to zero.  This is the default
 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
 
 =head3 DBIx::Class specific connection attributes
@@ -268,7 +292,7 @@ storage object.
 If set to a true value, this option will disable the caching of
 statement handles via L<DBI/prepare_cached>.
 
-=item limit_dialect 
+=item limit_dialect
 
 Sets the limit dialect. This is useful for JDBC-bridge among others
 where the remote SQL-dialect cannot be determined by the name of the
@@ -276,7 +300,7 @@ driver alone. See also L<SQL::Abstract::Limit>.
 
 =item quote_char
 
-Specifies what characters to use to quote table and column names. If 
+Specifies what characters to use to quote table and column names. If
 you use this you will want to specify L</name_sep> as well.
 
 C<quote_char> expects either a single character, in which case is it
@@ -288,8 +312,8 @@ SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
 
 =item name_sep
 
-This only needs to be used in conjunction with C<quote_char>, and is used to 
-specify the charecter that seperates elements (schemas, tables, columns) from 
+This only needs to be used in conjunction with C<quote_char>, and is used to
+specify the charecter that seperates elements (schemas, tables, columns) from
 each other. In most cases this is simply a C<.>.
 
 The consequences of not supplying this value is that L<SQL::Abstract>
@@ -335,6 +359,12 @@ L<DBIx::Class::Schema/connect>
   # Connect via subref
   ->connect_info([ sub { DBI->connect(...) } ]);
 
+  # Connect via subref in hashref
+  ->connect_info([{
+    dbh_maker => sub { DBI->connect(...) },
+    on_connect_do => 'alter session ...',
+  }]);
+
   # A bit more complicated
   ->connect_info(
     [
@@ -405,8 +435,21 @@ sub connect_info {
   elsif (ref $args[0] eq 'HASH') { # single hashref (i.e. Catalyst config)
     %attrs = %{$args[0]};
     @args = ();
-    for (qw/password user dsn/) {
-      unshift @args, delete $attrs{$_};
+    if (my $code = delete $attrs{dbh_maker}) {
+      @args = $code;
+
+      my @ignored = grep { delete $attrs{$_} } (qw/dsn user password/);
+      if (@ignored) {
+        carp sprintf (
+            'Attribute(s) %s in connect_info were ignored, as they can not be applied '
+          . "to the result of 'dbh_maker'",
+
+          join (', ', map { "'$_'" } (@ignored) ),
+        );
+      }
+    }
+    else {
+      @args = delete @attrs{qw/dsn user password/};
     }
   }
   else {                # otherwise assume dsn/user/password + \%attrs + \%extra_attrs
@@ -435,12 +478,28 @@ sub connect_info {
     }
   }
 
-  %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
+  if (ref $args[0] eq 'CODE') {
+    # _connect() never looks past $args[0] in this case
+    %attrs = ()
+  } else {
+    %attrs = (
+      %{ $self->_default_dbi_connect_attributes || {} },
+      %attrs,
+    );
+  }
 
   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
   $self->_connect_info;
 }
 
+sub _default_dbi_connect_attributes {
+  return {
+    AutoCommit => 1,
+    RaiseError => 1,
+    PrintError => 0,
+  };
+}
+
 =head2 on_connect_do
 
 This method is deprecated in favour of setting via L</connect_info>.
@@ -509,7 +568,7 @@ sub dbh_do {
   my $self = shift;
   my $code = shift;
 
-  my $dbh = $self->_dbh;
+  my $dbh = $self->_get_dbh;
 
   return $self->$code($dbh, @_) if $self->{_in_dbh_do}
       || $self->{transaction_depth};
@@ -520,11 +579,6 @@ sub dbh_do {
   my $want_array = wantarray;
 
   eval {
-    $self->_verify_pid if $dbh;
-    if(!$self->_dbh) {
-        $self->_populate_dbh;
-        $dbh = $self->_dbh;
-    }
 
     if($want_array) {
         @result = $self->$code($dbh, @_);
@@ -537,6 +591,7 @@ sub dbh_do {
     }
   };
 
+  # ->connected might unset $@ - copy
   my $exception = $@;
   if(!$exception) { return $want_array ? @result : $result[0] }
 
@@ -544,6 +599,8 @@ sub dbh_do {
 
   # We were not connected - reconnect and retry, but let any
   #  exception fall right through this time
+  carp "Retrying $code after catching disconnected exception: $exception"
+    if $ENV{DBIC_DBIRETRY_DEBUG};
   $self->_populate_dbh;
   $self->$code($self->_dbh, @_);
 }
@@ -568,8 +625,7 @@ sub txn_do {
   my $tried = 0;
   while(1) {
     eval {
-      $self->_verify_pid if $self->_dbh;
-      $self->_populate_dbh if !$self->_dbh;
+      $self->_get_dbh;
 
       $self->txn_begin;
       if($want_array) {
@@ -584,10 +640,11 @@ sub txn_do {
       $self->txn_commit;
     };
 
+    # ->connected might unset $@ - copy
     my $exception = $@;
     if(!$exception) { return $want_array ? @result : $result[0] }
 
-    if($tried++ > 0 || $self->connected) {
+    if($tried++ || $self->connected) {
       eval { $self->txn_rollback };
       my $rollback_exception = $@;
       if($rollback_exception) {
@@ -605,6 +662,8 @@ sub txn_do {
 
     # We were not connected, and was first try - reconnect and retry
     # via the while loop
+    carp "Retrying $coderef after catching disconnected exception: $exception"
+      if $ENV{DBIC_DBIRETRY_DEBUG};
     $self->_populate_dbh;
   }
 }
@@ -619,7 +678,7 @@ database is not in C<AutoCommit> mode.
 sub disconnect {
   my ($self) = @_;
 
-  if( $self->connected ) {
+  if( $self->_dbh ) {
     my @actions;
 
     push @actions, ( $self->on_disconnect_call || () );
@@ -627,7 +686,8 @@ sub disconnect {
 
     $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
 
-    $self->_dbh->rollback unless $self->_dbh_autocommit;
+    $self->_dbh_rollback unless $self->_dbh_autocommit;
+
     $self->_dbh->disconnect;
     $self->_dbh(undef);
     $self->{_dbh_gen}++;
@@ -656,23 +716,57 @@ sub with_deferred_fk_checks {
   $sub->();
 }
 
+=head2 connected
+
+=over
+
+=item Arguments: none
+
+=item Return Value: 1|0
+
+=back
+
+Verifies that the the current database handle is active and ready to execute
+an SQL statement (i.e. the connection did not get stale, server is still
+answering, etc.) This method is used internally by L</dbh>.
+
+=cut
+
 sub connected {
-  my ($self) = @_;
+  my $self = shift;
+  return 0 unless $self->_seems_connected;
 
-  if(my $dbh = $self->_dbh) {
-      if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
-          $self->_dbh(undef);
-          $self->{_dbh_gen}++;
-          return;
-      }
-      else {
-          $self->_verify_pid;
-          return 0 if !$self->_dbh;
-      }
-      return ($dbh->FETCH('Active') && $dbh->ping);
+  #be on the safe side
+  local $self->_dbh->{RaiseError} = 1;
+
+  return $self->_ping;
+}
+
+sub _seems_connected {
+  my $self = shift;
+
+  my $dbh = $self->_dbh
+    or return 0;
+
+  if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
+    $self->_dbh(undef);
+    $self->{_dbh_gen}++;
+    return 0;
+  }
+  else {
+    $self->_verify_pid;
+    return 0 if !$self->_dbh;
   }
 
-  return 0;
+  return $dbh->FETCH('Active');
+}
+
+sub _ping {
+  my $self = shift;
+
+  my $dbh = $self->_dbh or return 0;
+
+  return $dbh->ping;
 }
 
 # handle pid changes correctly
@@ -699,21 +793,42 @@ sub ensure_connected {
 
 =head2 dbh
 
-Returns the dbh - a data base handle of class L<DBI>.
+Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
+is guaranteed to be healthy by implicitly calling L</connected>, and if
+necessary performing a reconnection before returning. Keep in mind that this
+is very B<expensive> on some database engines. Consider using L<dbh_do>
+instead.
 
 =cut
 
 sub dbh {
   my ($self) = @_;
 
-  $self->ensure_connected;
+  if (not $self->_dbh) {
+    $self->_populate_dbh;
+  } else {
+    $self->ensure_connected;
+  }
+  return $self->_dbh;
+}
+
+# this is the internal "get dbh or connect (don't check)" method
+sub _get_dbh {
+  my $self = shift;
+  $self->_verify_pid if $self->_dbh;
+  $self->_populate_dbh unless $self->_dbh;
   return $self->_dbh;
 }
 
 sub _sql_maker_args {
     my ($self) = @_;
 
-    return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
+    return (
+      bindtype=>'columns',
+      array_datatypes => 1,
+      limit_dialect => $self->_get_dbh,
+      %{$self->_sql_maker_opts}
+    );
 }
 
 sub sql_maker {
@@ -726,11 +841,15 @@ sub sql_maker {
   return $self->_sql_maker;
 }
 
+# nothing to do by default
 sub _rebless {}
+sub _init {}
 
 sub _populate_dbh {
   my ($self) = @_;
+
   my @info = @{$self->_dbi_connect_info || []};
+  $self->_dbh(undef); # in case ->connected failed we might get sent here
   $self->_dbh($self->_connect(@info));
 
   $self->_conn_pid($$);
@@ -742,6 +861,11 @@ sub _populate_dbh {
   #  there is no transaction in progress by definition
   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
 
+  $self->_run_connection_actions unless $self->{_in_determine_driver};
+}
+
+sub _run_connection_actions {
+  my $self = shift;
   my @actions;
 
   push @actions, ( $self->on_connect_call || () );
@@ -753,21 +877,43 @@ sub _populate_dbh {
 sub _determine_driver {
   my ($self) = @_;
 
-  if (ref $self eq 'DBIx::Class::Storage::DBI') {
-    my $driver;
+  if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
+    my $started_unconnected = 0;
+    local $self->{_in_determine_driver} = 1;
+
+    if (ref($self) eq __PACKAGE__) {
+      my $driver;
+      if ($self->_dbh) { # we are connected
+        $driver = $self->_dbh->{Driver}{Name};
+      } else {
+        # if connect_info is a CODEREF, we have no choice but to connect
+        if (ref $self->_dbi_connect_info->[0] &&
+            Scalar::Util::reftype($self->_dbi_connect_info->[0]) eq 'CODE') {
+          $self->_populate_dbh;
+          $driver = $self->_dbh->{Driver}{Name};
+        }
+        else {
+          # try to use dsn to not require being connected, the driver may still
+          # force a connection in _rebless to determine version
+          ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+          $started_unconnected = 1;
+        }
+      }
 
-    if ($self->_dbh) { # we are connected
-      $driver = $self->_dbh->{Driver}{Name};
-    } else {
-      # try to use dsn to not require being connected, the driver may still
-      # force a connection in _rebless to determine version
-      ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+      my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
+      if ($self->load_optional_class($storage_class)) {
+        mro::set_mro($storage_class, 'c3');
+        bless $self, $storage_class;
+        $self->_rebless();
+      }
     }
 
-    if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
-      bless $self, "DBIx::Class::Storage::DBI::${driver}";
-      $self->_rebless();
-    }
+    $self->_driver_determined(1);
+
+    $self->_init; # run driver-specific initializations
+
+    $self->_run_connection_actions
+        if $started_unconnected && defined $self->_dbh;
   }
 }
 
@@ -825,7 +971,7 @@ sub _do_query {
     my @bind = map { [ undef, $_ ] } @do_args;
 
     $self->_query_start($sql, @bind);
-    $self->_dbh->do($sql, $attrs, @do_args);
+    $self->_get_dbh->do($sql, $attrs, @do_args);
     $self->_query_end($sql, @bind);
   }
 
@@ -861,6 +1007,8 @@ sub _connect {
             $weak_self->throw_exception("DBI Exception: $_[0]");
           }
           else {
+            # the handler may be invoked by something totally out of
+            # the scope of DBIC
             croak ("DBI Exception: $_[0]");
           }
       };
@@ -891,11 +1039,11 @@ sub svp_begin {
 
   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
     unless $self->can('_svp_begin');
-  
+
   push @{ $self->{savepoints} }, $name;
 
   $self->debugobj->svp_begin($name) if $self->debug;
-  
+
   return $self->_svp_begin($name);
 }
 
@@ -955,7 +1103,7 @@ sub svp_rollback {
   }
 
   $self->debugobj->svp_rollback($name) if $self->debug;
-  
+
   return $self->_svp_rollback($name);
 }
 
@@ -967,27 +1115,39 @@ sub _svp_generate_name {
 
 sub txn_begin {
   my $self = shift;
-  $self->ensure_connected();
   if($self->{transaction_depth} == 0) {
     $self->debugobj->txn_begin()
       if $self->debug;
-    # this isn't ->_dbh-> because
-    #  we should reconnect on begin_work
-    #  for AutoCommit users
-    $self->dbh->begin_work;
-  } elsif ($self->auto_savepoint) {
+    $self->_dbh_begin_work;
+  }
+  elsif ($self->auto_savepoint) {
     $self->svp_begin;
   }
   $self->{transaction_depth}++;
 }
 
+sub _dbh_begin_work {
+  my $self = shift;
+
+  # if the user is utilizing txn_do - good for him, otherwise we need to
+  # ensure that the $dbh is healthy on BEGIN.
+  # We do this via ->dbh_do instead of ->dbh, so that the ->dbh "ping"
+  # will be replaced by a failure of begin_work itself (which will be
+  # then retried on reconnect)
+  if ($self->{_in_dbh_do}) {
+    $self->_dbh->begin_work;
+  } else {
+    $self->dbh_do(sub { $_[1]->begin_work });
+  }
+}
+
 sub txn_commit {
   my $self = shift;
   if ($self->{transaction_depth} == 1) {
     my $dbh = $self->_dbh;
     $self->debugobj->txn_commit()
       if ($self->debug);
-    $dbh->commit;
+    $self->_dbh_commit;
     $self->{transaction_depth} = 0
       if $self->_dbh_autocommit;
   }
@@ -998,6 +1158,11 @@ sub txn_commit {
   }
 }
 
+sub _dbh_commit {
+  my $self = shift;
+  $self->_dbh->commit;
+}
+
 sub txn_rollback {
   my $self = shift;
   my $dbh = $self->_dbh;
@@ -1007,7 +1172,7 @@ sub txn_rollback {
         if ($self->debug);
       $self->{transaction_depth} = 0
         if $self->_dbh_autocommit;
-      $dbh->rollback;
+      $self->_dbh_rollback;
     }
     elsif($self->{transaction_depth} > 1) {
       $self->{transaction_depth}--;
@@ -1030,6 +1195,11 @@ sub txn_rollback {
   }
 }
 
+sub _dbh_rollback {
+  my $self = shift;
+  $self->_dbh->rollback;
+}
+
 # This used to be the top-half of _execute.  It was split out to make it
 #  easier to override in NoBindVars without duping the rest.  It takes up
 #  all of _execute's args, and emits $sql, @bind.
@@ -1093,7 +1263,7 @@ sub _dbh_execute {
 
   my $sth = $self->sth($sql,$op);
 
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@$bind) {
     my $attributes = {};
@@ -1124,24 +1294,33 @@ sub _dbh_execute {
 
 sub _execute {
     my $self = shift;
-    $self->dbh_do('_dbh_execute', @_)
+    $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
 }
 
 sub insert {
   my ($self, $source, $to_insert) = @_;
 
+# redispatch to insert method of storage we reblessed into, if necessary
+  if (not $self->_driver_determined) {
+    $self->_determine_driver;
+    goto $self->can('insert');
+  }
+
   my $ident = $source->from;
   my $bind_attributes = $self->source_bind_attributes($source);
 
   my $updated_cols = {};
 
-  $self->ensure_connected;
   foreach my $col ( $source->columns ) {
     if ( !defined $to_insert->{$col} ) {
       my $col_info = $source->column_info($col);
 
       if ( $col_info->{auto_nextval} ) {
-        $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
+        $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
+          'nextval',
+          $col_info->{sequence} ||
+            $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
+        );
       }
     }
   }
@@ -1152,16 +1331,23 @@ sub insert {
 }
 
 ## Still not quite perfect, and EXPERIMENTAL
-## Currently it is assumed that all values passed will be "normal", i.e. not 
+## Currently it is assumed that all values passed will be "normal", i.e. not
 ## scalar refs, or at least, all the same type as the first set, the statement is
 ## only prepped once.
 sub insert_bulk {
   my ($self, $source, $cols, $data) = @_;
+
+# redispatch to insert_bulk method of storage we reblessed into, if necessary
+  if (not $self->_driver_determined) {
+    $self->_determine_driver;
+    goto $self->can('insert_bulk');
+  }
+
   my %colvalues;
   my $table = $source->from;
   @colvalues{@$cols} = (0..$#$cols);
   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
-  
+
   $self->_query_start( $sql, @bind );
   my $sth = $self->sth($sql);
 
@@ -1174,7 +1360,7 @@ sub insert_bulk {
   my $bind_attributes = $self->source_bind_attributes($source);
 
   ## Bind the values and execute
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@bind) {
 
@@ -1204,6 +1390,7 @@ sub insert_bulk {
     local $Data::Dumper::Indent = 1;
     local $Data::Dumper::Useqq = 1;
     local $Data::Dumper::Quotekeys = 0;
+    local $Data::Dumper::Sortkeys = 1;
 
     $self->throw_exception(sprintf "%s for populate slice:\n%s",
       $tuple_status->[$i][1],
@@ -1219,20 +1406,26 @@ sub insert_bulk {
 }
 
 sub update {
-  my $self = shift @_;
-  my $source = shift @_;
+  my ($self, $source, @args) = @_; 
+
+# redispatch to update method of storage we reblessed into, if necessary
+  if (not $self->_driver_determined) {
+    $self->_determine_driver;
+    goto $self->can('update');
+  }
+
   my $bind_attributes = $self->source_bind_attributes($source);
-  
-  return $self->_execute('update' => [], $source, $bind_attributes, @_);
+
+  return $self->_execute('update' => [], $source, $bind_attributes, @args);
 }
 
 
 sub delete {
   my $self = shift @_;
   my $source = shift @_;
-  
+  $self->_determine_driver;
   my $bind_attrs = $self->source_bind_attributes($source);
-  
+
   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
 }
 
@@ -1331,10 +1524,10 @@ sub _select {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   return $self->_execute($self->_select_args(@_));
 }
@@ -1343,10 +1536,10 @@ sub _select_args_to_query {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
   #  = $self->_select_args($ident, $select, $cond, $attrs);
@@ -1366,8 +1559,19 @@ sub _select_args_to_query {
 sub _select_args {
   my ($self, $ident, $select, $where, $attrs) = @_;
 
+  my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
+
   my $sql_maker = $self->sql_maker;
-  my $alias2source = $self->_resolve_ident_sources ($ident);
+  $sql_maker->{_dbic_rs_attrs} = {
+    %$attrs,
+    select => $select,
+    from => $ident,
+    where => $where,
+    $rs_alias
+      ? ( _source_handle => $alias2source->{$rs_alias}->handle )
+      : ()
+    ,
+  };
 
   # calculate bind_attrs before possible $ident mangling
   my $bind_attrs = {};
@@ -1378,29 +1582,55 @@ sub _select_args {
       my $fqcn = join ('.', $alias, $col);
       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
 
-      # so that unqualified searches can be bound too
-      $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
+      # Unqialified column names are nice, but at the same time can be
+      # rather ambiguous. What we do here is basically go along with
+      # the loop, adding an unqualified column slot to $bind_attrs,
+      # alongside the fully qualified name. As soon as we encounter
+      # another column by that name (which would imply another table)
+      # we unset the unqualified slot and never add any info to it
+      # to avoid erroneous type binding. If this happens the users
+      # only choice will be to fully qualify his column name
+
+      if (exists $bind_attrs->{$col}) {
+        $bind_attrs->{$col} = {};
+      }
+      else {
+        $bind_attrs->{$col} = $bind_attrs->{$fqcn};
+      }
     }
   }
 
-  my @limit;
-  if ($attrs->{software_limit} ||
-      $sql_maker->_default_limit_syntax eq "GenericSubQ") {
-        $attrs->{software_limit} = 1;
-  } else {
+  # adjust limits
+  if (
+    $attrs->{software_limit}
+      ||
+    $sql_maker->_default_limit_syntax eq "GenericSubQ"
+  ) {
+    $attrs->{software_limit} = 1;
+  }
+  else {
     $self->throw_exception("rows attribute must be positive if present")
       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
 
     # MySQL actually recommends this approach.  I cringe.
     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
+  }
 
-    if ($attrs->{rows} && keys %{$attrs->{collapse}}) {
-      ($ident, $select, $where, $attrs)
-        = $self->_adjust_select_args_for_limited_prefetch ($ident, $select, $where, $attrs);
-    }
-    else {
-      push @limit, $attrs->{rows}, $attrs->{offset};
-    }
+  my @limit;
+
+  # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
+  # otherwise delegate the limiting to the storage, unless software limit was requested
+  if (
+    ( $attrs->{rows} && keys %{$attrs->{collapse}} )
+       ||
+    ( $attrs->{group_by} && @{$attrs->{group_by}} &&
+      $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
+  ) {
+    ($ident, $select, $where, $attrs)
+      = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
+  }
+  elsif (! $attrs->{software_limit} ) {
+    push @limit, $attrs->{rows}, $attrs->{offset};
   }
 
 ###
@@ -1415,173 +1645,259 @@ sub _select_args {
 
   my $order = { map
     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
-    (qw/order_by group_by having _virtual_order_by/ )
+    (qw/order_by group_by having/ )
   };
 
-
-  $sql_maker->{for} = delete $attrs->{for};
-
   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
 }
 
-sub _adjust_select_args_for_limited_prefetch {
+#
+# This is the code producing joined subqueries like:
+# SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
+#
+sub _adjust_select_args_for_complex_prefetch {
   my ($self, $from, $select, $where, $attrs) = @_;
 
-  if ($attrs->{group_by} and @{$attrs->{group_by}}) {
-    $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a group_by attribute');
-  }
+  $self->throw_exception ('Nothing to prefetch... how did we get here?!')
+    if not @{$attrs->{_prefetch_select}};
 
-  $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
-    if (ref $from ne 'ARRAY');
+  $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
+    if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
 
 
-  # separate attributes
-  my $sub_attrs = { %$attrs };
-  delete $attrs->{$_} for qw/where bind rows offset/;
-  delete $sub_attrs->{$_} for qw/for collapse select order_by/;
+  # generate inner/outer attribute lists, remove stuff that doesn't apply
+  my $outer_attrs = { %$attrs };
+  delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
 
-  my $alias = $attrs->{alias};
+  my $inner_attrs = { %$attrs };
+  delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
 
-  # create subquery select list
-  my $sub_select = [ grep { $_ =~ /^$alias\./ } @{$attrs->{select}} ];
 
   # bring over all non-collapse-induced order_by into the inner query (if any)
   # the outer one will have to keep them all
-  if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
-    $sub_attrs->{order_by} = [
-      @{$attrs->{order_by}}[ 0 .. ($#{$attrs->{order_by}} - $ord_cnt - 1) ]
+  delete $inner_attrs->{order_by};
+  if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
+    $inner_attrs->{order_by} = [
+      @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
     ];
   }
 
-  # mangle {from}
-  $from = [ @$from ];
-  my $select_root = shift @$from;
-  my @outer_from = @$from;
-
-  my %inner_joins;
-  my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
 
-  # in complex search_related chains $alias may *not* be 'me'
-  # so always include it in the inner join, and also shift away
-  # from the outer stack, so that the two datasets actually do
-  # meet
-  if ($select_root->{-alias} ne $alias) {
-    $inner_joins{$alias} = 1;
+  # generate the inner/outer select lists
+  # for inside we consider only stuff *not* brought in by the prefetch
+  # on the outside we substitute any function for its alias
+  my $outer_select = [ @$select ];
+  my $inner_select = [];
+  for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
+    my $sel = $outer_select->[$i];
 
-    while (@outer_from && $outer_from[0][0]{-alias} ne $alias) {
-      shift @outer_from;
-    }
-    if (! @outer_from) {
-      $self->throw_exception ("Unable to find '$alias' in the {from} stack, something is wrong");
+    if (ref $sel eq 'HASH' ) {
+      $sel->{-as} ||= $attrs->{as}[$i];
+      $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
     }
 
-    shift @outer_from; # the new subquery will represent this alias, so get rid of it
+    push @$inner_select, $sel;
   }
 
+  # normalize a copy of $from, so it will be easier to work with further
+  # down (i.e. promote the initial hashref to an AoH)
+  $from = [ @$from ];
+  $from->[0] = [ $from->[0] ];
+  my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
 
-  # decide which parts of the join will remain on the inside
-  #
-  # this is not a very viable optimisation, but it was written
-  # before I realised this, so might as well remain. We can throw
-  # away _any_ branches of the join tree that are:
-  # 1) not mentioned in the condition/order
-  # 2) left-join leaves (or left-join leaf chains)
-  # Most of the join ocnditions will not satisfy this, but for real
-  # complex queries some might, and we might make some RDBMS happy.
-  #
-  #
-  # since we do not have introspectable SQLA, we fall back to ugly
-  # scanning of raw SQL for WHERE, and for pieces of ORDER BY
-  # in order to determine what goes into %inner_joins
+
+  # decide which parts of the join will remain in either part of
+  # the outer/inner query
+
+  # First we compose a list of which aliases are used in restrictions
+  # (i.e. conditions/order/grouping/etc). Since we do not have
+  # introspectable SQLA, we fall back to ugly scanning of raw SQL for
+  # WHERE, and for pieces of ORDER BY in order to determine which aliases
+  # need to appear in the resulting sql.
   # It may not be very efficient, but it's a reasonable stop-gap
+  # Also unqualified column names will not be considered, but more often
+  # than not this is actually ok
+  #
+  # In the same loop we enumerate part of the selection aliases, as
+  # it requires the same sqla hack for the time being
+  my ($restrict_aliases, $select_aliases, $prefetch_aliases);
   {
     # produce stuff unquoted, so it can be scanned
     my $sql_maker = $self->sql_maker;
     local $sql_maker->{quote_char};
+    my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+    $sep = "\Q$sep\E";
 
-    my @order_by = (map
+    my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
+    my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
+    my $where_sql = $sql_maker->where ($where);
+    my $group_by_sql = $sql_maker->_order_by({
+      map { $_ => $inner_attrs->{$_} } qw/group_by having/
+    });
+    my @non_prefetch_order_by_chunks = (map
       { ref $_ ? $_->[0] : $_ }
-      $sql_maker->_order_by_chunks ($sub_attrs->{order_by})
+      $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
     );
 
-    my $where_sql = $sql_maker->where ($where);
 
-    # sort needed joins
-    for my $alias (keys %join_info) {
+    for my $alias (keys %original_join_info) {
+      my $seen_re = qr/\b $alias $sep/x;
 
-      # any table alias found on a column name in where or order_by
-      # gets included in %inner_joins
-      # Also any parent joins that are needed to reach this particular alias
-      for my $piece ($where_sql, @order_by ) {
-        if ($piece =~ /\b$alias\./) {
-          $inner_joins{$alias} = 1;
+      for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
+        if ($piece =~ $seen_re) {
+          $restrict_aliases->{$alias} = 1;
         }
       }
+
+      if ($non_prefetch_select_sql =~ $seen_re) {
+          $select_aliases->{$alias} = 1;
+      }
+
+      if ($prefetch_select_sql =~ $seen_re) {
+          $prefetch_aliases->{$alias} = 1;
+      }
+
     }
   }
 
-  # scan for non-leaf/non-left joins and mark as needed
-  # also mark all ancestor joins that are needed to reach this particular alias
-  # (e.g.  join => { cds => 'tracks' } - tracks will bring cds too )
-  #
-  # traverse by the size of the -join_path i.e. reverse depth first
-  for my $alias (sort { @{$join_info{$b}{-join_path}} <=> @{$join_info{$a}{-join_path}} } (keys %join_info) ) {
-
-    my $j = $join_info{$alias};
-    $inner_joins{$alias} = 1 if (! $j->{-join_type} || ($j->{-join_type} !~ /^left$/i) );
+  # Add any non-left joins to the restriction list (such joins are indeed restrictions)
+  for my $j (values %original_join_info) {
+    my $alias = $j->{-alias} or next;
+    $restrict_aliases->{$alias} = 1 if (
+      (not $j->{-join_type})
+        or
+      ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
+    );
+  }
 
-    if ($inner_joins{$alias}) {
-      $inner_joins{$_} = 1 for (@{$j->{-join_path}});
+  # mark all join parents as mentioned
+  # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
+  for my $collection ($restrict_aliases, $select_aliases) {
+    for my $alias (keys %$collection) {
+      $collection->{$_} = 1
+        for (@{ $original_join_info{$alias}{-join_path} || [] });
     }
   }
 
   # construct the inner $from for the subquery
-  my $inner_from = [ $select_root ];
+  my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
+  my @inner_from;
   for my $j (@$from) {
-    push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
+    push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
   }
 
   # if a multi-type join was needed in the subquery ("multi" is indicated by
   # presence in {collapse}) - add a group_by to simulate the collapse in the subq
-
-  for my $alias (keys %inner_joins) {
-
-    # the dot comes from some weirdness in collapse
-    # remove after the rewrite
-    if ($attrs->{collapse}{".$alias"}) {
-      $sub_attrs->{group_by} = $sub_select;
-      last;
+  unless ($inner_attrs->{group_by}) {
+    for my $alias (keys %inner_joins) {
+
+      # the dot comes from some weirdness in collapse
+      # remove after the rewrite
+      if ($attrs->{collapse}{".$alias"}) {
+        $inner_attrs->{group_by} ||= $inner_select;
+        last;
+      }
     }
   }
 
+  # demote the inner_from head
+  $inner_from[0] = $inner_from[0][0];
+
   # generate the subquery
   my $subq = $self->_select_args_to_query (
-    $inner_from,
-    $sub_select,
+    \@inner_from,
+    $inner_select,
     $where,
-    $sub_attrs
+    $inner_attrs,
   );
 
-  # put it in the new {from}
-  unshift @outer_from, { $alias => $subq };
+  my $subq_joinspec = {
+    -alias => $attrs->{alias},
+    -source_handle => $inner_from[0]{-source_handle},
+    $attrs->{alias} => $subq,
+  };
+
+  # Generate the outer from - this is relatively easy (really just replace
+  # the join slot with the subquery), with a major caveat - we can not
+  # join anything that is non-selecting (not part of the prefetch), but at
+  # the same time is a multi-type relationship, as it will explode the result.
+  #
+  # There are two possibilities here
+  # - either the join is non-restricting, in which case we simply throw it away
+  # - it is part of the restrictions, in which case we need to collapse the outer
+  #   result by tackling yet another group_by to the outside of the query
+
+  # so first generate the outer_from, up to the substitution point
+  my @outer_from;
+  while (my $j = shift @$from) {
+    if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
+      push @outer_from, [
+        $subq_joinspec,
+        @{$j}[1 .. $#$j],
+      ];
+      last; # we'll take care of what's left in $from below
+    }
+    else {
+      push @outer_from, $j;
+    }
+  }
+
+  # see what's left - throw away if not selecting/restricting
+  # also throw in a group_by if restricting to guard against
+  # cross-join explosions
+  #
+  while (my $j = shift @$from) {
+    my $alias = $j->[0]{-alias};
+
+    if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
+      push @outer_from, $j;
+    }
+    elsif ($restrict_aliases->{$alias}) {
+      push @outer_from, $j;
+
+      # FIXME - this should be obviated by SQLA2, as I'll be able to 
+      # have restrict_inner and restrict_outer... or something to that
+      # effect... I think...
+
+      # FIXME2 - I can't find a clean way to determine if a particular join
+      # is a multi - instead I am just treating everything as a potential
+      # explosive join (ribasushi)
+      #
+      # if (my $handle = $j->[0]{-source_handle}) {
+      #   my $rsrc = $handle->resolve;
+      #   ... need to bail out of the following if this is not a multi,
+      #       as it will be much easier on the db ...
+
+          $outer_attrs->{group_by} ||= $outer_select;
+      # }
+    }
+  }
+
+  # demote the outer_from head
+  $outer_from[0] = $outer_from[0][0];
 
   # This is totally horrific - the $where ends up in both the inner and outer query
-  # Unfortunately not much can be done until SQLA2 introspection arrives
+  # Unfortunately not much can be done until SQLA2 introspection arrives, and even
+  # then if where conditions apply to the *right* side of the prefetch, you may have
+  # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
+  # the outer select to exclude joins you didin't want in the first place
   #
   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
-  return (\@outer_from, $select, $where, $attrs);
+  return (\@outer_from, $outer_select, $where, $outer_attrs);
 }
 
 sub _resolve_ident_sources {
   my ($self, $ident) = @_;
 
   my $alias2source = {};
+  my $rs_alias;
 
   # the reason this is so contrived is that $ident may be a {from}
   # structure, specifying multiple tables to join
   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
     # this is compat mode for insert/update/delete which do not deal with aliases
     $alias2source->{me} = $ident;
+    $rs_alias = 'me';
   }
   elsif (ref $ident eq 'ARRAY') {
 
@@ -1589,6 +1905,7 @@ sub _resolve_ident_sources {
       my $tabinfo;
       if (ref $_ eq 'HASH') {
         $tabinfo = $_;
+        $rs_alias = $tabinfo->{-alias};
       }
       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
         $tabinfo = $_->[0];
@@ -1599,7 +1916,57 @@ sub _resolve_ident_sources {
     }
   }
 
-  return $alias2source;
+  return ($alias2source, $rs_alias);
+}
+
+# Takes $ident, \@column_names
+#
+# returns { $column_name => \%column_info, ... }
+# also note: this adds -result_source => $rsrc to the column info
+#
+# usage:
+#   my $col_sources = $self->_resolve_column_info($ident, @column_names);
+sub _resolve_column_info {
+  my ($self, $ident, $colnames) = @_;
+  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
+
+  my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+  $sep = "\Q$sep\E";
+
+  my (%return, %seen_cols);
+
+  # compile a global list of column names, to be able to properly
+  # disambiguate unqualified column names (if at all possible)
+  for my $alias (keys %$alias2src) {
+    my $rsrc = $alias2src->{$alias};
+    for my $colname ($rsrc->columns) {
+      push @{$seen_cols{$colname}}, $alias;
+    }
+  }
+
+  COLUMN:
+  foreach my $col (@$colnames) {
+    my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
+
+    unless ($alias) {
+      # see if the column was seen exactly once (so we know which rsrc it came from)
+      if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
+        $alias = $seen_cols{$colname}[0];
+      }
+      else {
+        next COLUMN;
+      }
+    }
+
+    my $rsrc = $alias2src->{$alias};
+    $return{$col} = $rsrc && {
+      %{$rsrc->column_info($colname)},
+      -result_source => $rsrc,
+      -source_alias => $alias,
+    };
+  }
+
+  return \%return;
 }
 
 # Returns a counting SELECT for a simple count
@@ -1701,7 +2068,7 @@ sub _dbh_sth {
 
 sub sth {
   my ($self, $sql) = @_;
-  $self->dbh_do('_dbh_sth', $sql);
+  $self->dbh_do('_dbh_sth', $sql);  # retry over disconnects
 }
 
 sub _dbh_columns_info_for {
@@ -1763,7 +2130,7 @@ sub _dbh_columns_info_for {
 
 sub columns_info_for {
   my ($self, $table) = @_;
-  $self->dbh_do('_dbh_columns_info_for', $table);
+  $self->_dbh_columns_info_for ($self->_get_dbh, $table);
 }
 
 =head2 last_insert_id
@@ -1789,7 +2156,68 @@ EOE
 
 sub last_insert_id {
   my $self = shift;
-  $self->dbh_do('_dbh_last_insert_id', @_);
+  $self->_dbh_last_insert_id ($self->_dbh, @_);
+}
+
+=head2 _native_data_type
+
+=over 4
+
+=item Arguments: $type_name
+
+=back
+
+This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
+currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
+L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
+
+The default implementation returns C<undef>, implement in your Storage driver if
+you need this functionality.
+
+Should map types from other databases to the native RDBMS type, for example
+C<VARCHAR2> to C<VARCHAR>.
+
+Types with modifiers should map to the underlying data type. For example,
+C<INTEGER AUTO_INCREMENT> should become C<INTEGER>.
+
+Composite types should map to the container type, for example
+C<ENUM(foo,bar,baz)> becomes C<ENUM>.
+
+=cut
+
+sub _native_data_type {
+  #my ($self, $data_type) = @_;
+  return undef
+}
+
+# Check if placeholders are supported at all
+sub _placeholders_supported {
+  my $self = shift;
+  my $dbh  = $self->_get_dbh;
+
+  # some drivers provide a $dbh attribute (e.g. Sybase and $dbh->{syb_dynamic_supported})
+  # but it is inaccurate more often than not
+  eval {
+    local $dbh->{PrintError} = 0;
+    local $dbh->{RaiseError} = 1;
+    $dbh->do('select ?', {}, 1);
+  };
+  return $@ ? 0 : 1;
+}
+
+# Check if placeholders bound to non-string types throw exceptions
+#
+sub _typeless_placeholders_supported {
+  my $self = shift;
+  my $dbh  = $self->_get_dbh;
+
+  eval {
+    local $dbh->{PrintError} = 0;
+    local $dbh->{RaiseError} = 1;
+    # this specifically tests a bind that is NOT a string
+    $dbh->do('select 1 where 1 = ?', {}, 1);
+  };
+  return $@ ? 0 : 1;
 }
 
 =head2 sqlt_type
@@ -1798,7 +2226,16 @@ Returns the database driver name.
 
 =cut
 
-sub sqlt_type { shift->dbh->{Driver}->{Name} }
+sub sqlt_type {
+  my ($self) = @_;
+
+  if (not $self->_driver_determined) {
+    $self->_determine_driver;
+    goto $self->can ('sqlt_type');
+  }
+
+  $self->_get_dbh->{Driver}->{Name};
+}
 
 =head2 bind_attribute_by_data_type
 
@@ -1881,13 +2318,13 @@ By default, C<\%sqlt_args> will have
 
  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
 
-merged with the hash passed in. To disable any of those features, pass in a 
+merged with the hash passed in. To disable any of those features, pass in a
 hashref like the following
 
  { ignore_constraint_names => 0, # ... other options }
 
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly 
+Note that this feature is currently EXPERIMENTAL and may not work correctly
 across all databases, or fully handle complex relationships.
 
 WARNING: Please check all SQL files created, before applying them.
@@ -1908,15 +2345,14 @@ sub create_ddl_dir {
   $version ||= $schema_version;
 
   $sqltargs = {
-    add_drop_table => 1, 
+    add_drop_table => 1,
     ignore_constraint_names => 1,
     ignore_index_names => 1,
     %{$sqltargs || {}}
   };
 
-  $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09003: '}
-      . $self->_check_sqlt_message . q{'})
-          if !$self->_check_sqlt_version;
+  $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
+    if !$self->_sqlt_version_ok;
 
   my $sqlt = SQL::Translator->new( $sqltargs );
 
@@ -1948,7 +2384,7 @@ sub create_ddl_dir {
     }
     print $file $output;
     close($file);
-  
+
     next unless ($preversion);
 
     require SQL::Translator::Diff;
@@ -1964,7 +2400,7 @@ sub create_ddl_dir {
       carp("Overwriting existing diff file - $difffile");
       unlink($difffile);
     }
-    
+
     my $source_schema;
     {
       my $t = SQL::Translator->new($sqltargs);
@@ -1983,7 +2419,7 @@ sub create_ddl_dir {
         unless ( $source_schema->name );
     }
 
-    # The "new" style of producers have sane normalization and can support 
+    # The "new" style of producers have sane normalization and can support
     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
     # And we have to diff parsed SQL against parsed SQL.
     my $dest_schema = $sqlt_schema;
@@ -2004,12 +2440,12 @@ sub create_ddl_dir {
       $dest_schema->name( $filename )
         unless $dest_schema->name;
     }
-    
+
     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
                                                   $dest_schema,   $db,
                                                   $sqltargs
                                                  );
-    if(!open $file, ">$difffile") { 
+    if(!open $file, ">$difffile") {
       $self->throw_exception("Can't write to $difffile ($!)");
       next;
     }
@@ -2044,8 +2480,6 @@ See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
 
 sub deployment_statements {
   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
-  # Need to be connected to get the correct sqlt_type
-  $self->ensure_connected() unless $type;
   $type ||= $self->sqlt_type;
   $version ||= $schema->schema_version || '1.x';
   $dir ||= './';
@@ -2053,29 +2487,28 @@ sub deployment_statements {
   if(-f $filename)
   {
       my $file;
-      open($file, "<$filename") 
+      open($file, "<$filename")
         or $self->throw_exception("Can't open $filename ($!)");
       my @rows = <$file>;
       close($file);
       return join('', @rows);
   }
 
-  $self->throw_exception(q{Can't deploy without SQL::Translator 0.09003: '}
-      . $self->_check_sqlt_message . q{'})
-          if !$self->_check_sqlt_version;
+  $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
+    if !$self->_sqlt_version_ok;
 
-  require SQL::Translator::Parser::DBIx::Class;
-  eval qq{use SQL::Translator::Producer::${type}};
-  $self->throw_exception($@) if $@;
-
-  # sources needs to be a parser arg, but for simplicty allow at top level 
+  # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
       if exists $sqltargs->{sources};
 
-  my $tr = SQL::Translator->new(%$sqltargs);
-  SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
-  return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
+  my $tr = SQL::Translator->new(
+    producer => "SQL::Translator::Producer::${type}",
+    %$sqltargs,
+    parser => 'SQL::Translator::Parser::DBIx::Class',
+    data => $schema,
+  );
+  return $tr->translate;
 }
 
 sub deploy {
@@ -2090,7 +2523,9 @@ sub deploy {
     return if $line =~ /^\s+$/; # skip whitespace only
     $self->_query_start($line);
     eval {
-      $self->dbh->do($line); # shouldn't be using ->dbh ?
+      # do a dbh_do cycle here, as we need some error checking in
+      # place (even though we will ignore errors)
+      $self->dbh_do (sub { $_[1]->do($line) });
     };
     if ($@) {
       carp qq{$@ (running "${line}")};
@@ -2119,7 +2554,6 @@ Returns the datetime parser class
 sub datetime_parser {
   my $self = shift;
   return $self->{datetime_parser} ||= do {
-    $self->ensure_connected;
     $self->build_datetime_parser(@_);
   };
 }
@@ -2140,28 +2574,17 @@ See L</datetime_parser>
 =cut
 
 sub build_datetime_parser {
+  if (not $_[0]->_driver_determined) {
+    $_[0]->_determine_driver;
+    goto $_[0]->can('build_datetime_parser');
+  }
+
   my $self = shift;
   my $type = $self->datetime_parser_type(@_);
-  eval "use ${type}";
-  $self->throw_exception("Couldn't load ${type}: $@") if $@;
+  $self->ensure_class_loaded ($type);
   return $type;
 }
 
-{
-    my $_check_sqlt_version; # private
-    my $_check_sqlt_message; # private
-    sub _check_sqlt_version {
-        return $_check_sqlt_version if defined $_check_sqlt_version;
-        eval 'use SQL::Translator "0.09003"';
-        $_check_sqlt_message = $@ || '';
-        $_check_sqlt_version = !$@;
-    }
-
-    sub _check_sqlt_message {
-        _check_sqlt_version if !defined $_check_sqlt_message;
-        $_check_sqlt_message;
-    }
-}
 
 =head2 is_replicating
 
@@ -2173,7 +2596,7 @@ returned by databases that don't support replication.
 
 sub is_replicating {
     return;
-    
+
 }
 
 =head2 lag_behind_master
@@ -2188,10 +2611,44 @@ sub lag_behind_master {
     return;
 }
 
+# SQLT version handling 
+{
+  my $_sqlt_version_ok;     # private 
+  my $_sqlt_version_error;  # private 
+
+  sub _sqlt_version_ok {
+    if (!defined $_sqlt_version_ok) {
+      eval "use SQL::Translator $minimum_sqlt_version";
+      if ($@) {
+        $_sqlt_version_ok = 0;
+        $_sqlt_version_error = $@;
+      }
+      else {
+        $_sqlt_version_ok = 1;
+      }
+    }
+    return $_sqlt_version_ok;
+  }
+
+  sub _sqlt_version_error {
+    shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
+    return $_sqlt_version_error;
+  }
+
+  sub _sqlt_minimum_version { $minimum_sqlt_version };
+}
+
 sub DESTROY {
   my $self = shift;
-  return if !$self->_dbh;
-  $self->_verify_pid;
+
+  $self->_verify_pid if $self->_dbh;
+
+  # some databases need this to stop spewing warnings
+  if (my $dbh = $self->_dbh) {
+    local $@;
+    eval { $dbh->disconnect };
+  }
+
   $self->_dbh(undef);
 }
 
@@ -2203,7 +2660,7 @@ sub DESTROY {
 
 DBIx::Class can do some wonderful magic with handling exceptions,
 disconnections, and transactions when you use C<< AutoCommit => 1 >>
-combined with C<txn_do> for transaction support.
+(the default) combined with C<txn_do> for transaction support.
 
 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
 in an assumed transaction between commits, and you're telling us you'd
@@ -2215,7 +2672,6 @@ cases if you choose the C<< AutoCommit => 0 >> path, just as you would
 be with raw DBI.
 
 
-
 =head1 AUTHORS
 
 Matt S. Trout <mst@shadowcatsystems.co.uk>