From: Malcolm Beattie Date: Thu, 10 Apr 1997 20:17:26 +0000 (+0000) Subject: Initial check-in of Thread module. X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=d9bb3666254d7e5b84344a4af97d56090a5c1e96;p=p5sagit%2Fp5-mst-13.2.git Initial check-in of Thread module. p4raw-id: //depot/perlext/Thread@6 --- d9bb3666254d7e5b84344a4af97d56090a5c1e96 diff --git a/Makefile.PL b/Makefile.PL new file mode 100644 index 0000000..414df14 --- /dev/null +++ b/Makefile.PL @@ -0,0 +1,2 @@ +use ExtUtils::MakeMaker; +WriteMakefile(); diff --git a/Notes b/Notes new file mode 100644 index 0000000..1505877 --- /dev/null +++ b/Notes @@ -0,0 +1,13 @@ +Should cvcache be per CV (keyed by thread) or per thread (keyed by CV)? + +Maybe ought to protect all SVs by a mutex for SvREFCNT_{dec,inc}, +upgrades and so on. Then use SvMUTEX instead of CvMUTEX for CVs. +On the other hand, people shouldn't expect concurrent operations +on non-lexicals to be safe anyway. + +Probably don't need to bother keeping track of CvOWNER on clones. + +Either @_ needs to be made lexical or other arrangments need to be +made so that some globs (or just *_) are per-thread. + +tokenbuf and buf probably ought to be global protected by a global lock. diff --git a/README b/README new file mode 100644 index 0000000..32a891c --- /dev/null +++ b/README @@ -0,0 +1,19 @@ +The file thrpatch-oct1 contains patches against perl5.001m which makes +a first stab at a multithreading perl5. If your version of patch can't +create file from scratch, then you'll need to create an empty thread.h +manually first. Perl itself will need to be built with -DUSE_THREADS +and very probably -DDEBUGGING since I haven't tested it without that +yet. If you're using MIT pthreads or another threads package that +needs pthread_init() to be called, then add -DNEED_PTHREAD_INIT. If +you're not using a threads library that follows the latest POSIX draft, +then you'll probably need to add -DOLD_PTHREADS_API. I haven't tested +-DOLD_PTHREADS_API properly yet and I think you may still have to tweak +a couple of the mutex calls to follow the old API. + +These patches are copyright Malcolm Beattie 1995 and are freely +distributable under your choice of the GNU Public License or the +Artistic License (see the main perl distribution). + +These are very preliminary patches and although it should be sufficient +to show roughly what's been going on, they're almost certainly not +going to produce a perl of any practical use yet. diff --git a/Thread.pm b/Thread.pm new file mode 100644 index 0000000..9ea8cd8 --- /dev/null +++ b/Thread.pm @@ -0,0 +1,20 @@ +package Thread; +require Exporter; +require DynaLoader; +@ISA = qw(Exporter DynaLoader); +@EXPORT_OK = qw(sync fast yield); + +warn "about to bootstrap Thread\n"; +bootstrap Thread; + +my $cv; +foreach $cv (\&yield, \&sync, \&join, \&fast, + \&waituntil, \&signal, \&broadcast) { + warn "Thread.pm: calling fast($cv)\n"; + fast($cv); +} + +sync(\&new); # not sure if this needs to be sync'd +sync(\&Thread::Cond::new); # this needs syncing because of condpair_table + +1; diff --git a/Thread.xs b/Thread.xs new file mode 100644 index 0000000..dcb2d36 --- /dev/null +++ b/Thread.xs @@ -0,0 +1,319 @@ +#include "EXTERN.h" +#include "perl.h" +#include "XSUB.h" + +typedef struct condpair { + pthread_mutex_t mutex; + pthread_cond_t cond; + Thread owner; +} condpair_t; + +AV *condpair_table; +typedef SSize_t Thread__Cond; + +static void * +threadstart(arg) +void *arg; +{ + Thread thr = (Thread) arg; + LOGOP myop; + dSP; + I32 oldmark = TOPMARK; + I32 oldscope = scopestack_ix; + I32 retval; + AV *returnav = newAV(); + int i; + + /* + * Wait until our creator releases us. If we didn't do this, then + * it would be potentially possible for out thread to carry on and + * do stuff before our creator fills in our "self" field. For example, + * if we went and created another thread which tried to pthread_join + * with us, then we'd be in a mess. + */ + MUTEX_LOCK(threadstart_mutexp); + MUTEX_UNLOCK(threadstart_mutexp); + MUTEX_DESTROY(threadstart_mutexp); /* don't need it any more */ + Safefree(threadstart_mutexp); + + DEBUG_L(fprintf(stderr, "new thread 0x%lx starting at %s\n", + (unsigned long) thr, SvPEEK(TOPs))); + /* + * It's safe to wait until now to set the thread-specific pointer + * from our pthread_t structure to our struct thread, since we're + * the only thread who can get at it anyway. + */ + if (pthread_setspecific(thr_key, (void *) thr)) + croak("panic: pthread_setspecific"); + + switch (Sigsetjmp(top_env,1)) { + case 3: + fprintf(stderr, "panic: top_env\n"); + /* fall through */ + case 1: +#ifdef VMS + statusvalue = 255; +#else + statusvalue = 1; +#endif + /* fall through */ + case 2: + av_store(returnav, 0, newSViv(statusvalue)); + goto finishoff; + } + + /* Now duplicate most of perl_call_sv but with a few twists */ + op = (OP*)&myop; + Zero(op, 1, LOGOP); + myop.op_flags = OPf_STACKED; + myop.op_next = Nullop; + myop.op_flags |= OPf_KNOW; + myop.op_flags |= OPf_LIST; + op = pp_entersub(ARGS); + if (op) + runops(); + retval = stack_sp - (stack_base + oldmark); + sp -= retval; + av_store(returnav, 0, newSVpv("", 0)); + for (i = 1; i <= retval; i++) + sv_setsv(*av_fetch(returnav, i, TRUE), *sp++); + + finishoff: + SvREFCNT_dec(stack); + SvREFCNT_dec(cvcache); + Safefree(markstack); + Safefree(scopestack); + Safefree(savestack); + Safefree(retstack); + Safefree(cxstack); + Safefree(tmps_stack); + + return (void *) returnav; /* Available for anyone to join with us */ +} + +Thread +newthread(startsv, initargs) +SV *startsv; +AV *initargs; +{ + dTHR; + dSP; + Thread savethread; + int i; + + savethread = thr; + New(53, thr, 1, struct thread); + init_stacks(ARGS); + SPAGAIN; + defstash = savethread->Tdefstash; /* XXX maybe these should */ + curstash = savethread->Tcurstash; /* always be set to main? */ + mainstack = stack; + /* top_env? */ + /* runlevel */ + cvcache = newHV(); + + /* The following pushes the arg list and startsv onto the *new* stack */ + PUSHMARK(sp); + /* Could easily speed up the following greatly */ + for (i = 0; i < AvFILL(initargs); i++) + XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE))); + XPUSHs(SvREFCNT_inc(startsv)); + PUTBACK; + + New(53, threadstart_mutexp, 1, pthread_mutex_t); + /* On your marks... */ + MUTEX_INIT(threadstart_mutexp); + MUTEX_LOCK(threadstart_mutexp); + /* Get set... + * Increment the global thread count. It is decremented + * by the destructor for the thread specific key thr_key. + */ + MUTEX_LOCK(&nthreads_mutex); + nthreads++; + MUTEX_UNLOCK(&nthreads_mutex); + if (pthread_create(&self, NULL, threadstart, (void*) thr)) + return NULL; /* XXX should clean up first */ + /* Go */ + MUTEX_UNLOCK(threadstart_mutexp); + return thr; +} + +void condpair_kick(SSize_t cond, SV *code, int broadcast_flag) { + condpair_t *condp; + HV *hvp; + GV *gvp; + CV *cv = sv_2cv(code, &hvp, &gvp, FALSE); + SV *sv = *av_fetch(condpair_table, cond, TRUE); + dTHR; + + if (!SvOK(sv)) + croak("bad Cond object argument"); + condp = (condpair_t *) SvPVX(sv); + /* Get ownership of condpair object */ + MUTEX_LOCK(&condp->mutex); + while (condp->owner && condp->owner != thr) + COND_WAIT(&condp->cond, &condp->mutex); + if (condp->owner == thr) { + MUTEX_UNLOCK(&condp->mutex); + croak("Recursing in Thread::Cond::waituntil"); + } + condp->owner = thr; + MUTEX_UNLOCK(&condp->mutex); + /* We now own the condpair object */ + perl_call_sv(code, G_SCALAR|G_NOARGS|G_DISCARD|G_EVAL); + /* Release condpair object */ + MUTEX_LOCK(&condp->mutex); + condp->owner = 0; + /* Signal or Broadcast condpair */ + if (broadcast_flag) + COND_BROADCAST(&condp->cond); + else + COND_SIGNAL(&condp->cond); + MUTEX_UNLOCK(&condp->mutex); + /* Check we don't need to propagate a die */ + sv = GvSV(gv_fetchpv("@", TRUE, SVt_PV)); + if (SvTRUE(sv)) + croak(SvPV(sv, na)); +} + +static SV * +fast(sv) +SV *sv; +{ + HV *hvp; + GV *gvp; + CV *cv = sv_2cv(sv, &hvp, &gvp, FALSE); + + if (!cv) + croak("Not a CODE reference"); + if (CvCONDP(cv)) { + COND_DESTROY(CvCONDP(cv)); + Safefree(CvCONDP(cv)); + CvCONDP(cv) = 0; + } + return sv; +} + +MODULE = Thread PACKAGE = Thread + +Thread +new(class, startsv, ...) + SV * class + SV * startsv + AV * av = av_make(items - 1, &ST(2)); + CODE: + RETVAL = newthread(startsv, av); + OUTPUT: + RETVAL + +void +sync(sv) + SV * sv + HV * hvp = NO_INIT + GV * gvp = NO_INIT + CODE: + SvFLAGS(sv_2cv(sv, &hvp, &gvp, FALSE)) |= SVpcv_SYNC; + ST(0) = sv_mortalcopy(sv); + +void +fast(sv) + SV * sv + CODE: + ST(0) = sv_mortalcopy(fast(sv)); + +void +join(t) + Thread t + AV * av = NO_INIT + int i = NO_INIT + PPCODE: + if (pthread_join(t->Tself, (void **) &av)) + croak("pthread_join failed"); + /* Could easily speed up the following if necessary */ + for (i = 0; i <= AvFILL(av); i++) + XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); + +void +yield(t) + Thread t + CODE: + pthread_yield(); + +MODULE = Thread PACKAGE = Thread::Cond + +Thread::Cond +new(class) + char * class + SV * sv = NO_INIT + condpair_t * condp = NO_INIT + CODE: + if (!condpair_table) + condpair_table = newAV(); + sv = newSVpv("", 0); + sv_grow(sv, sizeof(condpair_t)); + condp = (condpair_t *) SvPVX(sv); + MUTEX_INIT(&condp->mutex); + COND_INIT(&condp->cond); + condp->owner = 0; + av_push(condpair_table, sv); + RETVAL = AvFILL(condpair_table); + OUTPUT: + RETVAL + +void +waituntil(cond, code) + Thread::Cond cond + SV * code + SV * sv = NO_INIT + condpair_t * condp = NO_INIT + HV * hvp = NO_INIT + GV * gvp = NO_INIT + CV * cv = sv_2cv(code, &hvp, &gvp, FALSE); + I32 count = NO_INIT + CODE: + sv = *av_fetch(condpair_table, cond, TRUE); + if (!SvOK(sv)) + croak("bad Cond object argument"); + condp = (condpair_t *) SvPVX(sv); + do { + /* Get ownership of condpair object */ + MUTEX_LOCK(&condp->mutex); + while (condp->owner && condp->owner != thr) + COND_WAIT(&condp->cond, &condp->mutex); + if (condp->owner == thr) { + MUTEX_UNLOCK(&condp->mutex); + croak("Recursing in Thread::Cond::waituntil"); + } + condp->owner = thr; + MUTEX_UNLOCK(&condp->mutex); + /* We now own the condpair object */ + count = perl_call_sv(code, G_SCALAR|G_NOARGS|G_EVAL); + SPAGAIN; + /* Release condpair object */ + MUTEX_LOCK(&condp->mutex); + condp->owner = 0; + MUTEX_UNLOCK(&condp->mutex); + /* See if we need to go round again */ + if (count == 0) + croak(SvPV(GvSV(gv_fetchpv("@", TRUE, SVt_PV)), na)); + else if (count > 1) + croak("waituntil code returned more than one value"); + sv = POPs; + PUTBACK; + } while (!SvTRUE(sv)); + ST(0) = sv_mortalcopy(sv); + +void +signal(cond, code) + Thread::Cond cond + SV * code + CODE: + condpair_kick(cond, code, 0); + +void +broadcast(cond, code) + Thread::Cond cond + SV * code + CODE: + condpair_kick(cond, code, 1); + diff --git a/cond.t b/cond.t new file mode 100644 index 0000000..27ea9a9 --- /dev/null +++ b/cond.t @@ -0,0 +1,26 @@ +use Thread 'fast'; + +sub printstuff { + my $count = 2000; + while ($count--) { + $lock->waituntil(sub { $inuse ? 0 : ($inuse = 1) }); + print "A"; + $lock->signal(sub { $inuse = 0 }); + } + $lock->signal(sub { $inuse = 42 }); +} + +$| = 1; +$inuse = 0; +$lock = new Thread::Cond; +$t = new Thread \&printstuff; +PAUSE: while (!$done) { + sleep 3; + $lock->waituntil(sub { + $inuse != 42 ? $inuse ? 0 : ($inuse = 1) : ($done = 1, 0) + }); + last PAUSE if $done; + sleep 1; + $lock->signal(sub { $inuse = 0 }); +} +print "main exiting\n"; diff --git a/create.t b/create.t new file mode 100644 index 0000000..7d6d189 --- /dev/null +++ b/create.t @@ -0,0 +1,17 @@ +use Thread; +sub start_here { + my $i; + print "In start_here with args: @_\n"; + for ($i = 1; $i <= 5; $i++) { + print "start_here: $i\n"; + sleep 1; + } +} + +print "Starting new thread now\n"; +$t = new Thread \&start_here, qw(foo bar baz); +print "Started thread $t\n"; +for ($count = 1; $count <= 5; $count++) { + print "main: $count\n"; + sleep 1; +} diff --git a/io.t b/io.t new file mode 100644 index 0000000..8818318 --- /dev/null +++ b/io.t @@ -0,0 +1,19 @@ +use Thread; + +sub reader { + my $line; + while ($line = ) { + print "reader: $line"; + } + print "End of input in reader\n"; + return 0; +} + +$r = new Thread \&reader; +$count = 20; +while ($count--) { + sleep 1; + print "ping $count\n"; +} + + diff --git a/join.t b/join.t new file mode 100644 index 0000000..640256a --- /dev/null +++ b/join.t @@ -0,0 +1,11 @@ +use Thread; +sub foo { + print "In foo with args: @_\n"; + return (7, 8, 9); +} + +print "Starting thread\n"; +$t = new Thread \&foo, qw(foo bar baz); +print "Joining with $t\n"; +@results = $t->join(); +print "Joining returned @results\n"; diff --git a/sync.t b/sync.t new file mode 100644 index 0000000..3b7b1e4 --- /dev/null +++ b/sync.t @@ -0,0 +1,61 @@ +use Thread; + +$level = 0; + +sub single_file { + my $arg = shift; + $level++; + print "Level $level for $arg\n"; + print "(something is wrong)\n" if $level < 0 || $level > 1; + sleep 1; + $level--; + print "Back to level $level\n"; +} + +sub start_bar { + my $i; + print "start bar\n"; + for $i (1..3) { + print "bar $i\n"; + single_file("bar $i"); + sleep 1 if rand > 0.5; + } + print "end bar\n"; + return 1; +} + +sub start_foo { + my $i; + print "start foo\n"; + for $i (1..3) { + print "foo $i\n"; + single_file("foo $i"); + sleep 1 if rand > 0.5; + } + print "end foo\n"; + return 1; +} + +sub start_baz { + my $i; + print "start baz\n"; + for $i (1..3) { + print "baz $i\n"; + single_file("baz $i"); + sleep 1 if rand > 0.5; + } + print "end baz\n"; + return 1; +} + +$| = 1; +srand($$^$^T); +Thread::sync(\&single_file); + +$foo = new Thread \&start_foo; +$bar = new Thread \&start_bar; +$baz = new Thread \&start_baz; +$foo->join(); +$bar->join(); +$baz->join(); +print "main: threads finished, exiting\n"; diff --git a/sync2.t b/sync2.t new file mode 100644 index 0000000..9230d82 --- /dev/null +++ b/sync2.t @@ -0,0 +1,59 @@ +use Thread; + +$global = undef; + +sub single_file { + my $who = shift; + my $i; + + print "Uh oh: $who entered while locked by $global\n" if $global; + $global = $who; + print "["; + for ($i = 0; $i < int(50 * rand); $i++) { + print $who; + } + print "]"; + $global = undef; +} + +sub start_a { + my ($i, $j); + for ($j = 0; $j < 50; $j++) { + single_file("A"); + for ($i = 0; $i < int(50 * rand); $i++) { + print "a"; + } + } +} + +sub start_b { + my ($i, $j); + for ($j = 0; $j < 50; $j++) { + single_file("A"); + for ($i = 0; $i < int(50 * rand); $i++) { + print "b"; + } + } +} + +sub start_c { + my ($i, $j); + for ($j = 0; $j < 50; $j++) { + single_file("c"); + for ($i = 0; $i < int(50 * rand); $i++) { + print "C"; + } + } +} + +$| = 1; +srand($$^$^T); +Thread::sync(\&single_file); + +$foo = new Thread \&start_a; +$bar = new Thread \&start_b; +$baz = new Thread \&start_c; +print "\nmain: joining...\n"; +$foo->join; +$bar->join; +$baz->join; diff --git a/typemap b/typemap new file mode 100644 index 0000000..8d39ff1 --- /dev/null +++ b/typemap @@ -0,0 +1,13 @@ +Thread T_IVOBJ +Thread::Cond T_IVOBJ + +INPUT +T_IVOBJ + if (sv_isobject($arg)) + $var = ($type) SvIV((SV*)SvRV($arg)); + else + croak(\"$var is not an object\") + +OUTPUT +T_IVOBJ + sv_setref_iv($arg, \"${ntype}\", (IV)($var)); diff --git a/unsync.t b/unsync.t new file mode 100644 index 0000000..d2d97e9 --- /dev/null +++ b/unsync.t @@ -0,0 +1,35 @@ +use Thread; + +$| = 1; + +if (@ARGV) { + srand($ARGV[0]); +} else { + my $seed = $$ ^ $^T; + print "Randomising to $seed\n"; + srand($seed); +} + +sub whoami { + my ($depth, $a, $b, $c) = @_; + my $i; + print "whoami ($depth): $a $b $c\n"; + sleep 1; + whoami($depth - 1, $a, $b, $c) if $depth > 0; +} + +sub start_foo { + my $r = 3 + int(10 * rand); + print "start_foo: r is $r\n"; + whoami($r, "start_foo", "foo1", "foo2"); +} + +sub start_bar { + my $r = 3 + int(10 * rand); + print "start_bar: r is $r\n"; + whoami($r, "start_bar", "bar1", "bar2"); +} + +$foo = new Thread \&start_foo; +$bar = new Thread \&start_bar; +print "main: exiting\n"; diff --git a/unsync2.t b/unsync2.t new file mode 100644 index 0000000..fb955ac --- /dev/null +++ b/unsync2.t @@ -0,0 +1,36 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub printargs { + my $thread = shift; + my $arg; + my $i; + while ($arg = shift) { + my $delay = int(rand(500)); + $i++; + print "$thread arg $i is $arg\n"; + 1 while $delay--; + } +} + +sub start_thread { + my $thread = shift; + my $count = 10; + while ($count--) { + my(@args) = ($thread) x int(rand(10)); + print "$thread $count calling printargs @args\n"; + printargs($thread, @args); + } +} + +new Thread (\&start_thread, "A"); +new Thread (\&start_thread, "B"); +#new Thread (\&start_thread, "C"); +#new Thread (\&start_thread, "D"); +#new Thread (\&start_thread, "E"); +#new Thread (\&start_thread, "F"); + +print "main: exiting\n"; diff --git a/unsync3.t b/unsync3.t new file mode 100644 index 0000000..e03e9c8 --- /dev/null +++ b/unsync3.t @@ -0,0 +1,50 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub whoami { + my $thread = shift; + print $thread; +} + +sub uppercase { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print "A"; + $i = int(rand(1000)); + 1 while $i--; + whoami("B"); + } +} + +sub lowercase { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print "x"; + $i = int(rand(1000)); + 1 while $i--; + whoami("y"); + } +} + +sub numbers { + my $count = 100; + while ($count--) { + my $i = int(rand(1000)); + 1 while $i--; + print 1; + $i = int(rand(1000)); + 1 while $i--; + whoami(2); + } +} + +new Thread \&numbers; +new Thread \&uppercase; +new Thread \&lowercase; diff --git a/unsync4.t b/unsync4.t new file mode 100644 index 0000000..494ad2b --- /dev/null +++ b/unsync4.t @@ -0,0 +1,38 @@ +use Thread; + +$| = 1; + +srand($$^$^T); + +sub printargs { + my(@copyargs) = @_; + my $thread = shift @copyargs; + my $arg; + my $i; + while ($arg = shift @copyargs) { + my $delay = int(rand(500)); + $i++; + print "$thread arg $i is $arg\n"; + 1 while $delay--; + } +} + +sub start_thread { + my(@threadargs) = @_; + my $thread = $threadargs[0]; + my $count = 10; + while ($count--) { + my(@args) = ($thread) x int(rand(10)); + print "$thread $count calling printargs @args\n"; + printargs($thread, @args); + } +} + +new Thread (\&start_thread, "A"); +new Thread (\&start_thread, "B"); +new Thread (\&start_thread, "C"); +new Thread (\&start_thread, "D"); +new Thread (\&start_thread, "E"); +new Thread (\&start_thread, "F"); + +print "main: exiting\n";