Rewrite thread return code to distinguish between ordinary return
Malcolm Beattie [Thu, 13 Nov 1997 18:01:27 +0000 (18:01 +0000)]
and die() and make join propagate the die. Add tiny method eval
which just does "return eval { shift->join; }". Add Thread::Specific
class for access to thread specific user data along with specific.t.
Rename Class to classname throughout Thread.xs for consistency.
Fix pp_specific to pp_threadsv in global.sym. Add support to
pp_entersub in pp_hot.c to lock stash for static locked methods.

p4raw-id: //depot/perl@248

MANIFEST
ext/Thread/Thread.pm
ext/Thread/Thread.xs
ext/Thread/Thread/Specific.pm [new file with mode: 0644]
ext/Thread/join.t
ext/Thread/specific.t [new file with mode: 0644]
global.sym
lib/fields.pm [new file with mode: 0644]
pp_hot.c
thread.h

index 53ffcab..09747e1 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -212,15 +212,19 @@ ext/Thread/Notes  Thread notes
 ext/Thread/README      Thread README
 ext/Thread/Thread/Queue.pm     Thread synchronised queue objects
 ext/Thread/Thread/Semaphore.pm Thread semaphore objects
+ext/Thread/Thread/Specific.pm  Thread specific data access
 ext/Thread/Thread.pm   Thread extension Perl module
 ext/Thread/Thread.xs   Thread extension external subroutines
 ext/Thread/create.t    Test thread creation
+ext/Thread/die.t       Test thread die()
+ext/Thread/die2.t      Test thread die() differently
 ext/Thread/io.t                Test threads doing simple I/O
 ext/Thread/join.t      Test thread joining
 ext/Thread/join2.t     Test thread joining differently
 ext/Thread/list.t      Test getting list of all threads
 ext/Thread/lock.t      Test lock primitive
 ext/Thread/queue.t     Test Thread::Queue module
+ext/Thread/specific.t  Test thread-specific user data
 ext/Thread/sync.t      Test thread synchronisation
 ext/Thread/sync2.t     Test thread synchronisation
 ext/Thread/typemap     Thread extension interface types
index 2ace5dd..1936142 100644 (file)
@@ -15,6 +15,10 @@ sub async (&) {
     return new Thread $_[0];
 }
 
+sub eval {
+    return eval { shift->join; };
+}
+
 bootstrap Thread;
 
 1;
index 841b569..ba256c9 100644 (file)
@@ -19,7 +19,7 @@ static int sig_pipe[2];
 typedef struct thread *Thread;
 #define THREAD_RET_TYPE void *
 #define THREAD_RET_CAST(x) ((THREAD_RET_TYPE) x)
-#endif;
+#endif
 
 static void
 remove_thread(struct thread *t)
@@ -47,7 +47,7 @@ threadstart(void *arg)
     dSP;
     I32 oldscope = scopestack_ix;
     I32 retval;
-    AV *returnav;
+    AV *av;
     int i;
 
     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
@@ -86,7 +86,8 @@ threadstart(void *arg)
     I32 oldmark = TOPMARK;
     I32 oldscope = scopestack_ix;
     I32 retval;
-    AV *returnav;
+    SV *sv;
+    AV *av = newAV();
     int i, ret;
     dJMPENV;
     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p waiting to start\n",
@@ -114,6 +115,7 @@ threadstart(void *arg)
     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
                          thr, SvPEEK(TOPs)));
 
+#ifdef OLD_WAY
     JMPENV_PUSH(ret);
     switch (ret) {
     case 3:
@@ -127,7 +129,10 @@ threadstart(void *arg)
        while (scopestack_ix > oldscope)
            LEAVE;
        JMPENV_POP;
-       av_store(returnav, 0, newSViv(statusvalue));
+       MUTEX_LOCK(&thr->mutex);
+       thr->flags |= THRf_DID_DIE;
+       MUTEX_UNLOCK(&thr->mutex);
+       av = newSVpvf("Thread called exit with value %d", statusvalue);
        goto finishoff;
     }
 
@@ -143,18 +148,34 @@ threadstart(void *arg)
     op = pp_entersub(ARGS);
     if (op)
        runops();
+#else
+    sv = POPs;
+    PUTBACK;
+    perl_call_sv(sv, G_ARRAY|G_EVAL);
+#endif
     SPAGAIN;
     retval = sp - (stack_base + oldmark);
     sp = stack_base + oldmark + 1;
