RE: [PATCH] threads 1.33
[p5sagit/p5-mst-13.2.git] / ext / threads / threads.xs
index 9593781..5e6d16c 100755 (executable)
@@ -2,7 +2,13 @@
 #include "EXTERN.h"
 #include "perl.h"
 #include "XSUB.h"
+/* Workaround for XSUB.h bug under WIN32 */
+#ifdef WIN32
+#  undef setjmp
+#  define setjmp(x) _setjmp(x)
+#endif
 #ifdef HAS_PPPORT_H
+#  define NEED_PL_signals
 #  define NEED_newRV_noinc
 #  define NEED_sv_2pv_nolen
 #  include "ppport.h"
@@ -32,6 +38,9 @@ typedef perl_os_thread pthread_t;
 #    define PERL_THREAD_DETACH(t) pthread_detach((t))
 #  endif
 #endif
+#if !defined(HAS_GETPAGESIZE) && defined(I_SYS_PARAM)
+#  include <sys/param.h>
+#endif
 
 /* Values for 'state' member */
 #define PERL_ITHR_JOINABLE      0
@@ -77,7 +86,9 @@ static ithread *threads;
 static perl_mutex create_destruct_mutex;
 
 static UV tid_counter = 0;
-static IV active_threads = 0;
+static IV joinable_threads = 0;
+static IV running_threads = 0;
+static IV detached_threads = 0;
 #ifdef THREAD_CREATE_NEEDS_STACK
 static IV default_stack_size = THREAD_CREATE_NEEDS_STACK;
 #else
@@ -150,11 +161,11 @@ S_ithread_destruct(pTHX_ ithread *thread)
         return;
     }
 
-    MUTEX_LOCK(&create_destruct_mutex);
     /* Main thread (0) is immortal and should never get here */
     assert(thread->tid != 0);
 
     /* Remove from circular list of threads */
+    MUTEX_LOCK(&create_destruct_mutex);
     thread->next->prev = thread->prev;
     thread->prev->next = thread->next;
     thread->next = NULL;
