Fix RT54063
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 01adf7d..7400316 100644 (file)
@@ -4,7 +4,7 @@ package DBIx::Class::Storage::DBI;
 use strict;
 use warnings;
 
-use base 'DBIx::Class::Storage';
+use base qw/DBIx::Class::Storage::DBIHacks DBIx::Class::Storage/;
 use mro 'c3';
 
 use Carp::Clan qw/^DBIx::Class/;
@@ -13,11 +13,8 @@ use DBIx::Class::Storage::DBI::Cursor;
 use DBIx::Class::Storage::Statistics;
 use Scalar::Util();
 use List::Util();
-
-# what version of sqlt do we require if deploy() without a ddl_dir is invoked
-# when changing also adjust the corresponding author_require in Makefile.PL
-my $minimum_sqlt_version = '0.11002';
-
+use Data::Dumper::Concise();
+use Sub::Name ();
 
 __PACKAGE__->mk_group_accessors('simple' =>
   qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
@@ -40,6 +37,39 @@ __PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks');
 
 
+# Each of these methods need _determine_driver called before itself
+# in order to function reliably. This is a purely DRY optimization
+my @rdbms_specific_methods = qw/
+  deployment_statements
+  sqlt_type
+  build_datetime_parser
+  datetime_parser_type
+
+  insert
+  insert_bulk
+  update
+  delete
+  select
+  select_single
+/;
+
+for my $meth (@rdbms_specific_methods) {
+
+  my $orig = __PACKAGE__->can ($meth)
+    or next;
+
+  no strict qw/refs/;
+  no warnings qw/redefine/;
+  *{__PACKAGE__ ."::$meth"} = Sub::Name::subname $meth => sub {
+    if (not $_[0]->_driver_determined) {
+      $_[0]->_determine_driver;
+      goto $_[0]->can($meth);
+    }
+    $orig->(@_);
+  };
+}
+
+
 =head1 NAME
 
 DBIx::Class::Storage::DBI - DBI storage handler
@@ -161,7 +191,7 @@ for most DBDs. See L</DBIx::Class and AutoCommit> for details.
 In addition to the standard L<DBI|DBI/ATTRIBUTES_COMMON_TO_ALL_HANDLES>
 L<connection|DBI/Database_Handle_Attributes> attributes, DBIx::Class recognizes
 the following connection options. These options can be mixed in with your other
-L<DBI> connection attributes, or placed in a seperate hashref
+L<DBI> connection attributes, or placed in a separate hashref
 (C<\%extra_attributes>) as shown above.
 
 Every time C<connect_info> is invoked, any previous settings for
@@ -313,7 +343,7 @@ SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
 =item name_sep
 
 This only needs to be used in conjunction with C<quote_char>, and is used to
-specify the charecter that seperates elements (schemas, tables, columns) from
+specify the character that separates elements (schemas, tables, columns) from
 each other. In most cases this is simply a C<.>.
 
 The consequences of not supplying this value is that L<SQL::Abstract>
@@ -417,13 +447,50 @@ L<DBIx::Class::Schema/connect>
 =cut
 
 sub connect_info {
-  my ($self, $info_arg) = @_;
+  my ($self, $info) = @_;
 
-  return $self->_connect_info if !$info_arg;
+  return $self->_connect_info if !$info;
 
-  my @args = @$info_arg;  # take a shallow copy for further mutilation
-  $self->_connect_info([@args]); # copy for _connect_info
+  $self->_connect_info($info); # copy for _connect_info
 
+  $info = $self->_normalize_connect_info($info)
+    if ref $info eq 'ARRAY';
+
+  for my $storage_opt (keys %{ $info->{storage_options} }) {
+    my $value = $info->{storage_options}{$storage_opt};
+
+    $self->$storage_opt($value);
+  }
+
+  # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
+  #  the new set of options
+  $self->_sql_maker(undef);
+  $self->_sql_maker_opts({});
+
+  for my $sql_maker_opt (keys %{ $info->{sql_maker_options} }) {
+    my $value = $info->{sql_maker_options}{$sql_maker_opt};
+
+    $self->_sql_maker_opts->{$sql_maker_opt} = $value;
+  }
+
+  my %attrs = (
+    %{ $self->_default_dbi_connect_attributes || {} },
+    %{ $info->{attributes} || {} },
+  );
+
+  my @args = @{ $info->{arguments} };
+
+  $self->_dbi_connect_info([@args,
+    %attrs && !(ref $args[0] eq 'CODE') ? \%attrs : ()]);
+
+  return $self->_connect_info;
+}
+
+sub _normalize_connect_info {
+  my ($self, $info_arg) = @_;
+  my %info;
+
+  my @args = @$info_arg;  # take a shallow copy for further mutilation
 
   # combine/pre-parse arguments depending on invocation style
 
@@ -460,36 +527,23 @@ sub connect_info {
     @args = @args[0,1,2];
   }
 
-  # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
-  #  the new set of options
-  $self->_sql_maker(undef);
-  $self->_sql_maker_opts({});
+  $info{arguments} = \@args;
 
-  if(keys %attrs) {
-    for my $storage_opt (@storage_options, 'cursor_class') {    # @storage_options is declared at the top of the module
-      if(my $value = delete $attrs{$storage_opt}) {
-        $self->$storage_opt($value);
-      }
-    }
-    for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
-      if(my $opt_val = delete $attrs{$sql_maker_opt}) {
-        $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
-      }
-    }
-  }
+  my @storage_opts = grep exists $attrs{$_},
+    @storage_options, 'cursor_class';
 
-  if (ref $args[0] eq 'CODE') {
-    # _connect() never looks past $args[0] in this case
-    %attrs = ()
-  } else {
-    %attrs = (
-      %{ $self->_default_dbi_connect_attributes || {} },
-      %attrs,
-    );
-  }
+  @{ $info{storage_options} }{@storage_opts} =
+    delete @attrs{@storage_opts} if @storage_opts;
+
+  my @sql_maker_opts = grep exists $attrs{$_},
+    qw/limit_dialect quote_char name_sep/;
+
+  @{ $info{sql_maker_options} }{@sql_maker_opts} =
+    delete @attrs{@sql_maker_opts} if @sql_maker_opts;
 
-  $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
-  $self->_connect_info;
+  $info{attributes} = \%attrs if %attrs;
+
+  return \%info;
 }
 
 sub _default_dbi_connect_attributes {
@@ -712,7 +766,6 @@ in MySQL's case disabled entirely.
 # Storage subclasses should override this
 sub with_deferred_fk_checks {
   my ($self, $sub) = @_;
-
   $sub->();
 }
 
@@ -726,8 +779,8 @@ sub with_deferred_fk_checks {
 
 =back
 
-Verifies that the the current database handle is active and ready to execute
-an SQL statement (i.e. the connection did not get stale, server is still
+Verifies that the current database handle is active and ready to execute
+an SQL statement (e.g. the connection did not get stale, server is still
 answering, etc.) This method is used internally by L</dbh>.
 
 =cut
@@ -878,13 +931,14 @@ sub _determine_driver {
   my ($self) = @_;
 
   if ((not $self->_driver_determined) && (not $self->{_in_determine_driver})) {
-    my $started_unconnected = 0;
+    my $started_connected = 0;
     local $self->{_in_determine_driver} = 1;
 
     if (ref($self) eq __PACKAGE__) {
       my $driver;
       if ($self->_dbh) { # we are connected
         $driver = $self->_dbh->{Driver}{Name};
+        $started_connected = 1;
       } else {
         # if connect_info is a CODEREF, we have no choice but to connect
         if (ref $self->_dbi_connect_info->[0] &&
@@ -895,16 +949,19 @@ sub _determine_driver {
         else {
           # try to use dsn to not require being connected, the driver may still
           # force a connection in _rebless to determine version
-          ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
-          $started_unconnected = 1;
+          # (dsn may not be supplied at all if all we do is make a mock-schema)
+          my $dsn = $self->_dbi_connect_info->[0] || '';
+          ($driver) = $dsn =~ /dbi:([^:]+):/i;
         }
       }
 
-      my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
-      if ($self->load_optional_class($storage_class)) {
-        mro::set_mro($storage_class, 'c3');
-        bless $self, $storage_class;
-        $self->_rebless();
+      if ($driver) {
+        my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
+        if ($self->load_optional_class($storage_class)) {
+          mro::set_mro($storage_class, 'c3');
+          bless $self, $storage_class;
+          $self->_rebless();
+        }
       }
     }
 
@@ -913,7 +970,7 @@ sub _determine_driver {
     $self->_init; # run driver-specific initializations
 
     $self->_run_connection_actions
-        if $started_unconnected && defined $self->_dbh;
+        if !$started_connected && defined $self->_dbh;
   }
 }
 
@@ -993,7 +1050,7 @@ sub _connect {
 
   eval {
     if(ref $info[0] eq 'CODE') {
-       $dbh = &{$info[0]}
+       $dbh = $info[0]->();
     }
     else {
        $dbh = DBI->connect(@info);
@@ -1115,6 +1172,11 @@ sub _svp_generate_name {
 
 sub txn_begin {
   my $self = shift;
+
+  # this means we have not yet connected and do not know the AC status
+  # (e.g. coderef $dbh)
+  $self->ensure_connected if (! defined $self->_dbh_autocommit);
+
   if($self->{transaction_depth} == 0) {
     $self->debugobj->txn_begin()
       if $self->debug;
@@ -1144,7 +1206,6 @@ sub _dbh_begin_work {
 sub txn_commit {
   my $self = shift;
   if ($self->{transaction_depth} == 1) {
-    my $dbh = $self->_dbh;
     $self->debugobj->txn_commit()
       if ($self->debug);
     $self->_dbh_commit;
@@ -1160,7 +1221,9 @@ sub txn_commit {
 
 sub _dbh_commit {
   my $self = shift;
-  $self->_dbh->commit;
+  my $dbh  = $self->_dbh
+    or $self->throw_exception('cannot COMMIT on a disconnected handle');
+  $dbh->commit;
 }
 
 sub txn_rollback {
@@ -1197,7 +1260,9 @@ sub txn_rollback {
 
 sub _dbh_rollback {
   my $self = shift;
-  $self->_dbh->rollback;
+  my $dbh  = $self->_dbh
+    or $self->throw_exception('cannot ROLLBACK on a disconnected handle');
+  $dbh->rollback;
 }
 
 # This used to be the top-half of _execute.  It was split out to make it
@@ -1300,12 +1365,6 @@ sub _execute {
 sub insert {
   my ($self, $source, $to_insert) = @_;
 
-# redispatch to insert method of storage we reblessed into, if necessary
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can('insert');
-  }
-
   my $ident = $source->from;
   my $bind_attributes = $self->source_bind_attributes($source);
 
@@ -1330,33 +1389,113 @@ sub insert {
   return $updated_cols;
 }
 
-## Still not quite perfect, and EXPERIMENTAL
 ## Currently it is assumed that all values passed will be "normal", i.e. not
 ## scalar refs, or at least, all the same type as the first set, the statement is
 ## only prepped once.
 sub insert_bulk {
   my ($self, $source, $cols, $data) = @_;
 
-# redispatch to insert_bulk method of storage we reblessed into, if necessary
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can('insert_bulk');
-  }
-
   my %colvalues;
-  my $table = $source->from;
   @colvalues{@$cols} = (0..$#$cols);
 
+  for my $i (0..$#$cols) {
+    my $first_val = $data->[0][$i];
+    next unless ref $first_val eq 'SCALAR';
+
+    $colvalues{ $cols->[$i] } = $first_val;
+  }
+
+  # check for bad data and stringify stringifiable objects
+  my $bad_slice = sub {
+    my ($msg, $col_idx, $slice_idx) = @_;
+    $self->throw_exception(sprintf "%s for column '%s' in populate slice:\n%s",
+      $msg,
+      $cols->[$col_idx],
+      do {
+        local $Data::Dumper::Maxdepth = 1; # don't dump objects, if any
+        Data::Dumper::Concise::Dumper({
+          map { $cols->[$_] => $data->[$slice_idx][$_] } (0 .. $#$cols)
+        }),
+      }
+    );
+  };
+
+  for my $datum_idx (0..$#$data) {
+    my $datum = $data->[$datum_idx];
+
+    for my $col_idx (0..$#$cols) {
+      my $val            = $datum->[$col_idx];
+      my $sqla_bind      = $colvalues{ $cols->[$col_idx] };
+      my $is_literal_sql = (ref $sqla_bind) eq 'SCALAR';
+
+      if ($is_literal_sql) {
+        if (not ref $val) {
+          $bad_slice->('bind found where literal SQL expected', $col_idx, $datum_idx);
+        }
+        elsif ((my $reftype = ref $val) ne 'SCALAR') {
+          $bad_slice->("$reftype reference found where literal SQL expected",
+            $col_idx, $datum_idx);
+        }
+        elsif ($$val ne $$sqla_bind){
+          $bad_slice->("inconsistent literal SQL value, expecting: '$$sqla_bind'",
+            $col_idx, $datum_idx);
+        }
+      }
+      elsif (my $reftype = ref $val) {
+        require overload;
+        if (overload::Method($val, '""')) {
+          $datum->[$col_idx] = "".$val;
+        }
+        else {
+          $bad_slice->("$reftype reference found where bind expected",
+            $col_idx, $datum_idx);
+        }
+      }
+    }
+  }
+
   my ($sql, $bind) = $self->_prep_for_execute (
     'insert', undef, $source, [\%colvalues]
   );
-  my @bind = @$bind
-    or croak 'Cannot insert_bulk without support for placeholders';
+  my @bind = @$bind;
+
+  my $empty_bind = 1 if (not @bind) &&
+    (grep { ref $_ eq 'SCALAR' } values %colvalues) == @$cols;
+
+  if ((not @bind) && (not $empty_bind)) {
+    $self->throw_exception(
+      'Cannot insert_bulk without support for placeholders'
+    );
+  }
+
+  # neither _execute_array, nor _execute_inserts_with_no_binds are
+  # atomic (even if _execute _array is a single call). Thus a safety
+  # scope guard
+  my $guard = $self->txn_scope_guard unless $self->{transaction_depth} != 0;
 
-  $self->_query_start( $sql, @bind );
+  $self->_query_start( $sql, ['__BULK__'] );
   my $sth = $self->sth($sql);
+  my $rv = do {
+    if ($empty_bind) {
+      # bind_param_array doesn't work if there are no binds
+      $self->_dbh_execute_inserts_with_no_binds( $sth, scalar @$data );
+    }
+    else {
+#      @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
+      $self->_execute_array( $source, $sth, \@bind, $cols, $data );
+    }
+  };
+
+  $self->_query_end( $sql, ['__BULK__'] );
 
-#  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
+
+  $guard->commit if $guard;
+
+  return (wantarray ? ($rv, $sth, @bind) : $rv);
+}
+
+sub _execute_array {
+  my ($self, $source, $sth, $bind, $cols, $data, @extra) = @_;
 
   ## This must be an arrayref, else nothing works!
   my $tuple_status = [];
@@ -1367,7 +1506,7 @@ sub insert_bulk {
   ## Bind the values and execute
   my $placeholder_index = 1;
 
-  foreach my $bound (@bind) {
+  foreach my $bound (@$bind) {
 
     my $attributes = {};
     my ($column_name, $data_index) = @$bound;
@@ -1382,57 +1521,82 @@ sub insert_bulk {
     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
     $placeholder_index++;
   }
-  my $rv = eval { $sth->execute_array({ArrayTupleStatus => $tuple_status}) };
-  $sth->finish;
-  if (my $err = $@) {
+
+  my $rv = eval {
+    $self->_dbh_execute_array($sth, $tuple_status, @extra);
+  };
+  my $err = $@ || $sth->errstr;
+
+# Statement must finish even if there was an exception.
+  eval { $sth->finish };
+  $err = $@ unless $err;
+
+  if ($err) {
     my $i = 0;
     ++$i while $i <= $#$tuple_status && !ref $tuple_status->[$i];
 
-    $self->throw_exception($sth->errstr || "Unexpected populate error: $err")
+    $self->throw_exception("Unexpected populate error: $err")
       if ($i > $#$tuple_status);
 
     $self->throw_exception(sprintf "%s for populate slice:\n%s",
-      $tuple_status->[$i][1],
-      $self->_pretty_print ({
+      ($tuple_status->[$i][1] || $err),
+      Data::Dumper::Concise::Dumper({
         map { $cols->[$_] => $data->[$i][$_] } (0 .. $#$cols)
       }),
     );
   }
-  $self->throw_exception($sth->errstr) if !$rv;
+  return $rv;
+}
 
-  $self->_query_end( $sql, @bind );
-  return (wantarray ? ($rv, $sth, @bind) : $rv);
+sub _dbh_execute_array {
+    my ($self, $sth, $tuple_status, @extra) = @_;
+
+    return $sth->execute_array({ArrayTupleStatus => $tuple_status});
 }
 
-sub update {
-  my ($self, $source, @args) = @_; 
+sub _dbh_execute_inserts_with_no_binds {
+  my ($self, $sth, $count) = @_;
 
-# redispatch to update method of storage we reblessed into, if necessary
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can('update');
-  }
+  eval {
+    my $dbh = $self->_get_dbh;
+    local $dbh->{RaiseError} = 1;
+    local $dbh->{PrintError} = 0;
 
-  my $bind_attributes = $self->source_bind_attributes($source);
+    $sth->execute foreach 1..$count;
+  };
+  my $exception = $@;
+
+# Make sure statement is finished even if there was an exception.
+  eval { $sth->finish };
+  $exception = $@ unless $exception;
+
+  $self->throw_exception($exception) if $exception;
+
+  return $count;
+}
+
+sub update {
+  my ($self, $source, @args) = @_;
+
+  my $bind_attrs = $self->source_bind_attributes($source);
 
-  return $self->_execute('update' => [], $source, $bind_attributes, @args);
+  return $self->_execute('update' => [], $source, $bind_attrs, @args);
 }
 
 
 sub delete {
-  my $self = shift @_;
-  my $source = shift @_;
-  $self->_determine_driver;
+  my ($self, $source, @args) = @_;
+
   my $bind_attrs = $self->source_bind_attributes($source);
 
-  return $self->_execute('delete' => [], $source, $bind_attrs, @_);
+  return $self->_execute('delete' => [], $source, $bind_attrs, @args);
 }
 
 # We were sent here because the $rs contains a complex search
 # which will require a subquery to select the correct rows
-# (i.e. joined or limited resultsets)
+# (i.e. joined or limited resultsets, or non-introspectable conditions)
 #
-# Genarating a single PK column subquery is trivial and supported
+# Generating a single PK column subquery is trivial and supported
 # by all RDBMS. However if we have a multicolumn PK, things get ugly.
 # Look at _multipk_update_delete()
 sub _subq_update_delete {
@@ -1441,14 +1605,19 @@ sub _subq_update_delete {
 
   my $rsrc = $rs->result_source;
 
-  # we already check this, but double check naively just in case. Should be removed soon
+  # quick check if we got a sane rs on our hands
+  my @pcols = $rsrc->_pri_cols;
+
   my $sel = $rs->_resolved_attrs->{select};
   $sel = [ $sel ] unless ref $sel eq 'ARRAY';
-  my @pcols = $rsrc->primary_columns;
-  if (@$sel != @pcols) {
+
+  if (
+      join ("\x00", map { join '.', $rs->{attrs}{alias}, $_ } sort @pcols)
+        ne
+      join ("\x00", sort @$sel )
+  ) {
     $self->throw_exception (
-      'Subquery update/delete can not be called on resultsets selecting a'
-     .' number of columns different than the number of primary keys'
+      '_subq_update_delete can not be called on resultsets selecting columns other than the primary keys'
     );
   }
 
@@ -1490,7 +1659,7 @@ sub _per_row_update_delete {
   my ($rs, $op, $values) = @_;
 
   my $rsrc = $rs->result_source;
-  my @pcols = $rsrc->primary_columns;
+  my @pcols = $rsrc->_pri_cols;
 
   my $guard = $self->txn_scope_guard;
 
@@ -1498,11 +1667,12 @@ sub _per_row_update_delete {
   my $row_cnt = '0E0';
 
   my $subrs_cur = $rs->cursor;
-  while (my @pks = $subrs_cur->next) {
+  my @all_pk = $subrs_cur->all;
+  for my $pks ( @all_pk) {
 
     my $cond;
     for my $i (0.. $#pcols) {
-      $cond->{$pcols[$i]} = $pks[$i];
+      $cond->{$pcols[$i]} = $pks->[$i];
     }
 
     $self->$op (
@@ -1566,7 +1736,7 @@ sub _select_args {
     select => $select,
     from => $ident,
     where => $where,
-    $rs_alias
+    $rs_alias && $alias2source->{$rs_alias}
       ? ( _source_handle => $alias2source->{$rs_alias}->handle )
       : ()
     ,
@@ -1617,21 +1787,76 @@ sub _select_args {
 
   my @limit;
 
-  # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
-  # otherwise delegate the limiting to the storage, unless software limit was requested
+  # see if we need to tear the prefetch apart otherwise delegate the limiting to the
+  # storage, unless software limit was requested
   if (
+    #limited has_many
     ( $attrs->{rows} && keys %{$attrs->{collapse}} )
        ||
-    ( $attrs->{group_by} && @{$attrs->{group_by}} &&
-      $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
+    # limited prefetch with RNO subqueries
+    (
+      $attrs->{rows}
+        &&
+      $sql_maker->limit_dialect eq 'RowNumberOver'
+        &&
+      $attrs->{_prefetch_select}
+        &&
+      @{$attrs->{_prefetch_select}}
+    )
+      ||
+    # grouped prefetch
+    ( $attrs->{group_by}
+        &&
+      @{$attrs->{group_by}}
+        &&
+      $attrs->{_prefetch_select}
+        &&
+      @{$attrs->{_prefetch_select}}
+    )
   ) {
     ($ident, $select, $where, $attrs)
       = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
   }
+
+  elsif (
+    ($attrs->{rows} || $attrs->{offset})
+      &&
+    $sql_maker->limit_dialect eq 'RowNumberOver'
+      &&
+    (ref $ident eq 'ARRAY' && @$ident > 1)  # indicates a join
+      &&
+    scalar $self->_parse_order_by ($attrs->{order_by})
+  ) {
+    # the RNO limit dialect above mangles the SQL such that the join gets lost
+    # wrap a subquery here
+
+    push @limit, delete @{$attrs}{qw/rows offset/};
+
+    my $subq = $self->_select_args_to_query (
+      $ident,
+      $select,
+      $where,
+      $attrs,
+    );
+
+    $ident = {
+      -alias => $attrs->{alias},
+      -source_handle => $ident->[0]{-source_handle},
+      $attrs->{alias} => $subq,
+    };
+
+    # all part of the subquery now
+    delete @{$attrs}{qw/order_by group_by having/};
+    $where = undef;
+  }
+
   elsif (! $attrs->{software_limit} ) {
     push @limit, $attrs->{rows}, $attrs->{offset};
   }
 
+  # try to simplify the joinmap further (prune unreferenced type-single joins)
+  $ident = $self->_prune_unused_joins ($ident, $select, $where, $attrs);
+
 ###
   # This would be the point to deflate anything found in $where
   # (and leave $attrs->{bind} intact). Problem is - inflators historically
@@ -1650,324 +1875,6 @@ sub _select_args {
   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
 }
 
-#
-# This is the code producing joined subqueries like:
-# SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
-#
-sub _adjust_select_args_for_complex_prefetch {
-  my ($self, $from, $select, $where, $attrs) = @_;
-
-  $self->throw_exception ('Nothing to prefetch... how did we get here?!')
-    if not @{$attrs->{_prefetch_select}};
-
-  $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
-    if (ref $from ne 'ARRAY' || ref $from->[0] ne 'HASH' || ref $from->[1] ne 'ARRAY');
-
-
-  # generate inner/outer attribute lists, remove stuff that doesn't apply
-  my $outer_attrs = { %$attrs };
-  delete $outer_attrs->{$_} for qw/where bind rows offset group_by having/;
-
-  my $inner_attrs = { %$attrs };
-  delete $inner_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
-
-
-  # bring over all non-collapse-induced order_by into the inner query (if any)
-  # the outer one will have to keep them all
-  delete $inner_attrs->{order_by};
-  if (my $ord_cnt = @{$outer_attrs->{order_by}} - @{$outer_attrs->{_collapse_order_by}} ) {
-    $inner_attrs->{order_by} = [
-      @{$outer_attrs->{order_by}}[ 0 .. $ord_cnt - 1]
-    ];
-  }
-
-
-  # generate the inner/outer select lists
-  # for inside we consider only stuff *not* brought in by the prefetch
-  # on the outside we substitute any function for its alias
-  my $outer_select = [ @$select ];
-  my $inner_select = [];
-  for my $i (0 .. ( @$outer_select - @{$outer_attrs->{_prefetch_select}} - 1) ) {
-    my $sel = $outer_select->[$i];
-
-    if (ref $sel eq 'HASH' ) {
-      $sel->{-as} ||= $attrs->{as}[$i];
-      $outer_select->[$i] = join ('.', $attrs->{alias}, ($sel->{-as} || "inner_column_$i") );
-    }
-
-    push @$inner_select, $sel;
-  }
-
-  # normalize a copy of $from, so it will be easier to work with further
-  # down (i.e. promote the initial hashref to an AoH)
-  $from = [ @$from ];
-  $from->[0] = [ $from->[0] ];
-  my %original_join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
-
-
-  # decide which parts of the join will remain in either part of
-  # the outer/inner query
-
-  # First we compose a list of which aliases are used in restrictions
-  # (i.e. conditions/order/grouping/etc). Since we do not have
-  # introspectable SQLA, we fall back to ugly scanning of raw SQL for
-  # WHERE, and for pieces of ORDER BY in order to determine which aliases
-  # need to appear in the resulting sql.
-  # It may not be very efficient, but it's a reasonable stop-gap
-  # Also unqualified column names will not be considered, but more often
-  # than not this is actually ok
-  #
-  # In the same loop we enumerate part of the selection aliases, as
-  # it requires the same sqla hack for the time being
-  my ($restrict_aliases, $select_aliases, $prefetch_aliases);
-  {
-    # produce stuff unquoted, so it can be scanned
-    my $sql_maker = $self->sql_maker;
-    local $sql_maker->{quote_char};
-    my $sep = $self->_sql_maker_opts->{name_sep} || '.';
-    $sep = "\Q$sep\E";
-
-    my $non_prefetch_select_sql = $sql_maker->_recurse_fields ($inner_select);
-    my $prefetch_select_sql = $sql_maker->_recurse_fields ($outer_attrs->{_prefetch_select});
-    my $where_sql = $sql_maker->where ($where);
-    my $group_by_sql = $sql_maker->_order_by({
-      map { $_ => $inner_attrs->{$_} } qw/group_by having/
-    });
-    my @non_prefetch_order_by_chunks = (map
-      { ref $_ ? $_->[0] : $_ }
-      $sql_maker->_order_by_chunks ($inner_attrs->{order_by})
-    );
-
-
-    for my $alias (keys %original_join_info) {
-      my $seen_re = qr/\b $alias $sep/x;
-
-      for my $piece ($where_sql, $group_by_sql, @non_prefetch_order_by_chunks ) {
-        if ($piece =~ $seen_re) {
-          $restrict_aliases->{$alias} = 1;
-        }
-      }
-
-      if ($non_prefetch_select_sql =~ $seen_re) {
-          $select_aliases->{$alias} = 1;
-      }
-
-      if ($prefetch_select_sql =~ $seen_re) {
-          $prefetch_aliases->{$alias} = 1;
-      }
-
-    }
-  }
-
-  # Add any non-left joins to the restriction list (such joins are indeed restrictions)
-  for my $j (values %original_join_info) {
-    my $alias = $j->{-alias} or next;
-    $restrict_aliases->{$alias} = 1 if (
-      (not $j->{-join_type})
-        or
-      ($j->{-join_type} !~ /^left (?: \s+ outer)? $/xi)
-    );
-  }
-
-  # mark all join parents as mentioned
-  # (e.g.  join => { cds => 'tracks' } - tracks will need to bring cds too )
-  for my $collection ($restrict_aliases, $select_aliases) {
-    for my $alias (keys %$collection) {
-      $collection->{$_} = 1
-        for (@{ $original_join_info{$alias}{-join_path} || [] });
-    }
-  }
-
-  # construct the inner $from for the subquery
-  my %inner_joins = (map { %{$_ || {}} } ($restrict_aliases, $select_aliases) );
-  my @inner_from;
-  for my $j (@$from) {
-    push @inner_from, $j if $inner_joins{$j->[0]{-alias}};
-  }
-
-  # if a multi-type join was needed in the subquery ("multi" is indicated by
-  # presence in {collapse}) - add a group_by to simulate the collapse in the subq
-  unless ($inner_attrs->{group_by}) {
-    for my $alias (keys %inner_joins) {
-
-      # the dot comes from some weirdness in collapse
-      # remove after the rewrite
-      if ($attrs->{collapse}{".$alias"}) {
-        $inner_attrs->{group_by} ||= $inner_select;
-        last;
-      }
-    }
-  }
-
-  # demote the inner_from head
-  $inner_from[0] = $inner_from[0][0];
-
-  # generate the subquery
-  my $subq = $self->_select_args_to_query (
-    \@inner_from,
-    $inner_select,
-    $where,
-    $inner_attrs,
-  );
-
-  my $subq_joinspec = {
-    -alias => $attrs->{alias},
-    -source_handle => $inner_from[0]{-source_handle},
-    $attrs->{alias} => $subq,
-  };
-
-  # Generate the outer from - this is relatively easy (really just replace
-  # the join slot with the subquery), with a major caveat - we can not
-  # join anything that is non-selecting (not part of the prefetch), but at
-  # the same time is a multi-type relationship, as it will explode the result.
-  #
-  # There are two possibilities here
-  # - either the join is non-restricting, in which case we simply throw it away
-  # - it is part of the restrictions, in which case we need to collapse the outer
-  #   result by tackling yet another group_by to the outside of the query
-
-  # so first generate the outer_from, up to the substitution point
-  my @outer_from;
-  while (my $j = shift @$from) {
-    if ($j->[0]{-alias} eq $attrs->{alias}) { # time to swap
-      push @outer_from, [
-        $subq_joinspec,
-        @{$j}[1 .. $#$j],
-      ];
-      last; # we'll take care of what's left in $from below
-    }
-    else {
-      push @outer_from, $j;
-    }
-  }
-
-  # see what's left - throw away if not selecting/restricting
-  # also throw in a group_by if restricting to guard against
-  # cross-join explosions
-  #
-  while (my $j = shift @$from) {
-    my $alias = $j->[0]{-alias};
-
-    if ($select_aliases->{$alias} || $prefetch_aliases->{$alias}) {
-      push @outer_from, $j;
-    }
-    elsif ($restrict_aliases->{$alias}) {
-      push @outer_from, $j;
-
-      # FIXME - this should be obviated by SQLA2, as I'll be able to 
-      # have restrict_inner and restrict_outer... or something to that
-      # effect... I think...
-
-      # FIXME2 - I can't find a clean way to determine if a particular join
-      # is a multi - instead I am just treating everything as a potential
-      # explosive join (ribasushi)
-      #
-      # if (my $handle = $j->[0]{-source_handle}) {
-      #   my $rsrc = $handle->resolve;
-      #   ... need to bail out of the following if this is not a multi,
-      #       as it will be much easier on the db ...
-
-          $outer_attrs->{group_by} ||= $outer_select;
-      # }
-    }
-  }
-
-  # demote the outer_from head
-  $outer_from[0] = $outer_from[0][0];
-
-  # This is totally horrific - the $where ends up in both the inner and outer query
-  # Unfortunately not much can be done until SQLA2 introspection arrives, and even
-  # then if where conditions apply to the *right* side of the prefetch, you may have
-  # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
-  # the outer select to exclude joins you didin't want in the first place
-  #
-  # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
-  return (\@outer_from, $outer_select, $where, $outer_attrs);
-}
-
-sub _resolve_ident_sources {
-  my ($self, $ident) = @_;
-
-  my $alias2source = {};
-  my $rs_alias;
-
-  # the reason this is so contrived is that $ident may be a {from}
-  # structure, specifying multiple tables to join
-  if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
-    # this is compat mode for insert/update/delete which do not deal with aliases
-    $alias2source->{me} = $ident;
-    $rs_alias = 'me';
-  }
-  elsif (ref $ident eq 'ARRAY') {
-
-    for (@$ident) {
-      my $tabinfo;
-      if (ref $_ eq 'HASH') {
-        $tabinfo = $_;
-        $rs_alias = $tabinfo->{-alias};
-      }
-      if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
-        $tabinfo = $_->[0];
-      }
-
-      $alias2source->{$tabinfo->{-alias}} = $tabinfo->{-source_handle}->resolve
-        if ($tabinfo->{-source_handle});
-    }
-  }
-
-  return ($alias2source, $rs_alias);
-}
-
-# Takes $ident, \@column_names
-#
-# returns { $column_name => \%column_info, ... }
-# also note: this adds -result_source => $rsrc to the column info
-#
-# usage:
-#   my $col_sources = $self->_resolve_column_info($ident, @column_names);
-sub _resolve_column_info {
-  my ($self, $ident, $colnames) = @_;
-  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
-
-  my $sep = $self->_sql_maker_opts->{name_sep} || '.';
-  $sep = "\Q$sep\E";
-
-  my (%return, %seen_cols);
-
-  # compile a global list of column names, to be able to properly
-  # disambiguate unqualified column names (if at all possible)
-  for my $alias (keys %$alias2src) {
-    my $rsrc = $alias2src->{$alias};
-    for my $colname ($rsrc->columns) {
-      push @{$seen_cols{$colname}}, $alias;
-    }
-  }
-
-  COLUMN:
-  foreach my $col (@$colnames) {
-    my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
-
-    unless ($alias) {
-      # see if the column was seen exactly once (so we know which rsrc it came from)
-      if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
-        $alias = $seen_cols{$colname}[0];
-      }
-      else {
-        next COLUMN;
-      }
-    }
-
-    my $rsrc = $alias2src->{$alias};
-    $return{$col} = $rsrc && {
-      %{$rsrc->column_info($colname)},
-      -result_source => $rsrc,
-      -source_alias => $alias,
-    };
-  }
-
-  return \%return;
-}
-
 # Returns a counting SELECT for a simple count
 # query. Abstracted so that a storage could override
 # this to { count => 'firstcol' } or whatever makes
@@ -1986,25 +1893,38 @@ sub _count_select {
 #
 sub _subq_count_select {
   my ($self, $source, $rs_attrs) = @_;
-  return $rs_attrs->{group_by} if $rs_attrs->{group_by};
+
+  if (my $groupby = $rs_attrs->{group_by}) {
+
+    my $avail_columns = $self->_resolve_column_info ($rs_attrs->{from});
+
+    my $sel_index;
+    for my $sel (@{$rs_attrs->{select}}) {
+      if (ref $sel eq 'HASH' and $sel->{-as}) {
+        $sel_index->{$sel->{-as}} = $sel;
+      }
+    }
+
+    my @selection;
+    for my $g_part (@$groupby) {
+      if (ref $g_part or $avail_columns->{$g_part}) {
+        push @selection, $g_part;
+      }
+      elsif ($sel_index->{$g_part}) {
+        push @selection, $sel_index->{$g_part};
+      }
+      else {
+        $self->throw_exception ("group_by criteria '$g_part' not contained within current resultset source(s)");
+      }
+    }
+
+    return \@selection;
+  }
 
   my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
   return @pcols ? \@pcols : [ 1 ];
 }
 
-#
-# Returns an ordered list of column names before they are used
-# in a SELECT statement. By default simply returns the list
-# passed in.
-#
-# This may be overridden in a specific storage when there are
-# requirements such as moving BLOB columns to the end of the 
-# SELECT list.
-sub _order_select_columns {
-  #my ($self, $source, $columns) = @_;
-  return @{$_[2]};
-}
-
 sub source_bind_attributes {
   my ($self, $source) = @_;
 
@@ -2151,18 +2071,14 @@ Return the row id of the last insert.
 =cut
 
 sub _dbh_last_insert_id {
-    # All Storage's need to register their own _dbh_last_insert_id
-    # the old SQLite-based method was highly inappropriate
+    my ($self, $dbh, $source, $col) = @_;
 
-    my $self = shift;
-    my $class = ref $self;
-    $self->throw_exception (<<EOE);
+    my $id = eval { $dbh->last_insert_id (undef, undef, $source->name, $col) };
+
+    return $id if defined $id;
 
-No _dbh_last_insert_id() method found in $class.
-Since the method of obtaining the autoincrement id of the last insert
-operation varies greatly between different databases, this method must be
-individually implemented for every storage class.
-EOE
+    my $class = ref $self;
+    $self->throw_exception ("No storage specific _dbh_last_insert_id() method implemented in $class, and the generic DBI::last_insert_id() failed");
 }
 
 sub last_insert_id {
@@ -2180,7 +2096,7 @@ sub last_insert_id {
 
 This API is B<EXPERIMENTAL>, will almost definitely change in the future, and
 currently only used by L<::AutoCast|DBIx::Class::Storage::DBI::AutoCast> and
-L<::Sybase|DBIx::Class::Storage::DBI::Sybase>.
+L<::Sybase::ASE|DBIx::Class::Storage::DBI::Sybase::ASE>.
 
 The default implementation returns C<undef>, implement in your Storage driver if
 you need this functionality.
@@ -2238,14 +2154,7 @@ Returns the database driver name.
 =cut
 
 sub sqlt_type {
-  my ($self) = @_;
-
-  if (not $self->_driver_determined) {
-    $self->_determine_driver;
-    goto $self->can ('sqlt_type');
-  }
-
-  $self->_get_dbh->{Driver}->{Name};
+  shift->_get_dbh->{Driver}->{Name};
 }
 
 =head2 bind_attribute_by_data_type
@@ -2283,7 +2192,7 @@ sub is_datatype_numeric {
 }
 
 
-=head2 create_ddl_dir (EXPERIMENTAL)
+=head2 create_ddl_dir
 
 =over 4
 
@@ -2335,20 +2244,21 @@ hashref like the following
  { ignore_constraint_names => 0, # ... other options }
 
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly
-across all databases, or fully handle complex relationships.
-
-WARNING: Please check all SQL files created, before applying them.
+WARNING: You are strongly advised to check all SQL files created, before applying
+them.
 
 =cut
 
 sub create_ddl_dir {
   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
 
-  if(!$dir || !-d $dir) {
+  unless ($dir) {
     carp "No directory given, using ./\n";
-    $dir = "./";
+    $dir = './';
   }
+
+  $self->throw_exception ("Directory '$dir' does not exist\n") unless(-d $dir);
+
   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
 
@@ -2362,8 +2272,9 @@ sub create_ddl_dir {
     %{$sqltargs || {}}
   };
 
-  $self->throw_exception("Can't create a ddl file without SQL::Translator: " . $self->_sqlt_version_error)
-    if !$self->_sqlt_version_ok;
+  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy')) {
+    $self->throw_exception("Can't create a ddl file without " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  }
 
   my $sqlt = SQL::Translator->new( $sqltargs );
 
@@ -2505,8 +2416,9 @@ sub deployment_statements {
       return join('', @rows);
   }
 
-  $self->throw_exception("Can't deploy without either SQL::Translator or a ddl_dir: " . $self->_sqlt_version_error )
-    if !$self->_sqlt_version_ok;
+  unless (DBIx::Class::Optional::Dependencies->req_ok_for ('deploy') ) {
+    $self->throw_exception("Can't deploy without a ddl_dir or " . DBIx::Class::Optional::Dependencies->req_missing_for ('deploy') );
+  }
 
   # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
@@ -2519,7 +2431,20 @@ sub deployment_statements {
     parser => 'SQL::Translator::Parser::DBIx::Class',
     data => $schema,
   );
-  return $tr->translate;
+
+  my @ret;
+  my $wa = wantarray;
+  if ($wa) {
+    @ret = $tr->translate;
+  }
+  else {
+    $ret[0] = $tr->translate;
+  }
+
+  $self->throw_exception( 'Unable to produce deployment statements: ' . $tr->error)
+    unless (@ret && defined $ret[0]);
+
+  return $wa ? @ret : $ret[0];
 }
 
 sub deploy {
@@ -2543,7 +2468,7 @@ sub deploy {
     }
     $self->_query_end($line);
   };
-  my @statements = $self->deployment_statements($schema, $type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
+  my @statements = $schema->deployment_statements($type, undef, $dir, { %{ $sqltargs || {} }, no_comments => 1 } );
   if (@statements > 1) {
     foreach my $statement (@statements) {
       $deploy->( $statement );
@@ -2585,11 +2510,6 @@ See L</datetime_parser>
 =cut
 
 sub build_datetime_parser {
-  if (not $_[0]->_driver_determined) {
-    $_[0]->_determine_driver;
-    goto $_[0]->can('build_datetime_parser');
-  }
-
   my $self = shift;
   my $type = $self->datetime_parser_type(@_);
   $self->ensure_class_loaded ($type);
@@ -2622,31 +2542,32 @@ sub lag_behind_master {
     return;
 }
 
-# SQLT version handling 
-{
-  my $_sqlt_version_ok;     # private 
-  my $_sqlt_version_error;  # private 
+=head2 relname_to_table_alias
 
-  sub _sqlt_version_ok {
-    if (!defined $_sqlt_version_ok) {
-      eval "use SQL::Translator $minimum_sqlt_version";
-      if ($@) {
-        $_sqlt_version_ok = 0;
-        $_sqlt_version_error = $@;
-      }
-      else {
-        $_sqlt_version_ok = 1;
-      }
-    }
-    return $_sqlt_version_ok;
-  }
+=over 4
 
-  sub _sqlt_version_error {
-    shift->_sqlt_version_ok unless defined $_sqlt_version_ok;
-    return $_sqlt_version_error;
-  }
+=item Arguments: $relname, $join_count
+
+=back
 
-  sub _sqlt_minimum_version { $minimum_sqlt_version };
+L<DBIx::Class> uses L<DBIx::Class::Relationship> names as table aliases in
+queries.
+
+This hook is to allow specific L<DBIx::Class::Storage> drivers to change the
+way these aliases are named.
+
+The default behavior is C<< "$relname_$join_count" if $join_count > 1 >>,
+otherwise C<"$relname">.
+
+=cut
+
+sub relname_to_table_alias {
+  my ($self, $relname, $join_count) = @_;
+
+  my $alias = ($join_count && $join_count > 1 ?
+    join('_', $relname, $join_count) : $relname);
+
+  return $alias;
 }
 
 sub DESTROY {
@@ -2657,7 +2578,10 @@ sub DESTROY {
   # some databases need this to stop spewing warnings
   if (my $dbh = $self->_dbh) {
     local $@;
-    eval { $dbh->disconnect };
+    eval {
+      %{ $dbh->{CachedKids} } = ();
+      $dbh->disconnect;
+    };
   }
 
   $self->_dbh(undef);