Bump version of PerlIO::via after last change
[p5sagit/p5-mst-13.2.git] / ext / threads / threads.xs
index 3a1ea14..f15e40e 100755 (executable)
@@ -47,10 +47,12 @@ typedef perl_os_thread pthread_t;
 /* Values for 'state' member */
 #define PERL_ITHR_DETACHED              1
 #define PERL_ITHR_JOINED                2
+#define PERL_ITHR_UNCALLABLE            (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)
 #define PERL_ITHR_FINISHED              4
 #define PERL_ITHR_THREAD_EXIT_ONLY      8
 #define PERL_ITHR_NONVIABLE             16
 #define PERL_ITHR_DESTROYED             32
+#define PERL_ITHR_DIED                  64
 
 typedef struct _ithread {
     struct _ithread *next;      /* Next thread in the list */
@@ -70,6 +72,8 @@ typedef struct _ithread {
     pthread_t thr;              /* OS's handle for the thread */
 #endif
     IV stack_size;
+    SV *err;                    /* Error from abnormally terminated thread */
+    char *err_class;            /* Error object's classname if applicable */
 } ithread;
 
 
@@ -135,7 +139,7 @@ S_ithread_clear(pTHX_ ithread *thread)
     PerlInterpreter *interp;
 
     assert(((thread->state & PERL_ITHR_FINISHED) &&
-            (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))
+            (thread->state & PERL_ITHR_UNCALLABLE))
                 ||
            (thread->state & PERL_ITHR_NONVIABLE));
 
@@ -149,6 +153,11 @@ S_ithread_clear(pTHX_ ithread *thread)
         SvREFCNT_dec(thread->params);
         thread->params = Nullsv;
 
+        if (thread->err) {
+            SvREFCNT_dec(thread->err);
+            thread->err = Nullsv;
+        }
+
         perl_destruct(interp);
         perl_free(interp);
         thread->interp = NULL;
