parallelise connection setup
[scpubgit/Object-Remote.git] / lib / Object / Remote / MiniLoop.pm
1 package Object::Remote::MiniLoop;
2
3 use IO::Select;
4 use Time::HiRes qw(time);
5 use Moo;
6
7 # this is ro because we only actually set it using local in sub run
8
9 has is_running => (is => 'ro', clearer => 'stop');
10
11 has _read_watches => (is => 'ro', default => sub { {} });
12 has _read_select => (is => 'ro', default => sub { IO::Select->new });
13
14 has _write_watches => (is => 'ro', default => sub { {} });
15 has _write_select => (is => 'ro', default => sub { IO::Select->new });
16
17 has _timers => (is => 'ro', default => sub { [] });
18
19 sub pass_watches_to {
20   my ($self, $new_loop) = @_;
21   foreach my $fh ($self->_read_select->handles) {
22     $new_loop->watch_io(
23       handle => $fh,
24       on_read_ready => $self->_read_watches->{$fh}
25     );
26   }
27   foreach my $fh ($self->_write_select->handles) {
28     $new_loop->watch_io(
29       handle => $fh,
30       on_write_ready => $self->_write_watches->{$fh}
31     );
32   }
33 }
34
35 sub watch_io {
36   my ($self, %watch) = @_;
37   my $fh = $watch{handle};
38   if (my $cb = $watch{on_read_ready}) {
39     $self->_read_select->add($fh);
40     $self->_read_watches->{$fh} = $cb;
41   }
42   if (my $cb = $watch{on_write_ready}) {
43     $self->_write_select->add($fh);
44     $self->_write_watches->{$fh} = $cb;
45   }
46 }
47
48 sub unwatch_io {
49   my ($self, %watch) = @_;
50   my $fh = $watch{handle};
51   if ($watch{on_read_ready}) {
52     $self->_read_select->remove($fh);
53     delete $self->_read_watches->{$fh};
54   }
55   if ($watch{on_write_ready}) {
56     $self->_write_select->remove($fh);
57     delete $self->_write_watches->{$fh};
58   }
59   return;
60 }
61
62 sub watch_time {
63   my ($self, %watch) = @_;
64   my $at = $watch{at} || do {
65     die "watch_time requires at or after" unless my $after = $watch{after};
66     time() + $after;
67   };
68   die "watch_time requires code" unless my $code = $watch{code};
69   my $timers = $self->_timers;
70   my $new = [ $at => $code ];
71   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
72   return "$new";
73 }
74
75 sub unwatch_time {
76   my ($self, $id) = @_;
77   @$_ = grep !($_ eq $id), @$_ for $self->_timers;
78   return;
79 }
80
81 sub loop_once {
82   my ($self) = @_;
83   my $read = $self->_read_watches;
84   my $write = $self->_write_watches;
85   my ($readable, $writeable) = IO::Select->select(
86     $self->_read_select, $self->_write_select, undef, 0.5
87   );
88   # I would love to trap errors in the select call but IO::Select doesn't
89   # differentiate between an error and a timeout.
90   #   -- no, love, mst.
91   foreach my $fh (@$readable) {
92     $read->{$fh}() if $read->{$fh};
93   }
94   foreach my $fh (@$writeable) {
95     $write->{$fh}() if $write->{$fh};
96   }
97   my $timers = $self->_timers;
98   my $now = time();
99   while (@$timers and $timers->[0][0] <= $now) {
100     (shift @$timers)->[1]->();
101   }
102   return;
103 }
104
105 sub want_run {
106   my ($self) = @_;
107   $self->{want_running}++;
108 }
109
110 sub run_while_wanted {
111   my ($self) = @_;
112   $self->loop_once while $self->{want_running};
113   return;
114 }
115
116 sub want_stop {
117   my ($self) = @_;
118   $self->{want_running}-- if $self->{want_running};
119 }
120
121 sub run {
122   my ($self) = @_;
123   local $self->{is_running} = 1;
124   while ($self->is_running) {
125     $self->loop_once;
126   }
127   return;
128 }
129
130 1;