X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=ten%2Ften.tcl;h=fed99af8e1c0f593a80241d9f0359783aa786cef;hb=451af734a02bd95ae80362af9e0a4f6b6e4ee34e;hp=184b16181af234b5bae064c81971665d99c251ff;hpb=3c6a0ce7ac46c66d6b7b48d63cbfc0470ae991e1;p=scpubgit%2FTenDotTcl.git diff --git a/ten/ten.tcl b/ten/ten.tcl index 184b161..fed99af 100644 --- a/ten/ten.tcl +++ b/ten/ten.tcl @@ -1,16 +1,20 @@ -package require Tcl 8.4 +package require Tcl 8.5 package require snit +package require json::write +package require json namespace eval ::ten:: { set library [file dirname [info script]] + + proc nofuture {args} {} } snit::type ten::connector::perl { method connect {} { set conn_fh [open {|object-remote-node} r+] - set firstline [gets conn_fh] + set firstline [gets $conn_fh] switch $firstline { Shere {} default { error "Expected Shere, got $firstline" } @@ -24,7 +28,204 @@ snit::type ten::connector::perl { return $conn } -} +} + +snit::type ten::connection { + + option -send_to_fh + option -read_channel + + constructor {args} { + $self configurelist $args + fconfigure $options(-send_to_fh) -buffering none + $options(-read_channel) configurelist [list \ + -on_line_call [ mymethod Receive ] \ + -on_close_call [ mymethod ChannelClosed ] \ + ] + } + + method Receive {line} { + set tcl_line [ json::json2dict $line ] + set type [ lindex $tcl_line 0 ] + set rest [ lrange $tcl_line 1 end ] + $self "receive_$type" {*}$rest + } + + method receive_call_free {future_id id args} { + $self receive_call $future_id $id "" {*}$args + $self receive_free $id + } + + method receive_call {future_id id args} { + if {$future_id == "NULL"} { + set future ten::nofuture + } else { + set future $future_id + } + $self Invoke $future $id {*}$args + } + + method receive_free {id} { + $id free + } + + method Invoke {future local ctx method args} { + set result [$local $method {*}$args] + $future done $result + } + + method ChannelClosed {} { + error "Fail" + } + + method send {message_type args} { + set future [ten::future %AUTO%] + set call_args [list \ + [ json::write string $message_type ] \ + [ json::write string $future ] \ + {*}$args + ] + $self Send $call_args + return $future + } + + method Send {to_send} { + set send_this [ json::write array {*}$to_send ] + puts $options(-send_to_fh) $send_this + } + + method remote_object {id} { + return [ten::handle %AUTO% -connection $self -id $id] + } + +} + +snit::type ten::future { + + variable callbacks "" + variable is_ready 0 + variable result "" + variable failure "" + variable retain_count 1 + + method done {args} { + if [$self is_ready] { + error "Future $self already completed" + } + set result $args + $self MarkReady + } + + method fail {args} { + if [$self is_ready] { + error "Future $self already completed" + } + set failure $args + $self MarkReady + } + + method on_ready {cb_code} { + if [$self is_ready] { + eval [concat $cb_code $self] + } else { + lappend callbacks [list ready $cb_code] + } + } + + method on_done {cb_code} { + if [llength $result] { + eval [concat $cb_code [list $result]] + } else { + lappend callbacks [list done $cb_code] + } + } + + method on_fail {cb_code} { + if [llength $failure] { + eval [concat $cb_code [list $failure]] + } else { + lappend callbacks [list fail $cb_code] + } + } + + method MarkReady {} { + set is_ready 1 + foreach cb $callbacks { + set cb_type [lindex $cb 0] + set cb_code [lindex $cb 1] + switch $cb_type { + ready { eval [concat $cb_code $self] } + done { + if ![$self is_failure] { + eval [concat $cb_code [list $result]] + } + } + failed { + if [$self is_failure] { + eval [concat $cb_code [list $failure]] + } + } + } + } + } + + method AssertReady {} { + if {!$is_ready} { + error "Future not ready" + } + } + + method is_ready {} { return $is_ready } + + method get {} { + $self AssertReady + if [$self is_failure] { + error $failure + } + return $result + } + + method is_failure {} { + $self AssertReady + if [llength $failure] { + return 1 + } + return 0 + } + + method failure {} { + $self AssertReady + return $failure + } + + method await_ready {} { + vwait "${selfns}::is_ready" + } + + method await_get {} { + $self retain + $self await_ready + if [$self is_failure] { + set err [$self failure] + $self free + error $err + } + set res [$self get] + $self free + return $res + } + + method retain {} { + incr retain_count + } + + method free {} { + set retain_count [ expr $retain_count - 1 ] + if {$retain_count == 0} { + $self destroy + } + } +} snit::type ten::read_channel { option -fh @@ -45,12 +246,25 @@ snit::type ten::read_channel { } } else { if [llength $options(-on_line_call)] { - while {[llength [set line [gets $chan]]] > 0} { - eval [concat $options(-on_line_call) $line] + while {[string length [set line [gets $chan]]] > 0} { + eval [concat $options(-on_line_call) [ list $line ]] } } } } } -package provide ten 0.0.01 +snit::type ten::handle { + option -connection + option -id + + method call {name args} { + return [[$self start $name {*}$args] await_get] + } + + method start {name args} { + $options(-connection) send call [json::write string $options(-id)] 0 [json::write string $name] {*}$args + } +} + +package provide ten 0.0.1