first attempt at promise API
Rafael Kitover [Tue, 8 Mar 2011 14:32:39 +0000 (09:32 -0500)]
lib/DBIx/Class/Storage/DBI/POE/EasyDBI.pm
t/storage/poe_easydbi.t

index eb711c7..3a8ae7f 100644 (file)
@@ -13,12 +13,13 @@ use base qw/DBIx::Class::Storage::DBI/;
 use mro 'c3';
 use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
 use Sub::Name;
+use Scalar::Util 'weaken';
 use POE;
 use POE::Component::EasyDBI;
 use namespace::clean;
 
 __PACKAGE__->mk_group_accessors(simple => qw/
-  _normal_storage _easydbi
+  _normal_storage _easydbi _session_alias _promises
 /);
 
 my @proxy_to_normal_storage = qw/
@@ -42,8 +43,7 @@ This is a L<DBIx::Class> storage driver for asynchronous applications using
 L<POE::Component::EasyDBI>.
 
 It can be used with L<POE> or any other asynchronous framework that has a L<POE>
-adaptor or L<POE::Loop> for it. For L<AnyEvent>, for example, you can use
-L<AnyEvent::Impl::POE>.
+adaptor or L<POE::Loop> for it. For example, L<AnyEvent>.
 
 =head1 CAVEATS
 
@@ -84,6 +84,8 @@ for my $method (@proxy_to_normal_storage) {
   };
 }
 
+my $session_num = 1;
+
 sub _init {
   my $self = shift;
 
@@ -93,17 +95,76 @@ sub _init {
     'coderef connect_info not supported by '.__PACKAGE__
   ) if ref $dsn eq 'CODE';
 
+  $self->_promises({});
+
   my $easydbi = POE::Component::EasyDBI->new(
-    alias    => '',
-    dsn      => $dsn,
-    username => $user,
-    password => $pass,
-    options  => $opts
+    alias       => '',
+    dsn         => $dsn,
+    username    => $user,
+    password    => $pass,
+    options     => $opts,
+    max_retries => -1,
   );
 
   $poe_kernel->detach_child($easydbi->ID);
 
   $self->_easydbi($easydbi);
+
+  my $session_alias = "_dbic_poe_easydbi_".($session_num++);
+  $self->_session_alias($session_alias);
+
+  {
+    my $storage = $self;
+    weaken $storage;
+
+    POE::Session->create(
+      inline_states => {
+        _start => sub {
+          $_[KERNEL]->alias_set($session_alias);
+          $_[KERNEL]->detach_myself;
+        },
+        insert_done => sub {
+          my $res = $_[ARG0];
+
+          my $ret = $storage->_promises->{$res->{_promise_id}} = {};
+
+          $ret->{error} = $res->{error};
+          $ret->{done}  = 1;
+        },
+        shutdown => sub {
+          $_[KERNEL]->alias_remove($session_alias);
+          $session_num--;
+        },
+      },
+    );
+  }
+}
+
+my $promise_cntr = 1;
+
+sub insert {
+  my ($self, $source, $to_insert) = @_;
+
+  my $table_name = $source->from;
+  $table_name = $$table_name if ref $table_name;
+
+  my $promise_id = $promise_cntr++;
+
+  $self->_easydbi->insert(
+    table => $table_name,
+    hash => $to_insert,
+    session => $self->_session_alias,
+    event => 'insert_done',
+    _promise_id => $promise_id,
+  );
+
+  while (not $self->_promises->{$promise_id}{done}) {
+    $poe_kernel->run_one_timeslice;
+  }
+
+  my $res = delete $self->_promises->{$promise_id};
+
+  $self->throw_exception($res->{error}) if $res->{error};
 }
 
 sub DESTROY {
@@ -112,6 +173,10 @@ sub DESTROY {
   if ($self->_easydbi) {
     $self->_easydbi->shutdown;
   }
+
+  if ($self->_session_alias) {
+    $poe_kernel->post($self->_session_alias, 'shutdown');
+  }
 }
 
 1;
index 4215252..3cb9a7e 100644 (file)
@@ -21,6 +21,7 @@ POE::Session->create(
     _start => sub {
       $_[HEAP]{schema} = DBICTest->init_schema(
         no_populate  => 1,
+        sqlite_use_file => 1,
         storage_type => '::DBI::POE::EasyDBI',
       );
       $_[KERNEL]->yield('do_creates');