3951bf9d3eb4701f9e8b6cc708706da121072775
[scpubgit/Object-Remote.git] / lib / Object / Remote / MiniLoop.pm
1 package Object::Remote::MiniLoop;
2
3 use IO::Select;
4 use Time::HiRes qw(time);
5 use Moo;
6
7 # this is ro because we only actually set it using local in sub run
8
9 has is_running => (is => 'ro', clearer => 'stop');
10
11 has _read_watches => (is => 'ro', default => sub { {} });
12 has _read_select => (is => 'ro', default => sub { IO::Select->new });
13
14 has _write_watches => (is => 'ro', default => sub { {} });
15 has _write_select => (is => 'ro', default => sub { IO::Select->new });
16
17 has _timers => (is => 'ro', default => sub { [] });
18
19 sub pass_watches_to {
20   my ($self, $new_loop) = @_;
21   foreach my $fh ($self->_read_select->handles) {
22     $new_loop->watch_io(
23       handle => $fh,
24       on_read_ready => $self->_read_watches->{$fh}
25     );
26   }
27   foreach my $fh ($self->_write_select->handles) {
28     $new_loop->watch_io(
29       handle => $fh,
30       on_write_ready => $self->_write_watches->{$fh}
31     );
32   }
33 }
34
35 sub watch_io {
36   my ($self, %watch) = @_;
37   my $fh = $watch{handle};
38   if (my $cb = $watch{on_read_ready}) {
39     $self->_read_select->add($fh);
40     $self->_read_watches->{$fh} = $cb;
41   }
42   if (my $cb = $watch{on_write_ready}) {
43     $self->_write_select->add($fh);
44     $self->_write_watches->{$fh} = $cb;
45   }
46   return;
47 }
48
49 sub unwatch_io {
50   my ($self, %watch) = @_;
51   my $fh = $watch{handle};
52   if ($watch{on_read_ready}) {
53     $self->_read_select->remove($fh);
54     delete $self->_read_watches->{$fh};
55   }
56   if ($watch{on_write_ready}) {
57     $self->_write_select->remove($fh);
58     delete $self->_write_watches->{$fh};
59   }
60   return;
61 }
62
63 sub watch_time {
64   my ($self, %watch) = @_;
65   my $at = $watch{at} || do {
66     die "watch_time requires at or after" unless my $after = $watch{after};
67     time() + $after;
68   };
69   die "watch_time requires code" unless my $code = $watch{code};
70   my $timers = $self->_timers;
71   my $new = [ $at => $code ];
72   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
73   return "$new";
74 }
75
76 sub unwatch_time {
77   my ($self, $id) = @_;
78   @$_ = grep !($_ eq $id), @$_ for $self->_timers;
79   return;
80 }
81
82 sub loop_once {
83   my ($self) = @_;
84   my $read = $self->_read_watches;
85   my $write = $self->_write_watches;
86   my ($readable, $writeable) = IO::Select->select(
87     $self->_read_select, $self->_write_select, undef, 0.5
88   );
89   # I would love to trap errors in the select call but IO::Select doesn't
90   # differentiate between an error and a timeout.
91   #   -- no, love, mst.
92   foreach my $fh (@$readable) {
93     $read->{$fh}() if $read->{$fh};
94   }
95   foreach my $fh (@$writeable) {
96     $write->{$fh}() if $write->{$fh};
97   }
98   my $timers = $self->_timers;
99   my $now = time();
100   while (@$timers and $timers->[0][0] <= $now) {
101     (shift @$timers)->[1]->();
102   }
103   return;
104 }
105
106 sub want_run {
107   my ($self) = @_;
108   $self->{want_running}++;
109 }
110
111 sub run_while_wanted {
112   my ($self) = @_;
113   $self->loop_once while $self->{want_running};
114   return;
115 }
116
117 sub want_stop {
118   my ($self) = @_;
119   $self->{want_running}-- if $self->{want_running};
120 }
121
122 sub run {
123   my ($self) = @_;
124   local $self->{is_running} = 1;
125   while ($self->is_running) {
126     $self->loop_once;
127   }
128   return;
129 }
130
131 1;