From: Malcolm Beattie Date: Wed, 15 Oct 1997 16:57:45 +0000 (+0000) Subject: Finish thread state machine: fixes global destruction of threads, X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=50112d625f45627e1da4e48d0e830677a15d383b;p=p5sagit%2Fp5-mst-13.2.git Finish thread state machine: fixes global destruction of threads, detaching, joining etc. Alter FAKE_THREADS-specific fields to use new HAVE_THREAD_INTERN stuff. Updates docs. Various fixes to Thread.xs. p4raw-id: //depot/perlext/Thread@131 --- diff --git a/Thread.xs b/Thread.xs index ef1363f..a5382d9 100644 --- a/Thread.xs +++ b/Thread.xs @@ -32,7 +32,7 @@ void *arg; dSP; I32 oldscope = scopestack_ix; I32 retval; - AV *returnav = newAV(); + AV *returnav; int i; DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n", @@ -71,7 +71,7 @@ void *arg; I32 oldmark = TOPMARK; I32 oldscope = scopestack_ix; I32 retval; - AV *returnav = newAV(); + AV *returnav; int i, ret; dJMPENV; @@ -83,8 +83,8 @@ void *arg; * if we went and created another thread which tried to pthread_join * with us, then we'd be in a mess. */ - MUTEX_LOCK(thr->mutex); - MUTEX_UNLOCK(thr->mutex); + MUTEX_LOCK(&thr->mutex); + MUTEX_UNLOCK(&thr->mutex); /* * It's safe to wait until now to set the thread-specific pointer @@ -132,6 +132,7 @@ void *arg; PerlIO_printf(PerlIO_stderr(), "%p returnav[%d] = %s\n", thr, i, SvPEEK(sp[i - 1]));) + returnav = newAV(); av_store(returnav, 0, newSVpv("", 0)); for (i = 1; i <= retval; i++, sp++) sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp)); @@ -150,6 +151,9 @@ void *arg; Safefree(tmps_stack); MUTEX_LOCK(&thr->mutex); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: threadstart finishing: state is %u\n", + thr, ThrSTATE(thr))); switch (ThrSTATE(thr)) { case THRf_R_JOINABLE: ThrSETSTATE(thr, THRf_ZOMBIE); @@ -160,16 +164,17 @@ void *arg; case THRf_R_JOINED: ThrSETSTATE(thr, THRf_DEAD); MUTEX_UNLOCK(&thr->mutex); + remove_thread(thr); DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: R_JOINED thread finished\n", thr)); break; - case THRf_DETACHED: + case THRf_R_DETACHED: ThrSETSTATE(thr, THRf_DEAD); MUTEX_UNLOCK(&thr->mutex); - remove_thread(thr); SvREFCNT_dec(returnav); DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: DETACHED thread finished\n", thr)); + remove_thread(thr); /* This might trigger main thread to finish */ break; default: MUTEX_UNLOCK(&thr->mutex); @@ -177,7 +182,6 @@ void *arg; /* NOTREACHED */ } MUTEX_DESTROY(&thr->mutex); - DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr)); return (void *) returnav; /* Available for anyone to join with us */ /* unless we are detached in which case */ /* noone will see the value anyway. */ @@ -202,7 +206,7 @@ char *class; SvGROW(sv, sizeof(struct thread) + 1); SvCUR_set(sv, sizeof(struct thread)); thr = (Thread) SvPVX(sv); - DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n", + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n", savethread, SvPEEK(startsv), thr)); oursv = sv; /* If we don't zero these foostack pointers, init_stacks won't init them */ @@ -230,8 +234,9 @@ char *class; nthreads++; MUTEX_UNLOCK(&threads_mutex); - DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n", - savethread)); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "%p: newthread, tid is %u, preparing stack\n", + savethread, thr->tid)); /* The following pushes the arg list and startsv onto the *new* stack */ PUSHMARK(sp); /* Could easily speed up the following greatly */ @@ -246,8 +251,7 @@ char *class; /* On your marks... */ MUTEX_LOCK(&thr->mutex); /* Get set... - * Increment the global thread count. It is decremented - * by the destructor for the thread specific key thr_key. + * Increment the global thread count. */ sigfillset(&fullmask); if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1) @@ -291,20 +295,20 @@ join(t) PPCODE: DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n", thr, t, ThrSTATE(t));); - MUTEX_LOCK(&thr->mutex); - switch (ThrSTATE(thr)) { + MUTEX_LOCK(&t->mutex); + switch (ThrSTATE(t)) { case THRf_R_JOINABLE: case THRf_R_JOINED: - ThrSETSTATE(thr, THRf_R_JOINED); - MUTEX_UNLOCK(&thr->mutex); + ThrSETSTATE(t, THRf_R_JOINED); + MUTEX_UNLOCK(&t->mutex); break; case THRf_ZOMBIE: - ThrSETSTATE(thr, THRf_DEAD); - MUTEX_UNLOCK(&thr->mutex); - remove_thread(thr); + ThrSETSTATE(t, THRf_DEAD); + MUTEX_UNLOCK(&t->mutex); + remove_thread(t); break; default: - MUTEX_UNLOCK(&thr->mutex); + MUTEX_UNLOCK(&t->mutex); croak("can't join with thread"); /* NOTREACHED */ } @@ -321,22 +325,23 @@ detach(t) CODE: DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n", thr, t, ThrSTATE(t));); - switch (ThrSTATE(thr)) { + MUTEX_LOCK(&t->mutex); + switch (ThrSTATE(t)) { case THRf_R_JOINABLE: - ThrSETSTATE(thr, THRf_DETACHED); + ThrSETSTATE(t, THRf_R_DETACHED); /* fall through */ - case THRf_DETACHED: - MUTEX_UNLOCK(&thr->mutex); + case THRf_R_DETACHED: DETACH(t); + MUTEX_UNLOCK(&t->mutex); break; case THRf_ZOMBIE: - ThrSETSTATE(thr, THRf_DEAD); - MUTEX_UNLOCK(&thr->mutex); - remove_thread(thr); + ThrSETSTATE(t, THRf_DEAD); DETACH(t); + MUTEX_UNLOCK(&t->mutex); + remove_thread(t); break; default: - MUTEX_UNLOCK(&thr->mutex); + MUTEX_UNLOCK(&t->mutex); croak("can't detach thread"); /* NOTREACHED */ } @@ -365,6 +370,22 @@ self(class) SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE)))); +U32 +tid(t) + Thread t + CODE: + MUTEX_LOCK(&t->mutex); + RETVAL = t->tid; + MUTEX_UNLOCK(&t->mutex); + OUTPUT: + RETVAL + +void +DESTROY(t) + SV * t + PPCODE: + PUSHs(&sv_yes); + void yield() CODE: @@ -393,6 +414,8 @@ CODE: } MgOWNER(mg) = 0; COND_WAIT(MgCONDP(mg), MgMUTEXP(mg)); + while (MgOWNER(mg)) + COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg)); MgOWNER(mg) = thr; MUTEX_UNLOCK(MgMUTEXP(mg)); @@ -401,13 +424,9 @@ cond_signal(sv) SV * sv MAGIC * mg = NO_INIT CODE: - if (SvROK(sv)) { - /* - * Kludge to allow lock of real objects without requiring - * to pass in every type of argument by explicit reference. - */ + if (SvROK(sv)) sv = SvRV(sv); - } + mg = condpair_magic(sv); DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv)); MUTEX_LOCK(MgMUTEXP(mg)); @@ -455,6 +474,7 @@ list(class) do { n = nthreads; MUTEX_UNLOCK(&threads_mutex); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "list: n = %d\n", n)); if (AvFILL(av) < n - 1) { int i = AvFILL(av); for (i = AvFILL(av); i < n - 1; i++) { @@ -462,10 +482,12 @@ list(class) sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */ av_push(av, sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))); + } } MUTEX_LOCK(&threads_mutex); } while (n < nthreads); + n = nthreads; /* Get the final correct value */ /* * At this point, there's enough room to fill in av. @@ -477,18 +499,18 @@ list(class) svp = AvARRAY(av); do { SV *sv = SvRV(*svp++); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), + "list: filling in thread %p\n", t)); sv_setiv(sv, t->tid); SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv); SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED; SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; t = t->next; } while (t != thr); - /* Record the overflow */ - n -= nthreads; + /* */ MUTEX_UNLOCK(&threads_mutex); /* Truncate any unneeded slots in av */ - if (n > 0) - av_fill(av, AvFILL(av) - n); + av_fill(av, n - 1); /* Finally, push all the new objects onto the stack and drop av */ EXTEND(sp, n); for (svp = AvARRAY(av); n > 0; n--, svp++) diff --git a/queue.t b/queue.t index c87dcee..4672ba6 100644 --- a/queue.t +++ b/queue.t @@ -4,19 +4,33 @@ use Thread::Queue; $q = new Thread::Queue; sub reader { - my $i; - for ($i = 1; $i <= 10; $i++) { - print "reader: waiting for element $i...\n"; + my $tid = Thread->self->tid; + my $i = 0; + while (1) { + $i++; + print "reader (tid $tid): waiting for element $i...\n"; my $el = $q->dequeue; - print "reader: dequeued element $i: value $el\n"; + print "reader (tid $tid): dequeued element $i: value $el\n"; + select(undef, undef, undef, rand(2)); + if ($el == -1) { + # end marker + print "reader (tid $tid) returning\n"; + return; + } } } -new Thread \&reader; -my $i; -for ($i = 1; $i <= 10; $i++) { +my $nthreads = 3; + +for (my $i = 0; $i < $nthreads; $i++) { + Thread->new(\&reader, $i); +} + +for (my $i = 1; $i <= 10; $i++) { my $el = int(rand(100)); select(undef, undef, undef, rand(2)); print "writer: enqueuing value $el\n"; $q->enqueue($el); } + +$q->enqueue((-1) x $nthreads); # one end marker for each thread