#ifdef HAS_PPPORT_H
# define NEED_PL_signals
# define NEED_newRV_noinc
-# define NEED_sv_2pv_nolen
+# define NEED_sv_2pv_flags
# include "ppport.h"
# include "threads.h"
#endif
#endif
/* Values for 'state' member */
-#define PERL_ITHR_DETACHED 1
-#define PERL_ITHR_JOINED 2
-#define PERL_ITHR_UNCALLABLE (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)
-#define PERL_ITHR_FINISHED 4
-#define PERL_ITHR_THREAD_EXIT_ONLY 8
-#define PERL_ITHR_NONVIABLE 16
-#define PERL_ITHR_DESTROYED 32
-#define PERL_ITHR_DIED 64
+#define PERL_ITHR_DETACHED 1 /* Thread has been detached */
+#define PERL_ITHR_JOINED 2 /* Thread has been joined */
+#define PERL_ITHR_FINISHED 4 /* Thread has finished execution */
+#define PERL_ITHR_THREAD_EXIT_ONLY 8 /* exit() only exits current thread */
+#define PERL_ITHR_NONVIABLE 16 /* Thread creation failed */
+#define PERL_ITHR_DIED 32 /* Thread finished by dying */
+
+#define PERL_ITHR_UNCALLABLE (PERL_ITHR_DETACHED|PERL_ITHR_JOINED)
+
typedef struct _ithread {
struct _ithread *next; /* Next thread in the list */
PerlInterpreter *interp; /* The threads interpreter */
UV tid; /* Threads module's thread id */
perl_mutex mutex; /* Mutex for updating things in this struct */
- int count; /* How many SVs have a reference to us */
+ int count; /* Reference count. See S_ithread_create. */
int state; /* Detached, joined, finished, etc. */
int gimme; /* Context of create */
SV *init_function; /* Code to run */
IV joinable_threads;
IV running_threads;
IV detached_threads;
+ IV total_threads;
IV default_stack_size;
IV page_size;
} my_pool_t;
/* Free any data (such as the Perl interpreter) attached to an ithread
* structure. This is a bit like undef on SVs, where the SV isn't freed,
- * but the PVX is. Must be called with thread->mutex already held.
+ * but the PVX is. Must be called with thread->mutex already locked. Also,
+ * must be called with MY_POOL.create_destruct_mutex unlocked as destruction
+ * of the interpreter can lead to recursive destruction calls that could
+ * lead to a deadlock on that mutex.
*/
STATIC void
S_ithread_clear(pTHX_ ithread *thread)
}
-/* Free an ithread structure and any attached data if its count == 0 */
+/* Decrement the refcount of an ithread, and if it reaches zero, free it.
+ * Must be called with the mutex held.
+ * On return, mutex is released (or destroyed).
+ */
STATIC void
-S_ithread_destruct(pTHX_ ithread *thread)
+S_ithread_free(pTHX_ ithread *thread)
{
- int destroy = 0;
#ifdef WIN32
HANDLE handle;
#endif
dMY_POOL;
- /* Determine if thread can be destroyed now */
- MUTEX_LOCK(&thread->mutex);
- if (thread->count != 0) {
- destroy = 0;
- } else if (thread->state & PERL_ITHR_DESTROYED) {
- destroy = 0;
- } else if (thread->state & PERL_ITHR_NONVIABLE) {
- thread->state |= PERL_ITHR_DESTROYED;
- destroy = 1;
- } else if (! (thread->state & PERL_ITHR_FINISHED)) {
- destroy = 0;
- } else if (! (thread->state & PERL_ITHR_UNCALLABLE)) {
- destroy = 0;
- } else {
- thread->state |= PERL_ITHR_DESTROYED;
- destroy = 1;
+ if (! (thread->state & PERL_ITHR_NONVIABLE)) {
+ assert(thread->count > 0);
+ if (--thread->count > 0) {
+ MUTEX_UNLOCK(&thread->mutex);
+ return;
+ }
+ assert((thread->state & PERL_ITHR_FINISHED) &&
+ (thread->state & PERL_ITHR_UNCALLABLE));
}
MUTEX_UNLOCK(&thread->mutex);
- if (! destroy) return;
/* Main thread (0) is immortal and should never get here */
assert(thread->tid != 0);
}
#endif
- /* Call PerlMemShared_free() in the context of the "first" interpreter
- * per http://www.nntp.perl.org/group/perl.perl5.porters/110772
- */
- aTHX = MY_POOL.main_thread.interp;
PerlMemShared_free(thread);
+
+ /* total_threads >= 1 is used to veto cleanup by the main thread,
+ * should it happen to exit while other threads still exist.
+ * Decrement this as the very last thing in the thread's existence.
+ * Otherwise, MY_POOL and global state such as PL_op_mutex may get
+ * freed while we're still using it.
+ */
+ MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
+ MY_POOL.total_threads--;
+ MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
+}
+
+
+static void
+S_ithread_count_inc(pTHX_ ithread *thread)
+{
+ MUTEX_LOCK(&thread->mutex);
+ thread->count++;
+ MUTEX_UNLOCK(&thread->mutex);
}
STATIC int
S_exit_warning(pTHX)
{
- int veto_cleanup;
+ int veto_cleanup, warn;
dMY_POOL;
MUTEX_LOCK(&MY_POOL.create_destruct_mutex);
- veto_cleanup = (MY_POOL.running_threads || MY_POOL.joinable_threads);
+ veto_cleanup = (MY_POOL.total_threads > 0);
+ warn = (MY_POOL.running_threads || MY_POOL.joinable_threads);
MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
- if (veto_cleanup) {
+ if (warn) {
if (ckWARN_d(WARN_THREADS)) {
Perl_warn(aTHX_ "Perl exited with active threads:\n\t%"
IVdf " running and unjoined\n\t%"
return (veto_cleanup);
}
-/* Called on exit from main thread */
+
+/* Called from perl_destruct() in each thread. If it's the main thread,
+ * stop it from freeing everything if there are other threads still running.
+ */
int
Perl_ithread_hook(pTHX)
{
ithread_mg_free(pTHX_ SV *sv, MAGIC *mg)
{
ithread *thread = (ithread *)mg->mg_ptr;
-
MUTEX_LOCK(&thread->mutex);
- thread->count--;
- MUTEX_UNLOCK(&thread->mutex);
-
- /* Try to clean up thread */
- S_ithread_destruct(aTHX_ thread);
-
+ S_ithread_free(aTHX_ thread); /* Releases MUTEX */
return (0);
}
int
ithread_mg_dup(pTHX_ MAGIC *mg, CLONE_PARAMS *param)
{
- ithread *thread = (ithread *)mg->mg_ptr;
- MUTEX_LOCK(&thread->mutex);
- thread->count++;
- MUTEX_UNLOCK(&thread->mutex);
+ S_ithread_count_inc(aTHX_ (ithread *)mg->mg_ptr);
return (0);
}
SPAGAIN;
for (ii=len-1; ii >= 0; ii--) {
SV *sv = POPs;
- if (jmp_rc == 0) {
+ if (jmp_rc == 0 && (thread->gimme & G_WANT) != G_VOID) {
av_store(params, ii, SvREFCNT_inc(sv));
}
}
my_exit(exit_code);
}
- /* Try to clean up thread */
- S_ithread_destruct(aTHX_ thread);
+ /* At this point, the interpreter may have been freed, so call
+ * free in the the context of of the 'main' interpreter which
+ * can't have been freed due to the veto_cleanup mechanism.
+ */
+ aTHX = MY_POOL.main_thread.interp;
+
+ MUTEX_LOCK(&thread->mutex);
+ S_ithread_free(aTHX_ thread); /* Releases MUTEX */
#ifdef WIN32
return ((DWORD)0);
SV *sv;
MAGIC *mg;
- /* If incrementing thread ref count, then call within mutex lock */
- if (inc) {
- MUTEX_LOCK(&thread->mutex);
- thread->count++;
- MUTEX_UNLOCK(&thread->mutex);
- }
+ if (inc)
+ S_ithread_count_inc(aTHX_ thread);
if (! obj) {
obj = newSV(0);
thread->prev = MY_POOL.main_thread.prev;
MY_POOL.main_thread.prev = thread;
thread->prev->next = thread;
-
- /* Set count to 1 immediately in case thread exits before
- * we return to caller!
+ MY_POOL.total_threads++;
+
+ /* 1 ref to be held by the local var 'thread' in S_ithread_run().
+ * 1 ref to be held by the threads object that we assume we will
+ * be embedded in upon our return.
+ * 1 ref to be the responsibility of join/detach, so we don't get
+ * freed until join/detach, even if no thread objects remain.
+ * This allows the following to work:
+ * { threads->create(sub{...}); } threads->object(1)->join;
*/
- thread->count = 1;
+ thread->count = 3;
/* Block new thread until ->create() call finishes */
MUTEX_INIT(&thread->mutex);
thread->init_function = newSV(0);
sv_copypv(thread->init_function, init_function);
} else {
- thread->init_function = sv_dup(init_function, &clone_param);
- if (SvREFCNT(thread->init_function) == 0) {
- SvREFCNT_inc_void(thread->init_function);
- }
+ thread->init_function =
+ SvREFCNT_inc(sv_dup(init_function, &clone_param));
}
thread->params = sv_dup(params, &clone_param);
MUTEX_UNLOCK(&MY_POOL.create_destruct_mutex);
sv_2mortal(params);
thread->state |= PERL_ITHR_NONVIABLE;
- S_ithread_destruct(aTHX_ thread);
+ S_ithread_free(aTHX_ thread); /* Releases MUTEX */
#ifndef WIN32
if (ckWARN_d(WARN_THREADS)) {
if (rc_stack_size) {
CODE:
if ((items >= 2) && SvROK(ST(1)) && SvTYPE(SvRV(ST(1)))==SVt_PVHV) {
if (--items < 2) {
- Perl_croak(aTHX_ "Usage: threads->create(\\%specs, function, ...)");
+ Perl_croak(aTHX_ "Usage: threads->create(\\%%specs, function, ...)");
}
specs = (HV*)SvRV(ST(1));
idx = 1;
switch (*str) {
case 'a':
case 'A':
+ case 'l':
+ case 'L':
context = G_ARRAY;
break;
case 's':
if (SvTRUE(*hv_fetch(specs, "array", 5, 0))) {
context = G_ARRAY;
}
+ } else if (hv_exists(specs, "list", 4)) {
+ if (SvTRUE(*hv_fetch(specs, "list", 4, 0))) {
+ context = G_ARRAY;
+ }
} else if (hv_exists(specs, "scalar", 6)) {
if (SvTRUE(*hv_fetch(specs, "scalar", 6, 0))) {
context = G_SCALAR;
ithread *thread;
ithread *current_thread;
int join_err;
- AV *params;
+ AV *params = NULL;
int len;
int ii;
-#ifdef WIN32
- DWORD waitcode;
-#else
+#ifndef WIN32
int rc_join;
void *retval;
#endif
MUTEX_LOCK(&thread->mutex);
/* Get the return value from the call_sv */
/* Objects do not survive this process - FIXME */
- {
+ if ((thread->gimme & G_WANT) != G_VOID) {
AV *params_copy;
PerlInterpreter *other_perl;
CLONE_PARAMS clone_params;
if (! (thread->state & PERL_ITHR_DIED)) {
S_ithread_clear(aTHX_ thread);
}
- MUTEX_UNLOCK(&thread->mutex);
-
- /* Try to cleanup thread */
- S_ithread_destruct(aTHX_ thread);
+ S_ithread_free(aTHX_ thread); /* Releases MUTEX */
/* If no return values, then just return */
if (! params) {
{
S_ithread_clear(aTHX_ thread);
}
- MUTEX_UNLOCK(&thread->mutex);
-
- /* Try to cleanup thread */
- S_ithread_destruct(aTHX_ thread);
+ S_ithread_free(aTHX_ thread); /* Releases MUTEX */
void
if (sv_isobject(ST(0))) {
Perl_croak(aTHX_ "Cannot change stack size of an existing thread");
}
+ if (! looks_like_number(ST(1))) {
+ Perl_croak(aTHX_ "Stack size must be numeric");
+ }
old_size = MY_POOL.default_stack_size;
MY_POOL.default_stack_size = S_good_stack_size(aTHX_ SvIV(ST(1)));
CODE:
PERL_UNUSED_VAR(items);
thread = S_SV_to_ithread(aTHX_ ST(0));
- ST(0) = (thread->gimme & G_ARRAY) ? &PL_sv_yes :
- (thread->gimme & G_VOID) ? &PL_sv_undef
- /* G_SCALAR */ : &PL_sv_no;
+ ST(0) = ((thread->gimme & G_WANT) == G_ARRAY) ? &PL_sv_yes :
+ ((thread->gimme & G_WANT) == G_VOID) ? &PL_sv_undef
+ /* G_SCALAR */ : &PL_sv_no;
/* XSRETURN(1); - implied */