@@ -190,9 +201,17 @@ Perl_ithread_hook(pTHX)
 {
     int veto_cleanup = 0;
     MUTEX_LOCK(&create_destruct_mutex);
-    if ((aTHX == PL_curinterp) && (active_threads != 1)) {
+    if ((aTHX == PL_curinterp) &&
+        (running_threads || joinable_threads))
+    {
         if (ckWARN_d(WARN_THREADS)) {
-            Perl_warn(aTHX_ "A thread exited while %" IVdf " threads were running", active_threads);
+            Perl_warn(aTHX_ "Perl exited with active threads:\n\t%"
+                            IVdf " running and unjoined\n\t%"
+                            IVdf " finished and unjoined\n\t%"
+                            IVdf " running and detached\n",
+                            running_threads,
+                            joinable_threads,
+                            detached_threads);
         }
         veto_cleanup = 1;
     }
@@ -261,7 +280,7 @@ good_stack_size(pTHX_ IV stack_size)
 #ifdef PTHREAD_STACK_MIN
     /* Can't use less than minimum */
     if (stack_size < PTHREAD_STACK_MIN) {
-        if (ckWARN_d(WARN_THREADS)) {
+        if (ckWARN(WARN_THREADS)) {
             Perl_warn(aTHX_ "Using minimum thread stack size of %" IVdf, (IV)PTHREAD_STACK_MIN);
         }
         return (PTHREAD_STACK_MIN);
@@ -270,17 +289,13 @@ good_stack_size(pTHX_ IV stack_size)
 
     /* Round up to page size boundary */
     if (page_size <= 0) {
-#ifdef PL_mmap_page_size
-        page_size = PL_mmap_page_size;
-#else
-#  ifdef HAS_MMAP
-#    if defined(HAS_SYSCONF) && (defined(_SC_PAGESIZE) || defined(_SC_MMAP_PAGE_SIZE))
+#if defined(HAS_SYSCONF) && (defined(_SC_PAGESIZE) || defined(_SC_MMAP_PAGE_SIZE))
         SETERRNO(0, SS_NORMAL);
-#      ifdef _SC_PAGESIZE
+#  ifdef _SC_PAGESIZE
         page_size = sysconf(_SC_PAGESIZE);
-#      else
+#  else
         page_size = sysconf(_SC_MMAP_PAGE_SIZE);
-#      endif
+#  endif
         if ((long)page_size < 0) {
             if (errno) {
                 SV * const error = get_sv("@", FALSE);
@@ -290,20 +305,18 @@ good_stack_size(pTHX_ IV stack_size)
                 Perl_croak(aTHX_ "PANIC: sysconf: pagesize unknown");
             }
         }
-#    else
-#      ifdef HAS_GETPAGESIZE
+#else
+#  ifdef HAS_GETPAGESIZE
         page_size = getpagesize();
-#      else
-#        if defined(I_SYS_PARAM) && defined(PAGESIZE)
-        page_size = PAGESIZE;
-#        endif
-#      endif
-        if (page_size <= 0)
-            Perl_croak(aTHX_ "PANIC: bad pagesize %" IVdf, (IV)page_size);
-#    endif
 #  else
+#    if defined(I_SYS_PARAM) && defined(PAGESIZE)
+        page_size = PAGESIZE;
+#    else
         page_size = 8192;   /* A conservative default */
+#    endif
 #  endif
+        if (page_size <= 0)
+            Perl_croak(aTHX_ "PANIC: bad pagesize %" IVdf, (IV)page_size);
 #endif
     }
     stack_size = ((stack_size + (page_size - 1)) / page_size) * page_size;
@@ -347,6 +360,10 @@ S_ithread_run(void * arg)
         AV *params = (AV *)SvRV(thread->params);
         int len = (int)av_len(params)+1;
         int ii;
+        int jmp_rc = 0;
+        I32 oldscope;
+
+        dJMPENV;
 
         dSP;
         ENTER;
@@ -359,24 +376,44 @@ S_ithread_run(void * arg)
         }
         PUTBACK;
 
-        /* Run the specified function */
-        len = (int)call_sv(thread->init_function, thread->gimme|G_EVAL);
+        oldscope = PL_scopestack_ix;
+        JMPENV_PUSH(jmp_rc);
+        if (jmp_rc == 0) {
+            /* Run the specified function */
+            len = (int)call_sv(thread->init_function, thread->gimme|G_EVAL);
+        } else if (jmp_rc == 2) {
+            while (PL_scopestack_ix > oldscope) {
+                LEAVE;
+            }
+        }
+        JMPENV_POP;
 
         /* Remove args from stack and put back in params array */
         SPAGAIN;
         for (ii=len-1; ii >= 0; ii--) {
             SV *sv = POPs;
-            av_store(params, ii, SvREFCNT_inc(sv));
+            if (jmp_rc == 0) {
+                av_store(params, ii, SvREFCNT_inc(sv));
+            }
         }
 
+        FREETMPS;
+        LEAVE;
+
         /* Check for failure */
         if (SvTRUE(ERRSV) && ckWARN_d(WARN_THREADS)) {
-            Perl_warn(aTHX_ "Thread failed to start: %" SVf, ERRSV);
+            oldscope = PL_scopestack_ix;
+            JMPENV_PUSH(jmp_rc);
+            if (jmp_rc == 0) {
+                Perl_warn(aTHX_ "Thread %" UVuf " terminated abnormally: %" SVf, thread->tid, ERRSV);
+            } else if (jmp_rc == 2) {
+                while (PL_scopestack_ix > oldscope) {
+                    LEAVE;
+                }
+            }
+            JMPENV_POP;
         }
 
-        FREETMPS;
-        LEAVE;
-
         /* Release function ref */
         SvREFCNT_dec(thread->init_function);
         thread->init_function = Nullsv;
@@ -391,12 +428,17 @@ S_ithread_run(void * arg)
     cleanup = (thread->state & PERL_ITHR_DETACHED);
     MUTEX_UNLOCK(&thread->mutex);
 
-    if (cleanup)
+    if (cleanup) {
+        MUTEX_LOCK(&create_destruct_mutex);
+        detached_threads--;
+        MUTEX_UNLOCK(&create_destruct_mutex);
         S_ithread_destruct(aTHX_ thread);
-
-    MUTEX_LOCK(&create_destruct_mutex);
-    active_threads--;
-    MUTEX_UNLOCK(&create_destruct_mutex);
+    } else {
+        MUTEX_LOCK(&create_destruct_mutex);
+        running_threads--;
+        joinable_threads++;
+        MUTEX_UNLOCK(&create_destruct_mutex);
+    }
 
 #ifdef WIN32
     return ((DWORD)0);
@@ -453,6 +495,7 @@ S_ithread_create(
         char     *classname,
         SV       *init_function,
         IV        stack_size,
+        int       gimme,
         SV       *params)
 {
     ithread     *thread;
@@ -491,7 +534,7 @@ S_ithread_create(
     MUTEX_INIT(&thread->mutex);
     thread->tid = tid_counter++;
     thread->stack_size = good_stack_size(aTHX_ stack_size);
-    thread->gimme = GIMME_V;
+    thread->gimme = gimme;
 
     /* "Clone" our interpreter into the thread's interpreter.
      * This gives thread access to "static data" and code.
@@ -523,10 +566,16 @@ S_ithread_create(
          */
         SvREFCNT_dec(PL_endav);
         PL_endav = newAV();
-        clone_param.flags = 0;
-        thread->init_function = sv_dup(init_function, &clone_param);
-        if (SvREFCNT(thread->init_function) == 0) {
-            SvREFCNT_inc(thread->init_function);
+
+        if (SvPOK(init_function)) {
+            thread->init_function = newSV(0);
+            sv_copypv(thread->init_function, init_function);
+        } else {
+            clone_param.flags = 0;
+            thread->init_function = sv_dup(init_function, &clone_param);
+            if (SvREFCNT(thread->init_function) == 0) {
+                SvREFCNT_inc(thread->init_function);
+            }
         }
 
         thread->params = sv_dup(params, &clone_param);
@@ -615,11 +664,14 @@ S_ithread_create(
         /* Try to get thread's actual stack size */
         {
             size_t stacksize;
-            if (! pthread_attr_getstacksize(&attr, &stacksize)) {
-                if (stacksize) {
+#ifdef HPUX1020
+            stacksize = pthread_attr_getstacksize(attr);
+#else
+            if (! pthread_attr_getstacksize(&attr, &stacksize))
+#endif
+                if (stacksize > 0) {
                     thread->stack_size = (IV)stacksize;
                 }
-            }
         }
 #  endif
     }
@@ -645,7 +697,7 @@ S_ithread_create(
         return (&PL_sv_undef);
     }
 
-    active_threads++;
+    running_threads++;
     MUTEX_UNLOCK(&create_destruct_mutex);
 
     sv_2mortal(params);
@@ -670,6 +722,8 @@ ithread_create(...)
         AV *params;
         HV *specs;
         IV stack_size;
+        int context;
+        char *str;
         int idx;
         int ii;
     CODE:
@@ -698,6 +752,7 @@ ithread_create(...)
 
         function_to_call = ST(idx+1);
 
+        context = -1;
         if (specs) {
             /* stack_size */
             if (hv_exists(specs, "stack", 5)) {
@@ -707,6 +762,44 @@ ithread_create(...)
             } else if (hv_exists(specs, "stack_size", 10)) {
                 stack_size = SvIV(*hv_fetch(specs, "stack_size", 10, 0));
             }
+
+            /* context */
+            if (hv_exists(specs, "context", 7)) {
+                str = (char *)SvPV_nolen(*hv_fetch(specs, "context", 7, 0));
+                switch (*str) {
+                    case 'a':
+                    case 'A':
+                        context = G_ARRAY;
+                        break;
+                    case 's':
+                    case 'S':
+                        context = G_SCALAR;
+                        break;
+                    case 'v':
+                    case 'V':
+                        context = G_VOID;
+                        break;
+                    default:
+                        Perl_croak(aTHX_ "Invalid context: %s", str);
+                }
+            } else if (hv_exists(specs, "array", 5)) {
+                if (SvTRUE(*hv_fetch(specs, "array", 5, 0))) {
+                    context = G_ARRAY;
+                }
+            } else if (hv_exists(specs, "scalar", 6)) {
+                if (SvTRUE(*hv_fetch(specs, "scalar", 6, 0))) {
+                    context = G_SCALAR;
+                }
+            } else if (hv_exists(specs, "void", 4)) {
+                if (SvTRUE(*hv_fetch(specs, "void", 4, 0))) {
+                    context = G_VOID;
+                }
+            }
+        }
+        if (context == -1) {
+            context = GIMME_V;  /* Implicit context */
+        } else {
+            context |= (GIMME_V & (~(G_ARRAY|G_SCALAR|G_VOID)));
         }
 
         /* Function args */
@@ -722,6 +815,7 @@ ithread_create(...)
                                             classname,
                                             function_to_call,
                                             stack_size,
+                                            context,
                                             newRV_noinc((SV*)params)));
         /* XSRETURN(1); - implied */
 
@@ -864,6 +958,10 @@ ithread_join(...)
         S_ithread_clear(aTHX_ thread);
         MUTEX_UNLOCK(&thread->mutex);
 
+        MUTEX_LOCK(&create_destruct_mutex);
+        joinable_threads--;
+        MUTEX_UNLOCK(&create_destruct_mutex);
+
         /* If no return values, then just return */
         if (! params) {
             XSRETURN_UNDEF;
@@ -917,8 +1015,58 @@ ithread_detach(...)
         cleanup = (thread->state & PERL_ITHR_FINISHED);
         MUTEX_UNLOCK(&thread->mutex);
 
-        if (cleanup)
+        MUTEX_LOCK(&create_destruct_mutex);
+        if (cleanup) {
+            joinable_threads--;
+        } else {
+            running_threads--;
+            detached_threads++;
+        }
+        MUTEX_UNLOCK(&create_destruct_mutex);
+
+        if (cleanup) {
             S_ithread_destruct(aTHX_ thread);
+        }
+
+
+void
+ithread_kill(...)
+    PREINIT:
+        ithread *thread;
+        char *sig_name;
+        IV signal;
+    CODE:
+        /* Must have safe signals */
+        if (PL_signals & PERL_SIGNALS_UNSAFE_FLAG)
+            Perl_croak(aTHX_ "Cannot signal threads without safe signals");
+
+        /* Object method only */
+        if (! sv_isobject(ST(0)))
+            Perl_croak(aTHX_ "Usage: $thr->kill('SIG...')");
+
+        /* Get signal */
+        sig_name = SvPV_nolen(ST(1));
+        if (isALPHA(*sig_name)) {
+            if (*sig_name == 'S' && sig_name[1] == 'I' && sig_name[2] == 'G')
+                sig_name += 3;
+            if ((signal = whichsig(sig_name)) < 0)
+                Perl_croak(aTHX_ "Unrecognized signal name: %s", sig_name);
+        } else
+            signal = SvIV(ST(1));
+
+        /* Set the signal for the thread */
+        thread = SV_to_ithread(aTHX_ ST(0));
+        MUTEX_LOCK(&thread->mutex);
+        {
+            dTHXa(thread->interp);
+            PL_psig_pend[signal]++;
+            PL_sig_pending = 1;
+        }
+        MUTEX_UNLOCK(&thread->mutex);
+
+        /* Return the thread to allow for method chaining */
+        ST(0) = ST(0);
+        /* XSRETURN(1); - implied */
 
 
 void
@@ -1063,7 +1211,6 @@ BOOT:
     }
     Zero(thread, 1, ithread);
 
-    PL_perl_destruct_level = 2;
     MUTEX_INIT(&thread->mutex);
 
     thread->tid = tid_counter++;        /* Thread 0 */
@@ -1084,8 +1231,6 @@ BOOT:
     thread->thr = pthread_self();
 #  endif
 
-    active_threads++;
-
     S_ithread_set(aTHX_ thread);
     MUTEX_UNLOCK(&create_destruct_mutex);
 #endif /* USE_ITHREADS */