Commit | Line | Data |
9e72f0cf |
1 | package Object::Remote::MiniLoop; |
2 | |
3 | use IO::Select; |
befabdee |
4 | use Time::HiRes qw(time); |
9e72f0cf |
5 | use Moo; |
6 | |
7 | # this is ro because we only actually set it using local in sub run |
8 | |
9 | has is_running => (is => 'ro', clearer => 'stop'); |
10 | |
11 | has _read_watches => (is => 'ro', default => sub { {} }); |
12 | has _read_select => (is => 'ro', default => sub { IO::Select->new }); |
13 | |
fbd3b8ec |
14 | has _write_watches => (is => 'ro', default => sub { {} }); |
15 | has _write_select => (is => 'ro', default => sub { IO::Select->new }); |
16 | |
befabdee |
17 | has _timers => (is => 'ro', default => sub { [] }); |
18 | |
9e72f0cf |
19 | sub pass_watches_to { |
20 | my ($self, $new_loop) = @_; |
21 | foreach my $fh ($self->_read_select->handles) { |
22 | $new_loop->watch_io( |
23 | handle => $fh, |
24 | on_read_ready => $self->_read_watches->{$fh} |
25 | ); |
26 | } |
fbd3b8ec |
27 | foreach my $fh ($self->_write_select->handles) { |
28 | $new_loop->watch_io( |
29 | handle => $fh, |
30 | on_write_ready => $self->_write_watches->{$fh} |
31 | ); |
32 | } |
9e72f0cf |
33 | } |
34 | |
35 | sub watch_io { |
36 | my ($self, %watch) = @_; |
37 | my $fh = $watch{handle}; |
38 | if (my $cb = $watch{on_read_ready}) { |
39 | $self->_read_select->add($fh); |
40 | $self->_read_watches->{$fh} = $cb; |
41 | } |
fbd3b8ec |
42 | if (my $cb = $watch{on_write_ready}) { |
43 | $self->_write_select->add($fh); |
44 | $self->_write_watches->{$fh} = $cb; |
45 | } |
498c4ad5 |
46 | return; |
9e72f0cf |
47 | } |
48 | |
49 | sub unwatch_io { |
50 | my ($self, %watch) = @_; |
51 | my $fh = $watch{handle}; |
52 | if ($watch{on_read_ready}) { |
53 | $self->_read_select->remove($fh); |
54 | delete $self->_read_watches->{$fh}; |
55 | } |
fbd3b8ec |
56 | if ($watch{on_write_ready}) { |
57 | $self->_write_select->remove($fh); |
58 | delete $self->_write_watches->{$fh}; |
59 | } |
befabdee |
60 | return; |
61 | } |
62 | |
63 | sub watch_time { |
64 | my ($self, %watch) = @_; |
65 | my $at = $watch{at} || do { |
66 | die "watch_time requires at or after" unless my $after = $watch{after}; |
67 | time() + $after; |
68 | }; |
69 | die "watch_time requires code" unless my $code = $watch{code}; |
70 | my $timers = $self->_timers; |
71 | my $new = [ $at => $code ]; |
72 | @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new; |
73 | return "$new"; |
74 | } |
75 | |
76 | sub unwatch_time { |
77 | my ($self, $id) = @_; |
78 | @$_ = grep !($_ eq $id), @$_ for $self->_timers; |
79 | return; |
9e72f0cf |
80 | } |
81 | |
82 | sub loop_once { |
83 | my ($self) = @_; |
84 | my $read = $self->_read_watches; |
fbd3b8ec |
85 | my $write = $self->_write_watches; |
86 | my ($readable, $writeable) = IO::Select->select( |
87 | $self->_read_select, $self->_write_select, undef, 0.5 |
88 | ); |
9e72f0cf |
89 | # I would love to trap errors in the select call but IO::Select doesn't |
90 | # differentiate between an error and a timeout. |
91 | # -- no, love, mst. |
92 | foreach my $fh (@$readable) { |
fbd3b8ec |
93 | $read->{$fh}() if $read->{$fh}; |
94 | } |
95 | foreach my $fh (@$writeable) { |
96 | $write->{$fh}() if $write->{$fh}; |
9e72f0cf |
97 | } |
befabdee |
98 | my $timers = $self->_timers; |
99 | my $now = time(); |
100 | while (@$timers and $timers->[0][0] <= $now) { |
101 | (shift @$timers)->[1]->(); |
102 | } |
103 | return; |
9e72f0cf |
104 | } |
105 | |
6c597351 |
106 | sub want_run { |
107 | my ($self) = @_; |
108 | $self->{want_running}++; |
109 | } |
110 | |
111 | sub run_while_wanted { |
112 | my ($self) = @_; |
113 | $self->loop_once while $self->{want_running}; |
befabdee |
114 | return; |
6c597351 |
115 | } |
116 | |
117 | sub want_stop { |
118 | my ($self) = @_; |
119 | $self->{want_running}-- if $self->{want_running}; |
120 | } |
121 | |
9e72f0cf |
122 | sub run { |
123 | my ($self) = @_; |
124 | local $self->{is_running} = 1; |
125 | while ($self->is_running) { |
126 | $self->loop_once; |
127 | } |
befabdee |
128 | return; |
9e72f0cf |
129 | } |
130 | |
131 | 1; |