remove incomplete non-blocking support; make select() timeout duration configurable...
[scpubgit/Object-Remote.git] / lib / Object / Remote / MiniLoop.pm
1 package Object::Remote::MiniLoop;
2
3 use IO::Select;
4 use Time::HiRes qw(time);
5 use Object::Remote::Logging qw( :log :dlog );
6 use Moo;
7
8 # this is ro because we only actually set it using local in sub run
9
10 has is_running => (is => 'ro', clearer => 'stop');
11 #maximum duration that select() will block - undef means indefinite,
12 #0 means no blocking, otherwise maximum time in seconds
13 has block_duration => ( is => 'rw' );
14
15 has _read_watches => (is => 'ro', default => sub { {} });
16 has _read_select => (is => 'ro', default => sub { IO::Select->new });
17
18 has _write_watches => (is => 'ro', default => sub { {} });
19 has _write_select => (is => 'ro', default => sub { IO::Select->new });
20
21 has _timers => (is => 'ro', default => sub { [] });
22
23 sub pass_watches_to {
24   my ($self, $new_loop) = @_;
25   log_debug { "passing watches to new run loop" };
26   foreach my $fh ($self->_read_select->handles) {
27     $new_loop->watch_io(
28       handle => $fh,
29       on_read_ready => $self->_read_watches->{$fh}
30     );
31   }
32   foreach my $fh ($self->_write_select->handles) {
33     $new_loop->watch_io(
34       handle => $fh,
35       on_write_ready => $self->_write_watches->{$fh}
36     );
37   }
38 }
39
40 sub watch_io {
41   my ($self, %watch) = @_;
42   my $fh = $watch{handle};
43   Dlog_debug { "Adding IO watch for $_" } $fh;
44   
45   if (my $cb = $watch{on_read_ready}) {
46     log_trace { "IO watcher is registering with select for reading" };
47     $self->_read_select->add($fh);
48     $self->_read_watches->{$fh} = $cb;
49   }
50   if (my $cb = $watch{on_write_ready}) {
51     log_trace { "IO watcher is registering with select for writing" };
52     $self->_write_select->add($fh);
53     $self->_write_watches->{$fh} = $cb;
54   }
55   return;
56 }
57
58 sub unwatch_io {
59   my ($self, %watch) = @_;
60   my $fh = $watch{handle};
61   Dlog_debug { "Removing IO watch for $_" } $fh;
62   if ($watch{on_read_ready}) {
63     log_trace { "IO watcher is removing read from select()" };
64     $self->_read_select->remove($fh);
65     delete $self->_read_watches->{$fh};
66   }
67   if ($watch{on_write_ready}) {
68     log_trace { "IO watcher is removing write from select()" };
69     $self->_write_select->remove($fh);
70     delete $self->_write_watches->{$fh};
71   }
72   return;
73 }
74
75 sub _sort_timers {
76   my ($self, @new) = @_;
77   my $timers = $self->_timers; 
78   
79   log_trace { "Sorting timers" };
80   
81   @{$timers} = sort { $a->[0] <=> $b->[0] } @{$timers}, @new;
82   return;   
83 }
84
85 sub watch_time {
86   my ($self, %watch) = @_;
87   my $at; 
88   
89   Dlog_trace { "watch_time() invoked with $_" } \%watch;
90  
91   if (exists($watch{every})) {
92     $at = time() + $watch{every};
93   } elsif (exists($watch{after})) {
94     $at = time() + $watch{after}; 
95   } elsif (exists($watch{at})) {
96       $at = $watch{at}; 
97   } else {
98       die "watch_time requires every, after or at";
99   }
100   
101   die "watch_time requires code" unless my $code = $watch{code};
102   my $timers = $self->_timers;
103   my $new = [ $at => $code, $watch{every} ];
104   $self->_sort_timers($new); 
105   log_debug { "Created new timer with id '$new' that expires at '$at'" };
106   return "$new";
107 }
108
109 sub unwatch_time {
110   my ($self, $id) = @_;
111   log_debug { "Removing timer with id of '$id'" };
112   @$_ = grep !($_ eq $id), @$_ for $self->_timers;
113   return;
114 }
115
116 sub _next_timer_expires_delay {
117   my ($self) = @_;
118   my $timers = $self->_timers;
119   my $delay_max = $self->block_duration;
120     
121   return $delay_max unless @$timers;
122   my $duration = $timers->[0]->[0] - time;
123
124   log_trace { "next timer fires in '$duration' seconds " };
125   
126   if ($duration < 0) {
127     $duration = 0; 
128   } elsif (defined $delay_max && $duration > $delay_max) {
129     $duration = $delay_max;
130   }
131   
132   return $duration; 
133 }
134
135 sub loop_once {
136   my ($self) = @_;
137   my $read = $self->_read_watches;
138   my $write = $self->_write_watches;
139   our $Loop_Entered = 1; 
140   my $read_count = 0;
141   my $write_count = 0; 
142   my @c = caller;
143   my $wait_time = $self->_next_timer_expires_delay;
144   log_trace {  sprintf("Run loop: loop_once() has been invoked by $c[1]:$c[2] with read:%i write:%i select timeout:%s",
145       scalar(keys(%$read)), scalar(keys(%$write)), defined $wait_time ? $wait_time : 'indefinite' ) };
146   my ($readable, $writeable) = IO::Select->select(
147     $self->_read_select, $self->_write_select, undef, $wait_time
148   ); 
149   log_trace { 
150     my $readable_count = defined $readable ? scalar(@$readable) : 0;
151     my $writable_count = defined $writeable ? scalar(@$writeable) : 0;
152     "Run loop: select returned readable:$readable_count writeable:$writable_count";
153   };
154   # I would love to trap errors in the select call but IO::Select doesn't
155   # differentiate between an error and a timeout.
156   #   -- no, love, mst.
157
158   local $Loop_Entered;
159
160   log_trace { "Reading from all ready filehandles" };
161   foreach my $fh (@$readable) {
162     next unless $read->{$fh};
163     $read_count++;
164     $read->{$fh}();
165     last if $Loop_Entered;
166   }
167   log_trace { "Writing to all ready filehandles" };
168   foreach my $fh (@$writeable) {
169     next unless $write->{$fh};
170     $write_count++;
171     $write->{$fh}();
172     last if $Loop_Entered;
173   }
174   
175   log_trace { "Read from $read_count filehandles; wrote to $write_count filehandles" };
176   my $timers = $self->_timers;
177   my $now = time();
178   log_trace { "Checking timers" };
179   while (@$timers and $timers->[0][0] <= $now) {
180     my $active = $timers->[0]; 
181     Dlog_trace { "Found timer that needs to be executed: '$active'" };
182      
183     if (defined($active->[2])) {
184       #handle the case of an 'every' timer
185       $active->[0] = time() + $active->[2]; 
186       Dlog_trace { "scheduling timer for repeat execution at $_"} $active->[0];
187       $self->_sort_timers;
188     } else {
189       #it doesn't repeat again so get rid of it  
190       shift(@$timers);    
191     }
192
193     #execute the timer
194     $active->[1]->();
195      
196     last if $Loop_Entered;
197   }
198   
199   log_trace { "Run loop: single loop is completed" };
200   return;
201 }
202
203 sub want_run {
204   my ($self) = @_;
205   Dlog_debug { "Run loop: Incrimenting want_running, is now $_" }
206     ++$self->{want_running};
207 }
208
209 sub run_while_wanted {
210   my ($self) = @_;
211   log_debug { my $wr = $self->{want_running}; "Run loop: run_while_wanted() invoked; want_running: $wr" };
212   $self->loop_once while $self->{want_running};
213   log_debug { "Run loop: run_while_wanted() completed" };
214   return;
215 }
216
217 sub want_stop {
218   my ($self) = @_;
219   if (! $self->{want_running}) {
220     log_debug { "Run loop: want_stop() was called but want_running was not true" };
221     return; 
222   }
223   Dlog_debug { "Run loop: decrimenting want_running, is now $_" }
224     --$self->{want_running};
225 }
226
227 sub run {
228   my ($self) = @_;
229   log_trace { "Run loop: run() invoked" };
230   local $self->{is_running} = 1;
231   while ($self->is_running) {
232     $self->loop_once;
233   }
234   log_trace { "Run loop: run() completed" };
235   return;
236 }
237
238 1;