use strict;
use warnings;
+use Carp::Clan qw/^DBIx::Class/;
use DBI;
use SQL::Abstract::Limit;
use DBIx::Class::Storage::DBI::Cursor;
__PACKAGE__->mk_group_accessors('simple' =>
qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
_conn_pid _conn_tid disable_sth_caching on_connect_do
- transaction_depth unsafe _dbh_autocommit/
+ on_disconnect_do transaction_depth unsafe _dbh_autocommit
+ auto_savepoint savepoints/
);
__PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
+__PACKAGE__->mk_group_accessors('inherited' => qw/sql_maker_class/);
+__PACKAGE__->sql_maker_class('DBIC::SQL::Abstract');
+
BEGIN {
-package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
+package # Hide from PAUSE
+ DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
use base qw/SQL::Abstract::Limit/;
my ($sql, @ret) = $self->SUPER::select(
$table, $self->_recurse_fields($fields), $where, $order, @rest
);
+ $sql .=
+ $self->{for} ?
+ (
+ $self->{for} eq 'update' ? ' FOR UPDATE' :
+ $self->{for} eq 'shared' ? ' FOR SHARE' :
+ ''
+ ) :
+ ''
+ ;
return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
}
$new->transaction_depth(0);
$new->_sql_maker_opts({});
+ $new->{savepoints} = [];
$new->{_in_dbh_do} = 0;
$new->{_dbh_gen} = 0;
=item on_connect_do
-This can be set to an arrayref of literal sql statements, which will
-be executed immediately after making the connection to the database
-every time we [re-]connect.
+Specifies things to do immediately after connecting or re-connecting to
+the database. Its value may contain:
+
+=over
+
+=item an array reference
+
+This contains SQL statements to execute in order. Each element contains
+a string or a code reference that returns a string.
+
+=item a code reference
+
+This contains some code to execute. Unlike code references within an
+array reference, its return value is ignored.
+
+=back
+
+=item on_disconnect_do
+
+Takes arguments in the same form as L<on_connect_do> and executes them
+immediately before disconnecting from the database.
+
+Note, this only runs if you explicitly call L<disconnect> on the
+storage object.
=item disable_sth_caching
especially if you set a C<HandleError> handler that suppresses exceptions
and/or disable C<RaiseError>.
+=item auto_savepoint
+
+If this option is true, L<DBIx::Class> will use savepoints when nesting
+transactions, making it possible to recover from failure in the inner
+transaction without having to abort all outer transactions.
+
=back
These options can be mixed in with your other L<DBI> connection attributes,
Another Important Note:
DBIC can do some wonderful magic with handling exceptions,
-disconnections, and transactions when you use C<AutoCommit => 1>
+disconnections, and transactions when you use C<< AutoCommit => 1 >>
combined with C<txn_do> for transaction support.
-If you set C<AutoCommit => 0> in your connect info, then you are always
+If you set C<< AutoCommit => 0 >> in your connect info, then you are always
in an assumed transaction between commits, and you're telling us you'd
like to manage that manually. A lot of DBIC's magic protections
go away. We can't protect you from exceptions due to database
disconnects because we don't know anything about how to restart your
transactions. You're on your own for handling all sorts of exceptional
-cases if you choose the C<AutoCommit => 0> path, just as you would
+cases if you choose the C<< AutoCommit => 0 >> path, just as you would
be with raw DBI.
Examples:
my $last_info = $dbi_info->[-1];
if(ref $last_info eq 'HASH') {
$last_info = { %$last_info }; # so delete is non-destructive
- for my $storage_opt (
- qw/on_connect_do disable_sth_caching unsafe cursor_class/
- ) {
+ my @storage_option = qw(
+ on_connect_do on_disconnect_do disable_sth_caching unsafe cursor_class
+ auto_savepoint
+ );
+ for my $storage_opt (@storage_option) {
if(my $value = delete $last_info->{$storage_opt}) {
$self->$storage_opt($value);
}
=head2 dbh_do
-Arguments: $subref, @extra_coderef_args?
+Arguments: ($subref | $method_name), @extra_coderef_args?
-Execute the given subref using the new exception-based connection management.
+Execute the given $subref or $method_name using the new exception-based
+connection management.
The first two arguments will be the storage object that C<dbh_do> was called
on and a database handle to use. Any additional arguments will be passed
sub dbh_do {
my $self = shift;
- my $coderef = shift;
+ my $code = shift;
- ref $coderef eq 'CODE' or $self->throw_exception
- ('$coderef must be a CODE reference');
+ my $dbh = $self->_dbh;
- return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do}
+ return $self->$code($dbh, @_) if $self->{_in_dbh_do}
|| $self->{transaction_depth};
local $self->{_in_dbh_do} = 1;
my $want_array = wantarray;
eval {
- $self->_verify_pid if $self->_dbh;
- $self->_populate_dbh if !$self->_dbh;
+ $self->_verify_pid if $dbh;
+ if(!$self->_dbh) {
+ $self->_populate_dbh;
+ $dbh = $self->_dbh;
+ }
+
if($want_array) {
- @result = $coderef->($self, $self->_dbh, @_);
+ @result = $self->$code($dbh, @_);
}
elsif(defined $want_array) {
- $result[0] = $coderef->($self, $self->_dbh, @_);
+ $result[0] = $self->$code($dbh, @_);
}
else {
- $coderef->($self, $self->_dbh, @_);
+ $self->$code($dbh, @_);
}
};
# We were not connected - reconnect and retry, but let any
# exception fall right through this time
$self->_populate_dbh;
- $coderef->($self, $self->_dbh, @_);
+ $self->$code($self->_dbh, @_);
}
# This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
ref $coderef eq 'CODE' or $self->throw_exception
('$coderef must be a CODE reference');
- return $coderef->(@_) if $self->{transaction_depth};
+ return $coderef->(@_) if $self->{transaction_depth} && ! $self->auto_savepoint;
local $self->{_in_dbh_do} = 1;
my ($self) = @_;
if( $self->connected ) {
+ my $connection_do = $self->on_disconnect_do;
+ $self->_do_connection_actions($connection_do) if ref($connection_do);
+
$self->_dbh->rollback unless $self->_dbh_autocommit;
$self->_dbh->disconnect;
$self->_dbh(undef);
sub _verify_pid {
my ($self) = @_;
- return if $self->_conn_pid == $$;
+ return if defined $self->_conn_pid && $self->_conn_pid == $$;
$self->_dbh->{InactiveDestroy} = 1;
$self->_dbh(undef);
sub sql_maker {
my ($self) = @_;
unless ($self->_sql_maker) {
- $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
+ my $sql_maker_class = $self->sql_maker_class;
+ $self->_sql_maker($sql_maker_class->new( $self->_sql_maker_args ));
}
return $self->_sql_maker;
}
+sub _rebless {}
+
sub _populate_dbh {
my ($self) = @_;
my @info = @{$self->_dbi_connect_info || []};
my $driver = $self->_dbh->{Driver}->{Name};
if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
bless $self, "DBIx::Class::Storage::DBI::${driver}";
- $self->_rebless() if $self->can('_rebless');
+ $self->_rebless();
}
}
- # if on-connect sql statements are given execute them
- foreach my $sql_statement (@{$self->on_connect_do || []}) {
- $self->_query_start($sql_statement);
- $self->_dbh->do($sql_statement);
- $self->_query_end($sql_statement);
- }
+ my $connection_do = $self->on_connect_do;
+ $self->_do_connection_actions($connection_do) if ref($connection_do);
$self->_conn_pid($$);
$self->_conn_tid(threads->tid) if $INC{'threads.pm'};
}
+sub _do_connection_actions {
+ my $self = shift;
+ my $connection_do = shift;
+
+ if (ref $connection_do eq 'ARRAY') {
+ $self->_do_query($_) foreach @$connection_do;
+ }
+ elsif (ref $connection_do eq 'CODE') {
+ $connection_do->();
+ }
+
+ return $self;
+}
+
+sub _do_query {
+ my ($self, $action) = @_;
+
+ if (ref $action eq 'CODE') {
+ $action = $action->($self);
+ $self->_do_query($_) foreach @$action;
+ }
+ else {
+ my @to_run = (ref $action eq 'ARRAY') ? (@$action) : ($action);
+ $self->_query_start(@to_run);
+ $self->_dbh->do(@to_run);
+ $self->_query_end(@to_run);
+ }
+
+ return $self;
+}
+
sub _connect {
my ($self, @info) = @_;
$dbh;
}
+sub svp_begin {
+ my ($self, $name) = @_;
+
+ $name = $self->_svp_generate_name
+ unless defined $name;
+
+ $self->throw_exception ("You can't use savepoints outside a transaction")
+ if $self->{transaction_depth} == 0;
+
+ $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+ unless $self->can('_svp_begin');
+
+ push @{ $self->{savepoints} }, $name;
+
+ $self->debugobj->svp_begin($name) if $self->debug;
+
+ return $self->_svp_begin($name);
+}
+
+sub svp_release {
+ my ($self, $name) = @_;
+
+ $self->throw_exception ("You can't use savepoints outside a transaction")
+ if $self->{transaction_depth} == 0;
+
+ $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+ unless $self->can('_svp_release');
+
+ if (defined $name) {
+ $self->throw_exception ("Savepoint '$name' does not exist")
+ unless grep { $_ eq $name } @{ $self->{savepoints} };
+
+ # Dig through the stack until we find the one we are releasing. This keeps
+ # the stack up to date.
+ my $svp;
+
+ do { $svp = pop @{ $self->{savepoints} } } while $svp ne $name;
+ } else {
+ $name = pop @{ $self->{savepoints} };
+ }
+
+ $self->debugobj->svp_release($name) if $self->debug;
+
+ return $self->_svp_release($name);
+}
+
+sub svp_rollback {
+ my ($self, $name) = @_;
+
+ $self->throw_exception ("You can't use savepoints outside a transaction")
+ if $self->{transaction_depth} == 0;
+
+ $self->throw_exception ("Your Storage implementation doesn't support savepoints")
+ unless $self->can('_svp_rollback');
+
+ if (defined $name) {
+ # If they passed us a name, verify that it exists in the stack
+ unless(grep({ $_ eq $name } @{ $self->{savepoints} })) {
+ $self->throw_exception("Savepoint '$name' does not exist!");
+ }
+
+ # Dig through the stack until we find the one we are releasing. This keeps
+ # the stack up to date.
+ while(my $s = pop(@{ $self->{savepoints} })) {
+ last if($s eq $name);
+ }
+ # Add the savepoint back to the stack, as a rollback doesn't remove the
+ # named savepoint, only everything after it.
+ push(@{ $self->{savepoints} }, $name);
+ } else {
+ # We'll assume they want to rollback to the last savepoint
+ $name = $self->{savepoints}->[-1];
+ }
+
+ $self->debugobj->svp_rollback($name) if $self->debug;
+
+ return $self->_svp_rollback($name);
+}
+
+sub _svp_generate_name {
+ my ($self) = @_;
+
+ return 'savepoint_'.scalar(@{ $self->{'savepoints'} });
+}
sub txn_begin {
my $self = shift;
# we should reconnect on begin_work
# for AutoCommit users
$self->dbh->begin_work;
+ } elsif ($self->auto_savepoint) {
+ $self->svp_begin;
}
$self->{transaction_depth}++;
}
if $self->_dbh_autocommit;
}
elsif($self->{transaction_depth} > 1) {
- $self->{transaction_depth}--
+ $self->{transaction_depth}--;
+ $self->svp_release
+ if $self->auto_savepoint;
}
}
}
elsif($self->{transaction_depth} > 1) {
$self->{transaction_depth}--;
+ if ($self->auto_savepoint) {
+ $self->svp_rollback;
+ $self->svp_release;
+ }
}
else {
die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
if ( $self->debug ) {
@bind = $self->_fix_bind_params(@bind);
+
$self->debugobj->query_start( $sql, @bind );
}
}
sub _execute {
my $self = shift;
- $self->dbh_do($self->can('_dbh_execute'), @_)
+ $self->dbh_do('_dbh_execute', @_)
}
sub insert {
my $ident = $source->from;
my $bind_attributes = $self->source_bind_attributes($source);
+ foreach my $col ( $source->columns ) {
+ if ( !defined $to_insert->{$col} ) {
+ my $col_info = $source->column_info($col);
+
+ if ( $col_info->{auto_nextval} ) {
+ $self->ensure_connected;
+ $to_insert->{$col} = $self->_sequence_fetch( 'nextval', $col_info->{sequence} || $self->_dbh_get_autoinc_seq($self->dbh, $source) );
+ }
+ }
+ }
+
$self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
return $to_insert;
sub _select {
my ($self, $ident, $select, $condition, $attrs) = @_;
my $order = $attrs->{order_by};
+
if (ref $condition eq 'SCALAR') {
$order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
}
+
+ my $for = delete $attrs->{for};
+ my $sql_maker = $self->sql_maker;
+ local $sql_maker->{for} = $for;
+
if (exists $attrs->{group_by} || $attrs->{having}) {
$order = {
group_by => $attrs->{group_by},
} else {
$self->throw_exception("rows attribute must be positive if present")
if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
+
+ # MySQL actually recommends this approach. I cringe.
+ $attrs->{rows} = 2**48 if not defined $attrs->{rows} and defined $attrs->{offset};
push @args, $attrs->{rows}, $attrs->{offset};
}
+
return $self->_execute(@args);
}
my $self = shift;
my ($rv, $sth, @bind) = $self->_select(@_);
my @row = $sth->fetchrow_array;
+ if(@row && $sth->fetchrow_array) {
+ carp "Query returned more than one row. SQL that returns multiple rows is DEPRECATED for ->find and ->single";
+ }
# Need to call finish() to work round broken DBDs
$sth->finish();
return @row;
}
+sub reload_row {
+ my ($self, $row) = @_;
+
+ my $reload = $row->result_source->resultset->find(
+ map { $row->$_ } $row->primary_columns
+ );
+
+ return $reload;
+}
+
=head2 sth
=over 4
sub sth {
my ($self, $sql) = @_;
- $self->dbh_do($self->can('_dbh_sth'), $sql);
+ $self->dbh_do('_dbh_sth', $sql);
}
sub _dbh_columns_info_for {
sub columns_info_for {
my ($self, $table) = @_;
- $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
+ $self->dbh_do('_dbh_columns_info_for', $table);
}
=head2 last_insert_id
sub last_insert_id {
my $self = shift;
- $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
+ $self->dbh_do('_dbh_last_insert_id', @_);
}
=head2 sqlt_type
=over 4
-=item Arguments: $schema \@databases, $version, $directory, $preversion, $sqlt_args
+=item Arguments: $schema \@databases, $version, $directory, $preversion, \%sqlt_args
=back
Creates a SQL file based on the Schema, for each of the specified
database types, in the given directory.
+By default, C<\%sqlt_args> will have
+
+ { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
+
+merged with the hash passed in. To disable any of those features, pass in a
+hashref like the following
+
+ { ignore_constraint_names => 0, # ... other options }
+
=cut
sub create_ddl_dir
$databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
$databases = [ $databases ] if(ref($databases) ne 'ARRAY');
$version ||= $schema->VERSION || '1.x';
- $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
+ $sqltargs = {
+ add_drop_table => 1,
+ ignore_constraint_names => 1,
+ ignore_index_names => 1,
+ %{$sqltargs || {}}
+ };
- $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.08: '}
+ $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.09: '}
. $self->_check_sqlt_message . q{'})
if !$self->_check_sqlt_version;
- my $sqlt = SQL::Translator->new({
-# debug => 1,
- add_drop_table => 1,
- });
+ my $sqlt = SQL::Translator->new( $sqltargs );
+
+ $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
+ my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
+
foreach my $db (@$databases)
{
$sqlt->reset();
- $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
-# $sqlt->parser_args({'DBIx::Class' => $schema);
$sqlt = $self->configure_sqlt($sqlt, $db);
- $sqlt->data($schema);
+ $sqlt->{schema} = $sqlt_schema;
$sqlt->producer($db);
my $file;
warn("No previous schema file found ($prefilename)");
next;
}
- #### We need to reparse the SQLite file we just wrote, so that
- ## Diff doesnt get all confoosed, and Diff is *very* confused.
- ## FIXME: rip Diff to pieces!
-# my $target_schema = $sqlt->schema;
-# unless ( $target_schema->name ) {
-# $target_schema->name( $filename );
-# }
- my @input;
- push @input, {file => $prefilename, parser => $db};
- push @input, {file => $filename, parser => $db};
- my ( $source_schema, $source_db, $target_schema, $target_db ) = map {
- my $file = $_->{'file'};
- my $parser = $_->{'parser'};
-
- my $t = SQL::Translator->new;
- $t->debug( 0 );
- $t->trace( 0 );
- $t->parser( $parser ) or die $t->error;
- my $out = $t->translate( $file ) or die $t->error;
- my $schema = $t->schema;
- unless ( $schema->name ) {
- $schema->name( $file );
- }
- ($schema, $parser);
- } @input;
- my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
- $target_schema, $db,
- {}
- );
my $difffile = $schema->ddl_filename($db, $dir, $version, $preversion);
print STDERR "Diff: $difffile: $db, $dir, $version, $preversion \n";
if(-e $difffile)
warn("$difffile already exists, skipping");
next;
}
+
+ my $source_schema;
+ {
+ my $t = SQL::Translator->new($sqltargs);
+ $t->debug( 0 );
+ $t->trace( 0 );
+ $t->parser( $db ) or die $t->error;
+ $t = $self->configure_sqlt($t, $db);
+ my $out = $t->translate( $prefilename ) or die $t->error;
+ $source_schema = $t->schema;
+ unless ( $source_schema->name ) {
+ $source_schema->name( $prefilename );
+ }
+ }
+
+ # The "new" style of producers have sane normalization and can support
+ # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
+ # And we have to diff parsed SQL against parsed SQL.
+ my $dest_schema = $sqlt_schema;
+
+ unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
+ my $t = SQL::Translator->new($sqltargs);
+ $t->debug( 0 );
+ $t->trace( 0 );
+ $t->parser( $db ) or die $t->error;
+ $t = $self->configure_sqlt($t, $db);
+ my $out = $t->translate( $filename ) or die $t->error;
+ $dest_schema = $t->schema;
+ $dest_schema->name( $filename )
+ unless $dest_schema->name;
+ }
+
+ my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
+ $dest_schema, $db,
+ $sqltargs
+ );
if(!open $file, ">$difffile")
{
$self->throw_exception("Can't write to $difffile ($!)");
return join('', @rows);
}
- $self->throw_exception(q{Can't deploy without SQL::Translator 0.08: '}
+ $self->throw_exception(q{Can't deploy without SQL::Translator 0.09: '}
. $self->_check_sqlt_message . q{'})
if !$self->_check_sqlt_version;
my $_check_sqlt_message; # private
sub _check_sqlt_version {
return $_check_sqlt_version if defined $_check_sqlt_version;
- eval 'use SQL::Translator 0.08';
- $_check_sqlt_message = $@ ? $@ : '';
- $_check_sqlt_version = $@ ? 0 : 1;
+ eval 'use SQL::Translator "0.09"';
+ $_check_sqlt_message = $@ || '';
+ $_check_sqlt_version = !$@;
}
sub _check_sqlt_message {
}
}
+=head2 is_replicating
+
+A boolean that reports if a particular L<DBIx::Class::Storage::DBI> is set to
+replicate from a master database. Default is undef, which is the result
+returned by databases that don't support replication.
+
+=cut
+
+sub is_replicating {
+ return;
+
+}
+
+=head2 lag_behind_master
+
+Returns a number that represents a certain amount of lag behind a master db
+when a given storage is replicating. The number is database dependent, but
+starts at zero and increases with the amount of lag. Default in undef
+
+=cut
+
+sub lag_behind_master {
+ return;
+}
+
sub DESTROY {
my $self = shift;
return if !$self->_dbh;