Rewrite thread destruction system using linked list of threads.
[p5sagit/p5-mst-13.2.git] / Thread.xs
index 6750505..68c37ff 100644 (file)
--- a/Thread.xs
+++ b/Thread.xs
@@ -2,9 +2,26 @@
 #include "perl.h"
 #include "XSUB.h"
 
-static I32 threadnum = 0;
+/* Magic signature for Thread's mg_private is "Th" */ 
+#define Thread_MAGIC_SIGNATURE 0x5468
+
+static U32 threadnum = 0;
 static int sig_pipe[2];
 
+static void
+remove_thread(t)
+Thread t;
+{
+    DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
+                                  "%p: remove_thread %p\n", thr, t)));
+    MUTEX_LOCK(&threads_mutex);
+    nthreads--;
+    t->prev->next = t->next;
+    t->next->prev = t->prev;
+    COND_BROADCAST(&nthreads_cond);
+    MUTEX_UNLOCK(&threads_mutex);
+}
+
 static void *
 threadstart(arg)
 void *arg;
@@ -140,6 +157,7 @@ void *arg;
                              "%p detached...zapping returnav\n", thr));
        SvREFCNT_dec(returnav);
        ThrSETSTATE(thr, THRf_DEAD);
+       remove_thread(thr);
     }
     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr));    
     return (void *) returnav;  /* Available for anyone to join with us */
@@ -166,6 +184,8 @@ char *class;
     SvGROW(sv, sizeof(struct thread) + 1);
     SvCUR_set(sv, sizeof(struct thread));
     thr = (Thread) SvPVX(sv);
+    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n",
+                         savethread, SvPEEK(startsv), thr));
     oursv = sv; 
     /* If we don't zero these foostack pointers, init_stacks won't init them */
     markstack = 0;
@@ -180,9 +200,19 @@ char *class;
     /* top_env? */
     /* runlevel */
     cvcache = newHV();
-    thrflags = 0;
-    ThrSETSTATE(thr, THRf_NORMAL);
+    thr->flags = THRf_NORMAL;
+    thr->tid = ++threadnum;
+    /* Insert new thread into the circular linked list and bump nthreads */
+    MUTEX_LOCK(&threads_mutex);
+    thr->next = savethread->next;
+    thr->prev = savethread;
+    savethread->next = thr;
+    thr->next->prev = thr;
+    nthreads++;
+    MUTEX_UNLOCK(&threads_mutex);
 
+    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n",
+                         savethread));
     /* The following pushes the arg list and startsv onto the *new* stack */
     PUSHMARK(sp);
     /* Could easily speed up the following greatly */
@@ -202,9 +232,6 @@ char *class;
      * Increment the global thread count. It is decremented
      * by the destructor for the thread specific key thr_key.
      */
-    MUTEX_LOCK(&nthreads_mutex);
-    nthreads++;
-    MUTEX_UNLOCK(&nthreads_mutex);
     sigfillset(&fullmask);
     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
        croak("panic: sigprocmask");
@@ -215,9 +242,10 @@ char *class;
     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
        croak("panic: sigprocmask");
 #endif
-    sv = newSViv(++threadnum);
+    sv = newSViv(thr->tid);
     sv_magic(sv, oursv, '~', 0, 0);
-    return sv_bless(newRV(sv), gv_stashpv(class, TRUE));
+    SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+    return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
 }
 
 static Signal_t
@@ -244,9 +272,8 @@ join(t)
        AV *    av = NO_INIT
        int     i = NO_INIT
     PPCODE:
-       DEBUG_L(PerlIO_printf(PerlIO_stderr(),
-                             "%p: joining %p (state 0x%lx)\n",
-                             thr, t, (unsigned long)ThrSTATE(t)););
+       DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
+                             thr, t, ThrSTATE(t)););
        if (ThrSTATE(t) == THRf_DETACHED)
            croak("tried to join a detached thread");
        else if (ThrSTATE(t) == THRf_JOINED)
@@ -257,6 +284,8 @@ join(t)
        if (pthread_join(t->Tself, (void **) &av))
            croak("pthread_join failed");
        ThrSETSTATE(t, THRf_JOINED);
+       remove_thread(t);
+
        /* Could easily speed up the following if necessary */
        for (i = 0; i <= AvFILL(av); i++)
            XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
