Commit | Line | Data |
9e72f0cf |
1 | package Object::Remote::MiniLoop; |
2 | |
3 | use IO::Select; |
befabdee |
4 | use Time::HiRes qw(time); |
9e72f0cf |
5 | use Moo; |
6 | |
7 | # this is ro because we only actually set it using local in sub run |
8 | |
9 | has is_running => (is => 'ro', clearer => 'stop'); |
10 | |
11 | has _read_watches => (is => 'ro', default => sub { {} }); |
12 | has _read_select => (is => 'ro', default => sub { IO::Select->new }); |
13 | |
fbd3b8ec |
14 | has _write_watches => (is => 'ro', default => sub { {} }); |
15 | has _write_select => (is => 'ro', default => sub { IO::Select->new }); |
16 | |
befabdee |
17 | has _timers => (is => 'ro', default => sub { [] }); |
18 | |
9e72f0cf |
19 | sub pass_watches_to { |
20 | my ($self, $new_loop) = @_; |
21 | foreach my $fh ($self->_read_select->handles) { |
22 | $new_loop->watch_io( |
23 | handle => $fh, |
24 | on_read_ready => $self->_read_watches->{$fh} |
25 | ); |
26 | } |
fbd3b8ec |
27 | foreach my $fh ($self->_write_select->handles) { |
28 | $new_loop->watch_io( |
29 | handle => $fh, |
30 | on_write_ready => $self->_write_watches->{$fh} |
31 | ); |
32 | } |
9e72f0cf |
33 | } |
34 | |
35 | sub watch_io { |
36 | my ($self, %watch) = @_; |
37 | my $fh = $watch{handle}; |
38 | if (my $cb = $watch{on_read_ready}) { |
39 | $self->_read_select->add($fh); |
40 | $self->_read_watches->{$fh} = $cb; |
41 | } |
fbd3b8ec |
42 | if (my $cb = $watch{on_write_ready}) { |
43 | $self->_write_select->add($fh); |
44 | $self->_write_watches->{$fh} = $cb; |
45 | } |
9e72f0cf |
46 | } |
47 | |
48 | sub unwatch_io { |
49 | my ($self, %watch) = @_; |
50 | my $fh = $watch{handle}; |
51 | if ($watch{on_read_ready}) { |
52 | $self->_read_select->remove($fh); |
53 | delete $self->_read_watches->{$fh}; |
54 | } |
fbd3b8ec |
55 | if ($watch{on_write_ready}) { |
56 | $self->_write_select->remove($fh); |
57 | delete $self->_write_watches->{$fh}; |
58 | } |
befabdee |
59 | return; |
60 | } |
61 | |
62 | sub watch_time { |
63 | my ($self, %watch) = @_; |
64 | my $at = $watch{at} || do { |
65 | die "watch_time requires at or after" unless my $after = $watch{after}; |
66 | time() + $after; |
67 | }; |
68 | die "watch_time requires code" unless my $code = $watch{code}; |
69 | my $timers = $self->_timers; |
70 | my $new = [ $at => $code ]; |
71 | @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new; |
72 | return "$new"; |
73 | } |
74 | |
75 | sub unwatch_time { |
76 | my ($self, $id) = @_; |
77 | @$_ = grep !($_ eq $id), @$_ for $self->_timers; |
78 | return; |
9e72f0cf |
79 | } |
80 | |
81 | sub loop_once { |
82 | my ($self) = @_; |
83 | my $read = $self->_read_watches; |
fbd3b8ec |
84 | my $write = $self->_write_watches; |
85 | my ($readable, $writeable) = IO::Select->select( |
86 | $self->_read_select, $self->_write_select, undef, 0.5 |
87 | ); |
9e72f0cf |
88 | # I would love to trap errors in the select call but IO::Select doesn't |
89 | # differentiate between an error and a timeout. |
90 | # -- no, love, mst. |
91 | foreach my $fh (@$readable) { |
fbd3b8ec |
92 | $read->{$fh}() if $read->{$fh}; |
93 | } |
94 | foreach my $fh (@$writeable) { |
95 | $write->{$fh}() if $write->{$fh}; |
9e72f0cf |
96 | } |
befabdee |
97 | my $timers = $self->_timers; |
98 | my $now = time(); |
99 | while (@$timers and $timers->[0][0] <= $now) { |
100 | (shift @$timers)->[1]->(); |
101 | } |
102 | return; |
9e72f0cf |
103 | } |
104 | |
6c597351 |
105 | sub want_run { |
106 | my ($self) = @_; |
107 | $self->{want_running}++; |
108 | } |
109 | |
110 | sub run_while_wanted { |
111 | my ($self) = @_; |
112 | $self->loop_once while $self->{want_running}; |
befabdee |
113 | return; |
6c597351 |
114 | } |
115 | |
116 | sub want_stop { |
117 | my ($self) = @_; |
118 | $self->{want_running}-- if $self->{want_running}; |
119 | } |
120 | |
9e72f0cf |
121 | sub run { |
122 | my ($self) = @_; |
123 | local $self->{is_running} = 1; |
124 | while ($self->is_running) { |
125 | $self->loop_once; |
126 | } |
befabdee |
127 | return; |
9e72f0cf |
128 | } |
129 | |
130 | 1; |