Conversion of eval => try (part 1)
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 2939f4a..98c9f91 100644 (file)
@@ -15,15 +15,14 @@ use Scalar::Util();
 use List::Util();
 use Data::Dumper::Concise();
 use Sub::Name ();
+use Try::Tiny;
 
-# what version of sqlt do we require if deploy() without a ddl_dir is invoked
-# when changing also adjust the corresponding author_require in Makefile.PL
-my $minimum_sqlt_version = '0.11002';
-
+use File::Path ();
 
 __PACKAGE__->mk_group_accessors('simple' =>
   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
-     _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
+     _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints
+     _server_info_hash/
 );
 
 # the values for these accessors are picked out (and deleted) from
@@ -38,13 +37,16 @@ __PACKAGE__->mk_group_accessors('simple' => @storage_options);
 # default cursor class, overridable in connect_info attributes
 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
 
-__PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
+__PACKAGE__->mk_group_accessors('inherited' => qw/
+  sql_maker_class
+  _supports_insert_returning
+/);
 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
 
-
 # Each of these methods need _determine_driver called before itself
 # in order to function reliably. This is a purely DRY optimization
 my @rdbms_specific_methods = qw/
+  deployment_statements
   sqlt_type
   build_datetime_parser
   datetime_parser_type
@@ -93,7 +95,7 @@ DBIx::Class::Storage::DBI - DBI storage handler
   );
 
   $schema->resultset('Book')->search({
-     written_on => $schema->storage->datetime_parser(DateTime->now)
+     written_on => $schema->storage->datetime_parser->format_datetime(DateTime->now)
   });
 
 =head1 DESCRIPTION
@@ -115,9 +117,101 @@ sub new {
   $new->{_in_dbh_do} = 0;
   $new->{_dbh_gen} = 0;
 
+  # read below to see what this does
+  $new->_arm_global_destructor;
+
   $new;
 }
 
+# This is hack to work around perl shooting stuff in random
+# order on exit(). If we do not walk the remaining storage
+# objects in an END block, there is a *small but real* chance
+# of a fork()ed child to kill the parent's shared DBI handle,
+# *before perl reaches the DESTROY in this package*
+# Yes, it is ugly and effective.
+{
+  my %seek_and_destroy;
+
+  sub _arm_global_destructor {
+    my $self = shift;
+    my $key = Scalar::Util::refaddr ($self);
+    $seek_and_destroy{$key} = $self;
+    Scalar::Util::weaken ($seek_and_destroy{$key});
+  }
+
+  END {
+    local $?; # just in case the DBI destructor changes it somehow
+
+    # destroy just the object if not native to this process/thread
+    $_->_preserve_foreign_dbh for (grep
+      { defined $_ }
+      values %seek_and_destroy
+    );
+  }
+}
+
+sub DESTROY {
+  my $self = shift;
+
+  # destroy just the object if not native to this process/thread
+  $self->_preserve_foreign_dbh;
+
+  # some databases need this to stop spewing warnings
+  if (my $dbh = $self->_dbh) {
+    try {
+      %{ $dbh->{CachedKids} } = ();
+      $dbh->disconnect;
+    };
+  }
+
+  $self->_dbh(undef);
+}
+
+sub _preserve_foreign_dbh {
+  my $self = shift;
+
+  return unless $self->_dbh;
+
+  $self->_verify_tid;
+
+  return unless $self->_dbh;
+
+  $self->_verify_pid;
+
+}
+
+# handle pid changes correctly - do not destroy parent's connection
+sub _verify_pid {
+  my $self = shift;
+
+  return if ( defined $self->_conn_pid and $self->_conn_pid == $$ );
+
+  $self->_dbh->{InactiveDestroy} = 1;
+  $self->_dbh(undef);
+  $self->{_dbh_gen}++;
+
+  return;
+}
+
+# very similar to above, but seems to FAIL if I set InactiveDestroy
+sub _verify_tid {
+  my $self = shift;
+
+  if ( ! defined $self->_conn_tid ) {
+    return; # no threads
+  }
+  elsif ( $self->_conn_tid == threads->tid ) {
+    return; # same thread
+  }
+
+  #$self->_dbh->{InactiveDestroy} = 1;  # why does t/51threads.t fail...?
+  $self->_dbh(undef);
+  $self->{_dbh_gen}++;
+
+  return;
+}
+
+
 =head2 connect_info
 
 This method is normally called by L<DBIx::Class::Schema/connection>, which
