Commit | Line | Data |
9e72f0cf |
1 | package Object::Remote::MiniLoop; |
2 | |
3 | use IO::Select; |
befabdee |
4 | use Time::HiRes qw(time); |
9e72f0cf |
5 | use Moo; |
6 | |
7 | # this is ro because we only actually set it using local in sub run |
8 | |
9 | has is_running => (is => 'ro', clearer => 'stop'); |
10 | |
11 | has _read_watches => (is => 'ro', default => sub { {} }); |
12 | has _read_select => (is => 'ro', default => sub { IO::Select->new }); |
13 | |
fbd3b8ec |
14 | has _write_watches => (is => 'ro', default => sub { {} }); |
15 | has _write_select => (is => 'ro', default => sub { IO::Select->new }); |
16 | |
befabdee |
17 | has _timers => (is => 'ro', default => sub { [] }); |
18 | |
8a706e6c |
19 | sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ } |
20 | |
9e72f0cf |
21 | sub pass_watches_to { |
22 | my ($self, $new_loop) = @_; |
23 | foreach my $fh ($self->_read_select->handles) { |
24 | $new_loop->watch_io( |
25 | handle => $fh, |
26 | on_read_ready => $self->_read_watches->{$fh} |
27 | ); |
28 | } |
fbd3b8ec |
29 | foreach my $fh ($self->_write_select->handles) { |
30 | $new_loop->watch_io( |
31 | handle => $fh, |
32 | on_write_ready => $self->_write_watches->{$fh} |
33 | ); |
34 | } |
9e72f0cf |
35 | } |
36 | |
37 | sub watch_io { |
38 | my ($self, %watch) = @_; |
39 | my $fh = $watch{handle}; |
40 | if (my $cb = $watch{on_read_ready}) { |
41 | $self->_read_select->add($fh); |
42 | $self->_read_watches->{$fh} = $cb; |
43 | } |
fbd3b8ec |
44 | if (my $cb = $watch{on_write_ready}) { |
45 | $self->_write_select->add($fh); |
46 | $self->_write_watches->{$fh} = $cb; |
47 | } |
498c4ad5 |
48 | return; |
9e72f0cf |
49 | } |
50 | |
51 | sub unwatch_io { |
52 | my ($self, %watch) = @_; |
53 | my $fh = $watch{handle}; |
54 | if ($watch{on_read_ready}) { |
55 | $self->_read_select->remove($fh); |
56 | delete $self->_read_watches->{$fh}; |
57 | } |
fbd3b8ec |
58 | if ($watch{on_write_ready}) { |
59 | $self->_write_select->remove($fh); |
60 | delete $self->_write_watches->{$fh}; |
61 | } |
befabdee |
62 | return; |
63 | } |
64 | |
65 | sub watch_time { |
66 | my ($self, %watch) = @_; |
67 | my $at = $watch{at} || do { |
68 | die "watch_time requires at or after" unless my $after = $watch{after}; |
69 | time() + $after; |
70 | }; |
71 | die "watch_time requires code" unless my $code = $watch{code}; |
72 | my $timers = $self->_timers; |
73 | my $new = [ $at => $code ]; |
74 | @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new; |
75 | return "$new"; |
76 | } |
77 | |
78 | sub unwatch_time { |
79 | my ($self, $id) = @_; |
80 | @$_ = grep !($_ eq $id), @$_ for $self->_timers; |
81 | return; |
9e72f0cf |
82 | } |
83 | |
84 | sub loop_once { |
85 | my ($self) = @_; |
86 | my $read = $self->_read_watches; |
fbd3b8ec |
87 | my $write = $self->_write_watches; |
88 | my ($readable, $writeable) = IO::Select->select( |
89 | $self->_read_select, $self->_write_select, undef, 0.5 |
90 | ); |
9e72f0cf |
91 | # I would love to trap errors in the select call but IO::Select doesn't |
92 | # differentiate between an error and a timeout. |
93 | # -- no, love, mst. |
94 | foreach my $fh (@$readable) { |
8a706e6c |
95 | $self->_log("got a readable: $fh"); |
fbd3b8ec |
96 | $read->{$fh}() if $read->{$fh}; |
97 | } |
98 | foreach my $fh (@$writeable) { |
99 | $write->{$fh}() if $write->{$fh}; |
9e72f0cf |
100 | } |
befabdee |
101 | my $timers = $self->_timers; |
102 | my $now = time(); |
103 | while (@$timers and $timers->[0][0] <= $now) { |
104 | (shift @$timers)->[1]->(); |
105 | } |
106 | return; |
9e72f0cf |
107 | } |
108 | |
6c597351 |
109 | sub want_run { |
110 | my ($self) = @_; |
111 | $self->{want_running}++; |
112 | } |
113 | |
114 | sub run_while_wanted { |
115 | my ($self) = @_; |
116 | $self->loop_once while $self->{want_running}; |
befabdee |
117 | return; |
6c597351 |
118 | } |
119 | |
120 | sub want_stop { |
121 | my ($self) = @_; |
122 | $self->{want_running}-- if $self->{want_running}; |
123 | } |
124 | |
9e72f0cf |
125 | sub run { |
126 | my ($self) = @_; |
127 | local $self->{is_running} = 1; |
128 | while ($self->is_running) { |
129 | $self->loop_once; |
130 | } |
befabdee |
131 | return; |
9e72f0cf |
132 | } |
133 | |
134 | 1; |