use strict;
use warnings;
-use base qw/DBIx::Class::Storage::DBI::UniqueIdentifier/;
+use base qw/
+ DBIx::Class::Storage::DBI::UniqueIdentifier
+ DBIx::Class::Storage::DBI::IdentityInsert
+/;
use mro 'c3';
+
use Try::Tiny;
use List::Util 'first';
use namespace::clean;
__PACKAGE__->mk_group_accessors(simple => qw/
- _identity _identity_method
+ _identity _identity_method _no_scope_identity_query
/);
-__PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MSSQL');
-
-sub _set_identity_insert {
- my ($self, $table) = @_;
-
- my $sql = sprintf (
- 'SET IDENTITY_INSERT %s ON',
- $self->sql_maker->_quote ($table),
- );
+__PACKAGE__->sql_maker_class('DBIx::Class::SQLMaker::MSSQL');
- my $dbh = $self->_get_dbh;
- try { $dbh->do ($sql) }
- catch {
- $self->throw_exception (sprintf "Error executing '%s': %s",
- $sql,
- $dbh->errstr,
- );
- };
-}
-
-sub _unset_identity_insert {
- my ($self, $table) = @_;
-
- my $sql = sprintf (
- 'SET IDENTITY_INSERT %s OFF',
- $self->sql_maker->_quote ($table),
- );
-
- my $dbh = $self->_get_dbh;
- $dbh->do ($sql);
-}
+__PACKAGE__->sql_quote_char([qw/[ ]/]);
-sub insert_bulk {
- my $self = shift;
- my ($source, $cols, $data) = @_;
+__PACKAGE__->datetime_parser_type (
+ 'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
+);
- my $is_identity_insert =
- (first { $source->column_info ($_)->{is_auto_increment} } @{$cols}) ? 1 : 0;
+__PACKAGE__->new_guid('NEWID()');
- if ($is_identity_insert) {
- $self->_set_identity_insert ($source->name);
- }
+sub __sql_server_x_or_higher {
+ my ($self, $version) = @_;
- $self->next::method(@_);
-
- if ($is_identity_insert) {
- $self->_unset_identity_insert ($source->name);
+ if (exists $_[0]->_server_info->{normalized_dbms_version}) {
+ if ($_[0]->_server_info->{normalized_dbms_version} >= $version) {
+ return 1
+ } else {
+ return 0
+ }
}
+ return undef;
}
-sub insert {
- my $self = shift;
- my ($source, $to_insert) = @_;
-
- my $supplied_col_info = $self->_resolve_column_info($source, [keys %$to_insert] );
-
- my $is_identity_insert =
- (first { $_->{is_auto_increment} } values %$supplied_col_info) ? 1 : 0;
-
- if ($is_identity_insert) {
- $self->_set_identity_insert ($source->name);
- }
-
- my $updated_cols = $self->next::method(@_);
-
- if ($is_identity_insert) {
- $self->_unset_identity_insert ($source->name);
- }
+sub __offset_bindtype { +{ dbd_attrs => DBI::SQL_INTEGER() } }
+sub __rows_bindtype { +{ dbd_attrs => DBI::SQL_INTEGER() } }
- return $updated_cols;
-}
+sub _sql_server_2005_or_higher { shift->__sql_server_x_or_higher(9) }
+sub _sql_server_2012_or_higher { shift->__sql_server_x_or_higher(11) }
sub _prep_for_execute {
my $self = shift;
- my ($op, $extra_bind, $ident, $args) = @_;
+ my ($op, $ident, $args) = @_;
# cast MONEY values properly
if ($op eq 'insert' || $op eq 'update') {
my $fields = $args->[0];
+ my $colinfo = $ident->columns_info([keys %$fields]);
+
for my $col (keys %$fields) {
# $ident is a result source object with INSERT/UPDATE ops
- if ($ident->column_info ($col)->{data_type}
- &&
- $ident->column_info ($col)->{data_type} =~ /^money\z/i) {
+ if (
+ $colinfo->{$col}{data_type}
+ &&
+ $colinfo->{$col}{data_type} =~ /^money\z/i
+ ) {
my $val = $fields->{$col};
$fields->{$col} = \['CAST(? AS MONEY)', [ $col => $val ]];
}
my ($sql, $bind) = $self->next::method (@_);
- if ($op eq 'insert') {
- $sql .= ';SELECT SCOPE_IDENTITY()';
-
+ # SELECT SCOPE_IDENTITY only works within a statement scope. We
+ # must try to always use this particular idiom first, as it is the
+ # only one that guarantees retrieving the correct id under high
+ # concurrency. When this fails we will fall back to whatever secondary
+ # retrieval method is specified in _identity_method, but at this
+ # point we don't have many guarantees we will get what we expected.
+ # http://msdn.microsoft.com/en-us/library/ms190315.aspx
+ # http://davidhayden.com/blog/dave/archive/2006/01/17/2736.aspx
+ if ($self->_perform_autoinc_retrieval and not $self->_no_scope_identity_query) {
+ $sql .= "\nSELECT SCOPE_IDENTITY()";
}
return ($sql, $bind);
sub _execute {
my $self = shift;
- my ($op) = @_;
- my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
+ # always list ctx - we need the $sth
+ my ($rv, $sth, @bind) = $self->next::method(@_);
- if ($op eq 'insert') {
+ if ($self->_perform_autoinc_retrieval) {
- # this should bring back the result of SELECT SCOPE_IDENTITY() we tacked
+ # attempt to bring back the result of SELECT SCOPE_IDENTITY() we tacked
# on in _prep_for_execute above
- my ($identity) = try { $sth->fetchrow_array };
+ my $identity;
+
+ # we didn't even try on ftds
+ unless ($self->_no_scope_identity_query) {
+ ($identity) = try { $sth->fetchrow_array };
+ $sth->finish;
+ }
# SCOPE_IDENTITY failed, but we can do something else
if ( (! $identity) && $self->_identity_method) {
}
$self->_identity($identity);
- $sth->finish;
}
return wantarray ? ($rv, $sth, @bind) : $rv;
# http://sqladvice.com/forums/permalink/18496/22931/ShowThread.aspx#22931
#
sub _select_args_to_query {
+ #my ($self, $ident, $select, $cond, $attrs) = @_;
my $self = shift;
+ my $attrs = $_[3];
- my ($sql, $prep_bind, @rest) = $self->next::method (@_);
+ my $sql_bind = $self->next::method (@_);
# see if this is an ordered subquery
- my $attrs = $_[3];
if (
- $sql !~ /^ \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
- &&
- scalar $self->_parse_order_by ($attrs->{order_by})
+ $$sql_bind->[0] !~ /^ \s* \( \s* SELECT \s+ TOP \s+ \d+ \s+ /xi
+ and
+ scalar $self->_extract_order_criteria ($attrs->{order_by})
) {
$self->throw_exception(
- 'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL
- ') unless $attrs->{unsafe_subselect_ok};
- my $max = $self->sql_maker->__max_int;
- $sql =~ s/^ \s* SELECT \s/SELECT TOP $max /xi;
+ 'An ordered subselect encountered - this is not safe! Please see "Ordered Subselects" in DBIx::Class::Storage::DBI::MSSQL'
+ ) unless $attrs->{unsafe_subselect_ok};
+
+ $$sql_bind->[0] =~ s/^ \s* \( \s* SELECT (?=\s) / '(SELECT TOP ' . $self->sql_maker->__max_int /exi;
}
- return wantarray
- ? ($sql, $prep_bind, @rest)
- : \[ "($sql)", @$prep_bind ]
- ;
+ $sql_bind;
}
# savepoint syntax is the same as in Sybase ASE
-sub _svp_begin {
+sub _exec_svp_begin {
my ($self, $name) = @_;
- $self->_get_dbh->do("SAVE TRANSACTION $name");
+ $self->_dbh->do("SAVE TRANSACTION $name");
}
# A new SAVE TRANSACTION with the same name releases the previous one.
-sub _svp_release { 1 }
+sub _exec_svp_release { 1 }
-sub _svp_rollback {
+sub _exec_svp_rollback {
my ($self, $name) = @_;
- $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
-}
-
-sub datetime_parser_type {
- 'DBIx::Class::Storage::DBI::MSSQL::DateTime::Format'
+ $self->_dbh->do("ROLLBACK TRANSACTION $name");
}
sub sqlt_type { 'SQLServer' }
sub sql_limit_dialect {
my $self = shift;
- my $supports_rno = 0;
+ my $supports_ofn = $self->_sql_server_2012_or_higher;
- if (exists $self->_server_info->{normalized_dbms_version}) {
- $supports_rno = 1 if $self->_server_info->{normalized_dbms_version} >= 9;
+ unless (defined $supports_ofn) {
+ # User is connecting via DBD::Sybase and has no permission to run
+ # stored procedures like xp_msver, or version detection failed for some
+ # other reason.
+ # So, we use a query to check if OFN is implemented.
+ try {
+ $self->_get_dbh->selectrow_array('SELECT 1 ORDER BY 1 OFFSET 0 ROWS');
+ $supports_ofn = 1;
+ };
}
- else {
+ return 'OffsetFetchNext' if $supports_ofn;
+
+ my $supports_rno = $self->_sql_server_2005_or_higher;
+
+ unless (defined $supports_rno) {
# User is connecting via DBD::Sybase and has no permission to run
# stored procedures like xp_msver, or version detection failed for some
# other reason.
$supports_rno = 1;
};
}
+ return 'RowNumberOver' if $supports_rno;
- return $supports_rno ? 'RowNumberOver' : 'Top';
+ return 'Top';
}
sub _ping {