Add explicit grouping for rs update/delete operations if the parameters warrant it...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
index 7cb026d..3937cb5 100644 (file)
@@ -1051,6 +1051,93 @@ sub delete {
   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
 }
 
+# We were sent here because the $rs contains a complex search
+# which will require a subquery to select the correct rows
+# (i.e. joined or limited resultsets)
+#
+# Genarating a single PK column subquery is trivial and supported
+# by all RDBMS. However if we have a multicolumn PK, things get ugly.
+# Look at multipk_update_delete()
+sub subq_update_delete {
+  my $self = shift;
+  my ($rs, $op, $values) = @_;
+
+  if ($rs->result_source->primary_columns == 1) {
+    return $self->_onepk_update_delete (@_);
+  }
+  else {
+    return $self->_multipk_update_delete (@_);
+  }
+}
+
+# Generally a single PK resultset operation is trivially expressed
+# with PK IN (subquery). However some databases (mysql) do not support
+# modification of a table mentioned in the subselect. This method
+# should be overriden in the appropriate storage class to be smarter
+# in such situations
+sub _onepk_update_delete {
+
+  my $self = shift;
+  my ($rs, $op, $values) = @_;
+
+  my $rsrc = $rs->result_source;
+  my $attrs = $rs->_resolved_attrs;
+  my @pcols = $rsrc->primary_columns;
+
+  $self->throw_exception ('_onepk_update_delete can not be called on resultsets selecting multiple columns')
+    if (ref $attrs->{select} eq 'ARRAY' and @{$attrs->{select}} > 1);
+
+  return $self->$op (
+    $rsrc,
+    $op eq 'update' ? $values : (),
+    { $pcols[0] => { -in => $rs->as_query } },
+  );
+}
+
+# ANSI SQL does not provide a reliable way to perform a multicol-PK
+# resultset update/delete involving subqueries. So resort to simple
+# (and inefficient) delete_all style per-row opearations, while allowing
+# specific storages to override this with a faster implementation.
+#
+# We do not use $row->$op style queries, because resultset update/delete
+# is not expected to cascade (this is what delete_all/update_all is for).
+#
+# There should be no race conditions as the entire operation is rolled
+# in a transaction.
+sub _multipk_update_delete {
+  my $self = shift;
+  my ($rs, $op, $values) = @_;
+
+  my $rsrc = $rs->result_source;
+  my @pcols = $rsrc->primary_columns;
+  my $attrs = $rs->_resolved_attrs;
+
+  $self->throw_exception ('Number of columns selected by supplied resultset does not match number of primary keys')
+    if ( ref $attrs->{select} ne 'ARRAY' or @{$attrs->{select}} != @pcols );
+
+  my $guard = $self->txn_scope_guard;
+
+  my $subrs_cur = $rs->cursor;
+  while (my @pks = $subrs_cur->next) {
+
+    my $cond;
+    for my $i (0.. $#pcols) {
+      $cond->{$pcols[$i]} = $pks[$i];
+    }
+
+    $self->$op (
+      $rsrc,
+      $op eq 'update' ? $values : (),
+      $cond,
+    );
+  }
+
+  $guard->commit;
+
+  return 1;
+}
+
+
 sub _select {
   my $self = shift;
   my $sql_maker = $self->sql_maker;
@@ -1302,7 +1389,7 @@ sub create_ddl_dir {
   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
 
   if(!$dir || !-d $dir) {
-    warn "No directory given, using ./\n";
+    carp "No directory given, using ./\n";
     $dir = "./";
   }
   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
@@ -1325,7 +1412,8 @@ sub create_ddl_dir {
   my $sqlt = SQL::Translator->new( $sqltargs );
 
   $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
-  my $sqlt_schema = $sqlt->translate({ data => $schema }) or die $sqlt->error;
+  my $sqlt_schema = $sqlt->translate({ data => $schema })
+    or $self->throw_exception ($sqlt->error);
 
   foreach my $db (@$databases) {
     $sqlt->reset();
@@ -1336,13 +1424,13 @@ sub create_ddl_dir {
     my $filename = $schema->ddl_filename($db, $version, $dir);
     if (-e $filename && ($version eq $schema_version )) {
       # if we are dumping the current version, overwrite the DDL
-      warn "Overwriting existing DDL file - $filename";
+      carp "Overwriting existing DDL file - $filename";
       unlink($filename);
     }
 
     my $output = $sqlt->translate;
     if(!$output) {
-      warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
+      carp("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
       next;
     }
     if(!open($file, ">$filename")) {
@@ -1358,13 +1446,13 @@ sub create_ddl_dir {
 
     my $prefilename = $schema->ddl_filename($db, $preversion, $dir);
     if(!-e $prefilename) {
-      warn("No previous schema file found ($prefilename)");
+      carp("No previous schema file found ($prefilename)");
       next;
     }
 
     my $difffile = $schema->ddl_filename($db, $version, $dir, $preversion);
     if(-e $difffile) {
-      warn("Overwriting existing diff file - $difffile");
+      carp("Overwriting existing diff file - $difffile");
       unlink($difffile);
     }
     
@@ -1373,26 +1461,37 @@ sub create_ddl_dir {
       my $t = SQL::Translator->new($sqltargs);
       $t->debug( 0 );
       $t->trace( 0 );
-      $t->parser( $db )                       or die $t->error;
-      my $out = $t->translate( $prefilename ) or die $t->error;
+
+      $t->parser( $db )
+        or $self->throw_exception ($t->error);
+
+      my $out = $t->translate( $prefilename )
+        or $self->throw_exception ($t->error);
+
       $source_schema = $t->schema;
-      unless ( $source_schema->name ) {
-        $source_schema->name( $prefilename );
-      }
+
+      $source_schema->name( $prefilename )
+        unless ( $source_schema->name );
     }
 
     # The "new" style of producers have sane normalization and can support 
     # diffing a SQL file against a DBIC->SQLT schema. Old style ones don't
     # And we have to diff parsed SQL against parsed SQL.
     my $dest_schema = $sqlt_schema;
-    
+
     unless ( "SQL::Translator::Producer::$db"->can('preprocess_schema') ) {
       my $t = SQL::Translator->new($sqltargs);
       $t->debug( 0 );
       $t->trace( 0 );
-      $t->parser( $db )                    or die $t->error;
-      my $out = $t->translate( $filename ) or die $t->error;
+
+      $t->parser( $db )
+        or $self->throw_exception ($t->error);
+
+      my $out = $t->translate( $filename )
+        or $self->throw_exception ($t->error);
+
       $dest_schema = $t->schema;
+
       $dest_schema->name( $filename )
         unless $dest_schema->name;
     }
@@ -1484,7 +1583,7 @@ sub deploy {
       $self->dbh->do($line); # shouldn't be using ->dbh ?
     };
     if ($@) {
-      warn qq{$@ (running "${line}")};
+      carp qq{$@ (running "${line}")};
     }
     $self->_query_end($line);
   };