use mro 'c3';
use Carp::Clan qw/^DBIx::Class|^Try::Tiny/;
use Sub::Name;
+use Scalar::Util 'weaken';
use POE;
use POE::Component::EasyDBI;
use namespace::clean;
__PACKAGE__->mk_group_accessors(simple => qw/
- _normal_storage _easydbi
+ _normal_storage _easydbi _session_alias _promises
/);
my @proxy_to_normal_storage = qw/
L<POE::Component::EasyDBI>.
It can be used with L<POE> or any other asynchronous framework that has a L<POE>
-adaptor or L<POE::Loop> for it. For L<AnyEvent>, for example, you can use
-L<AnyEvent::Impl::POE>.
+adaptor or L<POE::Loop> for it. For example, L<AnyEvent>.
=head1 CAVEATS
};
}
+my $session_num = 1;
+
sub _init {
my $self = shift;
'coderef connect_info not supported by '.__PACKAGE__
) if ref $dsn eq 'CODE';
+ $self->_promises({});
+
my $easydbi = POE::Component::EasyDBI->new(
- alias => '',
- dsn => $dsn,
- username => $user,
- password => $pass,
- options => $opts
+ alias => '',
+ dsn => $dsn,
+ username => $user,
+ password => $pass,
+ options => $opts,
+ max_retries => -1,
);
$poe_kernel->detach_child($easydbi->ID);
$self->_easydbi($easydbi);
+
+ my $session_alias = "_dbic_poe_easydbi_".($session_num++);
+ $self->_session_alias($session_alias);
+
+ {
+ my $storage = $self;
+ weaken $storage;
+
+ POE::Session->create(
+ inline_states => {
+ _start => sub {
+ $_[KERNEL]->alias_set($session_alias);
+ $_[KERNEL]->detach_myself;
+ },
+ insert_done => sub {
+ my $res = $_[ARG0];
+
+ my $ret = $storage->_promises->{$res->{_promise_id}} = {};
+
+ $ret->{error} = $res->{error};
+ $ret->{done} = 1;
+ },
+ shutdown => sub {
+ $_[KERNEL]->alias_remove($session_alias);
+ $session_num--;
+ },
+ },
+ );
+ }
+}
+
+my $promise_cntr = 1;
+
+sub insert {
+ my ($self, $source, $to_insert) = @_;
+
+ my $table_name = $source->from;
+ $table_name = $$table_name if ref $table_name;
+
+ my $promise_id = $promise_cntr++;
+
+ $self->_easydbi->insert(
+ table => $table_name,
+ hash => $to_insert,
+ session => $self->_session_alias,
+ event => 'insert_done',
+ _promise_id => $promise_id,
+ );
+
+ while (not $self->_promises->{$promise_id}{done}) {
+ $poe_kernel->run_one_timeslice;
+ }
+
+ my $res = delete $self->_promises->{$promise_id};
+
+ $self->throw_exception($res->{error}) if $res->{error};
}
sub DESTROY {
if ($self->_easydbi) {
$self->_easydbi->shutdown;
}
+
+ if ($self->_session_alias) {
+ $poe_kernel->post($self->_session_alias, 'shutdown');
+ }
}
1;