Finish thread state machine: fixes global destruction of threads,
Malcolm Beattie [Wed, 15 Oct 1997 16:57:45 +0000 (16:57 +0000)]
detaching, joining etc. Alter FAKE_THREADS-specific fields to use
new HAVE_THREAD_INTERN stuff. Updates docs. Various fixes to
Thread.xs.

p4raw-id: //depot/perlext/Thread@131

Thread.xs
queue.t

index ef1363f..a5382d9 100644 (file)
--- a/Thread.xs
+++ b/Thread.xs
@@ -32,7 +32,7 @@ void *arg;
     dSP;
     I32 oldscope = scopestack_ix;
     I32 retval;
-    AV *returnav = newAV();
+    AV *returnav;
     int i;
 
     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
@@ -71,7 +71,7 @@ void *arg;
     I32 oldmark = TOPMARK;
     I32 oldscope = scopestack_ix;
     I32 retval;
-    AV *returnav = newAV();
+    AV *returnav;
     int i, ret;
     dJMPENV;
 
@@ -83,8 +83,8 @@ void *arg;
      * if we went and created another thread which tried to pthread_join
      * with us, then we'd be in a mess.
      */
-    MUTEX_LOCK(thr->mutex);
-    MUTEX_UNLOCK(thr->mutex);
+    MUTEX_LOCK(&thr->mutex);
+    MUTEX_UNLOCK(&thr->mutex);
 
     /*
      * It's safe to wait until now to set the thread-specific pointer
@@ -132,6 +132,7 @@ void *arg;
                PerlIO_printf(PerlIO_stderr(),
                              "%p returnav[%d] = %s\n",
                              thr, i, SvPEEK(sp[i - 1]));)
+    returnav = newAV();
     av_store(returnav, 0, newSVpv("", 0));
     for (i = 1; i <= retval; i++, sp++)
        sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
@@ -150,6 +151,9 @@ void *arg;
     Safefree(tmps_stack);
 
     MUTEX_LOCK(&thr->mutex);
+    DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+                         "%p: threadstart finishing: state is %u\n",
+                         thr, ThrSTATE(thr)));
     switch (ThrSTATE(thr)) {
     case THRf_R_JOINABLE:
        ThrSETSTATE(thr, THRf_ZOMBIE);
@@ -160,16 +164,17 @@ void *arg;
     case THRf_R_JOINED:
        ThrSETSTATE(thr, THRf_DEAD);
        MUTEX_UNLOCK(&thr->mutex);
+       remove_thread(thr);
        DEBUG_L(PerlIO_printf(PerlIO_stderr(),
                              "%p: R_JOINED thread finished\n", thr));
        break;
-    case THRf_DETACHED:
+    case THRf_R_DETACHED:
        ThrSETSTATE(thr, THRf_DEAD);
        MUTEX_UNLOCK(&thr->mutex);
-       remove_thread(thr);
        SvREFCNT_dec(returnav);
        DEBUG_L(PerlIO_printf(PerlIO_stderr(),
                              "%p: DETACHED thread finished\n", thr));
+       remove_thread(thr);     /* This might trigger main thread to finish */
        break;
     default:
        MUTEX_UNLOCK(&thr->mutex);
@@ -177,7 +182,6 @@ void *arg;
        /* NOTREACHED */
     }
     MUTEX_DESTROY(&thr->mutex);
-    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr));    
     return (void *) returnav;  /* Available for anyone to join with us */
                                /* unless we are detached in which case */
                                /* noone will see the value anyway. */
@@ -202,7 +206,7 @@ char *class;
     SvGROW(sv, sizeof(struct thread) + 1);
     SvCUR_set(sv, sizeof(struct thread));
     thr = (Thread) SvPVX(sv);
-    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n",
+    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n",
                          savethread, SvPEEK(startsv), thr));
     oursv = sv; 
     /* If we don't zero these foostack pointers, init_stacks won't init them */
@@ -230,8 +234,9 @@ char *class;
     nthreads++;
     MUTEX_UNLOCK(&threads_mutex);
 
-    DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n",
-                         savethread));
+    DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+                         "%p: newthread, tid is %u, preparing stack\n",
+                         savethread, thr->tid));
     /* The following pushes the arg list and startsv onto the *new* stack */
     PUSHMARK(sp);
     /* Could easily speed up the following greatly */
@@ -246,8 +251,7 @@ char *class;
     /* On your marks... */
     MUTEX_LOCK(&thr->mutex);
     /* Get set...
-     * Increment the global thread count. It is decremented
-     * by the destructor for the thread specific key thr_key.
+     * Increment the global thread count.
      */
     sigfillset(&fullmask);
     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
@@ -291,20 +295,20 @@ join(t)
     PPCODE:
        DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
                              thr, t, ThrSTATE(t)););
-       MUTEX_LOCK(&thr->mutex);
-       switch (ThrSTATE(thr)) {
+       MUTEX_LOCK(&t->mutex);
+       switch (ThrSTATE(t)) {
        case THRf_R_JOINABLE:
        case THRf_R_JOINED:
-           ThrSETSTATE(thr, THRf_R_JOINED);
-           MUTEX_UNLOCK(&thr->mutex);
+           ThrSETSTATE(t, THRf_R_JOINED);
+           MUTEX_UNLOCK(&t->mutex);
            break;
        case THRf_ZOMBIE:
-           ThrSETSTATE(thr, THRf_DEAD);
-           MUTEX_UNLOCK(&thr->mutex);
-           remove_thread(thr);
+           ThrSETSTATE(t, THRf_DEAD);
+           MUTEX_UNLOCK(&t->mutex);
+           remove_thread(t);
            break;
        default:
-           MUTEX_UNLOCK(&thr->mutex);
+           MUTEX_UNLOCK(&t->mutex);
            croak("can't join with thread");
            /* NOTREACHED */
        }
@@ -321,22 +325,23 @@ detach(t)
     CODE:
        DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
                              thr, t, ThrSTATE(t)););