-    DEBUG_L(for (i = 1; i <= retval; i++)
-               PerlIO_printf(PerlIO_stderr(),
-                             "%p returnav[%d] = %s\n",
-                             thr, i, SvPEEK(sp[i - 1]));)
-    returnav = newAV();
-    av_store(returnav, 0, newSVpv("", 0));
-    for (i = 1; i <= retval; i++, sp++)
-       sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
-    
+    if (SvCUR(thr->errsv)) {
+       MUTEX_LOCK(&thr->mutex);
+       thr->flags |= THRf_DID_DIE;
+       MUTEX_UNLOCK(&thr->mutex);
+       av_store(av, 0, &sv_no);
+       av_store(av, 1, newSVsv(thr->errsv));
+       DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p died: %s\n",
+                             SvPV(thr->errsv, na));
+    } else {
+       DEBUG_L(STMT_START {
+           for (i = 1; i <= retval; i++) {
+               PerlIO_printf(PerlIO_stderr(), "%p return[%d] = %s\n",
+                               thr, i, SvPEEK(sp[i - 1]));)
+           }
+       } STMT_END);
+       av_store(av, 0, &sv_yes);
+       for (i = 1; i <= retval; i++, sp++)
+           sv_setsv(*av_fetch(av, i, TRUE), SvREFCNT_inc(*sp));
+    }
+
   finishoff:
 #if 0    
     /* removed for debug */
@@ -194,7 +215,7 @@ threadstart(void *arg)
     case THRf_R_DETACHED:
        ThrSETSTATE(thr, THRf_DEAD);
        MUTEX_UNLOCK(&thr->mutex);
-       SvREFCNT_dec(returnav);
+       SvREFCNT_dec(av);
        DEBUG_L(PerlIO_printf(PerlIO_stderr(),
                              "%p: DETACHED thread finished\n", thr));
        remove_thread(thr);     /* This might trigger main thread to finish */
@@ -204,7 +225,7 @@ threadstart(void *arg)
        croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
        /* NOTREACHED */
     }
-    return THREAD_RET_CAST(returnav);  /* Available for anyone to join with */
+    return THREAD_RET_CAST(av);        /* Available for anyone to join with */
                                        /* us unless we're detached, in which */
                                        /* case noone sees the value anyway. */
 #endif    
@@ -214,7 +235,7 @@ threadstart(void *arg)
 }
 
 static SV *
-newthread (SV *startsv, AV *initargs, char *Class)
+newthread (SV *startsv, AV *initargs, char *classname)
 {
 #ifdef USE_THREADS
     dSP;
@@ -274,7 +295,7 @@ newthread (SV *startsv, AV *initargs, char *Class)
     sv = newSViv(thr->tid);
     sv_magic(sv, thr->oursv, '~', 0, 0);
     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
-    return sv_bless(newRV_noinc(sv), gv_stashpv(Class, TRUE));
+    return sv_bless(newRV_noinc(sv), gv_stashpv(classname, TRUE));
 #else
     croak("No threads in this perl");
     return &sv_undef;
@@ -294,12 +315,12 @@ MODULE = Thread           PACKAGE = Thread
 PROTOTYPES: DISABLE
 
 void
-new(Class, startsv, ...)
-       char *          Class
+new(classname, startsv, ...)
+       char *          classname
        SV *            startsv
        AV *            av = av_make(items - 2, &ST(2));
     PPCODE:
-       XPUSHs(sv_2mortal(newthread(startsv, av, Class)));
+       XPUSHs(sv_2mortal(newthread(startsv, av, classname)));
 
 void
 join(t)
@@ -329,9 +350,17 @@ join(t)
        }
        JOIN(t, &av);
 
-       /* Could easily speed up the following if necessary */
-       for (i = 0; i <= AvFILL(av); i++)
-           XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
+       if (SvTRUE(*av_fetch(av, 0, FALSE))) {
+           /* Could easily speed up the following if necessary */
+           for (i = 1; i <= AvFILL(av); i++)
+               XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
+       } else {
+           char *mess = SvPV(*av_fetch(av, 1, FALSE), na);
+           DEBUG_L(PerlIO_printf(PerlIO_stderr(),
+                                 "%p: join propagating die message: %s\n",
+                                 thr, mess));
+           croak(mess);
+       }
 #endif
 
 void
@@ -379,8 +408,8 @@ flags(t)
 #endif
 
 void
-self(Class)
-       char *  Class
+self(classname)
+       char *  classname
     PREINIT:
        SV *sv;
     PPCODE:        
@@ -388,7 +417,8 @@ self(Class)
        sv = newSViv(thr->tid);
        sv_magic(sv, thr->oursv, '~', 0, 0);
        SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
-       PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(Class, TRUE))));
+       PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv),
+                                 gv_stashpv(classname, TRUE))));
 #endif
 
 U32
@@ -486,8 +516,8 @@ CODE:
 #endif
 
 void
-list(Class)
-       char *  Class
+list(classname)
+       char *  classname
     PREINIT:
        Thread  t;
        AV *    av;
@@ -510,7 +540,7 @@ list(Class)
                    SV *sv = newSViv(0);        /* fill in tid later */
                    sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
                    av_push(av, sv_bless(newRV_noinc(sv),
-                                        gv_stashpv(Class, TRUE)));
+                                        gv_stashpv(classname, TRUE)));
        
                }
            }
@@ -580,3 +610,14 @@ await_signal()
     OUTPUT:
        RETVAL
 