@@ -195,7 +289,7 @@ for most DBDs. See L</DBIx::Class and AutoCommit> for details.
 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
 the following connection options. These options can be mixed in with your other
-L<DBI> connection attributes, or placed in a seperate hashref
+L<DBI> connection attributes, or placed in a separate hashref
 (C<\%extra_attributes>) as shown above.
 
 Every time C<connect_info> is invoked, any previous settings for
@@ -347,7 +441,7 @@ SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
 =item name_sep
 
 This only needs to be used in conjunction with C<quote_char>, and is used to
-specify the charecter that seperates elements (schemas, tables, columns) from
+specify the character that separates elements (schemas, tables, columns) from
 each other. In most cases this is simply a C<.>.
 
 The consequences of not supplying this value is that L<SQL::Abstract>
@@ -451,13 +545,50 @@ L<DBIx::Class::Schema/connect>
 =cut
 
 sub connect_info {
-  my ($self, $info_arg) = @_;
+  my ($self, $info) = @_;
 
-  return $self->_connect_info if !$info_arg;
+  return $self->_connect_info if !$info;
 
-  my @args = @$info_arg;  # take a shallow copy for further mutilation
-  $self->_connect_info([@args]); # copy for _connect_info
+  $self->_connect_info($info); # copy for _connect_info
+
+  $info = $self->_normalize_connect_info($info)
+    if ref $info eq 'ARRAY';
+
+  for my $storage_opt (keys %{ $info->{storage_options} }) {
+    my $value = $info->{storage_options}{$storage_opt};
+
+    $self->$storage_opt($value);
+  }
+
+  # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
+  #  the new set of options
+  $self->_sql_maker(undef);
+  $self->_sql_maker_opts({});
+
+  for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
+    my $value = $info->{sql_maker_options}{$sql_maker_opt};
+
+    $self->_sql_maker_opts->{$sql_maker_opt} = $value;
+  }
 
+  my %attrs = (
+    %{ $self->_default_dbi_connect_attributes || {} },
+    %{ $info->{attributes} || {} },
+  );
+
+  my @args = @{ $info->{arguments} };
+
+  $self->_dbi_connect_info([@args,
+    %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
+
+  return $self->_connect_info;
+}
+
+sub _normalize_connect_info {
+  my ($self, $info_arg) = @_;
+  my %info;
+
+  my @args = @$info_arg;  # take a shallow copy for further mutilation
 
   # combine/pre-parse arguments depending on invocation style
 
@@ -494,36 +625,23 @@ sub connect_info {
     @args = @args[0,1,2];
   }
 
-  # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
-  #  the new set of options
-  $self->_sql_maker(undef);
-  $self->_sql_maker_opts({});
+  $info{arguments} = \@args;
 
-  if(keys %attrs) {
-    for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
-      if(my $value = delete $attrs{$storage_opt}) {
-        $self->$storage_opt($value);
-      }
-    }
-    for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
-      if(my $opt_val = delete $attrs{$sql_maker_opt}) {
-        $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
-      }
-    }
-  }
+  my @storage_opts = grep exists $attrs{$_},
+    @storage_options, 'cursor_class';
 
-  if (ref $args[0] eq 'CODE') {
-    # _connect() never looks past $args[0] in this case
-    %attrs = ()
-  } else {
-    %attrs = (
-      %{ $self->_default_dbi_connect_attributes || {} },
-      %attrs,
-    );
-  }
+  @{ $info{storage_options} }{@storage_opts} =
+    delete @attrs{@storage_opts} if @storage_opts;
+
+  my @sql_maker_opts = grep exists $attrs{$_},
+    qw/limit_dialect quote_char name_sep/;
+
+  @{ $info{sql_maker_options} }{@sql_maker_opts} =
+    delete @attrs{@sql_maker_opts} if @sql_maker_opts;
+
+  $info{attributes} = \%attrs if %attrs;
 