@@ -265,31 +294,56 @@ void
 detach(t)
        Thread  t
     CODE:
-       DEBUG_L(PerlIO_printf(PerlIO_stderr(),
-                             "%p: detaching %p (state 0x%lx)\n",
-                             thr, t, (unsigned long)ThrSTATE(t)););
+       DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
+                             thr, t, ThrSTATE(t)););
        if (ThrSTATE(t) == THRf_DETACHED)
            croak("tried to detach an already detached thread");
        else if (ThrSTATE(t) == THRf_JOINED)
            croak("tried to detach an already joined thread");
        else if (ThrSTATE(t) == THRf_DEAD)
            croak("tried to detach a dead thread");
-       if (pthread_detach(t->Tself))
-           croak("panic: pthread_detach failed");
+       DETACH(t);
        ThrSETSTATE(t, THRf_DETACHED);
 
 void
 DESTROY(t)
        Thread  t
     CODE:
+       DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
+                                      "%p: DESTROY(%p), state %u\n",
+                                      thr, t, ThrSTATE(t))));
+                             
        if (ThrSTATE(t) == THRf_NORMAL) {
-           if (pthread_detach(t->Tself))
-               croak("panic: pthread_detach failed");
+           DETACH(t);
            ThrSETSTATE(t, THRf_DETACHED);
-           thrflags |= THRf_DIE_FATAL;
+           t->flags |= THRf_DIE_FATAL;
        }
 
 void
+equal(t1, t2)
+       Thread  t1
+       Thread  t2
+    PPCODE:
+       PUSHs((t1 == t2) ? &sv_yes : &sv_no);
+
+void
+flags(t)
+       Thread  t
+    PPCODE:
+       PUSHs(sv_2mortal(newSViv(t->flags)));
+
+void
+self(class)
+       char *  class
+    PREINIT:
+       SV *sv;
+    PPCODE:
+       sv = newSViv(thr->tid);
+       sv_magic(sv, oursv, '~', 0, 0);
+       SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+       PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
+
+void
 yield()
     CODE:
 #ifdef OLD_PTHREADS_API
@@ -361,6 +415,65 @@ CODE:
        COND_BROADCAST(MgCONDP(mg));
        MUTEX_UNLOCK(MgMUTEXP(mg));
 
+void
+list(class)
+       char *  class
+    PREINIT:
+       Thread  t;
+       AV *    av;
+       SV **   svp;
+       int     n = 0;
+    PPCODE:
+       av = newAV();
+       /*
+        * Iterate until we have enough dynamic storage for all threads.
+        * We mustn't do any allocation while holding threads_mutex though.
+        */
+       MUTEX_LOCK(&threads_mutex);
+       do {
+           n = nthreads;
+           MUTEX_UNLOCK(&threads_mutex);
+           if (AvFILL(av) < n - 1) {
+               int i = AvFILL(av);
+               for (i = AvFILL(av); i < n - 1; i++) {
+                   SV *sv = newSViv(0);        /* fill in tid later */
+                   sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
+                   av_push(av, sv_bless(newRV_noinc(sv),
+                                        gv_stashpv(class, TRUE)));
+               }
+           }
+           MUTEX_LOCK(&threads_mutex);
+       } while (n < nthreads);
+
+       /*
+        * At this point, there's enough room to fill in av.
+        * Note that we are holding threads_mutex so the list
+        * won't change out from under us but all the remaining
+        * processing is "fast" (no blocking, malloc etc.)
+        */
+       t = thr;
+       svp = AvARRAY(av);
+       do {
+           SV *sv = SvRV(*svp++);
+           sv_setiv(sv, t->tid);
+           SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
+           SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
+           SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
+           t = t->next;
+       } while (t != thr);
+       /* Record the overflow */
+       n -= nthreads;
+       MUTEX_UNLOCK(&threads_mutex);
+       /* Truncate any unneeded slots in av */
+       if (n > 0)
+           av_fill(av, AvFILL(av) - n);
+       /* Finally, push all the new objects onto the stack and drop av */
+       EXTEND(sp, n);
+       for (svp = AvARRAY(av); n > 0; n--, svp++)
+           PUSHs(*svp);
+       (void)sv_2mortal((SV*)av);
+
+
 MODULE = Thread                PACKAGE = Thread::Signal
 
 void