+MODULE = Thread                PACKAGE = Thread::Specific
+
+void
+data(classname = "Thread::Specific")
+       char *  classname
+    PPCODE:
+       if (AvFILL(thr->specific) == -1) {
+           GV *gv = gv_fetchpv("Thread::Specific::FIELDS", TRUE, SVt_PVHV);
+           av_store(thr->specific, 0, newRV((SV*)GvHV(gv)));
+       }
+       XPUSHs(sv_bless(newRV((SV*)thr->specific),gv_stashpv(classname,TRUE)));
diff --git a/ext/Thread/Thread/Specific.pm b/ext/Thread/Thread/Specific.pm
new file mode 100644 (file)
index 0000000..ec56539
--- /dev/null
@@ -0,0 +1,14 @@
+package Thread::Specific;
+
+sub import {
+    use attrs qw(locked method);
+    require fields;
+    fields->import(@_);
+}      
+
+sub key_create {
+    use attrs qw(locked method);
+    return ++$FIELDS{__MAX__};
+}
+
+1;
index 640256a..cba2c1c 100644 (file)
@@ -8,4 +8,4 @@ print "Starting thread\n";
 $t = new Thread \&foo, qw(foo bar baz);
 print "Joining with $t\n";
 @results = $t->join();
-print "Joining returned @results\n";
+print "Joining returned ", scalar(@results), " values: @results\n";
diff --git a/ext/Thread/specific.t b/ext/Thread/specific.t
new file mode 100644 (file)
index 0000000..da130b1
--- /dev/null
@@ -0,0 +1,17 @@
+use Thread;
+
+use Thread::Specific qw(foo);
+
+sub count {
+    my $tid = Thread->self->tid;
+    my Thread::Specific $tsd = Thread::Specific::data;
+    for (my $i = 0; $i < 5; $i++) {
+       $tsd->{foo} = $i;
+       print "thread $tid count: $tsd->{foo}\n";
+       select(undef, undef, undef, rand(2));
+    }
+};
+
+for(my $t = 0; $t < 5; $t++) {
+    new Thread \&count;
+}
index c2c8b0b..2806ac6 100644 (file)
@@ -958,7 +958,6 @@ pp_snetent
 pp_socket
 pp_sockpair
 pp_sort
-pp_specific
 pp_splice
 pp_split
 pp_sprintf
@@ -987,6 +986,7 @@ pp_system
 pp_syswrite
 pp_tell
 pp_telldir
+pp_threadsv
 pp_tie
 pp_tied
 pp_time
diff --git a/lib/fields.pm b/lib/fields.pm
new file mode 100644 (file)
index 0000000..8e2d639
--- /dev/null
@@ -0,0 +1,18 @@
+package fields;
+
+sub import {
+    my $class = shift;
+    my ($package) = caller;
+    my $fields = \%{"$package\::FIELDS"};
+    my $i = $fields->{__MAX__};
+    foreach my $f (@_) {
+       if (defined($fields->{$f})) {
+           require Carp;
+           Carp::croak("Field name $f already in use");
+       }
+       $fields->{$f} = ++$i;
+    }
+    $fields->{__MAX__} = $i;
+}
+
+1;
index b354540..6dbc259 100644 (file)
--- a/pp_hot.c
+++ b/pp_hot.c
@@ -1834,9 +1834,10 @@ PP(pp_entersub)
 #ifdef USE_THREADS
     /*
      * First we need to check if the sub or method requires locking.
-     * If so, we gain a lock on the CV or the first argument, as
-     * appropriate. This has to be inline because for FAKE_THREADS,
-     * COND_WAIT inlines code to reschedule by returning a new op.
+     * If so, we gain a lock on the CV, the first argument or the
+     * stash (for static methods), as appropriate. This has to be
+     * inline because for FAKE_THREADS, COND_WAIT inlines code to
+     * reschedule by returning a new op.
      */
     MUTEX_LOCK(CvMUTEXP(cv));
     if (CvFLAGS(cv) & CVf_LOCKED) {
@@ -1850,6 +1851,11 @@ PP(pp_entersub)
            }
            if (SvROK(sv))
                sv = SvRV(sv);
+           else {              
+               STRLEN len;
+               char *stashname = SvPV(sv, len);
+               sv = (SV*)gv_stashpvn(stashname, len, TRUE);
+           }
        }
        else {
            sv = (SV*)cv;
index d2455dc..cda4754 100644 (file)
--- a/thread.h
+++ b/thread.h
@@ -243,10 +243,10 @@ typedef struct thread *Thread;
 #define THRf_ZOMBIE    3
 #define THRf_DEAD      4
 
-#define THRf_DIE_FATAL 8
+#define THRf_DID_DIE   8
 
 /* ThrSTATE(t) and ThrSETSTATE(t) must only be called while holding t->mutex */
-#define ThrSTATE(t) ((t)->flags)
+#define ThrSTATE(t) ((t)->flags & THRf_STATE_MASK)
 #define ThrSETSTATE(t, s) STMT_START {         \
        (t)->flags &= ~THRf_STATE_MASK;         \
        (t)->flags |= (s);                      \