-  $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
-  $self->_connect_info;
+  return \%info;
 }
 
 sub _default_dbi_connect_attributes {
@@ -722,6 +840,7 @@ sub disconnect {
 
     $self->_dbh_rollback unless $self->_dbh_autocommit;
 
+    %{ $self->_dbh->{CachedKids} } = ();
     $self->_dbh->disconnect;
     $self->_dbh(undef);
     $self->{_dbh_gen}++;
@@ -759,8 +878,8 @@ sub with_deferred_fk_checks {
 
 =back
 
-Verifies that the the current database handle is active and ready to execute
-an SQL statement (i.e. the connection did not get stale, server is still
+Verifies that the current database handle is active and ready to execute
+an SQL statement (e.g. the connection did not get stale, server is still
 answering, etc.) This method is used internally by L</dbh>.
 
 =cut
@@ -778,19 +897,11 @@ sub connected {
 sub _seems_connected {
   my $self = shift;
 
+  $self->_preserve_foreign_dbh;
+
   my $dbh = $self->_dbh
     or return 0;
 
-  if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
-    $self->_dbh(undef);
-    $self->{_dbh_gen}++;
-    return 0;
-  }
-  else {
-    $self->_verify_pid;
-    return 0 if !$self->_dbh;
-  }
-
   return $dbh->FETCH('Active');
 }
 
@@ -802,20 +913,6 @@ sub _ping {
   return $dbh->ping;
 }
 
-# handle pid changes correctly
-#  NOTE: assumes $self->_dbh is a valid $dbh
-sub _verify_pid {
-  my ($self) = @_;
-
-  return if defined $self->_conn_pid && $self->_conn_pid == $$;
-
-  $self->_dbh->{InactiveDestroy} = 1;
-  $self->_dbh(undef);
-  $self->{_dbh_gen}++;
-
-  return;
-}
-
 sub ensure_connected {
   my ($self) = @_;
 
@@ -829,7 +926,7 @@ sub ensure_connected {
 Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
 is guaranteed to be healthy by implicitly calling L</connected>, and if
 necessary performing a reconnection before returning. Keep in mind that this
-is very B<expensive> on some database engines. Consider using L<dbh_do>
+is very B<expensive> on some database engines. Consider using L</dbh_do>
 instead.
 
 =cut
@@ -848,7 +945,7 @@ sub dbh {
 # this is the internal "get dbh or connect (don't check)" method
 sub _get_dbh {
   my $self = shift;
-  $self->_verify_pid if $self->_dbh;
+  $self->_preserve_foreign_dbh;
   $self->_populate_dbh unless $self->_dbh;
   return $self->_dbh;
 }
@@ -883,6 +980,7 @@ sub _populate_dbh {
 
   my @info = @{$self->_dbi_connect_info || []};
   $self->_dbh(undef); # in case ->connected failed we might get sent here
+  $self->_server_info_hash (undef);
   $self->_dbh($self->_connect(@info));
 
   $self->_conn_pid($$);
@@ -907,6 +1005,51 @@ sub _run_connection_actions {
   $self->_do_connection_actions(connect_call_ => $_) for @actions;
 }
 
+sub _server_info {
+  my $self = shift;
+
+  unless ($self->_server_info_hash) {
+
+    my %info;
+
+    my $server_version = do {
+      local $@; # might be happenin in some sort of destructor
+      eval { $self->_get_server_version };
+    };
+
+    if (defined $server_version) {
+      $info{dbms_version} = $server_version;
+
+      my ($numeric_version) = $server_version =~ /^([\d\.]+)/;
+      my @verparts = split (/\./, $numeric_version);
+      if (
+        @verparts
+          &&
+        $verparts[0] <= 999
+      ) {
+        # consider only up to 3 version parts, iff not more than 3 digits
+        my @use_parts;
+        while (@verparts && @use_parts < 3) {
+          my $p = shift @verparts;
+          last if $p > 999;
+          push @use_parts, $p;
+        }
+        push @use_parts, 0 while @use_parts < 3;
+
+        $info{normalized_dbms_version} = sprintf "%d.%03d%03d", @use_parts;
+      }
+    }
+
+    $self->_server_info_hash(\%info);
+  }
+
+  return $self->_server_info_hash
+}
+
+sub _get_server_version {
+  shift->_get_dbh->get_info(18);
+}
+
 sub _determine_driver {
   my ($self) = @_;
 
@@ -929,15 +1072,20 @@ sub _determine_driver {
         else {
           # try to use dsn to not require being connected, the driver may still
           # force a connection in _rebless to determine version
-          ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+          # (dsn may not be supplied at all if all we do is make a mock-schema)
+          my $dsn = $self->_dbi_connect_info->[0] || $ENV{DBI_DSN} || '';
+          ($driver) = $dsn =~ /dbi:([^:]+):/i;
+          $driver ||= $ENV{DBI_DRIVER};
         }
       }
 
-      my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
-      if ($self->load_optional_class($storage_class)) {
-        mro::set_mro($storage_class, 'c3');
-        bless $self, $storage_class;
-        $self->_rebless();
+      if ($driver) {
+        my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
+        if ($self->load_optional_class($storage_class)) {
+          mro::set_mro($storage_class, 'c3');
+          bless $self, $storage_class;
+          $self->_rebless();
+        }
       }
     }
 
@@ -1026,7 +1174,7 @@ sub _connect {
 
   eval {
     if(ref $info[0] eq 'CODE') {
-       $dbh = &{$info[0]}
+       $dbh = $info[0]->();
     }
     else {
        $dbh = DBI->connect(@info);
@@ -1148,6 +1296,11 @@ sub _svp_generate_name {
 
 sub txn_begin {
   my $self = shift;
+
+  # this means we have not yet connected and do not know the AC status
+  # (e.g. coderef $dbh)
+  $self->ensure_connected if (! defined $self->_dbh_autocommit);
+
   if($self->{transaction_depth} == 0) {
     $self->debugobj->txn_begin()
       if $self->debug;
@@ -1333,34 +1486,60 @@ sub _execute {
     $self->dbh_do('_dbh_execute', @_);  # retry over disconnects
 }
 
-sub insert {
+sub _prefetch_insert_auto_nextvals {
   my ($self, $source, $to_insert) = @_;
 
-  my $ident = $source->from;
-  my $bind_attributes = $self->source_bind_attributes($source);
-
-  my $updated_cols = {};
+  my $upd = {};
 
   foreach my $col ( $source->columns ) {
     if ( !defined $to_insert->{$col} ) {
       my $col_info = $source->column_info($col);
 
       if ( $col_info->{auto_nextval} ) {
-        $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
+        $upd->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
           'nextval',
-          $col_info->{sequence} ||
-            $self->_dbh_get_autoinc_seq($self->_get_dbh, $source)
+          $col_info->{sequence} ||=
+            $self->_dbh_get_autoinc_seq($self->_get_dbh, $source, $col)
         );
       }
     }
   }
 
-  $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
+  return $upd;
+}
+
+sub insert {
+  my $self = shift;
+  my ($source, $to_insert, $opts) = @_;
+
+  my $updated_cols = $self->_prefetch_insert_auto_nextvals (@_);
+
+  my $bind_attributes = $self->source_bind_attributes($source);
+
+  my ($rv, $sth) = $self->_execute('insert' => [], $source, $bind_attributes, $to_insert, $opts);
+
+  if ($opts->{returning}) {
+    my @ret_cols = @{$opts->{returning}};
+
+    my @ret_vals = eval {
+      local $SIG{__WARN__} = sub {};
+      my @r = $sth->fetchrow_array;
+      $sth->finish;
+      @r;
+    };
+
+    my %ret;
+    @ret{@ret_cols} = @ret_vals if (@ret_vals);
+
+    $updated_cols = {
+      %$updated_cols,
+      %ret,
+    };
+  }
 
   return $updated_cols;
 }
 
-## Still not quite perfect, and EXPERIMENTAL
 ## Currently it is assumed that all values passed will be "normal", i.e. not
 ## scalar refs, or at least, all the same type as the first set, the statement is
 ## only prepped once.
@@ -1440,9 +1619,13 @@ sub insert_bulk {
     );
   }
 
+  # neither _execute_array, nor _execute_inserts_with_no_binds are
+  # atomic (even if _execute _array is a single call). Thus a safety
+  # scope guard
+  my $guard = $self->txn_scope_guard;
+
   $self->_query_start( $sql, ['__BULK__'] );
   my $sth = $self->sth($sql);
-
   my $rv = do {
     if ($empty_bind) {
       # bind_param_array doesn't work if there are no binds
@@ -1456,14 +1639,14 @@ sub insert_bulk {
 
   $self->_query_end( $sql, ['__BULK__'] );
 
+  $guard->commit;
+
   return (wantarray ? ($rv, $sth, @bind) : $rv);
 }
 
 sub _execute_array {
   my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
 
-  my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
-
   ## This must be an arrayref, else nothing works!
   my $tuple_status = [];
 
@@ -1485,7 +1668,11 @@ sub _execute_array {
 
     my @data = map { $_->[$data_index] } @$data;
 
-    $sth->bind_param_array( $placeholder_index, [@data], $attributes );
+    $sth->bind_param_array(
+      $placeholder_index,
+      [@data],
+      (%$attributes ?  $attributes : ()),
+    );
     $placeholder_index++;
   }
 
@@ -1512,9 +1699,6 @@ sub _execute_array {
       }),
     );
   }
-
-  $guard->commit if $guard;
-
   return $rv;
 }
 
@@ -1527,8 +1711,6 @@ sub _dbh_execute_array {
 sub _dbh_execute_inserts_with_no_binds {
   my ($self, $sth, $count) = @_;
 
-  my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
-
   eval {
     my $dbh = $self->_get_dbh;
     local $dbh->{RaiseError} = 1;
@@ -1544,13 +1726,11 @@ sub _dbh_execute_inserts_with_no_binds {
 
   $self->throw_exception($exception) if $exception;
 
-  $guard->commit if $guard;
-
   return $count;
 }
 
 sub update {
-  my ($self, $source, @args) = @_; 
+  my ($self, $source, @args) = @_;
 
   my $bind_attrs = $self->source_bind_attributes($source);
 
@@ -1580,15 +1760,7 @@ sub _subq_update_delete {
   my $rsrc = $rs->result_source;
 
   # quick check if we got a sane rs on our hands
-  my @pcols = $rsrc->primary_columns;
-  unless (@pcols) {
-    $self->throw_exception (
-      sprintf (
-        "You must declare primary key(s) on source '%s' (via set_primary_key) in order to update or delete complex resultsets",
-        $rsrc->source_name || $rsrc->from
-      )
-    );
-  }
+  my @pcols = $rsrc->_pri_cols;
 
   my $sel = $rs->_resolved_attrs->{select};
   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
@@ -1641,7 +1813,7 @@ sub _per_row_update_delete {
   my ($rs, $op, $values) = @_;
 
   my $rsrc = $rs->result_source;
-  my @pcols = $rsrc->primary_columns;
+  my @pcols = $rsrc->_pri_cols;
 
   my $guard = $self->txn_scope_guard;
 
@@ -1649,11 +1821,12 @@ sub _per_row_update_delete {
   my $row_cnt = '0E0';
 
   my $subrs_cur = $rs->cursor;
-  while (my @pks = $subrs_cur->next) {
+  my @all_pk = $subrs_cur->all;
+  for my $pks ( @all_pk) {
 
     my $cond;
     for my $i (0.. $#pcols) {
-      $cond->{$pcols[$i]} = $pks[$i];
+      $cond->{$pcols[$i]} = $pks->[$i];
     }
 
     $self->$op (
@@ -1672,31 +1845,18 @@ sub _per_row_update_delete {
 
 sub _select {
   my $self = shift;
-
-  # localization is neccessary as
-  # 1) there is no infrastructure to pass this around before SQLA2
-  # 2) _select_args sets it and _prep_for_execute consumes it
-  my $sql_maker = $self->sql_maker;
-  local $sql_maker->{_dbic_rs_attrs};
-
-  return $self->_execute($self->_select_args(@_));
+  $self->_execute($self->_select_args(@_));
 }
 
 sub _select_args_to_query {
   my $self = shift;
 
-  # localization is neccessary as
-  # 1) there is no infrastructure to pass this around before SQLA2
-  # 2) _select_args sets it and _prep_for_execute consumes it
-  my $sql_maker = $self->sql_maker;
-  local $sql_maker->{_dbic_rs_attrs};
-
-  # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
+  # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $rs_attrs, $rows, $offset)
   #  = $self->_select_args($ident, $select, $cond, $attrs);
   my ($op, $bind, $ident, $bind_attrs, @args) =
     $self->_select_args(@_);
 
-  # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $order, $rows, $offset ]);
+  # my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, [ $select, $cond, $rs_attrs, $rows, $offset ]);
   my ($sql, $prepared_bind) = $self->_prep_for_execute($op, $bind, $ident, \@args);
   $prepared_bind ||= [];
 
@@ -1709,16 +1869,16 @@ sub _select_args_to_query {
 sub _select_args {
   my ($self, $ident, $select, $where, $attrs) = @_;
 
+  my $sql_maker = $self->sql_maker;
   my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
 
-  my $sql_maker = $self->sql_maker;
-  $sql_maker->{_dbic_rs_attrs} = {
+  $attrs = {
     %$attrs,
     select => $select,
     from => $ident,
     where => $where,
-    $rs_alias
-      ? ( _source_handle => $alias2source->{$rs_alias}->handle )
+    $rs_alias && $alias2source->{$rs_alias}
+      ? ( _rsroot_source_handle => $alias2source->{$rs_alias}->handle )
       : ()
     ,
   };
@@ -1768,11 +1928,13 @@ sub _select_args {
 
   my @limit;
 
-  # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
-  # otherwise delegate the limiting to the storage, unless software limit was requested
+  # see if we need to tear the prefetch apart otherwise delegate the limiting to the
+  # storage, unless software limit was requested
   if (
+    #limited has_many
     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
        ||
+    # grouped prefetch (to satisfy group_by == select)
     ( $attrs->{group_by}
         &&
       @{$attrs->{group_by}}
@@ -1782,47 +1944,16 @@ sub _select_args {
       @{$attrs->{_prefetch_select}}
     )
   ) {
-
     ($ident, $select, $where, $attrs)
       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
   }
-
-  elsif (
-    ($attrs->{rows} || $attrs->{offset})
-      &&
-    $sql_maker->limit_dialect eq 'RowNumberOver'
-      &&
-    (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
-      &&
-    scalar $sql_maker->_order_by_chunks ($attrs->{order_by})
-  ) {
-    # the RNO limit dialect above mangles the SQL such that the join gets lost
-    # wrap a subquery here
-
-    push @limit, delete @{$attrs}{qw/rows offset/};
-
-    my $subq = $self->_select_args_to_query (
-      $ident,
-      $select,
-      $where,
-      $attrs,
-    );
-
-    $ident = {
-      -alias => $attrs->{alias},
-      -source_handle => $ident->[0]{-source_handle},
-      $attrs->{alias} => $subq,
-    };
-
-    # all part of the subquery now
-    delete @{$attrs}{qw/order_by group_by having/};
-    $where = undef;
-  }
-
   elsif (! $attrs->{software_limit} ) {
     push @limit, $attrs->{rows}, $attrs->{offset};
   }
 
+  # try to simplify the joinmap further (prune unreferenced type-single joins)
+  $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
+
 ###
   # This would be the point to deflate anything found in $where
   # (and leave $attrs->{bind} intact). Problem is - inflators historically
@@ -1833,12 +1964,7 @@ sub _select_args {
   # invoked, and that's just bad...
 ###
 
-  my $order = { map
-    { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
-    (qw/order_by group_by having/ )
-  };
-
-  return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
+  return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $attrs, @limit);
 }
 
 # Returns a counting SELECT for a simple count
@@ -1850,20 +1976,6 @@ sub _count_select {
   return { count => '*' };
 }
 
-# Returns a SELECT which will end up in the subselect
-# There may or may not be a group_by, as the subquery
-# might have been called to accomodate a limit
-#
-# Most databases would be happy with whatever ends up
-# here, but some choke in various ways.
-#
-sub _subq_count_select {
-  my ($self, $source, $rs_attrs) = @_;
-  return $rs_attrs->{group_by} if $rs_attrs->{group_by};
-
-  my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
-  return @pcols ? \@pcols : [ 1 ];
-}
 
 sub source_bind_attributes {
   my ($self, $source) = @_;
@@ -2011,18 +2123,14 @@ Return the row id of the last insert.
 =cut
 
 sub _dbh_last_insert_id {
-    # All Storage's need to register their own _dbh_last_insert_id
-    # the old SQLite-based method was highly inappropriate
+    my ($self, $dbh, $source, $col) = @_;
 
-    my $self = shift;
-    my $class = ref $self;
-    $self->throw_exception (<<EOE);
+    my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
 
-No _dbh_last_insert_id() method found in $class.
-Since the method of obtaining the autoincrement id of the last insert
-operation varies greatly between different databases, this method must be
-individually implemented for every storage class.
-EOE
+    return $id if defined $id;
+
+    my $class = ref $self;
+    $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
 }
 
 sub last_insert_id {
@@ -2136,7 +2244,7 @@ sub is_datatype_numeric {
 }
 
 
-=head2 create_ddl_dir (EXPERIMENTAL)
+=head2 create_ddl_dir
 
 =over 4
 
@@ -2188,20 +2296,24 @@ hashref like the following
  { ignore_constraint_names => 0, # ... other options }
 
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly
-across all databases, or fully handle complex relationships.
-
-WARNING: Please check all SQL files created, before applying them.
+WARNING: You are strongly advised to check all SQL files created, before applying
+them.
 
 =cut
 
 sub create_ddl_dir {
   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
 
-  if(!$dir || !-d $dir) {
+  unless ($dir) {
     carp "No directory given, using ./\n";
-    $dir = "./";
+    $dir = './';
+  } else {
+      -d $dir or File::Path::mkpath($dir)
+          or $self->throw_exception("create_ddl_dir: $! creating dir '$dir'");
   }
+
+  $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
+
   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
 
@@ -2215,8 +2327,9 @@ sub create_ddl_dir {
     %{$sqltargs || {}}
   };
 
-  $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
-    if !$self->_sqlt_version_ok;
+  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
+    $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  }
 
   my $sqlt = SQL::Translator->new( $sqltargs );
 
@@ -2358,8 +2471,9 @@ sub deployment_statements {
       return join('', @rows);
   }
 
-  $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
-    if !$self->_sqlt_version_ok;
+  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
+    $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  }
 
   # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
@@ -2409,7 +2523,7 @@ sub deploy {
     }
     $self->_query_end($line);
   };
-  my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
+  my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
   if (@statements > 1) {
     foreach my $statement (@statements) {
       $deploy->( $statement );
@@ -2483,33 +2597,6 @@ sub lag_behind_master {
     return;
 }
 
-# SQLT version handling
-{
-  my $_sqlt_version_ok;     # private
-  my $_sqlt_version_error;  # private
-
-  sub _sqlt_version_ok {
-    if (!defined $_sqlt_version_ok) {
-      eval "use SQL::Translator $minimum_sqlt_version";
-      if ($@) {
-        $_sqlt_version_ok = 0;
-        $_sqlt_version_error = $@;
-      }
-      else {
-        $_sqlt_version_ok = 1;
-      }
-    }
-    return $_sqlt_version_ok;
-  }
-
-  sub _sqlt_version_error {
-    shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
-    return $_sqlt_version_error;
-  }
-
-  sub _sqlt_minimum_version { $minimum_sqlt_version };
-}
-
 =head2 relname_to_table_alias
 
 =over 4
@@ -2524,8 +2611,8 @@ queries.
 This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
 way these aliases are named.
 
-The default behavior is C<"$relname_$join_count" if $join_count > 1>, otherwise
-C<"$relname">.
+The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
+otherwise C<"$relname">.
 
 =cut
 
@@ -2538,20 +2625,6 @@ sub relname_to_table_alias {
   return $alias;
 }
 
-sub DESTROY {
-  my $self = shift;
-
-  $self->_verify_pid if $self->_dbh;
-
-  # some databases need this to stop spewing warnings
-  if (my $dbh = $self->_dbh) {
-    local $@;
-    eval { $dbh->disconnect };
-  }
-
-  $self->_dbh(undef);
-}
-
 1;
 
 =head1 USAGE NOTES