@@ -179,7 +188,7 @@ S_ithread_destruct(pTHX_ ithread *thread)
         destroy = 1;
     } else if (! (thread->state & PERL_ITHR_FINISHED)) {
         destroy = 0;
-    } else if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) {
+    } else if (! (thread->state & PERL_ITHR_UNCALLABLE)) {
         destroy = 0;
     } else {
         thread->state |= PERL_ITHR_DESTROYED;
@@ -381,8 +390,9 @@ S_ithread_run(void * arg)
     ithread *thread = (ithread *)arg;
     int jmp_rc = 0;
     I32 oldscope;
-    int exit_app = 0;
+    int exit_app = 0;   /* Thread terminated using 'exit' */
     int exit_code = 0;
+    int died = 0;       /* Thread terminated abnormally */
 
     dJMPENV;
 
@@ -442,22 +452,34 @@ S_ithread_run(void * arg)
         FREETMPS;
         LEAVE;
 
-        /* Check for failure */
-        if (SvTRUE(ERRSV) && ckWARN_d(WARN_THREADS)) {
-            oldscope = PL_scopestack_ix;
-            JMPENV_PUSH(jmp_rc);
-            if (jmp_rc == 0) {
-                /* Warn that thread died */
-                Perl_warn(aTHX_ "Thread %" UVuf " terminated abnormally: %" SVf, thread->tid, ERRSV);
-            } else if (jmp_rc == 2) {
-                /* Warn handler exited */
-                exit_app = 1;
-                exit_code = STATUS_CURRENT;
-                while (PL_scopestack_ix > oldscope) {
-                    LEAVE;
+        /* Check for abnormal termination */
+        if (SvTRUE(ERRSV)) {
+            died = PERL_ITHR_DIED;
+            thread->err = newSVsv(ERRSV);
+            /* If ERRSV is an object, remember the classname and then
+             * rebless into 'main' so it will survive 'cloning'
+             */
+            if (sv_isobject(thread->err)) {
+                thread->err_class = HvNAME(SvSTASH(SvRV(thread->err)));
+                sv_bless(thread->err, gv_stashpv("main", 0));
+            }
+
+            if (ckWARN_d(WARN_THREADS)) {
+                oldscope = PL_scopestack_ix;
+                JMPENV_PUSH(jmp_rc);
+                if (jmp_rc == 0) {
+                    /* Warn that thread died */
+                    Perl_warn(aTHX_ "Thread %" UVuf " terminated abnormally: %" SVf, thread->tid, ERRSV);
+                } else if (jmp_rc == 2) {
+                    /* Warn handler exited */
+                    exit_app = 1;
+                    exit_code = STATUS_CURRENT;
+                    while (PL_scopestack_ix > oldscope) {
+                        LEAVE;
+                    }
                 }
+                JMPENV_POP;
             }
-            JMPENV_POP;
         }
 
         /* Release function ref */
@@ -470,7 +492,7 @@ S_ithread_run(void * arg)
     MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
     MUTEX_LOCK(&thread->mutex);
     /* Mark as finished */
-    thread->state |= PERL_ITHR_FINISHED;
+    thread->state |= (PERL_ITHR_FINISHED | died);
     /* Clear exit flag if required */
     if (thread->state & PERL_ITHR_THREAD_EXIT_ONLY) {
         exit_app = 0;
@@ -569,7 +591,6 @@ S_ithread_create(
         SV       *params)
 {
     ithread     *thread;
-    CLONE_PARAMS clone_param;
     ithread     *current_thread = S_ithread_get(aTHX);
 
     SV         **tmps_tmp = PL_tmps_stack;
@@ -634,6 +655,8 @@ S_ithread_create(
      * context for the duration of our work for new interpreter.
      */
     {
+        CLONE_PARAMS clone_param;
+
         dTHXa(thread->interp);
 
         MY_CXT_CLONE;
@@ -644,11 +667,11 @@ S_ithread_create(
         SvREFCNT_dec(PL_endav);
         PL_endav = newAV();
 
+        clone_param.flags = 0;
         if (SvPOK(init_function)) {
             thread->init_function = newSV(0);
             sv_copypv(thread->init_function, init_function);
         } else {
-            clone_param.flags = 0;
             thread->init_function = sv_dup(init_function, &clone_param);
             if (SvREFCNT(thread->init_function) == 0) {
                 SvREFCNT_inc_void(thread->init_function);
@@ -825,8 +848,10 @@ ithread_create(...)
             /* $thr->create() */
             classname = HvNAME(SvSTASH(SvRV(ST(0))));
             thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
+            MUTEX_LOCK(&thread->mutex);
             stack_size = thread->stack_size;
             exit_opt = thread->state & PERL_ITHR_THREAD_EXIT_ONLY;
+            MUTEX_UNLOCK(&thread->mutex);
         } else {
             /* threads->create() */
             classname = (char *)SvPV_nolen(ST(0));
@@ -929,7 +954,8 @@ ithread_list(...)
         ithread *thread;
         int list_context;
         IV count = 0;
-        int want_running;
+        int want_running = 0;
+        int state;
         dMY_POOL;
     PPCODE:
         /* Class method only */
@@ -952,19 +978,23 @@ ithread_list(...)
              thread != &MY_POOL.main_thread;
              thread = thread->next)
         {
+            MUTEX_LOCK(&thread->mutex);
+            state = thread->state;
+            MUTEX_UNLOCK(&thread->mutex);
+
             /* Ignore detached or joined threads */
-            if (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)) {
+            if (state & PERL_ITHR_UNCALLABLE) {
                 continue;
             }
 
             /* Filter per parameter */
             if (items > 1) {
                 if (want_running) {
-                    if (thread->state & PERL_ITHR_FINISHED) {
+                    if (state & PERL_ITHR_FINISHED) {
                         continue;   /* Not running */
                     }
                 } else {
-                    if (! (thread->state & PERL_ITHR_FINISHED)) {
+                    if (! (state & PERL_ITHR_FINISHED)) {
                         continue;   /* Still running - not joinable yet */
                     }
                 }
@@ -990,7 +1020,7 @@ ithread_self(...)
         ithread *thread;
     CODE:
         /* Class method only */
-        if (SvROK(ST(0))) {
+        if ((items != 1) || SvROK(ST(0))) {
             Perl_croak(aTHX_ "Usage: threads->self()");
         }
         classname = (char *)SvPV_nolen(ST(0));
@@ -1006,6 +1036,7 @@ ithread_tid(...)
     PREINIT:
         ithread *thread;
     CODE:
+        PERL_UNUSED_VAR(items);
         thread = S_SV_to_ithread(aTHX_ ST(0));
         XST_mUV(0, thread->tid);
         /* XSRETURN(1); - implied */
@@ -1015,6 +1046,7 @@ void
 ithread_join(...)
     PREINIT:
         ithread *thread;
+        ithread *current_thread;
         int join_err;
         AV *params;
         int len;
@@ -1022,50 +1054,66 @@ ithread_join(...)
 #ifdef WIN32
         DWORD waitcode;
 #else
+        int rc_join;
         void *retval;
 #endif
         dMY_POOL;
     PPCODE:
         /* Object method only */
-        if (! sv_isobject(ST(0))) {
+        if ((items != 1) || ! sv_isobject(ST(0))) {
             Perl_croak(aTHX_ "Usage: $thr->join()");
         }
 
-        /* Check if the thread is joinable */
+        /* Check if the thread is joinable and not ourselves */
         thread = S_SV_to_ithread(aTHX_ ST(0));
-        join_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED));
-        if (join_err) {
-            if (join_err & PERL_ITHR_DETACHED) {
-                Perl_croak(aTHX_ "Cannot join a detached thread");
-            } else {
-                Perl_croak(aTHX_ "Thread already joined");
-            }
+        current_thread = S_ithread_get(aTHX);
+
+        MUTEX_LOCK(&thread->mutex);
+        if ((join_err = (thread->state & PERL_ITHR_UNCALLABLE))) {
+            MUTEX_UNLOCK(&thread->mutex);
+            Perl_croak(aTHX_ (join_err & PERL_ITHR_DETACHED)
+                                ? "Cannot join a detached thread"
+                                : "Thread already joined");
+        } else if (thread->tid == current_thread->tid) {
+            MUTEX_UNLOCK(&thread->mutex);
+            Perl_croak(aTHX_ "Cannot join self");
         }
 
+        /* Mark as joined */
+        thread->state |= PERL_ITHR_JOINED;
+        MUTEX_UNLOCK(&thread->mutex);
+
+        MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
+        MY_POOL.joinable_threads--;
+        MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
+
         /* Join the thread */
 #ifdef WIN32
-        waitcode = WaitForSingleObject(thread->handle, INFINITE);
+        if (WaitForSingleObject(thread->handle, INFINITE) != WAIT_OBJECT_0) {
+            /* Timeout/abandonment unexpected here; check $^E */
+            Perl_croak(aTHX_ "PANIC: underlying join failed");
+        };
 #else
-        pthread_join(thread->thr, &retval);
+        if ((rc_join = pthread_join(thread->thr, &retval)) != 0) {
+            /* In progress/deadlock/unknown unexpected here; check $! */
+            errno = rc_join;
+            Perl_croak(aTHX_ "PANIC: underlying join failed");
+        };
 #endif
 
         MUTEX_LOCK(&thread->mutex);
-        /* Mark as joined */
-        thread->state |= PERL_ITHR_JOINED;
-
         /* Get the return value from the call_sv */
+        /* Objects do not survive this process - FIXME */
         {
             AV *params_copy;
             PerlInterpreter *other_perl;
             CLONE_PARAMS clone_params;
-            ithread *current_thread;
 
             params_copy = (AV *)SvRV(thread->params);
             other_perl = thread->interp;
             clone_params.stashes = newAV();
             clone_params.flags = CLONEf_JOIN_IN;
             PL_ptr_table = ptr_table_new();
-            current_thread = S_ithread_get(aTHX);
             S_ithread_set(aTHX_ thread);
             /* Ensure 'meaningful' addresses retain their meaning */
             ptr_table_store(PL_ptr_table, &other_perl->Isv_undef, &PL_sv_undef);
@@ -1079,15 +1127,14 @@ ithread_join(...)
             PL_ptr_table = NULL;
         }
 
-        /* We are finished with the thread */
-        S_ithread_clear(aTHX_ thread);
+        /* If thread didn't die, then we can free its interpreter */
+        if (! (thread->state & PERL_ITHR_DIED)) {
+            S_ithread_clear(aTHX_ thread);
+        }
         MUTEX_UNLOCK(&thread->mutex);
 
-        MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
-        if (! (thread->state & PERL_ITHR_DETACHED)) {
-            MY_POOL.joinable_threads--;
-        }
-        MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
+        /* Try to cleanup thread */
+        S_ithread_destruct(aTHX_ thread);
 
         /* If no return values, then just return */
         if (! params) {
@@ -1108,6 +1155,7 @@ ithread_join(...)
 void
 ithread_yield(...)
     CODE:
+        PERL_UNUSED_VAR(items);
         YIELD;
 
 
@@ -1116,38 +1164,48 @@ ithread_detach(...)
     PREINIT:
         ithread *thread;
         int detach_err;
-        int cleanup = 0;
         dMY_POOL;
     CODE:
-        /* Check if the thread is detachable */
-        thread = S_SV_to_ithread(aTHX_ ST(0));
-        if ((detach_err = (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))) {
-            if (detach_err & PERL_ITHR_DETACHED) {
-                Perl_croak(aTHX_ "Thread already detached");
-            } else {
-                Perl_croak(aTHX_ "Cannot detach a joined thread");
-            }
-        }
+        PERL_UNUSED_VAR(items);
 
         /* Detach the thread */
+        thread = S_SV_to_ithread(aTHX_ ST(0));
         MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
         MUTEX_LOCK(&thread->mutex);
-        thread->state |= PERL_ITHR_DETACHED;
+        if (! (detach_err = (thread->state & PERL_ITHR_UNCALLABLE))) {
+            /* Thread is detachable */
+            thread->state |= PERL_ITHR_DETACHED;
 #ifdef WIN32
-        /* Windows has no 'detach thread' function */
+            /* Windows has no 'detach thread' function */
 #else
-        PERL_THREAD_DETACH(thread->thr);
+            PERL_THREAD_DETACH(thread->thr);
 #endif
-
-        if (thread->state & PERL_ITHR_FINISHED) {
-            MY_POOL.joinable_threads--;
-        } else {
-            MY_POOL.running_threads--;
-            MY_POOL.detached_threads++;
+            if (thread->state & PERL_ITHR_FINISHED) {
+                MY_POOL.joinable_threads--;
+            } else {
+                MY_POOL.running_threads--;
+                MY_POOL.detached_threads++;
+            }
         }
         MUTEX_UNLOCK(&thread->mutex);
         MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
 
+        if (detach_err) {
+            Perl_croak(aTHX_ (detach_err & PERL_ITHR_DETACHED)
+                                ? "Thread already detached"
+                                : "Cannot detach a joined thread");
+        }
+
+        /* If thread is finished and didn't die,
+         * then we can free its interpreter */
+        MUTEX_LOCK(&thread->mutex);
+        if ((thread->state & PERL_ITHR_FINISHED) &&
+            ! (thread->state & PERL_ITHR_DIED))
+        {
+            S_ithread_clear(aTHX_ thread);
+        }
+        MUTEX_UNLOCK(&thread->mutex);
+
         /* Try to cleanup thread */
         S_ithread_destruct(aTHX_ thread);
 
@@ -1165,7 +1223,7 @@ ithread_kill(...)
         }
 
         /* Object method only */
-        if (! sv_isobject(ST(0))) {
+        if ((items != 2) || ! sv_isobject(ST(0))) {
             Perl_croak(aTHX_ "Usage: $thr->kill('SIG...')");
         }
 
@@ -1200,6 +1258,7 @@ ithread_kill(...)
 void
 ithread_DESTROY(...)
     CODE:
+        PERL_UNUSED_VAR(items);
         sv_unmagic(SvRV(ST(0)), PERL_MAGIC_shared_scalar);
 
 
@@ -1208,6 +1267,8 @@ ithread_equal(...)
     PREINIT:
         int are_equal = 0;
     CODE:
+        PERL_UNUSED_VAR(items);
+
         /* Compares TIDs to determine thread equality */
         if (sv_isobject(ST(0)) && sv_isobject(ST(1))) {
             ithread *thr1 = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
@@ -1229,6 +1290,7 @@ ithread_object(...)
         char *classname;
         UV tid;
         ithread *thread;
+        int state;
         int have_obj = 0;
         dMY_POOL;
     CODE:
@@ -1254,7 +1316,10 @@ ithread_object(...)
             /* Look for TID */
             if (thread->tid == tid) {
                 /* Ignore if detached or joined */
-                if (! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) {
+                MUTEX_LOCK(&thread->mutex);
+                state = thread->state;
+                MUTEX_UNLOCK(&thread->mutex);
+                if (! (state & PERL_ITHR_UNCALLABLE)) {
                     /* Put object on stack */
                     ST(0) = sv_2mortal(S_ithread_to_SV(aTHX_ Nullsv, thread, classname, TRUE));
                     have_obj = 1;
@@ -1275,6 +1340,7 @@ ithread__handle(...);
     PREINIT:
         ithread *thread;
     CODE:
+        PERL_UNUSED_VAR(items);
         thread = S_SV_to_ithread(aTHX_ ST(0));
 #ifdef WIN32
         XST_mUV(0, PTR2UV(&thread->handle));
@@ -1290,6 +1356,7 @@ ithread_get_stack_size(...)
         IV stack_size;
         dMY_POOL;
     CODE:
+        PERL_UNUSED_VAR(items);
         if (sv_isobject(ST(0))) {
             /* $thr->get_stack_size() */
             ithread *thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
@@ -1327,12 +1394,14 @@ ithread_is_running(...)
         ithread *thread;
     CODE:
         /* Object method only */
-        if (! sv_isobject(ST(0))) {
+        if ((items != 1) || ! sv_isobject(ST(0))) {
             Perl_croak(aTHX_ "Usage: $thr->is_running()");
         }
 
         thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
+        MUTEX_LOCK(&thread->mutex);
         ST(0) = (thread->state & PERL_ITHR_FINISHED) ? &PL_sv_no : &PL_sv_yes;
+        MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */
 
 
@@ -1341,8 +1410,11 @@ ithread_is_detached(...)
     PREINIT:
         ithread *thread;
     CODE:
+        PERL_UNUSED_VAR(items);
         thread = S_SV_to_ithread(aTHX_ ST(0));
+        MUTEX_LOCK(&thread->mutex);
         ST(0) = (thread->state & PERL_ITHR_DETACHED) ? &PL_sv_yes : &PL_sv_no;
+        MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */
 
 
@@ -1352,14 +1424,14 @@ ithread_is_joinable(...)
         ithread *thread;
     CODE:
         /* Object method only */
-        if (! sv_isobject(ST(0))) {
+        if ((items != 1) || ! sv_isobject(ST(0))) {
             Perl_croak(aTHX_ "Usage: $thr->is_joinable()");
         }
 
         thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
         MUTEX_LOCK(&thread->mutex);
         ST(0) = ((thread->state & PERL_ITHR_FINISHED) &&
-                 ! (thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)))
+                 ! (thread->state & PERL_ITHR_UNCALLABLE))
             ? &PL_sv_yes : &PL_sv_no;
         MUTEX_UNLOCK(&thread->mutex);
         /* XSRETURN(1); - implied */
@@ -1370,6 +1442,7 @@ ithread_wantarray(...)
     PREINIT:
         ithread *thread;
     CODE:
+        PERL_UNUSED_VAR(items);
         thread = S_SV_to_ithread(aTHX_ ST(0));
         ST(0) = (thread->gimme & G_ARRAY) ? &PL_sv_yes :
                 (thread->gimme & G_VOID)  ? &PL_sv_undef
@@ -1394,6 +1467,59 @@ ithread_set_thread_exit_only(...)
         }
         MUTEX_UNLOCK(&thread->mutex);
 
+
+void
+ithread_error(...)
+    PREINIT:
+        ithread *thread;
+        SV *err = NULL;
+    CODE:
+        /* Object method only */
+        if ((items != 1) || ! sv_isobject(ST(0))) {
+            Perl_croak(aTHX_ "Usage: $thr->err()");
+        }
+
+        thread = INT2PTR(ithread *, SvIV(SvRV(ST(0))));
+        MUTEX_LOCK(&thread->mutex);
+
+        /* If thread died, then clone the error into the calling thread */
+        if (thread->state & PERL_ITHR_DIED) {
+            PerlInterpreter *other_perl;
+            CLONE_PARAMS clone_params;
+            ithread *current_thread;
+
+            other_perl = thread->interp;
+            clone_params.stashes = newAV();
+            clone_params.flags = CLONEf_JOIN_IN;
+            PL_ptr_table = ptr_table_new();
+            current_thread = S_ithread_get(aTHX);
+            S_ithread_set(aTHX_ thread);
+            /* Ensure 'meaningful' addresses retain their meaning */
+            ptr_table_store(PL_ptr_table, &other_perl->Isv_undef, &PL_sv_undef);
+            ptr_table_store(PL_ptr_table, &other_perl->Isv_no, &PL_sv_no);
+            ptr_table_store(PL_ptr_table, &other_perl->Isv_yes, &PL_sv_yes);
+            err = sv_dup(thread->err, &clone_params);
+            S_ithread_set(aTHX_ current_thread);
+            SvREFCNT_dec(clone_params.stashes);
+            SvREFCNT_inc_void(err);
+            /* If error was an object, bless it into the correct class */
+            if (thread->err_class) {
+                sv_bless(err, gv_stashpv(thread->err_class, 1));
+            }
+            ptr_table_free(PL_ptr_table);
+            PL_ptr_table = NULL;
+        }
+
+        MUTEX_UNLOCK(&thread->mutex);
+
+        if (! err) {
+            XSRETURN_UNDEF;
+        }
+
+        ST(0) = sv_2mortal(err);
+        /* XSRETURN(1); - implied */
+
+
 #endif /* USE_ITHREADS */