has read_fh => (is => 'ro', required => 1);
has write_fh => (is => 'ro', required => 1);
+has _read_buf => (is => 'ro', default => sub { my $x = ''; \$x });
+
sub BUILD { shift->write_fh->autoflush(1); }
-sub read_message {
- my ($self) = @_;
- if (defined(my $line = readline($self->read_fh))) {
- log_trace { "Received $line" };
- if (my $unpacked = $self->_unpack_line($line)) {
- return $unpacked;
+sub read_messages {
+ my ($self, $cb) = @_;
+ my $rb = $self->_read_buf;
+ if (sysread($self->read_fh, $$rb, 1024, length($$rb)) > 0) {
+ while ($$rb =~ s/^(.*)\n//) {
+ my $line = $1;
+ log_trace { "Received $line" };
+ if (my $unpacked = $self->_unpack_line($line)) {
+ $cb->(@$unpacked);
+ }
}
} else {
- return [ 'close', 'channel' ];
+ log_trace { "Closing" };
+ $cb->('close', 'channel');
}
}
sub _raw_write_message {
my ($self, $raw) = @_;
#warn "Sending: ${raw}\n";
- print { $self->write_fh } $raw."\n";
+ print { $self->write_fh } $raw."\n"
+ or log_error { "Error writing: $!" };
}
1;