discard changes now is forced to use master for replication. changed discard_changes...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 1a6d1f9..6c9b97d 100644 (file)
@@ -5,22 +5,29 @@ use base 'DBIx::Class::Storage';
 
 use strict;    
 use warnings;
+use Carp::Clan qw/^DBIx::Class/;
 use DBI;
 use SQL::Abstract::Limit;
 use DBIx::Class::Storage::DBI::Cursor;
 use DBIx::Class::Storage::Statistics;
-use IO::File;
-use Scalar::Util 'blessed';
+use Scalar::Util qw/blessed weaken/;
 
 __PACKAGE__->mk_group_accessors('simple' =>
     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
-       _conn_pid _conn_tid disable_sth_caching cursor on_connect_do
-       transaction_depth/
+       _conn_pid _conn_tid disable_sth_caching on_connect_do
+       on_disconnect_do transaction_depth unsafe _dbh_autocommit
+       auto_savepoint savepoints/
 );
 
+__PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
+
+__PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
+__PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
+
 BEGIN {
 
-package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
+package # Hide from PAUSE
+  DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
 
 use base qw/SQL::Abstract::Limit/;
 
@@ -82,6 +89,15 @@ sub select {
   my ($sql, @ret) = $self->SUPER::select(
     $table, $self->_recurse_fields($fields), $where, $order, @rest
   );
+  $sql .= 
+    $self->{for} ?
+    (
+      $self->{for} eq 'update' ? ' FOR UPDATE' :
+      $self->{for} eq 'shared' ? ' FOR SHARE'  :
+      ''
+    ) :
+    ''
+  ;
   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
 }
 
@@ -116,7 +132,7 @@ sub _emulate_limit {
 }
 
 sub _recurse_fields {
-  my ($self, $fields) = @_;
+  my ($self, $fields, $params) = @_;
   my $ref = ref $fields;
   return $self->_quote($fields) unless $ref;
   return $$fields if $ref eq 'SCALAR';
@@ -124,10 +140,10 @@ sub _recurse_fields {
   if ($ref eq 'ARRAY') {
     return join(', ', map {
       $self->_recurse_fields($_)
-      .(exists $self->{rownum_hack_count}
-         ? ' AS col'.$self->{rownum_hack_count}++
-         : '')
-     } @$fields);
+        .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
+          ? ' AS col'.$self->{rownum_hack_count}++
+          : '')
+      } @$fields);
   } elsif ($ref eq 'HASH') {
     foreach my $func (keys %$fields) {
       return $self->_sqlcase($func)
@@ -143,7 +159,7 @@ sub _order_by {
   if (ref $_[0] eq 'HASH') {
     if (defined $_[0]->{group_by}) {
       $ret = $self->_sqlcase(' group by ')
-               .$self->_recurse_fields($_[0]->{group_by});
+        .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
     }
     if (defined $_[0]->{having}) {
       my $frag;
@@ -312,9 +328,9 @@ documents DBI-specific methods and behaviors.
 sub new {
   my $new = shift->next::method(@_);
 
-  $new->cursor("DBIx::Class::Storage::DBI::Cursor");
   $new->transaction_depth(0);
   $new->_sql_maker_opts({});
+  $new->{savepoints} = [];
   $new->{_in_dbh_do} = 0;
   $new->{_dbh_gen} = 0;
 
@@ -347,9 +363,30 @@ connection-specific options:
 
 =item on_connect_do
 
-This can be set to an arrayref of literal sql statements, which will
-be executed immediately after making the connection to the database
-every time we [re-]connect.
+Specifies things to do immediately after connecting or re-connecting to
+the database.  Its value may contain:
+
+=over
+
+=item an array reference
+
+This contains SQL statements to execute in order.  Each element contains
+a string or a code reference that returns a string.
+
+=item a code reference
+
+This contains some code to execute.  Unlike code references within an
+array reference, its return value is ignored.
+
+=back
+
+=item on_disconnect_do
+
+Takes arguments in the same form as L<on_connect_do> and executes them
+immediately before disconnecting from the database.
+
+Note, this only runs if you explicitly call L<disconnect> on the
+storage object.
 
 =item disable_sth_caching
 
@@ -380,6 +417,28 @@ This only needs to be used in conjunction with L<quote_char>, and is used to
 specify the charecter that seperates elements (schemas, tables, columns) from 
 each other. In most cases this is simply a C<.>.
 
+=item unsafe
+
+This Storage driver normally installs its own C<HandleError>, sets
+C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
+all database handles, including those supplied by a coderef.  It does this
+so that it can have consistent and useful error behavior.
+
+If you set this option to a true value, Storage will not do its usual
+modifications to the database handle's attributes, and instead relies on
+the settings in your connect_info DBI options (or the values you set in
+your connection coderef, in the case that you are connecting via coderef).
+
+Note that your custom settings can cause Storage to malfunction,
+especially if you set a C<HandleError> handler that suppresses exceptions
+and/or disable C<RaiseError>.
+
+=item auto_savepoint
+
+If this option is true, L<DBIx::Class> will use savepoints when nesting
+transactions, making it possible to recover from failure in the inner
+transaction without having to abort all outer transactions.
+
 =back
 
 These options can be mixed in with your other L<DBI> connection attributes,
@@ -390,25 +449,19 @@ Every time C<connect_info> is invoked, any previous settings for
 these options will be cleared before setting the new ones, regardless of
 whether any options are specified in the new C<connect_info>.
 
-Important note:  DBIC expects the returned database handle provided by 
-a subref argument to have RaiseError set on it.  If it doesn't, things
-might not work very well, YMMV.  If you don't use a subref, DBIC will
-force this setting for you anyways.  Setting HandleError to anything
-other than simple exception object wrapper might cause problems too.
-
 Another Important Note:
 
 DBIC can do some wonderful magic with handling exceptions,
-disconnections, and transactions when you use C<AutoCommit =&gt; 1>
+disconnections, and transactions when you use C<< AutoCommit => 1 >>
 combined with C<txn_do> for transaction support.
 
-If you set C<AutoCommit =&gt; 0> in your connect info, then you are always
+If you set C<< AutoCommit => 0 >> in your connect info, then you are always
 in an assumed transaction between commits, and you're telling us you'd
 like to manage that manually.  A lot of DBIC's magic protections
 go away.  We can't protect you from exceptions due to database
 disconnects because we don't know anything about how to restart your
 transactions.  You're on your own for handling all sorts of exceptional
-cases if you choose the C<AutoCommit =&gt 0> path, just as you would
+cases if you choose the C<< AutoCommit => 0 >> path, just as you would
 be with raw DBI.
 
 Examples:
@@ -470,7 +523,12 @@ sub connect_info {
 
   my $last_info = $dbi_info->[-1];
   if(ref $last_info eq 'HASH') {
-    for my $storage_opt (qw/on_connect_do disable_sth_caching/) {
+    $last_info = { %$last_info }; # so delete is non-destructive
+    my @storage_option = qw(
+      on_connect_do on_disconnect_do disable_sth_caching unsafe cursor_class
+      auto_savepoint
+    );
+    for my $storage_opt (@storage_option) {
       if(my $value = delete $last_info->{$storage_opt}) {
         $self->$storage_opt($value);
       }
@@ -480,6 +538,8 @@ sub connect_info {
         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
       }
     }
+    # re-insert modified hashref
+    $dbi_info->[-1] = $last_info;
 
     # Get rid of any trailing empty hashref
     pop(@$dbi_info) if !keys %$last_info;
@@ -495,9 +555,10 @@ This method is deprecated in favor of setting via L</connect_info>.
 
 =head2 dbh_do
 
-Arguments: $subref, @extra_coderef_args?
+Arguments: ($subref | $method_name), @extra_coderef_args?
 
-Execute the given subref using the new exception-based connection management.
+Execute the given $subref or $method_name using the new exception-based
+connection management.
 
 The first two arguments will be the storage object that C<dbh_do> was called
 on and a database handle to use.  Any additional arguments will be passed
@@ -525,12 +586,11 @@ Example:
 
 sub dbh_do {
   my $self = shift;
-  my $coderef = shift;
+  my $code = shift;
 
-  ref $coderef eq 'CODE' or $self->throw_exception
-    ('$coderef must be a CODE reference');
+  my $dbh = $self->_dbh;
 
-  return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do}
+  return $self->$code($dbh, @_) if $self->{_in_dbh_do}
       || $self->{transaction_depth};
 
   local $self->{_in_dbh_do} = 1;
@@ -539,16 +599,20 @@ sub dbh_do {
   my $want_array = wantarray;
 
   eval {
-    $self->_verify_pid if $self->_dbh;
-    $self->_populate_dbh if !$self->_dbh;
+    $self->_verify_pid if $dbh;
+    if( !$dbh ) {
+        $self->_populate_dbh;
+        $dbh = $self->_dbh;
+    }
+
     if($want_array) {
-        @result = $coderef->($self, $self->_dbh, @_);
+        @result = $self->$code($dbh, @_);
     }
     elsif(defined $want_array) {
-        $result[0] = $coderef->($self, $self->_dbh, @_);
+        $result[0] = $self->$code($dbh, @_);
     }
     else {
-        $coderef->($self, $self->_dbh, @_);
+        $self->$code($dbh, @_);
     }
   };
 
@@ -560,7 +624,7 @@ sub dbh_do {
   # We were not connected - reconnect and retry, but let any
   #  exception fall right through this time
   $self->_populate_dbh;
-  $coderef->($self, $self->_dbh, @_);
+  $self->$code($self->_dbh, @_);
 }
 
 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
@@ -573,6 +637,8 @@ sub txn_do {
   ref $coderef eq 'CODE' or $self->throw_exception
     ('$coderef must be a CODE reference');
 
+  return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
+
   local $self->{_in_dbh_do} = 1;
 
   my @result;
@@ -633,7 +699,10 @@ sub disconnect {
   my ($self) = @_;
 
   if( $self->connected ) {
-    $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
+    my $connection_do = $self->on_disconnect_do;
+    $self->_do_connection_actions($connection_do) if ref($connection_do);
+
+    $self->_dbh->rollback unless $self->_dbh_autocommit;
     $self->_dbh->disconnect;
     $self->_dbh(undef);
     $self->{_dbh_gen}++;
@@ -651,6 +720,7 @@ sub connected {
       }
       else {
           $self->_verify_pid;
+          return 0 if !$self->_dbh;
       }
       return ($dbh->FETCH('Active') && $dbh->ping);
   }
@@ -663,7 +733,7 @@ sub connected {
 sub _verify_pid {
   my ($self) = @_;
 
-  return if $self->_conn_pid == $$;
+  return if defined $self->_conn_pid && $self->_conn_pid == $$;
 
   $self->_dbh->{InactiveDestroy} = 1;
   $self->_dbh(undef);
@@ -702,11 +772,14 @@ sub _sql_maker_args {
 sub sql_maker {
   my ($self) = @_;
   unless ($self->_sql_maker) {
-    $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
+    my $sql_maker_class = $self->sql_maker_class;
+    $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
   }
   return $self->_sql_maker;
 }
 
+sub _rebless {}
+
 sub _populate_dbh {
   my ($self) = @_;
   my @info = @{$self->_dbi_connect_info || []};
@@ -714,38 +787,65 @@ sub _populate_dbh {
 
   # Always set the transaction depth on connect, since
   #  there is no transaction in progress by definition
-  $self->{transaction_depth} = $self->_dbh->{AutoCommit} ? 0 : 1;
+  $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
 
   if(ref $self eq 'DBIx::Class::Storage::DBI') {
     my $driver = $self->_dbh->{Driver}->{Name};
     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
       bless $self, "DBIx::Class::Storage::DBI::${driver}";
-      $self->_rebless() if $self->can('_rebless');
+      $self->_rebless();
     }
   }
 
-  # if on-connect sql statements are given execute them
-  foreach my $sql_statement (@{$self->on_connect_do || []}) {
-    $self->debugobj->query_start($sql_statement) if $self->debug();
-    $self->_dbh->do($sql_statement);
-    $self->debugobj->query_end($sql_statement) if $self->debug();
-  }
+  my $connection_do = $self->on_connect_do;
+  $self->_do_connection_actions($connection_do) if ref($connection_do);
 
   $self->_conn_pid($$);
   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
 }
 
+sub _do_connection_actions {
+  my $self = shift;
+  my $connection_do = shift;
+
+  if (ref $connection_do eq 'ARRAY') {
+    $self->_do_query($_) foreach @$connection_do;
+  }
+  elsif (ref $connection_do eq 'CODE') {
+    $connection_do->();
+  }
+
+  return $self;
+}
+
+sub _do_query {
+  my ($self, $action) = @_;
+
+  if (ref $action eq 'CODE') {
+    $action = $action->($self);
+    $self->_do_query($_) foreach @$action;
+  }
+  else {
+    my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
+    $self->_query_start(@to_run);
+    $self->_dbh->do(@to_run);
+    $self->_query_end(@to_run);
+  }
+
+  return $self;
+}
+
 sub _connect {
   my ($self, @info) = @_;
 
   $self->throw_exception("You failed to provide any connection info")
-      if !@info;
+    if !@info;
 
   my ($old_connect_via, $dbh);
 
   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
-      $old_connect_via = $DBI::connect_via;
-      $DBI::connect_via = 'connect';
+    $old_connect_via = $DBI::connect_via;
+    $DBI::connect_via = 'connect';
   }
 
   eval {
@@ -754,9 +854,17 @@ sub _connect {
     }
     else {
        $dbh = DBI->connect(@info);
-       $dbh->{RaiseError} = 1;
-       $dbh->{PrintError} = 0;
-       $dbh->{PrintWarn} = 0;
+    }
+
+    if($dbh && !$self->unsafe) {
+      my $weak_self = $self;
+      weaken($weak_self);
+      $dbh->{HandleError} = sub {
+          $weak_self->throw_exception("DBI Exception: $_[0]")
+      };
+      $dbh->{ShowErrorStatement} = 1;
+      $dbh->{RaiseError} = 1;
+      $dbh->{PrintError} = 0;
     }
   };
 
@@ -765,20 +873,110 @@ sub _connect {
   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
     if !$dbh || $@;
 
+  $self->_dbh_autocommit($dbh->{AutoCommit});
+
   $dbh;
 }
 
+sub svp_begin {
+  my ($self, $name) = @_;
+
+  $name = $self->_svp_generate_name
+    unless defined $name;
+
+  $self->throw_exception ("You can't use savepoints outside a transaction")
+    if $self->{transaction_depth} == 0;
+
+  $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+    unless $self->can('_svp_begin');
+  
+  push @{ $self->{savepoints} }, $name;
+
+  $self->debugobj->svp_begin($name) if $self->debug;
+  
+  return $self->_svp_begin($name);
+}
+
+sub svp_release {
+  my ($self, $name) = @_;
+
+  $self->throw_exception ("You can't use savepoints outside a transaction")
+    if $self->{transaction_depth} == 0;
+
+  $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+    unless $self->can('_svp_release');
+
+  if (defined $name) {
+    $self->throw_exception ("Savepoint '$name' does not exist")
+      unless grep { $_ eq $name } @{ $self->{savepoints} };
+
+    # Dig through the stack until we find the one we are releasing.  This keeps
+    # the stack up to date.
+    my $svp;
+
+    do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
+  } else {
+    $name = pop @{ $self->{savepoints} };
+  }
+
+  $self->debugobj->svp_release($name) if $self->debug;
+
+  return $self->_svp_release($name);
+}
+
+sub svp_rollback {
+  my ($self, $name) = @_;
+
+  $self->throw_exception ("You can't use savepoints outside a transaction")
+    if $self->{transaction_depth} == 0;
+
+  $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+    unless $self->can('_svp_rollback');
+
+  if (defined $name) {
+      # If they passed us a name, verify that it exists in the stack
+      unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
+          $self->throw_exception("Savepoint '$name' does not exist!");
+      }
+
+      # Dig through the stack until we find the one we are releasing.  This keeps
+      # the stack up to date.
+      while(my $s = pop(@{ $self->{savepoints} })) {
+          last if($s eq $name);
+      }
+      # Add the savepoint back to the stack, as a rollback doesn't remove the
+      # named savepoint, only everything after it.
+      push(@{ $self->{savepoints} }, $name);
+  } else {
+      # We'll assume they want to rollback to the last savepoint
+      $name = $self->{savepoints}->[-1];
+  }
+
+  $self->debugobj->svp_rollback($name) if $self->debug;
+  
+  return $self->_svp_rollback($name);
+}
+
+sub _svp_generate_name {
+    my ($self) = @_;
+
+    return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
+}
 
 sub txn_begin {
   my $self = shift;
-  if($self->{transaction_depth}++ == 0) {
+  $self->ensure_connected();
+  if($self->{transaction_depth} == 0) {
     $self->debugobj->txn_begin()
       if $self->debug;
     # this isn't ->_dbh-> because
     #  we should reconnect on begin_work
     #  for AutoCommit users
     $self->dbh->begin_work;
+  } elsif ($self->auto_savepoint) {
+    $self->svp_begin;
   }
+  $self->{transaction_depth}++;
 }
 
 sub txn_commit {
@@ -789,28 +987,32 @@ sub txn_commit {
       if ($self->debug);
     $dbh->commit;
     $self->{transaction_depth} = 0
-      if $dbh->{AutoCommit};
+      if $self->_dbh_autocommit;
   }
   elsif($self->{transaction_depth} > 1) {
-    $self->{transaction_depth}--
+    $self->{transaction_depth}--;
+    $self->svp_release
+      if $self->auto_savepoint;
   }
 }
 
 sub txn_rollback {
   my $self = shift;
   my $dbh = $self->_dbh;
-  my $autocommit;
   eval {
-    $autocommit = $dbh->{AutoCommit};
     if ($self->{transaction_depth} == 1) {
       $self->debugobj->txn_rollback()
         if ($self->debug);
-      $dbh->rollback;
       $self->{transaction_depth} = 0
-        if $autocommit;
+        if $self->_dbh_autocommit;
+      $dbh->rollback;
     }
     elsif($self->{transaction_depth} > 1) {
       $self->{transaction_depth}--;
+      if ($self->auto_savepoint) {
+        $self->svp_rollback;
+        $self->svp_release;
+      }
     }
     else {
       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
@@ -821,7 +1023,7 @@ sub txn_rollback {
     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
     $error =~ /$exception_class/ and $self->throw_exception($error);
     # ensure that a failed rollback resets the transaction depth
-    $self->{transaction_depth} = $autocommit ? 0 : 1;
+    $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
     $self->throw_exception($error);
   }
 }
@@ -830,69 +1032,95 @@ sub txn_rollback {
 #  easier to override in NoBindVars without duping the rest.  It takes up
 #  all of _execute's args, and emits $sql, @bind.
 sub _prep_for_execute {
-  my ($self, $op, $extra_bind, $ident, @args) = @_;
+  my ($self, $op, $extra_bind, $ident, $args) = @_;
 
-  my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
+  my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
   unshift(@bind,
     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
       if $extra_bind;
-  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
 
-  return ($sql, @bind);
+  return ($sql, \@bind);
 }
 
-sub _execute {
-  my ($self, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
+sub _fix_bind_params {
+    my ($self, @bind) = @_;
+
+    ### Turn @bind from something like this:
+    ###   ( [ "artist", 1 ], [ "cdid", 1, 3 ] )
+    ### to this:
+    ###   ( "'1'", "'1'", "'3'" )
+    return
+        map {
+            if ( defined( $_ && $_->[1] ) ) {
+                map { qq{'$_'}; } @{$_}[ 1 .. $#$_ ];
+            }
+            else { q{'NULL'}; }
+        } @bind;
+}
+
+sub _query_start {
+    my ( $self, $sql, @bind ) = @_;
+
+    if ( $self->debug ) {
+        @bind = $self->_fix_bind_params(@bind);
+        
+        $self->debugobj->query_start( $sql, @bind );
+    }
+}
+
+sub _query_end {
+    my ( $self, $sql, @bind ) = @_;
+
+    if ( $self->debug ) {
+        @bind = $self->_fix_bind_params(@bind);
+        $self->debugobj->query_end( $sql, @bind );
+    }
+}
+
+sub _dbh_execute {
+  my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
   
   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
     $ident = $ident->from();
   }
-  
-  my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
-  unshift(@bind,
-    map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
-      if $extra_bind;
 
-  if ($self->debug) {
-      my @debug_bind =
-        map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind;
-      $self->debugobj->query_start($sql, @debug_bind);
-  }
+  my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
 
-  my $sth = eval { $self->sth($sql,$op) };
-  $self->throw_exception("no sth generated via sql ($@): $sql") if $@;
+  $self->_query_start( $sql, @$bind );
 
-  my $rv = eval {
-    my $placeholder_index = 1; 
+  my $sth = $self->sth($sql,$op);
 
-    foreach my $bound (@bind) {
-      my $attributes = {};
-      my($column_name, @data) = @$bound;
+  my $placeholder_index = 1; 
 
-      if ($bind_attributes) {
-        $attributes = $bind_attributes->{$column_name}
-        if defined $bind_attributes->{$column_name};
-      }
+  foreach my $bound (@$bind) {
+    my $attributes = {};
+    my($column_name, @data) = @$bound;
 
-      foreach my $data (@data) {
-        $data = ref $data ? ''.$data : $data; # stringify args
-
-        $sth->bind_param($placeholder_index, $data, $attributes);
-        $placeholder_index++;
-      }
+    if ($bind_attributes) {
+      $attributes = $bind_attributes->{$column_name}
+      if defined $bind_attributes->{$column_name};
     }
-    $sth->execute();
-  };
 
-  $self->throw_exception("Error executing '$sql': " . ($@ || $sth->errstr))
-    if $@ || !$rv;
+    foreach my $data (@data) {
+      $data = ref $data ? ''.$data : $data; # stringify args
 
-  if ($self->debug) {
-     my @debug_bind =
-       map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @bind; 
-     $self->debugobj->query_end($sql, @debug_bind);
+      $sth->bind_param($placeholder_index, $data, $attributes);
+      $placeholder_index++;
+    }
   }
-  return (wantarray ? ($rv, $sth, @bind) : $rv);
+
+  # Can this fail without throwing an exception anyways???
+  my $rv = $sth->execute();
+  $self->throw_exception($sth->errstr) if !$rv;
+
+  $self->_query_end( $sql, @$bind );
+
+  return (wantarray ? ($rv, $sth, @$bind) : $rv);
+}
+
+sub _execute {
+    my $self = shift;
+    $self->dbh_do('_dbh_execute', @_)
 }
 
 sub insert {
@@ -901,12 +1129,18 @@ sub insert {
   my $ident = $source->from; 
   my $bind_attributes = $self->source_bind_attributes($source);
 
-  eval { $self->_execute('insert' => [], $source, $bind_attributes, $to_insert) };
-  $self->throw_exception(
-    "Couldn't insert ".join(', ',
-      map "$_ => $to_insert->{$_}", keys %$to_insert
-    )." into ${ident}: $@"
-  ) if $@;
+  foreach my $col ( $source->columns ) {
+    if ( !defined $to_insert->{$col} ) {
+      my $col_info = $source->column_info($col);
+
+      if ( $col_info->{auto_nextval} ) {
+        $self->ensure_connected; 
+        $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
+      }
+    }
+  }
+
+  $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
 
   return $to_insert;
 }
@@ -922,16 +1156,11 @@ sub insert_bulk {
   @colvalues{@$cols} = (0..$#$cols);
   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
   
-  if ($self->debug) {
-      my @debug_bind = map { defined $_->[1] ? qq{$_->[1]} : q{'NULL'} } @bind;
-      $self->debugobj->query_start($sql, @debug_bind);
-  }
+  $self->_query_start( $sql, @bind );
   my $sth = $self->sth($sql);
 
 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
 
-  my $rv;
-  
   ## This must be an arrayref, else nothing works!
   
   my $tuple_status = [];
@@ -939,51 +1168,33 @@ sub insert_bulk {
   ##use Data::Dumper;
   ##print STDERR Dumper( $data, $sql, [@bind] );
 
-  if ($sth) {
-  
-    my $time = time();
+  my $time = time();
 
-    ## Get the bind_attributes, if any exist
-    my $bind_attributes = $self->source_bind_attributes($source);
-
-    ## Bind the values and execute
-    $rv = eval {
+  ## Get the bind_attributes, if any exist
+  my $bind_attributes = $self->source_bind_attributes($source);
 
-     my $placeholder_index = 1; 
+  ## Bind the values and execute
+  my $placeholder_index = 1; 
 
-        foreach my $bound (@bind) {
+  foreach my $bound (@bind) {
 
-          my $attributes = {};
-          my ($column_name, $data_index) = @$bound;
+    my $attributes = {};
+    my ($column_name, $data_index) = @$bound;
 
-          if( $bind_attributes ) {
-            $attributes = $bind_attributes->{$column_name}
-            if defined $bind_attributes->{$column_name};
-          }
+    if( $bind_attributes ) {
+      $attributes = $bind_attributes->{$column_name}
+      if defined $bind_attributes->{$column_name};
+    }
 
-          my @data = map { $_->[$data_index] } @$data;
+    my @data = map { $_->[$data_index] } @$data;
 
-          $sth->bind_param_array( $placeholder_index, [@data], $attributes );
-          $placeholder_index++;
-      }
-      $sth->execute_array( {ArrayTupleStatus => $tuple_status} );
-
-    };
-   
-    if ($@ || !defined $rv) {
-      my $errors = '';
-      foreach my $tuple (@$tuple_status) {
-          $errors .= "\n" . $tuple->[1] if(ref $tuple);
-      }
-      $self->throw_exception("Error executing '$sql': ".($@ || $errors));
-    }
-  } else {
-    $self->throw_exception("'$sql' did not generate a statement.");
-  }
-  if ($self->debug) {
-      my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
-      $self->debugobj->query_end($sql, @debug_bind);
+    $sth->bind_param_array( $placeholder_index, [@data], $attributes );
+    $placeholder_index++;
   }
+  my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
+  $self->throw_exception($sth->errstr) if !$rv;
+
+  $self->_query_end( $sql, @bind );
   return (wantarray ? ($rv, $sth, @bind) : $rv);
 }
 
@@ -1008,9 +1219,15 @@ sub delete {
 sub _select {
   my ($self, $ident, $select, $condition, $attrs) = @_;
   my $order = $attrs->{order_by};
+
   if (ref $condition eq 'SCALAR') {
     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
   }
+
+  my $for = delete $attrs->{for};
+  my $sql_maker = $self->sql_maker;
+  local $sql_maker->{for} = $for;
+
   if (exists $attrs->{group_by} || $attrs->{having}) {
     $order = {
       group_by => $attrs->{group_by},
@@ -1026,8 +1243,12 @@ sub _select {
   } else {
     $self->throw_exception("rows attribute must be positive if present")
       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
+
+    # MySQL actually recommends this approach.  I cringe.
+    $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
     push @args, $attrs->{rows}, $attrs->{offset};
   }
+
   return $self->_execute(@args);
 }
 
@@ -1060,18 +1281,43 @@ Handle a SQL select statement.
 sub select {
   my $self = shift;
   my ($ident, $select, $condition, $attrs) = @_;
-  return $self->cursor->new($self, \@_, $attrs);
+  return $self->cursor_class->new($self, \@_, $attrs);
 }
 
 sub select_single {
   my $self = shift;
   my ($rv, $sth, @bind) = $self->_select(@_);
   my @row = $sth->fetchrow_array;
+  if(@row && $sth->fetchrow_array) {
+    carp "Query returned more than one row.  SQL that returns multiple rows is DEPRECATED for ->find and ->single";
+  }
   # Need to call finish() to work round broken DBDs
   $sth->finish();
   return @row;
 }
 
+sub reload_row { 
+  my ($self, $row) = @_;
+  delete $row->{_dirty_columns};
+  return unless $row->in_storage; # Don't reload if we aren't real!
+
+  my $reload = $row->result_source->resultset->find(
+    map { $row->$_ } $row->primary_columns
+  );
+  unless ($reload) { # If we got deleted in the mean-time
+    $row->in_storage(0);
+    return $row;
+  }
+
+  $row = %$reload;
+  
+  # Avoid a possible infinite loop with
+  # sub DESTROY { $_[0]->discard_changes }
+  bless $reload, 'Do::Not::Exist';
+
+  return $row;
+}
+
 =head2 sth
 
 =over 4
@@ -1094,14 +1340,14 @@ sub _dbh_sth {
 
   # XXX You would think RaiseError would make this impossible,
   #  but apparently that's not true :(
-  die $dbh->errstr if !$sth;
+  $self->throw_exception($dbh->errstr) if !$sth;
 
   $sth;
 }
 
 sub sth {
   my ($self, $sql) = @_;
-  $self->dbh_do($self->can('_dbh_sth'), $sql);
+  $self->dbh_do('_dbh_sth', $sql);
 }
 
 sub _dbh_columns_info_for {
@@ -1163,7 +1409,7 @@ sub _dbh_columns_info_for {
 
 sub columns_info_for {
   my ($self, $table) = @_;
-  $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
+  $self->dbh_do('_dbh_columns_info_for', $table);
 }
 
 =head2 last_insert_id
@@ -1180,7 +1426,7 @@ sub _dbh_last_insert_id {
 
 sub last_insert_id {
   my $self = shift;
-  $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
+  $self->dbh_do('_dbh_last_insert_id', @_);
 }
 
 =head2 sqlt_type
@@ -1205,7 +1451,7 @@ sub bind_attribute_by_data_type {
     return;
 }
 
-=head2 create_ddl_dir (EXPERIMENTAL)
+=head2 create_ddl_dir
 
 =over 4
 
@@ -1216,9 +1462,6 @@ sub bind_attribute_by_data_type {
 Creates a SQL file based on the Schema, for each of the specified
 database types, in the given directory.
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly
-across all databases, or fully handle complex relationships.
-
 =cut
 
 sub create_ddl_dir
@@ -1235,20 +1478,20 @@ sub create_ddl_dir
   $version ||= $schema->VERSION || '1.x';
   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
 
-  eval "use SQL::Translator";
-  $self->throw_exception("Can't create a ddl file without SQL::Translator: $@") if $@;
+  $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
+      . $self->_check_sqlt_message . q{'})
+          if !$self->_check_sqlt_version;
+
+  my $sqlt = SQL::Translator->new( $sqltargs );
+
+  $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
+  my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
 
-  my $sqlt = SQL::Translator->new({
-#      debug => 1,
-      add_drop_table => 1,
-  });
   foreach my $db (@$databases)
   {
     $sqlt->reset();
-    $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
-#    $sqlt->parser_args({'DBIx::Class' => $schema);
     $sqlt = $self->configure_sqlt($sqlt, $db);
-    $sqlt->data($schema);
+    $sqlt->{schema} = $sqlt_schema;
     $sqlt->producer($db);
 
     my $file;
@@ -1256,31 +1499,25 @@ sub create_ddl_dir
     if(-e $filename)
     {
       warn("$filename already exists, skipping $db");
-      next;
-    }
-
-    my $output = $sqlt->translate;
-    if(!$output)
-    {
-      warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
-      next;
-    }
-    if(!open($file, ">$filename"))
-    {
-        $self->throw_exception("Can't open $filename for writing ($!)");
-        next;
-    }
-    print $file $output;
-    close($file);
-
-    if($preversion)
-    {
-      eval "use SQL::Translator::Diff";
-      if($@)
+      next unless ($preversion);
+    } else {
+      my $output = $sqlt->translate;
+      if(!$output)
       {
-        warn("Can't diff versions without SQL::Translator::Diff: $@");
+        warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
         next;
       }
+      if(!open($file, ">$filename"))
+      {
+          $self->throw_exception("Can't open $filename for writing ($!)");
+          next;
+      }
+      print $file $output;
+      close($file);
+    } 
+    if($preversion)
+    {
+      require SQL::Translator::Diff;
 
       my $prefilename = $schema->ddl_filename($db, $dir, $preversion);
 #      print "Previous version $prefilename\n";
@@ -1289,36 +1526,7 @@ sub create_ddl_dir
         warn("No previous schema file found ($prefilename)");
         next;
       }
-      #### We need to reparse the SQLite file we just wrote, so that 
-      ##   Diff doesnt get all confoosed, and Diff is *very* confused.
-      ##   FIXME: rip Diff to pieces!
-#      my $target_schema = $sqlt->schema;
-#      unless ( $target_schema->name ) {
-#        $target_schema->name( $filename );
-#      }
-      my @input;
-      push @input, {file => $prefilename, parser => $db};
-      push @input, {file => $filename, parser => $db};
-      my ( $source_schema, $source_db, $target_schema, $target_db ) = map {
-        my $file   = $_->{'file'};
-        my $parser = $_->{'parser'};
-
-        my $t = SQL::Translator->new;
-        $t->debug( 0 );
-        $t->trace( 0 );
-        $t->parser( $parser )            or die $t->error;
-        my $out = $t->translate( $file ) or die $t->error;
-        my $schema = $t->schema;
-        unless ( $schema->name ) {
-          $schema->name( $file );
-        }
-        ($schema, $parser);
-      } @input;
 
-      my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
-                                                    $target_schema, $db,
-                                                    {}
-                                                   );
       my $difffile = $schema->ddl_filename($db, $dir, $version, $preversion);
       print STDERR "Diff: $difffile: $db, $dir, $version, $preversion \n";
       if(-e $difffile)
@@ -1326,6 +1534,43 @@ sub create_ddl_dir
         warn("$difffile already exists, skipping");
         next;
       }
+
+      my $source_schema;
+      {
+        my $t = SQL::Translator->new($sqltargs);
+        $t->debug( 0 );
+        $t->trace( 0 );
+        $t->parser( $db )                       or die $t->error;
+        $t = $self->configure_sqlt($t, $db);
+        my $out = $t->translate( $prefilename ) or die $t->error;
+        $source_schema = $t->schema;
+        unless ( $source_schema->name ) {
+          $source_schema->name( $prefilename );
+        }
+      }
+
+      # The "new" style of producers have sane normalization and can support 
+      # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
+      # And we have to diff parsed SQL against parsed SQL.
+      my $dest_schema = $sqlt_schema;
+
+      unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
+        my $t = SQL::Translator->new($sqltargs);
+        $t->debug( 0 );
+        $t->trace( 0 );
+        $t->parser( $db )                    or die $t->error;
+        $t = $self->configure_sqlt($t, $db);
+        my $out = $t->translate( $filename ) or die $t->error;
+        $dest_schema = $t->schema;
+        $dest_schema->name( $filename )
+          unless $dest_schema->name;
+      }
+
+      $DB::single = 1;
+      my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
+                                                    $dest_schema,   $db,
+                                                    $sqltargs
+                                                   );
       if(!open $file, ">$difffile")
       { 
         $self->throw_exception("Can't write to $difffile ($!)");
@@ -1389,25 +1634,23 @@ sub deployment_statements {
       return join('', @rows);
   }
 
-  eval "use SQL::Translator";
-  if(!$@)
-  {
-    eval "use SQL::Translator::Parser::DBIx::Class;";
-    $self->throw_exception($@) if $@;
-    eval "use SQL::Translator::Producer::${type};";
-    $self->throw_exception($@) if $@;
-
-    # sources needs to be a parser arg, but for simplicty allow at top level 
-    # coming in
-    $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
-        if exists $sqltargs->{sources};
-
-    my $tr = SQL::Translator->new(%$sqltargs);
-    SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
-    return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
-  }
+  $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
+      . $self->_check_sqlt_message . q{'})
+          if !$self->_check_sqlt_version;
+
+  require SQL::Translator::Parser::DBIx::Class;
+  eval qq{use SQL::Translator::Producer::${type}};
+  $self->throw_exception($@) if $@;
+
+  # sources needs to be a parser arg, but for simplicty allow at top level 
+  # coming in
+  $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
+      if exists $sqltargs->{sources};
+
+  my $tr = SQL::Translator->new(%$sqltargs);
+  SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
+  return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
 
-  $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
   return;
 
 }
@@ -1415,16 +1658,21 @@ sub deployment_statements {
 sub deploy {
   my ($self, $schema, $type, $sqltargs, $dir) = @_;
   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
-    for ( split(";\n", $statement)) {
-      next if($_ =~ /^--/);
-      next if(!$_);
-#      next if($_ =~ /^DROP/m);
-      next if($_ =~ /^BEGIN TRANSACTION/m);
-      next if($_ =~ /^COMMIT/m);
-      next if $_ =~ /^\s+$/; # skip whitespace only
-      $self->debugobj->query_start($_) if $self->debug;
-      $self->dbh->do($_); # shouldn't be using ->dbh ?
-      $self->debugobj->query_end($_) if $self->debug;
+    foreach my $line ( split(";\n", $statement)) {
+      next if($line =~ /^--/);
+      next if(!$line);
+#      next if($line =~ /^DROP/m);
+      next if($line =~ /^BEGIN TRANSACTION/m);
+      next if($line =~ /^COMMIT/m);
+      next if $line =~ /^\s+$/; # skip whitespace only
+      $self->_query_start($line);
+      eval {
+        $self->dbh->do($line); # shouldn't be using ->dbh ?
+      };
+      if ($@) {
+        warn qq{$@ (running "${line}")};
+      }
+      $self->_query_end($line);
     }
   }
 }
@@ -1437,7 +1685,10 @@ Returns the datetime parser class
 
 sub datetime_parser {
   my $self = shift;
-  return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
+  return $self->{datetime_parser} ||= do {
+    $self->ensure_connected;
+    $self->build_datetime_parser(@_);
+  };
 }
 
 =head2 datetime_parser_type
@@ -1463,6 +1714,47 @@ sub build_datetime_parser {
   return $type;
 }
 
+{
+    my $_check_sqlt_version; # private
+    my $_check_sqlt_message; # private
+    sub _check_sqlt_version {
+        return $_check_sqlt_version if defined $_check_sqlt_version;
+        eval 'use SQL::Translator "0.09"';
+        $_check_sqlt_message = $@ || '';
+        $_check_sqlt_version = !$@;
+    }
+
+    sub _check_sqlt_message {
+        _check_sqlt_version if !defined $_check_sqlt_message;
+        $_check_sqlt_message;
+    }
+}
+
+=head2 is_replicating
+
+A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
+replicate from a master database.  Default is undef, which is the result
+returned by databases that don't support replication.
+
+=cut
+
+sub is_replicating {
+    return;
+    
+}
+
+=head2 lag_behind_master
+
+Returns a number that represents a certain amount of lag behind a master db
+when a given storage is replicating.  The number is database dependent, but
+starts at zero and increases with the amount of lag. Default in undef
+
+=cut
+
+sub lag_behind_master {
+    return;
+}
+
 sub DESTROY {
   my $self = shift;
   return if !$self->_dbh;