privatize dormant method - it may be useful for sybase at *some* point
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 1a1aeec..355edf9 100644 (file)
@@ -1,10 +1,12 @@
 package DBIx::Class::Storage::DBI;
 # -*- mode: cperl; cperl-indent-level: 2 -*-
 
+use strict;
+use warnings;
+
 use base 'DBIx::Class::Storage';
+use mro 'c3';
 
-use strict;    
-use warnings;
 use Carp::Clan qw/^DBIx::Class/;
 use DBI;
 use DBIx::Class::Storage::DBI::Cursor;
@@ -13,14 +15,15 @@ use Scalar::Util();
 use List::Util();
 
 __PACKAGE__->mk_group_accessors('simple' =>
-    qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
-       _conn_pid _conn_tid transaction_depth _dbh_autocommit savepoints/
+  qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid
+     _conn_tid transaction_depth _dbh_autocommit _driver_determined savepoints/
 );
 
 # the values for these accessors are picked out (and deleted) from
 # the attribute hashref passed to connect_info
 my @storage_options = qw/
-  on_connect_do on_disconnect_do disable_sth_caching unsafe auto_savepoint
+  on_connect_call on_disconnect_call on_connect_do on_disconnect_do
+  disable_sth_caching unsafe auto_savepoint
 /;
 __PACKAGE__->mk_group_accessors('simple' => @storage_options);
 
@@ -89,8 +92,8 @@ recognized by DBIx::Class:
 
 =item *
 
-A single code reference which returns a connected 
-L<DBI database handle|DBI/connect> optionally followed by 
+A single code reference which returns a connected
+L<DBI database handle|DBI/connect> optionally followed by
 L<extra attributes|/DBIx::Class specific connection attributes> recognized
 by DBIx::Class:
 
@@ -109,7 +112,7 @@ mixed together:
     %extra_attributes,
   }];
 
-This is particularly useful for L<Catalyst> based applications, allowing the 
+This is particularly useful for L<Catalyst> based applications, allowing the
 following config (L<Config::General> style):
 
   <Model::DB>
@@ -128,7 +131,7 @@ Please note that the L<DBI> docs recommend that you always explicitly
 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
 recommends that it be set to I<1>, and that you perform transactions
 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
-to I<1> if you do not do explicitly set it to zero.  This is the default 
+to I<1> if you do not do explicitly set it to zero.  This is the default
 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
 
 =head3 DBIx::Class specific connection attributes
@@ -177,12 +180,97 @@ immediately before disconnecting from the database.
 Note, this only runs if you explicitly call L</disconnect> on the
 storage object.
 
+=item on_connect_call
+
+A more generalized form of L</on_connect_do> that calls the specified
+C<connect_call_METHOD> methods in your storage driver.
+
+  on_connect_do => 'select 1'
+
+is equivalent to:
+
+  on_connect_call => [ [ do_sql => 'select 1' ] ]
+
+Its values may contain:
+
+=over
+
+=item a scalar
+
+Will call the C<connect_call_METHOD> method.
+
+=item a code reference
+
+Will execute C<< $code->($storage) >>
+
+=item an array reference
+
+Each value can be a method name or code reference.
+
+=item an array of arrays
+
+For each array, the first item is taken to be the C<connect_call_> method name
+or code reference, and the rest are parameters to it.
+
+=back
+
+Some predefined storage methods you may use:
+
+=over
+
+=item do_sql
+
+Executes a SQL string or a code reference that returns a SQL string. This is
+what L</on_connect_do> and L</on_disconnect_do> use.
+
+It can take:
+
+=over
+
+=item a scalar
+
+Will execute the scalar as SQL.
+
+=item an arrayref
+
+Taken to be arguments to L<DBI/do>, the SQL string optionally followed by the
+attributes hashref and bind values.
+
+=item a code reference
+
+Will execute C<< $code->($storage) >> and execute the return array refs as
+above.
+
+=back
+
+=item datetime_setup
+
+Execute any statements necessary to initialize the database session to return
+and accept datetime/timestamp values used with
+L<DBIx::Class::InflateColumn::DateTime>.
+
+Only necessary for some databases, see your specific storage driver for
+implementation details.
+
+=back
+
+=item on_disconnect_call
+
+Takes arguments in the same form as L</on_connect_call> and executes them
+immediately before disconnecting from the database.
+
+Calls the C<disconnect_call_METHOD> methods as opposed to the
+C<connect_call_METHOD> methods called by L</on_connect_call>.
+
+Note, this only runs if you explicitly call L</disconnect> on the
+storage object.
+
 =item disable_sth_caching
 
 If set to a true value, this option will disable the caching of
 statement handles via L<DBI/prepare_cached>.
 
-=item limit_dialect 
+=item limit_dialect
 
 Sets the limit dialect. This is useful for JDBC-bridge among others
 where the remote SQL-dialect cannot be determined by the name of the
@@ -190,7 +278,7 @@ driver alone. See also L<SQL::Abstract::Limit>.
 
 =item quote_char
 
-Specifies what characters to use to quote table and column names. If 
+Specifies what characters to use to quote table and column names. If
 you use this you will want to specify L</name_sep> as well.
 
 C<quote_char> expects either a single character, in which case is it
@@ -202,8 +290,8 @@ SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
 
 =item name_sep
 
-This only needs to be used in conjunction with C<quote_char>, and is used to 
-specify the charecter that seperates elements (schemas, tables, columns) from 
+This only needs to be used in conjunction with C<quote_char>, and is used to
+specify the charecter that seperates elements (schemas, tables, columns) from
 each other. In most cases this is simply a C<.>.
 
 The consequences of not supplying this value is that L<SQL::Abstract>
