Merge 'trunk' into 'mssql_top_fixes'
Peter Rabbitson [Tue, 30 Jun 2009 15:09:03 +0000 (17:09 +0200)]
12 files changed:
lib/DBIx/Class.pm
lib/DBIx/Class/ResultSet.pm
lib/DBIx/Class/SQLAHacks.pm
lib/DBIx/Class/Schema.pm
lib/DBIx/Class/Storage/DBI.pm
lib/DBIx/Class/Storage/DBI/AmbiguousGlob.pm [new file with mode: 0644]
lib/DBIx/Class/Storage/DBI/MSSQL.pm
lib/DBIx/Class/Storage/DBI/ODBC.pm
lib/DBIx/Class/Storage/DBI/ODBC/Microsoft_SQL_Server.pm
lib/DBIx/Class/Storage/DBI/mysql.pm
t/42toplimit.t
t/746mssql.t

index bed13d2..677ddb7 100644 (file)
@@ -3,11 +3,12 @@ package DBIx::Class;
 use strict;
 use warnings;
 
+use MRO::Compat;
+
 use vars qw($VERSION);
 use base qw/DBIx::Class::Componentised Class::Accessor::Grouped/;
 use DBIx::Class::StartupCheck;
 
-
 sub mk_classdata {
   shift->mk_classaccessor(@_);
 }
