loop_upgrade
[scpubgit/Tak.git] / lib / Tak / Loop.pm
1 package Tak::Loop;
2
3 use IO::Select;
4 use Moo;
5
6 has is_running => (is => 'rw', clearer => 'loop_stop');
7
8 has _read_watches => (is => 'ro', default => sub { {} });
9 has _read_select => (is => 'ro', default => sub { IO::Select->new });
10
11 sub pass_watches_to {
12   my ($self, $new_loop) = @_;
13   foreach my $fh ($self->_read_select->handles) {
14     $new_loop->watch_io(
15       handle => $fh,
16       on_read_ready => $self->_read_watches->{$fh}
17     );
18   }
19 }
20
21 sub watch_io {
22   my ($self, %watch) = @_;
23   my $fh = $watch{handle};
24   if (my $cb = $watch{on_read_ready}) {
25     $self->_read_select->add($fh);
26     $self->_read_watches->{$fh} = $cb;
27   }
28 }
29
30 sub unwatch_io {
31   my ($self, %watch) = @_;
32   my $fh = $watch{handle};
33   if ($watch{on_read_ready}) {
34     $self->_read_select->remove($fh);
35     delete $self->_read_watches->{$fh};
36   }
37 }
38
39 sub loop_once {
40   my ($self) = @_;
41   my $read = $self->_read_watches;
42   my ($readable) = IO::Select->select($self->_read_select, undef, undef, 0.5);
43   # I would love to trap errors in the select call but IO::Select doesn't
44   # differentiate between an error and a timeout.
45   #   -- no, love, mst.
46   foreach my $fh (@$readable) {
47     $read->{$fh}();
48   }
49 }
50
51 sub loop_forever {
52   my ($self) = @_;
53   $self->is_running(1);
54   while ($self->is_running) {
55     $self->loop_once;
56   }
57 }
58
59 1;