@@ -349,16 +437,56 @@ sub connect_info {
     }
   }
 
-  %attrs = () if (ref $args[0] eq 'CODE');  # _connect() never looks past $args[0] in this case
+  if (ref $args[0] eq 'CODE') {
+    # _connect() never looks past $args[0] in this case
+    %attrs = ()
+  } else {
+    %attrs = (
+      %{ $self->_default_dbi_connect_attributes || {} },
+      %attrs,
+    );
+  }
 
   $self->_dbi_connect_info([@args, keys %attrs ? \%attrs : ()]);
   $self->_connect_info;
 }
 
+sub _default_dbi_connect_attributes {
+  return { AutoCommit => 1 };
+}
+
 =head2 on_connect_do
 
 This method is deprecated in favour of setting via L</connect_info>.
 
+=cut
+
+=head2 on_disconnect_do
+
+This method is deprecated in favour of setting via L</connect_info>.
+
+=cut
+
+sub _parse_connect_do {
+  my ($self, $type) = @_;
+
+  my $val = $self->$type;
+  return () if not defined $val;
+
+  my @res;
+
+  if (not ref($val)) {
+    push @res, [ 'do_sql', $val ];
+  } elsif (ref($val) eq 'CODE') {
+    push @res, $val;
+  } elsif (ref($val) eq 'ARRAY') {
+    push @res, map { [ 'do_sql', $_ ] } @$val;
+  } else {
+    $self->throw_exception("Invalid type for $type: ".ref($val));
+  }
+
+  return \@res;
+}
 
 =head2 dbh_do
 