index b012e03..d34378a 100644 (file)
@@ -1254,8 +1254,14 @@ sub _count_subq_rs {
 
   $sub_attrs->{select} = $rsrc->storage->_subq_count_select ($rsrc, $sub_attrs);
 
+  # this is so that ordering can be thrown away in things like Top limit
+  $sub_attrs->{-for_count_only} = 1;
+
+  my $sub_rs = $rsrc->resultset_class->new ($rsrc, $sub_attrs);
   $attrs->{from} = [{
-    count_subq => $rsrc->resultset_class->new ($rsrc, $sub_attrs )->as_query
+    -alias => 'count_subq',
+    -source_handle => $rsrc->handle,
+    count_subq => $sub_rs->as_query,
   }];
 
   # the subquery replaces this
@@ -2708,8 +2714,10 @@ sub _resolved_attrs {
   # Although this is needed only if the order_by is not defined, it is
   # actually cheaper to just populate this rather than properly examining
   # order_by (stuf like [ {} ] and the like)
-  $attrs->{_virtual_order_by} = [ $self->result_source->primary_columns ];
-
+  my $prefix = $alias . ($source->schema->storage->sql_maker->{name_sep} || '.');
+  $attrs->{_virtual_order_by} = [
+    map { $prefix . $_ } ($source->primary_columns)
+  ];
 
   $attrs->{collapse} ||= {};
   if ( my $prefetch = delete $attrs->{prefetch} ) {
index a454cd5..0494a6a 100644 (file)
@@ -12,12 +12,13 @@ BEGIN {
   no warnings qw/redefine/;
   no strict qw/refs/;
   for my $f (qw/carp croak/) {
+
     my $orig = \&{"SQL::Abstract::$f"};
     *{"SQL::Abstract::$f"} = sub {
 
       local $Carp::CarpLevel = 1;   # even though Carp::Clan ignores this, $orig will not
 
-      if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+\(\) called/) {
+      if (Carp::longmess() =~ /DBIx::Class::SQLAHacks::[\w]+ .+? called \s at/x) {
         __PACKAGE__->can($f)->(@_);
       }
       else {
@@ -116,37 +117,179 @@ SQL
 sub _Top {
   my ( $self, $sql, $order, $rows, $offset ) = @_;
 
+  # mangle the input sql so it can be properly aliased in the outer queries
+  $sql =~ s/^ \s* SELECT \s+ (.+?) \s+ (?=FROM)//ix
+    or croak "Unrecognizable SELECT: $sql";
+  my $sql_select = $1;
+  my @sql_select = split (/\s*,\s*/, $sql_select);
+
+  # we can't support subqueries (in fact MSSQL can't) - croak
+  if (@sql_select != @{$self->{_dbic_rs_attrs}{select}}) {
+    croak (sprintf (
+      'SQL SELECT did not parse cleanly - retrieved %d comma separated elements, while '
+    . 'the resultset select attribure contains %d elements: %s',
+      scalar @sql_select,
+      scalar @{$self->{_dbic_rs_attrs}{select}},
+      $sql_select,
+    ));
+  }
+
+  my $name_sep = $self->name_sep || '.';
+  $name_sep = "\Q$name_sep\E";
+  my $col_re = qr/ ^ (?: (.+) $name_sep )? ([^$name_sep]+) $ /x;
+
+  # construct the new select lists, rename(alias) some columns if necessary
+  my (@outer_select, @inner_select, %seen_names, %col_aliases, %outer_col_aliases);
+
+  for (@{$self->{_dbic_rs_attrs}{select}}) {
+    next if ref $_;
+    my ($table, $orig_colname) = ( $_ =~ $col_re );
+    next unless $table;
+    $seen_names{$orig_colname}++;
+  }
+
+  for my $i (0 .. $#sql_select) {
+
+    my $colsel_arg = $self->{_dbic_rs_attrs}{select}[$i];
+    my $colsel_sql = $sql_select[$i];
+
+    # this may or may not work (in case of a scalarref or something)
+    my ($table, $orig_colname) = ( $colsel_arg =~ $col_re );
+
+    my $quoted_alias;
+    # do not attempt to understand non-scalar selects - alias numerically
+    if (ref $colsel_arg) {
+      $quoted_alias = $self->_quote ('column_' . (@inner_select + 1) );
+    }
+    # column name seen more than once - alias it
+    elsif ($orig_colname && ($seen_names{$orig_colname} > 1) ) {
+      $quoted_alias = $self->_quote ("${table}__${orig_colname}");
+    }
+
+    # we did rename - make a record and adjust
+    if ($quoted_alias) {
+      # alias inner
+      push @inner_select, "$colsel_sql AS $quoted_alias";
+
+      # push alias to outer
+      push @outer_select, $quoted_alias;
+
+      # Any aliasing accumulated here will be considered
+      # both for inner and outer adjustments of ORDER BY
+      $self->__record_alias (
+        \%col_aliases,
+        $quoted_alias,
+        $colsel_arg,
+        $table ? $orig_colname : undef,
+      );
+    }
+
+    # otherwise just leave things intact inside, and use the abbreviated one outside
+    # (as we do not have table names anymore)
+    else {
+      push @inner_select, $colsel_sql;
+
+      my $outer_quoted = $self->_quote ($orig_colname);  # it was not a duplicate so should just work
+      push @outer_select, $outer_quoted;
+      $self->__record_alias (
+        \%outer_col_aliases,
+        $outer_quoted,
+        $colsel_arg,
+        $table ? $orig_colname : undef,
+      );
+    }
+  }
+
+  my $outer_select = join (', ', @outer_select );
+  my $inner_select = join (', ', @inner_select );
+
+  %outer_col_aliases = (%outer_col_aliases, %col_aliases);
+
+  # deal with order
   croak '$order supplied to SQLAHacks limit emulators must be a hash'
     if (ref $order ne 'HASH');
 
   $order = { %$order }; #copy
 
-  my $last = $rows + $offset;
-
-  my $req_order = $self->_order_by ($order->{order_by});
+  my $req_order = $order->{order_by};
+  my $limit_order =
+    scalar $self->_order_by_chunks ($req_order) # examine normalized version, collapses nesting
+      ? $req_order
+      : $order->{_virtual_order_by}
+  ;
 
-  my $limit_order = $req_order ? $order->{order_by} : $order->{_virtual_order_by};
+  my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
+  my $order_by_requested = $self->_order_by ($req_order);
 
+  # generate the rest
   delete $order->{$_} for qw/order_by _virtual_order_by/;
   my $grpby_having = $self->_order_by ($order);
 
-  my ( $order_by_inner, $order_by_outer ) = $self->_order_directions($limit_order);
+  # short circuit for counts - the ordering complexity is needless
+  if ($self->{_dbic_rs_attrs}{-for_count_only}) {
+    return "SELECT TOP $rows $inner_select $sql $grpby_having $order_by_outer";
+  }
+
+  # we can't really adjust the order_by columns, as introspection is lacking
+  # resort to simple substitution
+  for my $col (keys %outer_col_aliases) {
+    for ($order_by_requested, $order_by_outer) {
+      $_ =~ s/\s+$col\s+/ $outer_col_aliases{$col} /g;
+    }
+  }
+  for my $col (keys %col_aliases) {
+    $order_by_inner =~ s/\s+$col\s+/$col_aliases{$col}/g;
+  }
 
-  $sql =~ s/^\s*(SELECT|select)//;
 
-  $sql = <<"SQL";
-  SELECT * FROM
-  (
-    SELECT TOP $rows * FROM
+  my $inner_lim = $rows + $offset;
+
+  $sql = "SELECT TOP $inner_lim $inner_select $sql $grpby_having $order_by_inner";
+
+  if ($offset) {
+    $sql = <<"SQL";
+
+    SELECT TOP $rows $outer_select FROM
     (
-        SELECT TOP $last $sql $grpby_having $order_by_inner
-    ) AS foo
+      $sql
+    ) AS me
     $order_by_outer
-  ) AS bar
-  $req_order
+SQL
 
+  }
+
+  if ($order_by_requested) {
+    $sql = <<"SQL";
+
+    SELECT $outer_select FROM
+      ( $sql ) AS me
+    $order_by_requested;
 SQL
-    return $sql;
+
+  }
+
+  return $sql;
+}
+
+# action at a distance to shorten Top code above
+sub __record_alias {
+  my ($self, $register, $alias, $fqcol, $col) = @_;
+
+  # record qualified name
+  $register->{$fqcol} = $alias;
+  $register->{$self->_quote($fqcol)} = $alias;
+
+  return unless $col;
+
+  # record unqialified name, undef (no adjustment) if a duplicate is found
+  if (exists $register->{$col}) {
+    $register->{$col} = undef;
+  }
+  else {
+    $register->{$col} = $alias;
+  }
+
+  $register->{$self->_quote($col)} = $register->{$col};
 }
 
 
@@ -158,6 +301,10 @@ sub _find_syntax {
   return $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
 }
 
+my $for_syntax = {
+  update => 'FOR UPDATE',
+  shared => 'FOR SHARE',
+};
 sub select {
   my ($self, $table, $fields, $where, $order, @rest) = @_;
 
@@ -177,15 +324,10 @@ sub select {
   my ($sql, @where_bind) = $self->SUPER::select(
     $table, $self->_recurse_fields($fields), $where, $order, @rest
   );
-  $sql .= 
-    $self->{for} ?
-    (
-      $self->{for} eq 'update' ? ' FOR UPDATE' :
-      $self->{for} eq 'shared' ? ' FOR SHARE'  :
-      ''
-    ) :
-    ''
-  ;
+  if (my $for = delete $self->{_dbic_rs_attrs}{for}) {
+    $sql .= " $for_syntax->{$for}" if $for_syntax->{$for};
+  }
+
   return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
 }
 
index 80c43d9..bf2d76e 100644 (file)
@@ -7,9 +7,8 @@ use DBIx::Class::Exception;
 use Carp::Clan qw/^DBIx::Class/;
 use Scalar::Util qw/weaken/;
 use File::Spec;
-use MRO::Compat;
 use Sub::Name ();
-require Module::Find;
+use Module::Find();
 
 use base qw/DBIx::Class/;
 
index 1cbc7ef..6af9431 100644 (file)
@@ -3,7 +3,7 @@ package DBIx::Class::Storage::DBI;
 
 use base 'DBIx::Class::Storage';
 
-use strict;    
+use strict;
 use warnings;
 use Carp::Clan qw/^DBIx::Class/;
 use DBI;
@@ -90,8 +90,8 @@ recognized by DBIx::Class:
 
 =item *
 
-A single code reference which returns a connected 
-L<DBI database handle|DBI/connect> optionally followed by 
+A single code reference which returns a connected
+L<DBI database handle|DBI/connect> optionally followed by
 L<extra attributes|/DBIx::Class specific connection attributes> recognized
 by DBIx::Class:
 
@@ -110,7 +110,7 @@ mixed together:
     %extra_attributes,
   }];
 
-This is particularly useful for L<Catalyst> based applications, allowing the 
+This is particularly useful for L<Catalyst> based applications, allowing the
 following config (L<Config::General> style):
 
   <Model::DB>
@@ -129,7 +129,7 @@ Please note that the L<DBI> docs recommend that you always explicitly
 set C<AutoCommit> to either I<0> or I<1>.  L<DBIx::Class> further
 recommends that it be set to I<1>, and that you perform transactions
 via our L<DBIx::Class::Schema/txn_do> method.  L<DBIx::Class> will set it
-to I<1> if you do not do explicitly set it to zero.  This is the default 
+to I<1> if you do not do explicitly set it to zero.  This is the default
 for most DBDs. See L</DBIx::Class and AutoCommit> for details.
 
 =head3 DBIx::Class specific connection attributes
@@ -268,7 +268,7 @@ storage object.
 If set to a true value, this option will disable the caching of
 statement handles via L<DBI/prepare_cached>.
 
-=item limit_dialect 
+=item limit_dialect
 
 Sets the limit dialect. This is useful for JDBC-bridge among others
 where the remote SQL-dialect cannot be determined by the name of the
@@ -276,7 +276,7 @@ driver alone. See also L<SQL::Abstract::Limit>.
 
 =item quote_char
 
-Specifies what characters to use to quote table and column names. If 
+Specifies what characters to use to quote table and column names. If
 you use this you will want to specify L</name_sep> as well.
 
 C<quote_char> expects either a single character, in which case is it
@@ -288,8 +288,8 @@ SQL Server you should use C<< quote_char => [qw/[ ]/] >>.
 
 =item name_sep
 
-This only needs to be used in conjunction with C<quote_char>, and is used to 
-specify the charecter that seperates elements (schemas, tables, columns) from 
+This only needs to be used in conjunction with C<quote_char>, and is used to
+specify the charecter that seperates elements (schemas, tables, columns) from
 each other. In most cases this is simply a C<.>.
 
 The consequences of not supplying this value is that L<SQL::Abstract>
@@ -764,8 +764,10 @@ sub _determine_driver {
       ($driver) = $self->_dbi_connect_info->[0] =~ /dbi:([^:]+):/i;
     }
 
-    if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
-      bless $self, "DBIx::Class::Storage::DBI::${driver}";
+    my $storage_class = "DBIx::Class::Storage::DBI::${driver}";
+    if ($self->load_optional_class($storage_class)) {
+      mro::set_mro($storage_class, 'c3');
+      bless $self, $storage_class;
       $self->_rebless();
     }
   }
@@ -891,11 +893,11 @@ sub svp_begin {
 
   $self->throw_exception ("Your Storage implementation doesn't support savepoints")
     unless $self->can('_svp_begin');
-  
+
   push @{ $self->{savepoints} }, $name;
 
   $self->debugobj->svp_begin($name) if $self->debug;
-  
+
   return $self->_svp_begin($name);
 }
 
@@ -955,7 +957,7 @@ sub svp_rollback {
   }
 
   $self->debugobj->svp_rollback($name) if $self->debug;
-  
+
   return $self->_svp_rollback($name);
 }
 
@@ -1093,7 +1095,7 @@ sub _dbh_execute {
 
   my $sth = $self->sth($sql,$op);
 
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@$bind) {
     my $attributes = {};
@@ -1152,7 +1154,7 @@ sub insert {
 }
 
 ## Still not quite perfect, and EXPERIMENTAL
-## Currently it is assumed that all values passed will be "normal", i.e. not 
+## Currently it is assumed that all values passed will be "normal", i.e. not
 ## scalar refs, or at least, all the same type as the first set, the statement is
 ## only prepped once.
 sub insert_bulk {
@@ -1161,7 +1163,7 @@ sub insert_bulk {
   my $table = $source->from;
   @colvalues{@$cols} = (0..$#$cols);
   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
-  
+
   $self->_query_start( $sql, @bind );
   my $sth = $self->sth($sql);
 
@@ -1174,7 +1176,7 @@ sub insert_bulk {
   my $bind_attributes = $self->source_bind_attributes($source);
 
   ## Bind the values and execute
-  my $placeholder_index = 1; 
+  my $placeholder_index = 1;
 
   foreach my $bound (@bind) {
 
@@ -1222,7 +1224,7 @@ sub update {
   my $self = shift @_;
   my $source = shift @_;
   my $bind_attributes = $self->source_bind_attributes($source);
-  
+
   return $self->_execute('update' => [], $source, $bind_attributes, @_);
 }
 
@@ -1230,9 +1232,9 @@ sub update {
 sub delete {
   my $self = shift @_;
   my $source = shift @_;
-  
+
   my $bind_attrs = $self->source_bind_attributes($source);
-  
+
   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
 }
 
@@ -1331,10 +1333,10 @@ sub _select {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   return $self->_execute($self->_select_args(@_));
 }
@@ -1343,10 +1345,10 @@ sub _select_args_to_query {
   my $self = shift;
 
   # localization is neccessary as
-  # 1) there is no infrastructure to pass this around (easy to do, but will wait)
+  # 1) there is no infrastructure to pass this around before SQLA2
   # 2) _select_args sets it and _prep_for_execute consumes it
   my $sql_maker = $self->sql_maker;
-  local $sql_maker->{for};
+  local $sql_maker->{_dbic_rs_attrs};
 
   # my ($op, $bind, $ident, $bind_attrs, $select, $cond, $order, $rows, $offset)
   #  = $self->_select_args($ident, $select, $cond, $attrs);
@@ -1367,7 +1369,14 @@ sub _select_args {
   my ($self, $ident, $select, $where, $attrs) = @_;
 
   my $sql_maker = $self->sql_maker;
-  my $alias2source = $self->_resolve_ident_sources ($ident);
+  $sql_maker->{_dbic_rs_attrs} = {
+    %$attrs,
+    select => $select,
+    from => $ident,
+    where => $where,
+  };
+
+  my ($alias2source, $root_alias) = $self->_resolve_ident_sources ($ident);
 
   # calculate bind_attrs before possible $ident mangling
   my $bind_attrs = {};
@@ -1379,7 +1388,7 @@ sub _select_args {
       $bind_attrs->{$fqcn} = $bindtypes->{$col} if $bindtypes->{$col};
 
       # so that unqualified searches can be bound too
-      $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq 'me';
+      $bind_attrs->{$col} = $bind_attrs->{$fqcn} if $alias eq $root_alias;
     }
   }
 
@@ -1419,26 +1428,24 @@ sub _select_args {
   };
 
 
-  $sql_maker->{for} = delete $attrs->{for};
-
   return ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $where, $order, @limit);
 }
 
 sub _adjust_select_args_for_limited_prefetch {
   my ($self, $from, $select, $where, $attrs) = @_;
 
-  if ($attrs->{group_by} and @{$attrs->{group_by}}) {
-    $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a group_by attribute');
+  if ($attrs->{group_by} && @{$attrs->{group_by}}) {
+    $self->throw_exception ('has_many prefetch with limit (rows/offset) is not supported on grouped resultsets');
   }
 
-  $self->throw_exception ('Prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
+  $self->throw_exception ('has_many prefetch with limit (rows/offset) is not supported on resultsets with a custom from attribute')
     if (ref $from ne 'ARRAY');
 
 
   # separate attributes
   my $sub_attrs = { %$attrs };
   delete $attrs->{$_} for qw/where bind rows offset/;
-  delete $sub_attrs->{$_} for qw/for collapse select order_by/;
+  delete $sub_attrs->{$_} for qw/for collapse select as order_by/;
 
   my $alias = $attrs->{alias};
 
@@ -1563,10 +1570,17 @@ sub _adjust_select_args_for_limited_prefetch {
   );
 
   # put it in the new {from}
-  unshift @outer_from, { $alias => $subq };
+  unshift @outer_from, {
+    -alias => $alias,
+    -source_handle => $select_root->{-source_handle},
+    $alias => $subq,
+  };
 
   # This is totally horrific - the $where ends up in both the inner and outer query
-  # Unfortunately not much can be done until SQLA2 introspection arrives
+  # Unfortunately not much can be done until SQLA2 introspection arrives, and even
+  # then if where conditions apply to the *right* side of the prefetch, you may have
+  # to both filter the inner select (e.g. to apply a limit) and then have to re-filter
+  # the outer select to exclude joins you didin't want in the first place
   #
   # OTOH it can be seen as a plus: <ash> (notes that this query would make a DBA cry ;)
   return (\@outer_from, $select, $where, $attrs);
@@ -1576,12 +1590,14 @@ sub _resolve_ident_sources {
   my ($self, $ident) = @_;
 
   my $alias2source = {};
+  my $root_alias;
 
   # the reason this is so contrived is that $ident may be a {from}
   # structure, specifying multiple tables to join
   if ( Scalar::Util::blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
     # this is compat mode for insert/update/delete which do not deal with aliases
     $alias2source->{me} = $ident;
+    $root_alias = 'me';
   }
   elsif (ref $ident eq 'ARRAY') {
 
@@ -1589,6 +1605,7 @@ sub _resolve_ident_sources {
       my $tabinfo;
       if (ref $_ eq 'HASH') {
         $tabinfo = $_;
+        $root_alias = $tabinfo->{-alias};
       }
       if (ref $_ eq 'ARRAY' and ref $_->[0] eq 'HASH') {
         $tabinfo = $_->[0];
@@ -1599,7 +1616,35 @@ sub _resolve_ident_sources {
     }
   }
 
-  return $alias2source;
+  return ($alias2source, $root_alias);
+}
+
+# Takes $ident, \@column_names
+#
+# returns { $column_name => \%column_info, ... }
+# also note: this adds -result_source => $rsrc to the column info
+#
+# usage:
+#   my $col_sources = $self->_resolve_column_info($ident, [map $_->[0], @{$bind}]);
+sub _resolve_column_info {
+  my ($self, $ident, $colnames) = @_;
+  my ($alias2src, $root_alias) = $self->_resolve_ident_sources($ident);
+
+  my $sep = $self->_sql_maker_opts->{name_sep} || '.';
+  $sep = "\Q$sep\E";
+
+  my (%return, %converted);
+  foreach my $col (@$colnames) {
+    my ($alias, $colname) = $col =~ m/^ (?: ([^$sep]+) $sep)? (.+) $/x;
+
+    # deal with unqualified cols - we assume the main alias for all
+    # unqualified ones, ugly but can't think of anything better right now
+    $alias ||= $root_alias;
+
+    my $rsrc = $alias2src->{$alias};
+    $return{$col} = $rsrc && { %{$rsrc->column_info($colname)}, -result_source => $rsrc };
+  }
+  return \%return;
 }
 
 # Returns a counting SELECT for a simple count
@@ -1881,13 +1926,13 @@ By default, C<\%sqlt_args> will have
 
  { add_drop_table => 1, ignore_constraint_names => 1, ignore_index_names => 1 }
 
-merged with the hash passed in. To disable any of those features, pass in a 
+merged with the hash passed in. To disable any of those features, pass in a
 hashref like the following
 
  { ignore_constraint_names => 0, # ... other options }
 
 
-Note that this feature is currently EXPERIMENTAL and may not work correctly 
+Note that this feature is currently EXPERIMENTAL and may not work correctly
 across all databases, or fully handle complex relationships.
 
 WARNING: Please check all SQL files created, before applying them.
@@ -1908,7 +1953,7 @@ sub create_ddl_dir {
   $version ||= $schema_version;
 
   $sqltargs = {
-    add_drop_table => 1, 
+    add_drop_table => 1,
     ignore_constraint_names => 1,
     ignore_index_names => 1,
     %{$sqltargs || {}}
@@ -1948,7 +1993,7 @@ sub create_ddl_dir {
     }
     print $file $output;
     close($file);
-  
+
     next unless ($preversion);
 
     require SQL::Translator::Diff;
@@ -1964,7 +2009,7 @@ sub create_ddl_dir {
       carp("Overwriting existing diff file - $difffile");
       unlink($difffile);
     }
-    
+
     my $source_schema;
     {
       my $t = SQL::Translator->new($sqltargs);
@@ -1983,7 +2028,7 @@ sub create_ddl_dir {
         unless ( $source_schema->name );
     }
 
-    # The "new" style of producers have sane normalization and can support 
+    # The "new" style of producers have sane normalization and can support
     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
     # And we have to diff parsed SQL against parsed SQL.
     my $dest_schema = $sqlt_schema;
@@ -2004,12 +2049,12 @@ sub create_ddl_dir {
       $dest_schema->name( $filename )
         unless $dest_schema->name;
     }
-    
+
     my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
                                                   $dest_schema,   $db,
                                                   $sqltargs
                                                  );
-    if(!open $file, ">$difffile") { 
+    if(!open $file, ">$difffile") {
       $self->throw_exception("Can't write to $difffile ($!)");
       next;
     }
@@ -2053,7 +2098,7 @@ sub deployment_statements {
   if(-f $filename)
   {
       my $file;
-      open($file, "<$filename") 
+      open($file, "<$filename")
         or $self->throw_exception("Can't open $filename ($!)");
       my @rows = <$file>;
       close($file);
@@ -2068,7 +2113,7 @@ sub deployment_statements {
   eval qq{use SQL::Translator::Producer::${type}};
   $self->throw_exception($@) if $@;
 
-  # sources needs to be a parser arg, but for simplicty allow at top level 
+  # sources needs to be a parser arg, but for simplicty allow at top level
   # coming in
   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
       if exists $sqltargs->{sources};
@@ -2173,7 +2218,7 @@ returned by databases that don't support replication.
 
 sub is_replicating {
     return;
-    
+
 }
 
 =head2 lag_behind_master
diff --git a/lib/DBIx/Class/Storage/DBI/AmbiguousGlob.pm b/lib/DBIx/Class/Storage/DBI/AmbiguousGlob.pm
new file mode 100644 (file)
index 0000000..a7ed94b
--- /dev/null
@@ -0,0 +1,43 @@
+package DBIx::Class::Storage::DBI::AmbiguousGlob;
+
+use strict;
+use warnings;
+
+use base 'DBIx::Class::Storage::DBI';
+
+=head1 NAME
+
+DBIx::Class::Storage::DBI::AmbiguousGlob - Storage component for RDBMS supporting multicolumn in clauses
+
+=head1 DESCRIPTION
+
+Some servers choke on things like:
+
+  COUNT(*) FROM (SELECT tab1.col, tab2.col FROM tab1 JOIN tab2 ... )
+
+claiming that col is a duplicate column (it loses the table specifiers by
+the time it gets to the *). Thus for any subquery count we select only the
+primary keys of the main table in the inner query. This hopefully still
+hits the indexes and keeps the server happy.
+
+At this point the only overriden method is C<_subq_count_select()>
+
+=cut
+
+sub _subq_count_select {
+  my ($self, $source, $rs_attrs) = @_;
+  my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
+  return @pcols ? \@pcols : [ 1 ];
+}
+
+=head1 AUTHORS
+
+See L<DBIx::Class/CONTRIBUTORS>
+
+=head1 LICENSE
+
+You may distribute this code under the same terms as Perl itself.
+
+=cut
+
+1;
index 40afe76..c6b9360 100644 (file)
@@ -3,7 +3,7 @@ package DBIx::Class::Storage::DBI::MSSQL;
 use strict;
 use warnings;
 
-use base qw/DBIx::Class::Storage::DBI/;
+use base qw/DBIx::Class::Storage::DBI::AmbiguousGlob DBIx::Class::Storage::DBI/;
 
 sub _dbh_last_insert_id {
   my ($self, $dbh, $source, $col) = @_;
index 09a7c6b..40d12aa 100644 (file)
@@ -11,9 +11,11 @@ sub _rebless {
     unless ( $@ ) {
         # Translate the backend name into a perl identifier
         $dbtype =~ s/\W/_/gi;
-        my $class = "DBIx::Class::Storage::DBI::ODBC::${dbtype}";
-        eval "require $class";
-        bless $self, $class unless $@;
+        my $subclass = "DBIx::Class::Storage::DBI::ODBC::${dbtype}";
+        if ($self->load_optional_class($subclass) && !$self->isa($subclass)) {
+            bless $self, $subclass;
+            $self->_rebless;
+        }
     }
 }
 
index 3a464fd..1b661f8 100644 (file)
@@ -3,15 +3,54 @@ use strict;
 use warnings;
 
 use base qw/DBIx::Class::Storage::DBI::MSSQL/;
+use List::Util();
+
+sub insert_bulk {
+  my ($self, $source, $cols, $data) = @_;
+
+  my $identity_insert = 0;
+
+  COLUMNS:
+  foreach my $col (@{$cols}) {
+    if ($source->column_info($col)->{is_auto_increment}) {
+      $identity_insert = 1;
+      last COLUMNS;
+    }
+  }
+
+  if ($identity_insert) {
+    my $table = $source->from;
+    $self->dbh->do("SET IDENTITY_INSERT $table ON");
+  }
+
+  next::method(@_);
+
+  if ($identity_insert) {
+    my $table = $source->from;
+    $self->dbh->do("SET IDENTITY_INSERT $table OFF");
+  }
+}
 
 sub _prep_for_execute {
-    my $self = shift;
-    my ($op, $extra_bind, $ident, $args) = @_;
+  my $self = shift;
+  my ($op, $extra_bind, $ident, $args) = @_;
+
+  my ($sql, $bind) = $self->next::method (@_);
 
-    my ($sql, $bind) = $self->next::method (@_);
-    $sql .= ';SELECT SCOPE_IDENTITY()' if $op eq 'insert';
+  if ($op eq 'insert') {
+    $sql .= ';SELECT SCOPE_IDENTITY()';
+
+    my $col_info = $self->_resolve_column_info($ident, [map $_->[0], @{$bind}]);
+    if (List::Util::first { $_->{is_auto_increment} } (values %$col_info) ) {
+
+      my $table = $ident->from;
+      my $identity_insert_on = "SET IDENTITY_INSERT $table ON";
+      my $identity_insert_off = "SET IDENTITY_INSERT $table OFF";
+      $sql = "$identity_insert_on; $sql; $identity_insert_off";
+    }
+  }
 
-    return ($sql, $bind);
+  return ($sql, $bind);
 }
 
 sub _execute {
@@ -64,3 +103,4 @@ Marc Mims C<< <marc@questright.com> >>
 You may distribute this code under the same terms as Perl itself.
 
 =cut
+# vim: sw=2 sts=2
index 53b8e16..9432d0c 100644 (file)
@@ -3,7 +3,11 @@ package DBIx::Class::Storage::DBI::mysql;
 use strict;
 use warnings;
 
-use base qw/DBIx::Class::Storage::DBI::MultiColumnIn/;
+use base qw/
+  DBIx::Class::Storage::DBI::MultiColumnIn
+  DBIx::Class::Storage::DBI::AmbiguousGlob
+  DBIx::Class::Storage::DBI
+/;
 
 __PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::MySQL');
 
@@ -41,7 +45,7 @@ sub _svp_rollback {
 
     $self->dbh->do("ROLLBACK TO SAVEPOINT $name")
 }
+
 sub is_replicating {
     my $status = shift->dbh->selectrow_hashref('show slave status');
     return ($status->{Slave_IO_Running} eq 'Yes') && ($status->{Slave_SQL_Running} eq 'Yes');
@@ -57,19 +61,6 @@ sub _subq_update_delete {
   return shift->_per_row_update_delete (@_);
 }
 
-# MySql chokes on things like:
-# COUNT(*) FROM (SELECT tab1.col, tab2.col FROM tab1 JOIN tab2 ... )
-# claiming that col is a duplicate column (it loses the table specifiers by
-# the time it gets to the *). Thus for any subquery count we select only the
-# primary keys of the main table in the inner query. This hopefully still
-# hits the indexes and keeps mysql happy.
-# (mysql does not care if the SELECT and the GROUP BY match)
-sub _subq_count_select {
-  my ($self, $source, $rs_attrs) = @_;
-  my @pcols = map { join '.', $rs_attrs->{alias}, $_ } ($source->primary_columns);
-  return @pcols ? \@pcols : [ 1 ];
-}
-
 1;
 
 =head1 NAME
index f63b74c..96c4fa8 100644 (file)
@@ -14,7 +14,7 @@ my $schema = DBICTest->init_schema;
 delete $schema->storage->_sql_maker->{_cached_syntax};
 $schema->storage->_sql_maker->limit_dialect ('Top');
 
-my $rs = $schema->resultset ('FourKeys')->search ({}, { rows => 1, offset => 3 });
+my $rs = $schema->resultset ('BooksInLibrary')->search ({}, { prefetch => 'owner', rows => 1, offset => 3 });
 
 sub test_order {
   my $args = shift;
@@ -34,7 +34,7 @@ sub test_order {
       ) bar
       $req_order
     )",
-    [],
+    [ [ source => 'Library' ] ],
   );
 }
 
@@ -122,17 +122,22 @@ plan (tests => scalar @tests + 1);
 test_order ($_) for @tests;
 
 is_same_sql_bind (
-  $rs->search ({}, { group_by => 'bar', order_by => 'bar' })->as_query,
+  $rs->search ({}, { group_by => 'title', order_by => 'title' })->as_query,
   '(
-    SELECT * FROM
-    (
-      SELECT TOP 1 * FROM
-      (
-        SELECT TOP 4  me.foo, me.bar, me.hello, me.goodbye, me.sensors, me.read_count FROM fourkeys me GROUP BY bar ORDER BY bar ASC
-      ) AS foo
-      ORDER BY bar DESC
-    ) AS bar
-    ORDER BY bar
+    SELECT me__id, source, owner, title, price, owner__id, name 
+      FROM (
+        SELECT TOP 1 me__id, source, owner, title, price, owner__id, name 
+          FROM (
+            SELECT TOP 4 me.id AS me__id, me.source, me.owner, me.title, me.price, owner.id AS owner__id, owner.name
+              FROM books me
+              JOIN owners owner ON owner.id = me.owner
+            WHERE ( source = ? )
+            GROUP BY title
+            ORDER BY title ASC
+          ) AS me
+        ORDER BY title DESC
+      ) AS me
+    ORDER BY title;
   )',
-  [],
+  [ [ source => 'Library' ] ],
 );
index f53d49f..a7edb6f 100644 (file)
@@ -1,18 +1,20 @@
 use strict;
-use warnings;  
+use warnings;
 
 use Test::More;
+use Test::Exception;
 use lib qw(t/lib);
 use DBICTest;
+use DBIC::SqlMakerTest;
 
 my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_MSSQL_ODBC_${_}" } qw/DSN USER PASS/};
 
 plan skip_all => 'Set $ENV{DBICTEST_MSSQL_ODBC_DSN}, _USER and _PASS to run this test'
   unless ($dsn && $user);
 
-plan tests => 13;
+plan tests => 27;
 
-my $schema = DBICTest::Schema->connect($dsn, $user, $pass, {AutoCommit => 1});
+my $schema = DBICTest::Schema->connect($dsn, $user, $pass);
 
 {
   no warnings 'redefine';
@@ -47,7 +49,7 @@ SQL
 my %seen_id;
 
 # fresh $schema so we start unconnected
-$schema = DBICTest::Schema->connect($dsn, $user, $pass, {AutoCommit => 1});
+$schema = DBICTest::Schema->connect($dsn, $user, $pass);
 
 # test primary key handling
 my $new = $schema->resultset('Artist')->create({ name => 'foo' });
@@ -73,10 +75,162 @@ $it->next;
 is( $it->next->name, "Artist 2", "iterator->next ok" );
 is( $it->next, undef, "next past end of resultset ok" );
 
+$schema->storage->dbh_do (sub {
+    my ($storage, $dbh) = @_;
+    eval { $dbh->do("DROP TABLE Owners") };
+    eval { $dbh->do("DROP TABLE Books") };
+    $dbh->do(<<'SQL');
+
+
+CREATE TABLE Books (
+   id INT IDENTITY (1, 1) NOT NULL,
+   source VARCHAR(100),
+   owner INT,
+   title VARCHAR(10),
+   price INT NULL
+)
+
+CREATE TABLE Owners (
+   id INT IDENTITY (1, 1) NOT NULL,
+   name VARCHAR(100),
+)
+
+SQL
+
+});
+
+lives_ok ( sub {
+  $schema->populate ('Owners', [
+    [qw/id  name  /],
+    [qw/1   wiggle/],
+    [qw/2   woggle/],
+    [qw/3   boggle/],
+    [qw/4   fREW/],
+    [qw/5   fRIOUX/],
+    [qw/6   fROOH/],
+    [qw/7   fRUE/],
+    [qw/8   fISMBoC/],
+    [qw/9   station/],
+    [qw/10   mirror/],
+    [qw/11   dimly/],
+    [qw/12   face_to_face/],
+    [qw/13   icarus/],
+    [qw/14   dream/],
+    [qw/15   dyrstyggyr/],
+  ]);
+}, 'populate with PKs supplied ok' );
+
+lives_ok ( sub {
+  $schema->populate ('BooksInLibrary', [
+    [qw/source  owner title   /],
+    [qw/Library 1     secrets0/],
+    [qw/Library 1     secrets1/],
+    [qw/Eatery  1     secrets2/],
+    [qw/Library 2     secrets3/],
+    [qw/Library 3     secrets4/],
+    [qw/Eatery  3     secrets5/],
+    [qw/Library 4     secrets6/],
+    [qw/Library 5     secrets7/],
+    [qw/Eatery  5     secrets8/],
+    [qw/Library 6     secrets9/],
+    [qw/Library 7     secrets10/],
+    [qw/Eatery  7     secrets11/],
+    [qw/Library 8     secrets12/],
+  ]);
+}, 'populate without PKs supplied ok' );
+
+#
+# try a prefetch on tables with identically named columns
+#
+
+# set quote char - make sure things work while quoted
+$schema->storage->_sql_maker->{quote_char} = [qw/[ ]/];
+$schema->storage->_sql_maker->{name_sep} = '.';
+
+{
+  # try a ->has_many direction
+  my $owners = $schema->resultset ('Owners')->search ({
+      'books.id' => { '!=', undef }
+    }, {
+      prefetch => 'books',
+      order_by => 'name',
+      rows     => 3,  # 8 results total
+    });
+
+  is ($owners->page(1)->all, 3, 'has_many prefetch returns correct number of rows');
+  is ($owners->page(1)->count, 3, 'has-many prefetch returns correct count');
+
+  TODO: {
+    local $TODO = 'limit past end of resultset problem';
+    is ($owners->page(3)->all, 2, 'has_many prefetch returns correct number of rows');
+    is ($owners->page(3)->count, 2, 'has-many prefetch returns correct count');
+    is ($owners->page(3)->count_rs->next, 2, 'has-many prefetch returns correct count_rs');
+
+    # make sure count does not become overly complex FIXME
+    is_same_sql_bind (
+      $owners->page(3)->count_rs->as_query,
+      '(
+        SELECT COUNT( * )
+          FROM (
+            SELECT TOP 3 [me].[id]
+              FROM [owners] [me]
+              LEFT JOIN [books] [books] ON [books].[owner] = [me].[id]
+            WHERE ( [books].[id] IS NOT NULL )
+            GROUP BY [me].[id]
+            ORDER BY [me].[id] DESC
+          ) [count_subq]
+      )',
+      [],
+    );
+  }
+
+  # try a ->belongs_to direction (no select collapse, group_by should work)
+  my $books = $schema->resultset ('BooksInLibrary')->search ({
+      'owner.name' => [qw/wiggle woggle/],
+    }, {
+      distinct => 1,
+      prefetch => 'owner',
+      order_by => 'name',
+      rows     => 2,  # 3 results total
+    });
+
+
+  is ($books->page(1)->all, 2, 'Prefetched grouped search returns correct number of rows');
+  is ($books->page(1)->count, 2, 'Prefetched grouped search returns correct count');
+
+  TODO: {
+    local $TODO = 'limit past end of resultset problem';
+    is ($books->page(2)->all, 1, 'Prefetched grouped search returns correct number of rows');
+    is ($books->page(2)->count, 1, 'Prefetched grouped search returns correct count');
+    is ($books->page(2)->count_rs->next, 1, 'Prefetched grouped search returns correct count_rs');
+
+    # make sure count does not become overly complex FIXME
+    is_same_sql_bind (
+      $books->page(2)->count_rs->as_query,
+      '(
+        SELECT COUNT( * )
+          FROM (
+            SELECT TOP 2 [me].[id]
+              FROM [books] [me]
+              JOIN [owners] [owner] ON [owner].[id] = [me].[owner]
+            WHERE ( ( ( [owner].[name] = ? OR [owner].[name] = ? ) AND [source] = ? ) )
+            GROUP BY [me].[id], [me].[source], [me].[owner], [me].[title], [me].[price], [owner].[id], [owner].[name]
+            ORDER BY [me].[id] DESC
+          ) [count_subq]
+      )',
+      [
+        [ 'owner.name' => 'wiggle' ],
+        [ 'owner.name' => 'woggle' ],
+        [ 'source' => 'Library' ],
+      ],
+    );
+  }
+
+}
 
 # clean up our mess
 END {
     my $dbh = eval { $schema->storage->_dbh };
     $dbh->do('DROP TABLE artist') if $dbh;
 }
-
+# vim:sw=2 sts=2