From: Malcolm Beattie Date: Fri, 10 Oct 1997 17:22:46 +0000 (+0000) Subject: Rewrite thread destruction system using linked list of threads. X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=commitdiff_plain;h=7d901afa066fc56dbcf601e511c27b9ee7d1487b;p=p5sagit%2Fp5-mst-13.2.git Rewrite thread destruction system using linked list of threads. Still not completely done. Add methods self, equal, flags, list to Thread.xs. Add Thread_MAGIC_SIGNATURE check to typemap. p4raw-id: //depot/perlext/Thread@120 --- diff --git a/Thread.xs b/Thread.xs index 6750505..68c37ff 100644 --- a/Thread.xs +++ b/Thread.xs @@ -2,9 +2,26 @@ #include "perl.h" #include "XSUB.h" -static I32 threadnum = 0; +/* Magic signature for Thread's mg_private is "Th" */ +#define Thread_MAGIC_SIGNATURE 0x5468 + +static U32 threadnum = 0; static int sig_pipe[2]; +static void +remove_thread(t) +Thread t; +{ + DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(), + "%p: remove_thread %p\n", thr, t))); + MUTEX_LOCK(&threads_mutex); + nthreads--; + t->prev->next = t->next; + t->next->prev = t->prev; + COND_BROADCAST(&nthreads_cond); + MUTEX_UNLOCK(&threads_mutex); +} + static void * threadstart(arg) void *arg; @@ -140,6 +157,7 @@ void *arg; "%p detached...zapping returnav\n", thr)); SvREFCNT_dec(returnav); ThrSETSTATE(thr, THRf_DEAD); + remove_thread(thr); } DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr)); return (void *) returnav; /* Available for anyone to join with us */ @@ -166,6 +184,8 @@ char *class; SvGROW(sv, sizeof(struct thread) + 1); SvCUR_set(sv, sizeof(struct thread)); thr = (Thread) SvPVX(sv); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n", + savethread, SvPEEK(startsv), thr)); oursv = sv; /* If we don't zero these foostack pointers, init_stacks won't init them */ markstack = 0; @@ -180,9 +200,19 @@ char *class; /* top_env? */ /* runlevel */ cvcache = newHV(); - thrflags = 0; - ThrSETSTATE(thr, THRf_NORMAL); + thr->flags = THRf_NORMAL; + thr->tid = ++threadnum; + /* Insert new thread into the circular linked list and bump nthreads */ + MUTEX_LOCK(&threads_mutex); + thr->next = savethread->next; + thr->prev = savethread; + savethread->next = thr; + thr->next->prev = thr; + nthreads++; + MUTEX_UNLOCK(&threads_mutex); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n", + savethread)); /* The following pushes the arg list and startsv onto the *new* stack */ PUSHMARK(sp); /* Could easily speed up the following greatly */ @@ -202,9 +232,6 @@ char *class; * Increment the global thread count. It is decremented * by the destructor for the thread specific key thr_key. */ - MUTEX_LOCK(&nthreads_mutex); - nthreads++; - MUTEX_UNLOCK(&nthreads_mutex); sigfillset(&fullmask); if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1) croak("panic: sigprocmask"); @@ -215,9 +242,10 @@ char *class; if (sigprocmask(SIG_SETMASK, &oldmask, 0)) croak("panic: sigprocmask"); #endif - sv = newSViv(++threadnum); + sv = newSViv(thr->tid); sv_magic(sv, oursv, '~', 0, 0); - return sv_bless(newRV(sv), gv_stashpv(class, TRUE)); + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE)); } static Signal_t @@ -244,9 +272,8 @@ join(t) AV * av = NO_INIT int i = NO_INIT PPCODE: - DEBUG_L(PerlIO_printf(PerlIO_stderr(), - "%p: joining %p (state 0x%lx)\n", - thr, t, (unsigned long)ThrSTATE(t));); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n", + thr, t, ThrSTATE(t));); if (ThrSTATE(t) == THRf_DETACHED) croak("tried to join a detached thread"); else if (ThrSTATE(t) == THRf_JOINED) @@ -257,6 +284,8 @@ join(t) if (pthread_join(t->Tself, (void **) &av)) croak("pthread_join failed"); ThrSETSTATE(t, THRf_JOINED); + remove_thread(t); + /* Could easily speed up the following if necessary */ for (i = 0; i <= AvFILL(av); i++) XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE))); @@ -265,31 +294,56 @@ void detach(t) Thread t CODE: - DEBUG_L(PerlIO_printf(PerlIO_stderr(), - "%p: detaching %p (state 0x%lx)\n", - thr, t, (unsigned long)ThrSTATE(t));); + DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n", + thr, t, ThrSTATE(t));); if (ThrSTATE(t) == THRf_DETACHED) croak("tried to detach an already detached thread"); else if (ThrSTATE(t) == THRf_JOINED) croak("tried to detach an already joined thread"); else if (ThrSTATE(t) == THRf_DEAD) croak("tried to detach a dead thread"); - if (pthread_detach(t->Tself)) - croak("panic: pthread_detach failed"); + DETACH(t); ThrSETSTATE(t, THRf_DETACHED); void DESTROY(t) Thread t CODE: + DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(), + "%p: DESTROY(%p), state %u\n", + thr, t, ThrSTATE(t)))); + if (ThrSTATE(t) == THRf_NORMAL) { - if (pthread_detach(t->Tself)) - croak("panic: pthread_detach failed"); + DETACH(t); ThrSETSTATE(t, THRf_DETACHED); - thrflags |= THRf_DIE_FATAL; + t->flags |= THRf_DIE_FATAL; } void +equal(t1, t2) + Thread t1 + Thread t2 + PPCODE: + PUSHs((t1 == t2) ? &sv_yes : &sv_no); + +void +flags(t) + Thread t + PPCODE: + PUSHs(sv_2mortal(newSViv(t->flags))); + +void +self(class) + char * class + PREINIT: + SV *sv; + PPCODE: + sv = newSViv(thr->tid); + sv_magic(sv, oursv, '~', 0, 0); + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE)))); + +void yield() CODE: #ifdef OLD_PTHREADS_API @@ -361,6 +415,65 @@ CODE: COND_BROADCAST(MgCONDP(mg)); MUTEX_UNLOCK(MgMUTEXP(mg)); +void +list(class) + char * class + PREINIT: + Thread t; + AV * av; + SV ** svp; + int n = 0; + PPCODE: + av = newAV(); + /* + * Iterate until we have enough dynamic storage for all threads. + * We mustn't do any allocation while holding threads_mutex though. + */ + MUTEX_LOCK(&threads_mutex); + do { + n = nthreads; + MUTEX_UNLOCK(&threads_mutex); + if (AvFILL(av) < n - 1) { + int i = AvFILL(av); + for (i = AvFILL(av); i < n - 1; i++) { + SV *sv = newSViv(0); /* fill in tid later */ + sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */ + av_push(av, sv_bless(newRV_noinc(sv), + gv_stashpv(class, TRUE))); + } + } + MUTEX_LOCK(&threads_mutex); + } while (n < nthreads); + + /* + * At this point, there's enough room to fill in av. + * Note that we are holding threads_mutex so the list + * won't change out from under us but all the remaining + * processing is "fast" (no blocking, malloc etc.) + */ + t = thr; + svp = AvARRAY(av); + do { + SV *sv = SvRV(*svp++); + sv_setiv(sv, t->tid); + SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv); + SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED; + SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE; + t = t->next; + } while (t != thr); + /* Record the overflow */ + n -= nthreads; + MUTEX_UNLOCK(&threads_mutex); + /* Truncate any unneeded slots in av */ + if (n > 0) + av_fill(av, AvFILL(av) - n); + /* Finally, push all the new objects onto the stack and drop av */ + EXTEND(sp, n); + for (svp = AvARRAY(av); n > 0; n--, svp++) + PUSHs(*svp); + (void)sv_2mortal((SV*)av); + + MODULE = Thread PACKAGE = Thread::Signal void diff --git a/typemap b/typemap index 9a79e40..fd6e99d 100644 --- a/typemap +++ b/typemap @@ -9,7 +9,8 @@ T_XSCPTR if (!sv_isobject(sv)) croak(\"$var is not an object\"); sv = (SV*)SvRV(sv); - if (!SvRMAGICAL(sv) || !(mg = mg_find(sv, '~'))) + if (!SvRMAGICAL(sv) || !(mg = mg_find(sv, '~')) + || mg->mg_private != ${ntype}_MAGIC_SIGNATURE) croak(\"XSUB ${func_name}: $var is a forged ${ntype} object\"); $var = ($type) SvPVX(mg->mg_obj); DEBUG_L(PerlIO_printf(PerlIO_stderr(),