-       switch (ThrSTATE(thr)) {
+       MUTEX_LOCK(&t->mutex);
+       switch (ThrSTATE(t)) {
        case THRf_R_JOINABLE:
-           ThrSETSTATE(thr, THRf_DETACHED);
+           ThrSETSTATE(t, THRf_R_DETACHED);
            /* fall through */
-       case THRf_DETACHED:
-           MUTEX_UNLOCK(&thr->mutex);
+       case THRf_R_DETACHED:
            DETACH(t);
+           MUTEX_UNLOCK(&t->mutex);
            break;
        case THRf_ZOMBIE:
-           ThrSETSTATE(thr, THRf_DEAD);
-           MUTEX_UNLOCK(&thr->mutex);
-           remove_thread(thr);
+           ThrSETSTATE(t, THRf_DEAD);
            DETACH(t);
+           MUTEX_UNLOCK(&t->mutex);
+           remove_thread(t);
            break;
        default:
-           MUTEX_UNLOCK(&thr->mutex);
+           MUTEX_UNLOCK(&t->mutex);
            croak("can't detach thread");
            /* NOTREACHED */
        }
@@ -365,6 +370,22 @@ self(class)
        SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
        PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
 
+U32
+tid(t)
+       Thread  t
+    CODE:
+       MUTEX_LOCK(&t->mutex);
+       RETVAL = t->tid;
+       MUTEX_UNLOCK(&t->mutex);
+    OUTPUT:
+       RETVAL
+
+void
+DESTROY(t)
+       SV *    t
+    PPCODE:
+       PUSHs(&sv_yes);
+
 void
 yield()
     CODE:
@@ -393,6 +414,8 @@ CODE:
        }
        MgOWNER(mg) = 0;
        COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
+       while (MgOWNER(mg))
+           COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
        MgOWNER(mg) = thr;
        MUTEX_UNLOCK(MgMUTEXP(mg));
        
@@ -401,13 +424,9 @@ cond_signal(sv)
        SV *    sv
        MAGIC * mg = NO_INIT
 CODE:
-       if (SvROK(sv)) {
-           /*
-            * Kludge to allow lock of real objects without requiring
-            * to pass in every type of argument by explicit reference.
-            */
+       if (SvROK(sv))
            sv = SvRV(sv);
-       }
+
        mg = condpair_magic(sv);
        DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
        MUTEX_LOCK(MgMUTEXP(mg));
@@ -455,6 +474,7 @@ list(class)
        do {
            n = nthreads;
            MUTEX_UNLOCK(&threads_mutex);
+           DEBUG_L(PerlIO_printf(PerlIO_stderr(), "list: n = %d\n", n));
            if (AvFILL(av) < n - 1) {
                int i = AvFILL(av);
                for (i = AvFILL(av); i < n - 1; i++) {
@@ -462,10 +482,12 @@ list(class)
                    sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
                    av_push(av, sv_bless(newRV_noinc(sv),
                                         gv_stashpv(class, TRUE)));
+       
                }
            }
            MUTEX_LOCK(&threads_mutex);
        } while (n < nthreads);
+       n = nthreads;   /* Get the final correct value */
 
        /*
         * At this point, there's enough room to fill in av.
@@ -477,18 +499,18 @@ list(class)
        svp = AvARRAY(av);
        do {
            SV *sv = SvRV(*svp++);
+           DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+                                 "list: filling in thread %p\n", t));
            sv_setiv(sv, t->tid);
            SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
            SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
            SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
            t = t->next;
        } while (t != thr);
-       /* Record the overflow */
-       n -= nthreads;
+       /*  */
        MUTEX_UNLOCK(&threads_mutex);
        /* Truncate any unneeded slots in av */
-       if (n > 0)
-           av_fill(av, AvFILL(av) - n);
+       av_fill(av, n - 1);
        /* Finally, push all the new objects onto the stack and drop av */
        EXTEND(sp, n);
        for (svp = AvARRAY(av); n > 0; n--, svp++)
diff --git a/queue.t b/queue.t
index c87dcee..4672ba6 100644 (file)
--- a/queue.t
+++ b/queue.t
@@ -4,19 +4,33 @@ use Thread::Queue;
 $q = new Thread::Queue;
 
 sub reader {
-    my $i;
-    for ($i = 1; $i <= 10; $i++) {
-       print "reader: waiting for element $i...\n";
+    my $tid = Thread->self->tid;
+    my $i = 0;
+    while (1) {
+       $i++;
+       print "reader (tid $tid): waiting for element $i...\n";
        my $el = $q->dequeue;
-       print "reader: dequeued element $i: value $el\n";
+       print "reader (tid $tid): dequeued element $i: value $el\n";
+       select(undef, undef, undef, rand(2));
+       if ($el == -1) {
+           # end marker
+           print "reader (tid $tid) returning\n";
+           return;
+       }
     }
 }
 
-new Thread \&reader;
-my $i;
-for ($i = 1; $i <= 10; $i++) {
+my $nthreads = 3;
+
+for (my $i = 0; $i < $nthreads; $i++) {
+    Thread->new(\&reader, $i);
+}
+
+for (my $i = 1; $i <= 10; $i++) {
     my $el = int(rand(100));
     select(undef, undef, undef, rand(2));
     print "writer: enqueuing value $el\n";
     $q->enqueue($el);
 }
+
+$q->enqueue((-1) x $nthreads); # one end marker for each thread