add some debugging
[scpubgit/Object-Remote.git] / lib / Object / Remote / MiniLoop.pm
1 package Object::Remote::MiniLoop;
2
3 use IO::Select;
4 use Time::HiRes qw(time);
5 use Moo;
6
7 # this is ro because we only actually set it using local in sub run
8
9 has is_running => (is => 'ro', clearer => 'stop');
10
11 has _read_watches => (is => 'ro', default => sub { {} });
12 has _read_select => (is => 'ro', default => sub { IO::Select->new });
13
14 has _write_watches => (is => 'ro', default => sub { {} });
15 has _write_select => (is => 'ro', default => sub { IO::Select->new });
16
17 has _timers => (is => 'ro', default => sub { [] });
18
19 sub _log { shift; printf "[%s] %s\n", scalar(localtime), join '', @_ }
20
21 sub pass_watches_to {
22   my ($self, $new_loop) = @_;
23   foreach my $fh ($self->_read_select->handles) {
24     $new_loop->watch_io(
25       handle => $fh,
26       on_read_ready => $self->_read_watches->{$fh}
27     );
28   }
29   foreach my $fh ($self->_write_select->handles) {
30     $new_loop->watch_io(
31       handle => $fh,
32       on_write_ready => $self->_write_watches->{$fh}
33     );
34   }
35 }
36
37 sub watch_io {
38   my ($self, %watch) = @_;
39   my $fh = $watch{handle};
40   if (my $cb = $watch{on_read_ready}) {
41     $self->_read_select->add($fh);
42     $self->_read_watches->{$fh} = $cb;
43   }
44   if (my $cb = $watch{on_write_ready}) {
45     $self->_write_select->add($fh);
46     $self->_write_watches->{$fh} = $cb;
47   }
48   return;
49 }
50
51 sub unwatch_io {
52   my ($self, %watch) = @_;
53   my $fh = $watch{handle};
54   if ($watch{on_read_ready}) {
55     $self->_read_select->remove($fh);
56     delete $self->_read_watches->{$fh};
57   }
58   if ($watch{on_write_ready}) {
59     $self->_write_select->remove($fh);
60     delete $self->_write_watches->{$fh};
61   }
62   return;
63 }
64
65 sub watch_time {
66   my ($self, %watch) = @_;
67   my $at = $watch{at} || do {
68     die "watch_time requires at or after" unless my $after = $watch{after};
69     time() + $after;
70   };
71   die "watch_time requires code" unless my $code = $watch{code};
72   my $timers = $self->_timers;
73   my $new = [ $at => $code ];
74   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, $new;
75   return "$new";
76 }
77
78 sub unwatch_time {
79   my ($self, $id) = @_;
80   @$_ = grep !($_ eq $id), @$_ for $self->_timers;
81   return;
82 }
83
84 sub loop_once {
85   my ($self) = @_;
86   my $read = $self->_read_watches;
87   my $write = $self->_write_watches;
88   my ($readable, $writeable) = IO::Select->select(
89     $self->_read_select, $self->_write_select, undef, 0.5
90   );
91   # I would love to trap errors in the select call but IO::Select doesn't
92   # differentiate between an error and a timeout.
93   #   -- no, love, mst.
94   foreach my $fh (@$readable) {
95     $self->_log("got a readable: $fh");
96     $read->{$fh}() if $read->{$fh};
97   }
98   foreach my $fh (@$writeable) {
99     $write->{$fh}() if $write->{$fh};
100   }
101   my $timers = $self->_timers;
102   my $now = time();
103   while (@$timers and $timers->[0][0] <= $now) {
104     (shift @$timers)->[1]->();
105   }
106   return;
107 }
108
109 sub want_run {
110   my ($self) = @_;
111   $self->{want_running}++;
112 }
113
114 sub run_while_wanted {
115   my ($self) = @_;
116   $self->loop_once while $self->{want_running};
117   return;
118 }
119
120 sub want_stop {
121   my ($self) = @_;
122   $self->{want_running}-- if $self->{want_running};
123 }
124
125 sub run {
126   my ($self) = @_;
127   local $self->{is_running} = 1;
128   while ($self->is_running) {
129     $self->loop_once;
130   }
131   return;
132 }
133
134 1;