@@ -473,7 +601,7 @@ sub txn_do {
     my $exception = $@;
     if(!$exception) { return $want_array ? @result : $result[0] }
 
-    if($tried++ > 0 || $self->connected) {
+    if($tried++ || $self->connected) {
       eval { $self->txn_rollback };
       my $rollback_exception = $@;
       if($rollback_exception) {
@@ -505,11 +633,16 @@ database is not in C<AutoCommit> mode.
 sub disconnect {
   my ($self) = @_;
 
-  if( $self->connected ) {
-    my $connection_do = $self->on_disconnect_do;
-    $self->_do_connection_actions($connection_do) if ref($connection_do);
+  if( $self->_dbh ) {
+    my @actions;
+
+    push @actions, ( $self->on_disconnect_call || () );
+    push @actions, $self->_parse_connect_do ('on_disconnect_do');
+
+    $self->_do_connection_actions(disconnect_call_ => $_) for @actions;
+
+    $self->_dbh_rollback unless $self->_dbh_autocommit;
 
-    $self->_dbh->rollback unless $self->_dbh_autocommit;
     $self->_dbh->disconnect;
     $self->_dbh(undef);
     $self->{_dbh_gen}++;
@@ -538,6 +671,22 @@ sub with_deferred_fk_checks {
   $sub->();
 }
 
+=head2 connected
+
+=over
+
+=item Arguments: none
+
+=item Return Value: 1|0
+
+=back
+
+Verifies that the the current database handle is active and ready to execute
+an SQL statement (i.e. the connection did not get stale, server is still
+answering, etc.) This method is used internally by L</dbh>.
+
+=cut
+
 sub connected {
   my ($self) = @_;
 
@@ -551,12 +700,20 @@ sub connected {
           $self->_verify_pid;
           return 0 if !$self->_dbh;
       }
-      return ($dbh->FETCH('Active') && $dbh->ping);
+      return ($dbh->FETCH('Active') && $self->_ping);
   }
 
   return 0;
 }
 
+sub _ping {
+  my $self = shift;
+
+  my $dbh = $self->_dbh or return 0;
+
+  return $dbh->ping;
+}
+
 # handle pid changes correctly
 #  NOTE: assumes $self->_dbh is a valid $dbh
 sub _verify_pid {
@@ -581,21 +738,48 @@ sub ensure_connected {
 
 =head2 dbh
 
-Returns the dbh - a data base handle of class L<DBI>.
+Returns a C<$dbh> - a data base handle of class L<DBI>. The returned handle
+is guaranteed to be healthy by implicitly calling L</connected>, and if
+necessary performing a reconnection before returning.
 
 =cut
 
 sub dbh {
   my ($self) = @_;
 
-  $self->ensure_connected;
+  if (not $self->_dbh) {
+    $self->_populate_dbh;
+  } else {
+    $self->ensure_connected;
+  }
+  return $self->_dbh;
+}
+
+=head2 last_dbh
+
+This returns the B<last> available C<$dbh> if any, or attempts to
+connect and returns the resulting handle. This method differs from
+L</dbh> by not validating if a preexisting handle is still healthy
+via L</connected>. Make sure you take appropriate precautions
+when using this method, as the C<$dbh> may be useless at this point.
+
+=cut
+
+sub last_dbh {
+  my $self = shift;
+  $self->_populate_dbh unless $self->_dbh;
   return $self->_dbh;
 }
 
 sub _sql_maker_args {
     my ($self) = @_;
-    
-    return ( bindtype=>'columns', array_datatypes => 1, limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
+
+    return (
+      bindtype=>'columns',
+      array_datatypes => 1,
+      limit_dialect => $self->last_dbh,
+      %{$self->_sql_maker_opts}
+    );
 }
 
 sub sql_maker {
@@ -612,6 +796,7 @@ sub _rebless {}
 
 sub _populate_dbh {
   my ($self) = @_;
+
   my @info = @{$self->_dbi_connect_info || []};
   $self->_dbh($self->_connect(@info));
 
@@ -624,51 +809,88 @@ sub _populate_dbh {
   #  there is no transaction in progress by definition
   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
 
-  my $connection_do = $self->on_connect_do;
-  $self->_do_connection_actions($connection_do) if $connection_do;
+  $self->_run_connection_actions unless $self->{_in_determine_driver};
+}
+
+sub _run_connection_actions {
+  my $self = shift;
+  my @actions;
+
+  push @actions, ( $self->on_connect_call || () );
+  push @actions, $self->_parse_connect_do ('on_connect_do');
+
+  $self->_do_connection_actions(connect_call_ => $_) for @actions;
 }
 
 sub _determine_driver {
   my ($self) = @_;
 
-  if (ref $self eq 'DBIx::Class::Storage::DBI') {
-    my $driver;
+  if (not $self->_driver_determined) {
+    my $started_unconnected = 0;
+    local $self->{_in_determine_driver} = 1;
+
+    if (ref($self) eq __PACKAGE__) {
+      my $driver;
+      if ($self->_dbh) { # we are connected
+        $driver = $self->_dbh->{Driver}{Name};
+      } else {
+        # try to use dsn to not require being connected, the driver may still
+        # force a connection in _rebless to determine version
+        ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+        $started_unconnected = 1;
+      }
 
-    if ($self->_dbh) { # we are connected
-      $driver = $self->_dbh->{Driver}{Name};
-    } else {
-      # try to use dsn to not require being connected, the driver may still
-      # force a connection in _rebless to determine version
-      ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
+      my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
+      if ($self->load_optional_class($storage_class)) {
+        mro::set_mro($storage_class, 'c3');
+        bless $self, $storage_class;
+        $self->_rebless();
+      }
     }
 
-    if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
-      bless $self, "DBIx::Class::Storage::DBI::${driver}";
-      $self->_rebless();
-    }
+    $self->_driver_determined(1);
+
+    $self->_run_connection_actions
+        if $started_unconnected && defined $self->_dbh;
   }
 }
 
 sub _do_connection_actions {
-  my $self = shift;
-  my $connection_do = shift;
-
-  if (!ref $connection_do) {
-    $self->_do_query($connection_do);
-  }
-  elsif (ref $connection_do eq 'ARRAY') {
-    $self->_do_query($_) foreach @$connection_do;
-  }
-  elsif (ref $connection_do eq 'CODE') {
-    $connection_do->($self);
-  }
-  else {
-    $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref $connection_do) );
+  my $self          = shift;
+  my $method_prefix = shift;
+  my $call          = shift;
+
+  if (not ref($call)) {
+    my $method = $method_prefix . $call;
+    $self->$method(@_);
+  } elsif (ref($call) eq 'CODE') {
+    $self->$call(@_);
+  } elsif (ref($call) eq 'ARRAY') {
+    if (ref($call->[0]) ne 'ARRAY') {
+      $self->_do_connection_actions($method_prefix, $_) for @$call;
+    } else {
+      $self->_do_connection_actions($method_prefix, @$_) for @$call;
+    }
+  } else {
+    $self->throw_exception (sprintf ("Don't know how to process conection actions of type '%s'", ref($call)) );
   }
 
   return $self;
 }
 
+sub connect_call_do_sql {
+  my $self = shift;
+  $self->_do_query(@_);
+}
+
+sub disconnect_call_do_sql {
+  my $self = shift;
+  $self->_do_query(@_);
+}
+
+# override in db-specific backend when necessary
+sub connect_call_datetime_setup { 1 }
+
 sub _do_query {
   my ($self, $action) = @_;
 
@@ -753,11 +975,11 @@ sub svp_begin {
 
   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
     unless $self->can('_svp_begin');
-  
+
   push @{ $self->{savepoints} }, $name;
 
   $self->debugobj->svp_begin($name) if $self->debug;
-  
+
   return $self->_svp_begin($name);
 }
 
@@ -817,7 +1039,7 @@ sub svp_rollback {
   }
 
   $self->debugobj->svp_rollback($name) if $self->debug;
-  
+
   return $self->_svp_rollback($name);
 }
 
@@ -829,27 +1051,34 @@ sub _svp_generate_name {
 
 sub txn_begin {
   my $self = shift;
-  $self->ensure_connected();
   if($self->{transaction_depth} == 0) {
     $self->debugobj->txn_begin()
       if $self->debug;
-    # this isn't ->_dbh-> because
-    #  we should reconnect on begin_work
-    #  for AutoCommit users
-    $self->dbh->begin_work;
-  } elsif ($self->auto_savepoint) {
+    $self->_dbh_begin_work;
+  }
+  elsif ($self->auto_savepoint) {
     $self->svp_begin;
   }
   $self->{transaction_depth}++;
 }
 
+sub _dbh_begin_work {
+  my $self = shift;
+  # being here implies we have AutoCommit => 1
+  # if the user is utilizing txn_do - good for
+  # him, otherwise we need to ensure that the
+  # $dbh is healthy on BEGIN
+  my $dbh_method = $self->{_in_dbh_do} ? '_dbh' : 'dbh';
+  $self->$dbh_method->begin_work;
+}
+
 sub txn_commit {
   my $self = shift;
   if ($self->{transaction_depth} == 1) {
     my $dbh = $self->_dbh;
     $self->debugobj->txn_commit()
       if ($self->debug);
-    $dbh->commit;
+    $self->_dbh_commit;
     $self->{transaction_depth} = 0
       if $self->_dbh_autocommit;
   }
@@ -860,6 +1089,11 @@ sub txn_commit {
   }
 }
 
+sub _dbh_commit {
+  my $self = shift;
+  $self->_dbh->commit;
+}
+
 sub txn_rollback {
   my $self = shift;
   my $dbh = $self->_dbh;
@@ -869,7 +1103,7 @@ sub txn_rollback {
         if ($self->debug);
       $self->{transaction_depth} = 0
         if $self->_dbh_autocommit;
-      $dbh->rollback;
+      $self->_dbh_rollback;
     }
     elsif($self->{transaction_depth} > 1) {
       $self->{transaction_depth}--;
@@ -892,6 +1126,11 @@ sub txn_rollback {
   }
 }
 
+sub _dbh_rollback {
+  my $self = shift;
+  $self->_dbh->rollback;
+}
+
 # This used to be the top-half of _execute.  It was split out to make it
 #  easier to override in NoBindVars without duping the rest.  It takes up
 #  all of _execute's args, and emits $sql, @bind.
@@ -927,22 +1166,6 @@ sub _fix_bind_params {
         } @bind;
 }
 
-sub _flatten_bind_params {
-    my ($self, @bind) = @_;
-
-    ### Turn @bind from something like this:
-    ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
-    ### to this:
-    ###   ( 1, 1, 3 )
-    return
-        map {
-            if ( defined( $_ && $_->[1] ) ) {
-                @{$_}[ 1 .. $#$_ ];
-            }
-            else { undef; }
-        } @bind;
-}
-
 sub _query_start {
     my ( $self, $sql, @bind ) = @_;
 
@@ -971,7 +1194,7 @@ sub _dbh_execute {
 
   my $sth = $self->sth($sql,$op);
 
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@$bind) {
     my $attributes = {};
@@ -1008,18 +1231,27 @@ sub _execute {
 sub insert {
   my ($self, $source, $to_insert) = @_;
 
+# redispatch to insert method of storage we reblessed into, if necessary
+  if (not $self->_driver_determined) {
+    $self->_determine_driver;
+    goto $self->can('insert');
+  }
+
   my $ident = $source->from;
   my $bind_attributes = $self->source_bind_attributes($source);
 
   my $updated_cols = {};
 
-  $self->ensure_connected;
   foreach my $col ( $source->columns ) {
     if ( !defined $to_insert->{$col} ) {
       my $col_info = $source->column_info($col);
 
       if ( $col_info->{auto_nextval} ) {
-        $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
+        $updated_cols->{$col} = $to_insert->{$col} = $self->_sequence_fetch(
+          'nextval',
+          $col_info->{sequence} ||
+            $self->_dbh_get_autoinc_seq($self->last_dbh, $source)
+        );
       }
     }
   }
@@ -1030,7 +1262,7 @@ sub insert {
 }
 
 ## Still not quite perfect, and EXPERIMENTAL
-## Currently it is assumed that all values passed will be "normal", i.e. not 
+## Currently it is assumed that all values passed will be "normal", i.e. not
 ## scalar refs, or at least, all the same type as the first set, the statement is
 ## only prepped once.
 sub insert_bulk {
@@ -1039,7 +1271,9 @@ sub insert_bulk {
   my $table = $source->from;
   @colvalues{@$cols} = (0..$#$cols);
   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
-  
+
+  $self->_determine_driver;
+
   $self->_query_start( $sql, @bind );
   my $sth = $self->sth($sql);
 
@@ -1052,7 +1286,7 @@ sub insert_bulk {
   my $bind_attributes = $self->source_bind_attributes($source);
 
   ## Bind the values and execute
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@bind) {
 
@@ -1099,8 +1333,9 @@ sub insert_bulk {
 sub update {
   my $self = shift @_;
   my $source = shift @_;
+  $self->_determine_driver;
   my $bind_attributes = $self->source_bind_attributes($source);
-  
+
   return $self->_execute('update' => [], $source, $bind_attributes, @_);
 }
 
@@ -1108,9 +1343,9 @@ sub update {
 sub delete {
   my $self = shift @_;
   my $source = shift @_;
-  
+  $self->_determine_driver;
   my $bind_attrs = $self->source_bind_attributes($source);
-  
+
   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
 }
 
@@ -1209,10 +1444,10 @@ sub _select {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   return $self->_execute($self->_select_args(@_));
 }
@@ -1221,10 +1456,10 @@ sub _select_args_to_query {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
   #  = $self->_select_args($ident, $select, $cond, $attrs);
@@ -1244,8 +1479,19 @@ sub _select_args_to_query {
 sub _select_args {
   my ($self, $ident, $select, $where, $attrs) = @_;
 
+  my ($alias2source, $rs_alias) = $self->_resolve_ident_sources ($ident);
+
   my $sql_maker = $self->sql_maker;
-  my $alias2source = $self->_resolve_ident_sources ($ident);
+  $sql_maker->{_dbic_rs_attrs} = {
+    %$attrs,
+    select => $select,
+    from => $ident,
+    where => $where,
+    $rs_alias
+      ? ( _source_handle => $alias2source->{$rs_alias}->handle )
+      : ()
+    ,
+  };
 
   # calculate bind_attrs before possible $ident mangling
   my $bind_attrs = {};
@@ -1256,29 +1502,55 @@ sub _select_args {
       my $fqcn = join ('.', $alias, $col);
       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
 
-      # so that unqualified searches can be bound too
-      $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
+      # Unqialified column names are nice, but at the same time can be
+      # rather ambiguous. What we do here is basically go along with
+      # the loop, adding an unqualified column slot to $bind_attrs,
+      # alongside the fully qualified name. As soon as we encounter
+      # another column by that name (which would imply another table)
+      # we unset the unqualified slot and never add any info to it
+      # to avoid erroneous type binding. If this happens the users
+      # only choice will be to fully qualify his column name
+
+      if (exists $bind_attrs->{$col}) {
+        $bind_attrs->{$col} = {};
+      }
+      else {
+        $bind_attrs->{$col} = $bind_attrs->{$fqcn};
+      }
     }
   }
 
-  my @limit;
-  if ($attrs->{software_limit} ||
-      $sql_maker->_default_limit_syntax eq "GenericSubQ") {
-        $attrs->{software_limit} = 1;
-  } else {
+  # adjust limits
+  if (
+    $attrs->{software_limit}
+      ||
+    $sql_maker->_default_limit_syntax eq "GenericSubQ"
+  ) {
+    $attrs->{software_limit} = 1;
+  }
+  else {
     $self->throw_exception("rows attribute must be positive if present")
       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
 
     # MySQL actually recommends this approach.  I cringe.
     $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
+  }
 
-    if ($attrs->{rows} && keys %{$attrs->{collapse}}) {
-      ($ident, $select, $where, $attrs)
-        = $self->_adjust_select_args_for_limited_prefetch ($ident, $select, $where, $attrs);
-    }
-    else {
-      push @limit, $attrs->{rows}, $attrs->{offset};
-    }
+  my @limit;
+
+  # see if we need to tear the prefetch apart (either limited has_many or grouped prefetch)
+  # otherwise delegate the limiting to the storage, unless software limit was requested
+  if (
+    ( $attrs->{rows} && keys %{$attrs->{collapse}} )
+       ||
+    ( $attrs->{group_by} && @{$attrs->{group_by}} &&
+      $attrs->{_prefetch_select} && @{$attrs->{_prefetch_select}} )
+  ) {
+    ($ident, $select, $where, $attrs)
+      = $self->_adjust_select_args_for_complex_prefetch ($ident, $select, $where, $attrs);
+  }
+  elsif (! $attrs->{software_limit} ) {
+    push @limit, $attrs->{rows}, $attrs->{offset};
   }
 
 ###
@@ -1293,50 +1565,70 @@ sub _select_args {
 
   my $order = { map
     { $attrs->{$_} ? ( $_ => $attrs->{$_} ) : ()  }
-    (qw/order_by group_by having _virtual_order_by/ )
+    (qw/order_by group_by having/ )
   };
 
-
-  $sql_maker->{for} = delete $attrs->{for};
-
   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
 }
 
-sub _adjust_select_args_for_limited_prefetch {
+#
+# This is the code producing joined subqueries like:
+# SELECT me.*, other.* FROM ( SELECT me.* FROM ... ) JOIN other ON ... 
+#
+sub _adjust_select_args_for_complex_prefetch {
   my ($self, $from, $select, $where, $attrs) = @_;
 
-  if ($attrs->{group_by} and @{$attrs->{group_by}}) {
-    $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a group_by attribute');
-  }
-
-  $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
+  $self->throw_exception ('Complex prefetches are not supported on resultsets with a custom from attribute')
     if (ref $from ne 'ARRAY');
 
+  # copies for mangling
+  $from = [ @$from ];
+  $select = [ @$select ];
+  $attrs = { %$attrs };
+
   # separate attributes
   my $sub_attrs = { %$attrs };
-  delete $attrs->{$_} for qw/where bind rows offset/;
-  delete $sub_attrs->{$_} for qw/for collapse select order_by/;
+  delete $attrs->{$_} for qw/where bind rows offset group_by having/;
+  delete $sub_attrs->{$_} for qw/for collapse _prefetch_select _collapse_order_by select as/;
+
+  my $select_root_alias = $attrs->{alias};
+  my $sql_maker = $self->sql_maker;
 
-  my $alias = $attrs->{alias};
+  # create subquery select list - consider only stuff *not* brought in by the prefetch
+  my $sub_select = [];
+  my $sub_group_by;
+  for my $i (0 .. @{$attrs->{select}} - @{$attrs->{_prefetch_select}} - 1) {
+    my $sel = $attrs->{select}[$i];
+
+    # alias any functions to the dbic-side 'as' label
+    # adjust the outer select accordingly
+    if (ref $sel eq 'HASH' && !$sel->{-select}) {
+      $sel = { -select => $sel, -as => $attrs->{as}[$i] };
+      $select->[$i] = join ('.', $attrs->{alias}, ($attrs->{as}[$i] || "select_$i") );
+    }
 
-  # create subquery select list
-  my $sub_select = [ grep { $_ =~ /^$alias\./ } @{$attrs->{select}} ];
+    push @$sub_select, $sel;
+  }
 
   # bring over all non-collapse-induced order_by into the inner query (if any)
   # the outer one will have to keep them all
+  delete $sub_attrs->{order_by};
   if (my $ord_cnt = @{$attrs->{order_by}} - @{$attrs->{_collapse_order_by}} ) {
     $sub_attrs->{order_by} = [
-      @{$attrs->{order_by}}[ 0 .. ($#{$attrs->{order_by}} - $ord_cnt - 1) ]
+      @{$attrs->{order_by}}[ 0 .. $ord_cnt - 1]
     ];
   }
 
+  # mangle {from}, keep in mind that $from is "headless" from here on
+  my $join_root = shift @$from;
 
-  # mangle the head of the {from}
-  my $self_ident = shift @$from;
-
+  my %inner_joins;
   my %join_info = map { $_->[0]{-alias} => $_->[0] } (@$from);
 
-  my (%inner_joins);
+  # in complex search_related chains $select_root_alias may *not* be
+  # 'me' so always include it in the inner join
+  $inner_joins{$select_root_alias} = 1 if ($join_root->{-alias} ne $select_root_alias);
+
 
   # decide which parts of the join will remain on the inside
   #
@@ -1345,7 +1637,7 @@ sub _adjust_select_args_for_limited_prefetch {
   # away _any_ branches of the join tree that are:
   # 1) not mentioned in the condition/order
   # 2) left-join leaves (or left-join leaf chains)
-  # Most of the join ocnditions will not satisfy this, but for real
+  # Most of the join conditions will not satisfy this, but for real
   # complex queries some might, and we might make some RDBMS happy.
   #
   #
@@ -1355,8 +1647,9 @@ sub _adjust_select_args_for_limited_prefetch {
   # It may not be very efficient, but it's a reasonable stop-gap
   {
     # produce stuff unquoted, so it can be scanned
-    my $sql_maker = $self->sql_maker;
     local $sql_maker->{quote_char};
+    my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+    $sep = "\Q$sep\E";
 
     my @order_by = (map
       { ref $_ ? $_->[0] : $_ }
@@ -1364,6 +1657,7 @@ sub _adjust_select_args_for_limited_prefetch {
     );
 
     my $where_sql = $sql_maker->where ($where);
+    my $select_sql = $sql_maker->_recurse_fields ($sub_select);
 
     # sort needed joins
     for my $alias (keys %join_info) {
@@ -1371,8 +1665,8 @@ sub _adjust_select_args_for_limited_prefetch {
       # any table alias found on a column name in where or order_by
       # gets included in %inner_joins
       # Also any parent joins that are needed to reach this particular alias
-      for my $piece ($where_sql, @order_by ) {
-        if ($piece =~ /\b$alias\./) {
+      for my $piece ($select_sql, $where_sql, @order_by ) {
+        if ($piece =~ /\b $alias $sep/x) {
           $inner_joins{$alias} = 1;
         }
       }
@@ -1395,20 +1689,20 @@ sub _adjust_select_args_for_limited_prefetch {
   }
 
   # construct the inner $from for the subquery
-  my $inner_from = [ $self_ident ];
-  if (keys %inner_joins) {
-    for my $j (@$from) {
-      push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
-    }
+  my $inner_from = [ $join_root ];
+  for my $j (@$from) {
+    push @$inner_from, $j if $inner_joins{$j->[0]{-alias}};
+  }
 
-    # if a multi-type join was needed in the subquery ("multi" is indicated by
-    # presence in {collapse}) - add a group_by to simulate the collapse in the subq
+  # if a multi-type join was needed in the subquery ("multi" is indicated by
+  # presence in {collapse}) - add a group_by to simulate the collapse in the subq
+  unless ($sub_attrs->{group_by}) {
     for my $alias (keys %inner_joins) {
 
       # the dot comes from some weirdness in collapse
       # remove after the rewrite
       if ($attrs->{collapse}{".$alias"}) {
-        $sub_attrs->{group_by} = $sub_select;
+        $sub_attrs->{group_by} ||= $sub_select;
         last;
       }
     }
@@ -1421,27 +1715,64 @@ sub _adjust_select_args_for_limited_prefetch {
     $where,
     $sub_attrs
   );
+  my $subq_joinspec = {
+    -alias => $select_root_alias,
+    -source_handle => $join_root->{-source_handle},
+    $select_root_alias => $subq,
+  };
 
-  # put it back in $from
-  unshift @$from, { $alias => $subq };
+  # Generate a new from (really just replace the join slot with the subquery)
+  # Before we would start the outer chain from the subquery itself (i.e.
+  # SELECT ... FROM (SELECT ... ) alias JOIN ..., but this turned out to be
+  # a bad idea for search_related, as the root of the chain was effectively
+  # lost (i.e. $artist_rs->search_related ('cds'... ) would result in alias
+  # of 'cds', which would prevent from doing things like order_by artist.*)
+  # See t/prefetch/via_search_related.t for a better idea
+  my @outer_from;
+  if ($join_root->{-alias} eq $select_root_alias) { # just swap the root part and we're done
+    @outer_from = (
+      $subq_joinspec,
+      @$from,
+    )
+  }
+  else {  # this is trickier
+    @outer_from = ($join_root);
+
+    for my $j (@$from) {
+      if ($j->[0]{-alias} eq $select_root_alias) {
+        push @outer_from, [
+          $subq_joinspec,
+          @{$j}[1 .. $#$j],
+        ];
+      }
+      else {
+        push @outer_from, $j;
+      }
+    }
+  }
 
   # This is totally horrific - the $where ends up in both the inner and outer query
-  # Unfortunately not much can be done until SQLA2 introspection arrives
+  # Unfortunately not much can be done until SQLA2 introspection arrives, and even
+  # then if where conditions apply to the *right* side of the prefetch, you may have
+  # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
+  # the outer select to exclude joins you didin't want in the first place
   #
   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
-  return ($from, $select, $where, $attrs);
+  return (\@outer_from, $select, $where, $attrs);
 }
 
 sub _resolve_ident_sources {
   my ($self, $ident) = @_;
 
   my $alias2source = {};
+  my $rs_alias;
 
   # the reason this is so contrived is that $ident may be a {from}
   # structure, specifying multiple tables to join
   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
     # this is compat mode for insert/update/delete which do not deal with aliases
     $alias2source->{me} = $ident;
+    $rs_alias = 'me';
   }
   elsif (ref $ident eq 'ARRAY') {
 
@@ -1449,6 +1780,7 @@ sub _resolve_ident_sources {
       my $tabinfo;
       if (ref $_ eq 'HASH') {
         $tabinfo = $_;
+        $rs_alias = $tabinfo->{-alias};
       }
       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
         $tabinfo = $_->[0];
@@ -1459,85 +1791,105 @@ sub _resolve_ident_sources {
     }
   }
 
-  return $alias2source;
+  return ($alias2source, $rs_alias);
 }
 
-sub _copy_attributes_for_count {
-  my ($self, $source, $attrs) = @_;
-  my %attrs = %$attrs;
-
-  # take off any column specs, any pagers, record_filter is cdbi, and no point of ordering a count
-  delete @attrs{qw/select as rows offset page order_by record_filter/};
-
-  return \%attrs;
-}
-
-sub count {
-  my ($self, $source, $attrs) = @_;
-
-  my $tmp_attrs = $self->_copy_attributes_for_count($source, $attrs);
-
-  # overwrite the selector
-  $tmp_attrs->{select} = { count => '*' };
-
-  my $tmp_rs = $source->resultset_class->new($source, $tmp_attrs);
-  my ($count) = $tmp_rs->cursor->next;
-
-  # if the offset/rows attributes are still present, we did not use
-  # a subquery, so we need to make the calculations in software
-  $count -= $attrs->{offset} if $attrs->{offset};
-  $count = $attrs->{rows} if $attrs->{rows} and $attrs->{rows} < $count;
-  $count = 0 if ($count < 0);
-
-  return $count;
-}
+# Takes $ident, \@column_names
+#
+# returns { $column_name => \%column_info, ... }
+# also note: this adds -result_source => $rsrc to the column info
+#
+# usage:
+#   my $col_sources = $self->_resolve_column_info($ident, @column_names);
+sub _resolve_column_info {
+  my ($self, $ident, $colnames) = @_;
+  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
+
+  my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+  $sep = "\Q$sep\E";
+
+  my (%return, %seen_cols);
+
+  # compile a global list of column names, to be able to properly
+  # disambiguate unqualified column names (if at all possible)
+  for my $alias (keys %$alias2src) {
+    my $rsrc = $alias2src->{$alias};
+    for my $colname ($rsrc->columns) {
+      push @{$seen_cols{$colname}}, $alias;
+    }
+  }
 
-sub count_grouped {
-  my ($self, $source, $attrs) = @_;
+  COLUMN:
+  foreach my $col (@$colnames) {
+    my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
 
-  # copy for the subquery, we need to do some adjustments to it too
-  my $sub_attrs = { %$attrs };
-
-  # these can not go in the subquery, and there is no point of ordering it
-  delete $sub_attrs->{$_} for qw/collapse select as order_by/;
+    unless ($alias) {
+      # see if the column was seen exactly once (so we know which rsrc it came from)
+      if ($seen_cols{$colname} and @{$seen_cols{$colname}} == 1) {
+        $alias = $seen_cols{$colname}[0];
+      }
+      else {
+        next COLUMN;
+      }
+    }
 
-  # if we prefetch, we group_by primary keys only as this is what we would get out of the rs via ->next/->all
-  # simply deleting group_by suffices, as the code below will re-fill it
-  # Note: we check $attrs, as $sub_attrs has collapse deleted
-  if (ref $attrs->{collapse} and keys %{$attrs->{collapse}} ) {
-    delete $sub_attrs->{group_by};
+    my $rsrc = $alias2src->{$alias};
+    $return{$col} = $rsrc && {
+      %{$rsrc->column_info($colname)},
+      -result_source => $rsrc,
+      -source_alias => $alias,
+    };
   }
 
-  $sub_attrs->{group_by} ||= [ map { "$attrs->{alias}.$_" } ($source->primary_columns) ];
-  $sub_attrs->{select} = $self->_grouped_count_select ($source, $sub_attrs);
+  return \%return;
+}
 
-  $attrs->{from} = [{
-    count_subq => $source->resultset_class->new ($source, $sub_attrs )->as_query
-  }];
+# Returns a counting SELECT for a simple count
+# query. Abstracted so that a storage could override
+# this to { count => 'firstcol' } or whatever makes
+# sense as a performance optimization
+sub _count_select {
+  #my ($self, $source, $rs_attrs) = @_;
+  return { count => '*' };
+}
 
-  # the subquery replaces this
-  delete $attrs->{$_} for qw/where bind collapse group_by having having_bind rows offset/;
+# Returns a SELECT which will end up in the subselect
+# There may or may not be a group_by, as the subquery
+# might have been called to accomodate a limit
+#
+# Most databases would be happy with whatever ends up
+# here, but some choke in various ways.
+#
+sub _subq_count_select {
+  my ($self, $source, $rs_attrs) = @_;
+  return $rs_attrs->{group_by} if $rs_attrs->{group_by};
 
-  return $self->count ($source, $attrs);
+  my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
+  return @pcols ? \@pcols : [ 1 ];
 }
 
 #
-# Returns a SELECT to go with a supplied GROUP BY
-# (caled by count_grouped so a group_by is present)
-# Most databases expect them to match, but some
-# choke in various ways.
+# Returns an ordered list of column names before they are used
+# in a SELECT statement. By default simply returns the list
+# passed in.
 #
-sub _grouped_count_select {
-  my ($self, $source, $rs_args) = @_;
-  return $rs_args->{group_by};
+# This may be overridden in a specific storage when there are
+# requirements such as moving BLOB columns to the end of the 
+# SELECT list.
+sub _order_select_columns {
+  #my ($self, $source, $columns) = @_;
+  return @{$_[2]};
 }
 
+
+
+
 sub source_bind_attributes {
   my ($self, $source) = @_;
-  
+
   my $bind_attributes;
   foreach my $column ($source->columns) {
-  
+
     my $data_type = $source->column_info($column)->{data_type} || '';
     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
      if $data_type;
@@ -1703,7 +2055,7 @@ Returns the database driver name.
 
 =cut
 
-sub sqlt_type { shift->dbh->{Driver}->{Name} }
+sub sqlt_type { shift->last_dbh->{Driver}->{Name} }
 
 =head2 bind_attribute_by_data_type
 
@@ -1786,13 +2138,13 @@ By default, C<\%sqlt_args> will have
 
  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
 
-merged with the hash passed in. To disable any of those features, pass in a 
+merged with the hash passed in. To disable any of those features, pass in a
 hashref like the following
 
  { ignore_constraint_names => 0, # ... other options }
 
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly 
+Note that this feature is currently EXPERIMENTAL and may not work correctly
 across all databases, or fully handle complex relationships.
 
 WARNING: Please check all SQL files created, before applying them.
@@ -1813,7 +2165,7 @@ sub create_ddl_dir {
   $version ||= $schema_version;
 
   $sqltargs = {
-    add_drop_table => 1, 
+    add_drop_table => 1,
     ignore_constraint_names => 1,
     ignore_index_names => 1,
     %{$sqltargs || {}}
@@ -1853,7 +2205,7 @@ sub create_ddl_dir {
     }
     print $file $output;
     close($file);
-  
+
     next unless ($preversion);
 
     require SQL::Translator::Diff;
@@ -1869,7 +2221,7 @@ sub create_ddl_dir {
       carp("Overwriting existing diff file - $difffile");
       unlink($difffile);
     }
-    
+
     my $source_schema;
     {
       my $t = SQL::Translator->new($sqltargs);
@@ -1888,7 +2240,7 @@ sub create_ddl_dir {
         unless ( $source_schema->name );
     }
 
-    # The "new" style of producers have sane normalization and can support 
+    # The "new" style of producers have sane normalization and can support
     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
     # And we have to diff parsed SQL against parsed SQL.
     my $dest_schema = $sqlt_schema;
@@ -1909,12 +2261,12 @@ sub create_ddl_dir {
       $dest_schema->name( $filename )
         unless $dest_schema->name;
     }
-    
+
     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
                                                   $dest_schema,   $db,
                                                   $sqltargs
                                                  );
-    if(!open $file, ">$difffile") { 
+    if(!open $file, ">$difffile") {
       $self->throw_exception("Can't write to $difffile ($!)");
       next;
     }
@@ -1950,7 +2302,7 @@ See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
 sub deployment_statements {
   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
   # Need to be connected to get the correct sqlt_type
-  $self->ensure_connected() unless $type;
+  $self->last_dbh() unless $type;
   $type ||= $self->sqlt_type;
   $version ||= $schema->schema_version || '1.x';
   $dir ||= './';
@@ -1958,7 +2310,7 @@ sub deployment_statements {
   if(-f $filename)
   {
       my $file;
-      open($file, "<$filename") 
+      open($file, "<$filename")
         or $self->throw_exception("Can't open $filename ($!)");
       my @rows = <$file>;
       close($file);
@@ -1973,7 +2325,7 @@ sub deployment_statements {
   eval qq{use SQL::Translator::Producer::${type}};
   $self->throw_exception($@) if $@;
 
-  # sources needs to be a parser arg, but for simplicty allow at top level 
+  # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
       if exists $sqltargs->{sources};
@@ -1995,7 +2347,10 @@ sub deploy {
     return if $line =~ /^\s+$/; # skip whitespace only
     $self->_query_start($line);
     eval {
-      $self->dbh->do($line); # shouldn't be using ->dbh ?
+      # a previous error may invalidate $dbh - thus we need to use dbh()
+      # to guarantee a healthy $dbh (this is temporary until we get
+      # proper error handling on deploy() )
+      $self->dbh->do($line);
     };
     if ($@) {
       carp qq{$@ (running "${line}")};
@@ -2024,7 +2379,7 @@ Returns the datetime parser class
 sub datetime_parser {
   my $self = shift;
   return $self->{datetime_parser} ||= do {
-    $self->ensure_connected;
+    $self->last_dbh;
     $self->build_datetime_parser(@_);
   };
 }
@@ -2078,7 +2433,7 @@ returned by databases that don't support replication.
 
 sub is_replicating {
     return;
-    
+
 }
 
 =head2 lag_behind_master
@@ -2108,7 +2463,7 @@ sub DESTROY {
 
 DBIx::Class can do some wonderful magic with handling exceptions,
 disconnections, and transactions when you use C<< AutoCommit => 1 >>
-combined with C<txn_do> for transaction support.
+(the default) combined with C<txn_do> for transaction support.
 
 If you set C<< AutoCommit => 0 >> in your connect info, then you are always
 in an assumed transaction between commits, and you're telling us you'd
@@ -2120,7 +2475,6 @@ cases if you choose the C<< AutoCommit => 0 >> path, just as you would
 be with raw DBI.
 
 
-
 =head1 AUTHORS
 
 Matt S. Trout <mst@shadowcatsystems.co.uk>