cf572d69d62eaa9c29d93f7274c4ddc0ffb0f9bf
[scpubgit/Object-Remote.git] / lib / Object / Remote / MiniLoop.pm
1 package Object::Remote::MiniLoop;
2
3 use IO::Select;
4 use Time::HiRes qw(time);
5 use Object::Remote::Logging qw( :log :dlog router );
6 use Moo;
7
8 BEGIN { router()->exclude_forwarding }
9
10 # this is ro because we only actually set it using local in sub run
11 has is_running => (is => 'ro', clearer => 'stop');
12 #maximum duration that select() will block - undef means indefinite,
13 #0 means no blocking, otherwise maximum time in seconds
14 has block_duration => ( is => 'rw' );
15
16 has _read_watches => (is => 'ro', default => sub { {} });
17 has _read_select => (is => 'ro', default => sub { IO::Select->new });
18
19 has _write_watches => (is => 'ro', default => sub { {} });
20 has _write_select => (is => 'ro', default => sub { IO::Select->new });
21
22 has _timers => (is => 'ro', default => sub { [] });
23
24 sub pass_watches_to {
25   my ($self, $new_loop) = @_;
26   log_debug { "passing watches to new run loop" };
27   foreach my $fh ($self->_read_select->handles) {
28     $new_loop->watch_io(
29       handle => $fh,
30       on_read_ready => $self->_read_watches->{$fh}
31     );
32   }
33   foreach my $fh ($self->_write_select->handles) {
34     $new_loop->watch_io(
35       handle => $fh,
36       on_write_ready => $self->_write_watches->{$fh}
37     );
38   }
39 }
40
41 sub watch_io {
42   my ($self, %watch) = @_;
43   my $fh = $watch{handle};
44   Dlog_debug { "Adding IO watch for $_" } $fh;
45   
46   if (my $cb = $watch{on_read_ready}) {
47     log_trace { "IO watcher is registering with select for reading" };
48     $self->_read_select->add($fh);
49     $self->_read_watches->{$fh} = $cb;
50   }
51   if (my $cb = $watch{on_write_ready}) {
52     log_trace { "IO watcher is registering with select for writing" };
53     $self->_write_select->add($fh);
54     $self->_write_watches->{$fh} = $cb;
55   }
56   return;
57 }
58
59 sub unwatch_io {
60   my ($self, %watch) = @_;
61   my $fh = $watch{handle};
62   Dlog_debug { "Removing IO watch for $_" } $fh;
63   if ($watch{on_read_ready}) {
64     log_trace { "IO watcher is removing read from select()" };
65     $self->_read_select->remove($fh);
66     delete $self->_read_watches->{$fh};
67   }
68   if ($watch{on_write_ready}) {
69     log_trace { "IO watcher is removing write from select()" };
70     $self->_write_select->remove($fh);
71     delete $self->_write_watches->{$fh};
72   }
73   return;
74 }
75
76 sub _sort_timers {
77   my ($self, @new) = @_;
78   my $timers = $self->_timers; 
79   
80   log_trace { "Sorting timers" };
81   
82   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
83   return;   
84 }
85
86 sub watch_time {
87   my ($self, %watch) = @_;
88   my $at; 
89   
90   Dlog_trace { "watch_time() invoked with $_" } \%watch;
91  
92   if (exists($watch{every})) {
93     $at = time() + $watch{every};
94   } elsif (exists($watch{after})) {
95     $at = time() + $watch{after}; 
96   } elsif (exists($watch{at})) {
97     $at = $watch{at}; 
98   } else {
99     die "watch_time requires every, after or at";
100   }
101   
102   die "watch_time requires code" unless my $code = $watch{code};
103   my $timers = $self->_timers;
104   my $new = [ $at => $code, $watch{every} ];
105   $self->_sort_timers($new); 
106   log_debug { "Created new timer with id '$new' that expires at '$at'" };
107   return "$new";
108 }
109
110 sub unwatch_time {
111   my ($self, $id) = @_;
112   log_trace { "Removing timer with id of '$id'" };
113   @$_ = grep !($_ eq $id), @$_ for $self->_timers;
114   return;
115 }
116
117 sub _next_timer_expires_delay {
118   my ($self) = @_;
119   my $timers = $self->_timers;
120   my $delay_max = $self->block_duration;
121     
122   return $delay_max unless @$timers;
123   my $duration = $timers->[0]->[0] - time;
124
125   log_trace { "next timer fires in '$duration' seconds " };
126   
127   if ($duration < 0) {
128     $duration = 0; 
129   } elsif (defined $delay_max && $duration > $delay_max) {
130     $duration = $delay_max;
131   }
132   
133   return $duration; 
134 }
135
136 sub loop_once {
137   my ($self) = @_;
138   my $read = $self->_read_watches;
139   my $write = $self->_write_watches;
140   our $Loop_Entered = 1; 
141   my $read_count = 0;
142   my $write_count = 0; 
143   my @c = caller;
144   my $wait_time = $self->_next_timer_expires_delay;
145   log_trace {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
146       scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
147   my ($readable, $writeable) = IO::Select->select(
148     $self->_read_select, $self->_write_select, undef, $wait_time
149   ); 
150   log_trace { 
151     my $readable_count = defined $readable ? scalar(@$readable) : 0;
152     my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
153     "Run loop: select returned readable:$readable_count writeable:$writable_count";
154   };
155   # I would love to trap errors in the select call but IO::Select doesn't
156   # differentiate between an error and a timeout.
157   #   -- no, love, mst.
158
159   local $Loop_Entered;
160
161   log_trace { "Reading from all ready filehandles" };
162   foreach my $fh (@$readable) {
163     next unless $read->{$fh};
164     $read_count++;
165     $read->{$fh}();
166     last if $Loop_Entered;
167   }
168   log_trace { "Writing to all ready filehandles" };
169   foreach my $fh (@$writeable) {
170     next unless $write->{$fh};
171     $write_count++;
172     $write->{$fh}();
173     last if $Loop_Entered;
174   }
175   
176   log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
177   my $timers = $self->_timers;
178   my $now = time();
179   log_trace { "Checking timers" };
180   while (@$timers and $timers->[0][0] <= $now) {
181     my $active = $timers->[0]; 
182     Dlog_trace { "Found timer that needs to be executed: '$active'" };
183      
184     if (defined($active->[2])) {
185       #handle the case of an 'every' timer
186       $active->[0] = time() + $active->[2]; 
187       Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
188       $self->_sort_timers;
189     } else {
190       #it doesn't repeat again so get rid of it  
191       shift(@$timers);    
192     }
193
194     #execute the timer
195     $active->[1]->();
196      
197     last if $Loop_Entered;
198   }
199   
200   log_trace { "Run loop: single loop is completed" };
201   return;
202 }
203
204 sub want_run {
205   my ($self) = @_;
206   Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
207     ++$self->{want_running};
208 }
209
210 sub run_while_wanted {
211   my ($self) = @_;
212   log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
213   $self->loop_once while $self->{want_running};
214   log_debug { "Run loop: run_while_wanted() completed" };
215   return;
216 }
217
218 sub want_stop {
219   my ($self) = @_;
220   if (! $self->{want_running}) {
221     log_debug { "Run loop: want_stop() was called but want_running was not true" };
222     return; 
223   }
224   Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
225     --$self->{want_running};
226 }
227
228 sub run {
229   my ($self) = @_;
230   log_trace { "Run loop: run() invoked" };
231   local $self->{is_running} = 1;
232   while ($self->is_running) {
233     $self->loop_once;
234   }
235   log_trace { "Run loop: run() completed" };
236   return;
237 }
238
239 1;