X-Git-Url: http://git.shadowcat.co.uk/gitweb/gitweb.cgi?a=blobdiff_plain;f=ext%2Fthreads%2Fthreads.xs;h=48530fea617e346fda46a87b9e539e019c9870fa;hb=87c9b3a674e8d668946befbd197e1e7dcbafd7e6;hp=006e55252c8600d83f5925e4ec4557690c335a51;hpb=58c2ef1935bc22d76403b75989b56de9eecb6730;p=p5sagit%2Fp5-mst-13.2.git diff --git a/ext/threads/threads.xs b/ext/threads/threads.xs index 006e552..48530fe 100755 --- a/ext/threads/threads.xs +++ b/ext/threads/threads.xs @@ -3,55 +3,47 @@ #include "perl.h" #include "XSUB.h" +#ifdef USE_ITHREADS + + #ifdef WIN32 #include #include -#define PERL_THREAD_SETSPECIFIC(k,v) TlsSetValue(k,v) -#define PERL_THREAD_GETSPECIFIC(k,v) v = TlsGetValue(k) -#define PERL_THREAD_ALLOC_SPECIFIC(k) \ -STMT_START {\ - if((k = TlsAlloc()) == TLS_OUT_OF_INDEXES) {\ - PerlIO_printf(PerlIO_stderr(),"panic threads.h: TlsAlloc");\ - exit(1);\ - }\ -} STMT_END +#else +#ifdef OS2 +typedef perl_os_thread pthread_t; #else #include +#endif #include - #define PERL_THREAD_SETSPECIFIC(k,v) pthread_setspecific(k,v) #ifdef OLD_PTHREADS_API #define PERL_THREAD_DETACH(t) pthread_detach(&(t)) -#define PERL_THREAD_GETSPECIFIC(k,v) pthread_getspecific(k,&v) -#define PERL_THREAD_ALLOC_SPECIFIC(k) STMT_START {\ - if(pthread_keycreate(&(k),0)) {\ - PerlIO_printf(PerlIO_stderr(), "panic threads.h: pthread_key_create");\ - exit(1);\ - }\ -} STMT_END #else #define PERL_THREAD_DETACH(t) pthread_detach((t)) -#define PERL_THREAD_GETSPECIFIC(k,v) v = pthread_getspecific(k) -#define PERL_THREAD_ALLOC_SPECIFIC(k) STMT_START {\ - if(pthread_key_create(&(k),0)) {\ - PerlIO_printf(PerlIO_stderr(), "panic threads.h: pthread_key_create");\ - exit(1);\ - }\ -} STMT_END -#endif +#endif /* OLD_PTHREADS_API */ #endif + + + +/* Values for 'state' member */ +#define PERL_ITHR_JOINABLE 0 +#define PERL_ITHR_DETACHED 1 +#define PERL_ITHR_FINISHED 4 +#define PERL_ITHR_JOINED 2 + typedef struct ithread_s { - struct ithread_s *next; /* next thread in the list */ - struct ithread_s *prev; /* prev thread in the list */ + struct ithread_s *next; /* Next thread in the list */ + struct ithread_s *prev; /* Prev thread in the list */ PerlInterpreter *interp; /* The threads interpreter */ - I32 tid; /* threads module's thread id */ - perl_mutex mutex; /* mutex for updating things in this struct */ - I32 count; /* how many SVs have a reference to us */ - signed char detached; /* are we detached ? */ + I32 tid; /* Threads module's thread id */ + perl_mutex mutex; /* Mutex for updating things in this struct */ + I32 count; /* How many SVs have a reference to us */ + signed char state; /* Are we detached ? */ int gimme; /* Context of create */ SV* init_function; /* Code to run */ - SV* params; /* args to pass function */ + SV* params; /* Args to pass function */ #ifdef WIN32 DWORD thr; /* OS's idea if thread id */ HANDLE handle; /* OS's waitable handle */ @@ -68,20 +60,43 @@ ithread *threads; #define ithread_CLONE(thread) Perl_ithread_CLONE(aTHX_ thread) #define ithread_detach(thread) Perl_ithread_detach(aTHX_ thread) #define ithread_tid(thread) ((thread)->tid) +#define ithread_yield(thread) (YIELD); static perl_mutex create_destruct_mutex; /* protects the creation and destruction of threads*/ I32 tid_counter = 0; +I32 known_threads = 0; I32 active_threads = 0; -perl_key self_key; + + +void Perl_ithread_set (pTHX_ ithread* thread) +{ + SV* thread_sv = newSViv(PTR2IV(thread)); + if(!hv_store(PL_modglobal, "threads::self", 12, thread_sv,0)) { + croak("%s\n","Internal error, couldn't set TLS"); + } +} + +ithread* Perl_ithread_get (pTHX) { + SV** thread_sv = hv_fetch(PL_modglobal, "threads::self",12,0); + if(!thread_sv) { + croak("%s\n","Internal error, couldn't get TLS"); + } + return INT2PTR(ithread*,SvIV(*thread_sv)); +} + + /* * Clear up after thread is done with */ void -Perl_ithread_destruct (pTHX_ ithread* thread) +Perl_ithread_destruct (pTHX_ ithread* thread, const char *why) { MUTEX_LOCK(&thread->mutex); + if (!thread->next) { + Perl_croak(aTHX_ "panic: destruct destroyed thread %p (%s)",thread, why); + } if (thread->count != 0) { MUTEX_UNLOCK(&thread->mutex); return; @@ -93,30 +108,88 @@ Perl_ithread_destruct (pTHX_ ithread* thread) threads = NULL; } else { - thread->next->prev = thread->prev->next; - thread->prev->next = thread->next->prev; + thread->next->prev = thread->prev; + thread->prev->next = thread->next; if (threads == thread) { threads = thread->next; } + thread->next = NULL; + thread->prev = NULL; } - active_threads--; - MUTEX_UNLOCK(&create_destruct_mutex); - /* Thread is now disowned */ + known_threads--; + assert( known_threads >= 0 ); #if 0 - Perl_warn(aTHX_ "destruct %d @ %p by %p", - thread->tid,thread->interp,aTHX); + Perl_warn(aTHX_ "destruct %d @ %p by %p now %d", + thread->tid,thread->interp,aTHX, known_threads); #endif - if (thread->interp) { + MUTEX_UNLOCK(&create_destruct_mutex); + /* Thread is now disowned */ + + if(thread->interp) { dTHXa(thread->interp); + ithread* current_thread; +#ifdef OEMVS + void *ptr; +#endif PERL_SET_CONTEXT(thread->interp); + current_thread = Perl_ithread_get(aTHX); + Perl_ithread_set(aTHX_ thread); + + + + + SvREFCNT_dec(thread->params); + + + + thread->params = Nullsv; perl_destruct(thread->interp); - perl_free(thread->interp); + perl_free(thread->interp); thread->interp = NULL; } - PERL_SET_CONTEXT(aTHX); MUTEX_UNLOCK(&thread->mutex); + MUTEX_DESTROY(&thread->mutex); + PerlMemShared_free(thread); + + PERL_SET_CONTEXT(aTHX); } +int +Perl_ithread_hook(pTHX) +{ + int veto_cleanup = 0; + MUTEX_LOCK(&create_destruct_mutex); + if (aTHX == PL_curinterp && active_threads != 1) { + Perl_warn(aTHX_ "A thread exited while %" IVdf " threads were running", + (IV)active_threads); + veto_cleanup = 1; + } + MUTEX_UNLOCK(&create_destruct_mutex); + return veto_cleanup; +} + +void +Perl_ithread_detach(pTHX_ ithread *thread) +{ + MUTEX_LOCK(&thread->mutex); + if (!(thread->state & (PERL_ITHR_DETACHED|PERL_ITHR_JOINED))) { + thread->state |= PERL_ITHR_DETACHED; +#ifdef WIN32 + CloseHandle(thread->handle); + thread->handle = 0; +#else + PERL_THREAD_DETACH(thread->thr); +#endif + } + if ((thread->state & PERL_ITHR_FINISHED) && + (thread->state & PERL_ITHR_DETACHED)) { + MUTEX_UNLOCK(&thread->mutex); + Perl_ithread_destruct(aTHX_ thread, "detach"); + } + else { + MUTEX_UNLOCK(&thread->mutex); + } +} /* MAGIC (in mg.h sense) hooks */ @@ -135,9 +208,21 @@ ithread_mg_free(pTHX_ SV *sv, MAGIC *mg) ithread *thread = (ithread *) mg->mg_ptr; MUTEX_LOCK(&thread->mutex); thread->count--; - MUTEX_UNLOCK(&thread->mutex); - /* This is safe as it re-checks count */ - Perl_ithread_destruct(aTHX_ thread); + if (thread->count == 0) { + if(thread->state & PERL_ITHR_FINISHED && + (thread->state & PERL_ITHR_DETACHED || + thread->state & PERL_ITHR_JOINED)) + { + MUTEX_UNLOCK(&thread->mutex); + Perl_ithread_destruct(aTHX_ thread, "no reference"); + } + else { + MUTEX_UNLOCK(&thread->mutex); + } + } + else { + MUTEX_UNLOCK(&thread->mutex); + } return 0; } @@ -177,7 +262,7 @@ Perl_ithread_run(void * arg) { ithread* thread = (ithread*) arg; dTHXa(thread->interp); PERL_SET_CONTEXT(thread->interp); - PERL_THREAD_SETSPECIFIC(self_key,thread); + Perl_ithread_set(aTHX_ thread); #if 0 /* Far from clear messing with ->thr child-side is a good idea */ @@ -205,14 +290,14 @@ Perl_ithread_run(void * arg) { } PUTBACK; len = call_sv(thread->init_function, thread->gimme|G_EVAL); + SPAGAIN; for (i=len-1; i >= 0; i--) { SV *sv = POPs; av_store(params, i, SvREFCNT_inc(sv)); } - PUTBACK; if (SvTRUE(ERRSV)) { - Perl_warn(aTHX_ "Died:%_",ERRSV); + Perl_warn(aTHX_ "thread failed to start: %" SVf, ERRSV); } FREETMPS; LEAVE; @@ -221,15 +306,19 @@ Perl_ithread_run(void * arg) { PerlIO_flush((PerlIO*)NULL); MUTEX_LOCK(&thread->mutex); - if (thread->detached & 1) { + thread->state |= PERL_ITHR_FINISHED; + + if (thread->state & PERL_ITHR_DETACHED) { MUTEX_UNLOCK(&thread->mutex); - SvREFCNT_dec(thread->params); - thread->params = Nullsv; - Perl_ithread_destruct(aTHX_ thread); + Perl_ithread_destruct(aTHX_ thread, "detached finish"); } else { - thread->detached |= 4; - MUTEX_UNLOCK(&thread->mutex); - } + MUTEX_UNLOCK(&thread->mutex); + } + MUTEX_LOCK(&create_destruct_mutex); + active_threads--; + assert( active_threads >= 0 ); + MUTEX_UNLOCK(&create_destruct_mutex); + #ifdef WIN32 return (DWORD)0; #else @@ -260,20 +349,18 @@ ithread_to_SV(pTHX_ SV *obj, ithread *thread, char *classname, bool inc) ithread * SV_to_ithread(pTHX_ SV *sv) { - ithread *thread; if (SvROK(sv)) { - thread = INT2PTR(ithread*, SvIV(SvRV(sv))); + return INT2PTR(ithread*, SvIV(SvRV(sv))); } else { - PERL_THREAD_GETSPECIFIC(self_key,thread); + return Perl_ithread_get(aTHX); } - return thread; } /* - * iThread->create(); ( aka iThread->new() ) + * ithread->create(); ( aka ithread->new() ) * Called in context of parent thread */ @@ -282,12 +369,18 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param { ithread* thread; CLONE_PARAMS clone_param; + ithread* current_thread = Perl_ithread_get(aTHX); + + SV** tmps_tmp = PL_tmps_stack; + I32 tmps_ix = PL_tmps_ix; + MUTEX_LOCK(&create_destruct_mutex); thread = PerlMemShared_malloc(sizeof(ithread)); Zero(thread,1,ithread); thread->next = threads; thread->prev = threads->prev; + threads->prev = thread; thread->prev->next = thread; /* Set count to 1 immediately in case thread exits before * we return to caller ! @@ -296,13 +389,18 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param MUTEX_INIT(&thread->mutex); thread->tid = tid_counter++; thread->gimme = GIMME_V; - thread->detached = (thread->gimme == G_VOID) ? 1 : 0; /* "Clone" our interpreter into the thread's interpreter * This gives thread access to "static data" and code. */ PerlIO_flush((PerlIO*)NULL); + Perl_ithread_set(aTHX_ thread); + + SAVEBOOL(PL_srand_called); /* Save this so it becomes the correct + value */ + PL_srand_called = FALSE; /* Set it to false so we can detect + if it gets set during the clone */ #ifdef WIN32 thread->interp = perl_clone(aTHX, CLONEf_KEEP_PTR_TABLE | CLONEf_CLONE_HOST); @@ -310,14 +408,15 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param thread->interp = perl_clone(aTHX, CLONEf_KEEP_PTR_TABLE); #endif /* perl_clone leaves us in new interpreter's context. - As it is tricky to spot implcit aTHX create a new scope + As it is tricky to spot an implicit aTHX, create a new scope with aTHX matching the context for the duration of our work for new interpreter. */ { dTHXa(thread->interp); + /* Here we remove END blocks since they should only run - in the thread they are created + in the thread they are created */ SvREFCNT_dec(PL_endav); PL_endav = newAV(); @@ -326,14 +425,48 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param if (SvREFCNT(thread->init_function) == 0) { SvREFCNT_inc(thread->init_function); } + + thread->params = sv_dup(params, &clone_param); SvREFCNT_inc(thread->params); + + + /* The code below checks that anything living on + the tmps stack and has been cloned (so it lives in the + ptr_table) has a refcount higher than 0 + + If the refcount is 0 it means that a something on the + stack/context was holding a reference to it and + since we init_stacks() in perl_clone that won't get + cleaned and we will get a leaked scalar. + The reason it was cloned was that it lived on the + @_ stack. + + Example of this can be found in bugreport 15837 + where calls in the parameter list end up as a temp + + One could argue that this fix should be in perl_clone + */ + + + while (tmps_ix > 0) { + SV* sv = (SV*)ptr_table_fetch(PL_ptr_table, tmps_tmp[tmps_ix]); + tmps_ix--; + if (sv && SvREFCNT(sv) == 0) { + SvREFCNT_inc(sv); + SvREFCNT_dec(sv); + } + } + + + SvTEMP_off(thread->init_function); ptr_table_free(PL_ptr_table); PL_ptr_table = NULL; + PL_exit_flags |= PERL_EXIT_DESTRUCT_END; } - + Perl_ithread_set(aTHX_ current_thread); PERL_SET_CONTEXT(aTHX); /* Start the thread */ @@ -347,7 +480,6 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param { static pthread_attr_t attr; static int attr_inited = 0; - sigset_t fullmask, oldmask; static int attr_joinable = PTHREAD_CREATE_JOINABLE; if (!attr_inited) { attr_inited = 1; @@ -358,27 +490,36 @@ Perl_ithread_create(pTHX_ SV *obj, char* classname, SV* init_function, SV* param # endif # ifdef THREAD_CREATE_NEEDS_STACK if(pthread_attr_setstacksize(&attr, THREAD_CREATE_NEEDS_STACK)) - croak("panic: pthread_attr_setstacksize failed"); + Perl_croak(aTHX_ "panic: pthread_attr_setstacksize failed"); # endif #ifdef OLD_PTHREADS_API pthread_create( &thread->thr, attr, Perl_ithread_run, (void *)thread); #else +# if defined(HAS_PTHREAD_ATTR_SETSCOPE) && defined(PTHREAD_SCOPE_SYSTEM) + pthread_attr_setscope( &attr, PTHREAD_SCOPE_SYSTEM ); +# endif pthread_create( &thread->thr, &attr, Perl_ithread_run, (void *)thread); #endif } #endif + known_threads++; active_threads++; MUTEX_UNLOCK(&create_destruct_mutex); + sv_2mortal(params); + return ithread_to_SV(aTHX_ obj, thread, classname, FALSE); } SV* Perl_ithread_self (pTHX_ SV *obj, char* Class) { - ithread *thread; - PERL_THREAD_GETSPECIFIC(self_key,thread); - return ithread_to_SV(aTHX_ obj, thread, Class, TRUE); + ithread *thread = Perl_ithread_get(aTHX); + if (thread) + return ithread_to_SV(aTHX_ obj, thread, Class, TRUE); + else + Perl_croak(aTHX_ "panic: cannot find thread data"); + return NULL; /* silence compiler warning */ } /* @@ -395,20 +536,20 @@ Perl_ithread_CLONE(pTHX_ SV *obj) } else { - Perl_warn(aTHX_ "CLONE %_",obj); + Perl_warn(aTHX_ "CLONE %" SVf,obj); } } -AV* +AV* Perl_ithread_join(pTHX_ SV *obj) { ithread *thread = SV_to_ithread(aTHX_ obj); MUTEX_LOCK(&thread->mutex); - if (thread->detached & 1) { + if (thread->state & PERL_ITHR_DETACHED) { MUTEX_UNLOCK(&thread->mutex); Perl_croak(aTHX_ "Cannot join a detached thread"); } - else if (thread->detached & 2) { + else if (thread->state & PERL_ITHR_JOINED) { MUTEX_UNLOCK(&thread->mutex); Perl_croak(aTHX_ "Thread already joined"); } @@ -427,58 +568,71 @@ Perl_ithread_join(pTHX_ SV *obj) #endif MUTEX_LOCK(&thread->mutex); + /* sv_dup over the args */ { - AV* params = (AV*) SvRV(thread->params); + ithread* current_thread; + AV* params = (AV*) SvRV(thread->params); + PerlInterpreter *other_perl = thread->interp; CLONE_PARAMS clone_params; clone_params.stashes = newAV(); + clone_params.flags |= CLONEf_JOIN_IN; PL_ptr_table = ptr_table_new(); + current_thread = Perl_ithread_get(aTHX); + Perl_ithread_set(aTHX_ thread); + /* ensure 'meaningful' addresses retain their meaning */ + ptr_table_store(PL_ptr_table, &other_perl->Isv_undef, &PL_sv_undef); + ptr_table_store(PL_ptr_table, &other_perl->Isv_no, &PL_sv_no); + ptr_table_store(PL_ptr_table, &other_perl->Isv_yes, &PL_sv_yes); + +#if 0 + { + I32 len = av_len(params)+1; + I32 i; + for(i = 0; i < len; i++) { + sv_dump(SvRV(AvARRAY(params)[i])); + } + } +#endif retparam = (AV*) sv_dup((SV*)params, &clone_params); +#if 0 + { + I32 len = av_len(retparam)+1; + I32 i; + for(i = 0; i < len; i++) { + sv_dump(SvRV(AvARRAY(retparam)[i])); + } + } +#endif + Perl_ithread_set(aTHX_ current_thread); SvREFCNT_dec(clone_params.stashes); SvREFCNT_inc(retparam); ptr_table_free(PL_ptr_table); PL_ptr_table = NULL; } - /* sv_dup over the args */ - /* We have finished with it */ - thread->detached |= 2; + /* We are finished with it */ + thread->state |= PERL_ITHR_JOINED; MUTEX_UNLOCK(&thread->mutex); - sv_unmagic(SvRV(obj),PERL_MAGIC_shared_scalar); - Perl_ithread_destruct(aTHX_ thread); + return retparam; } return (AV*)NULL; } void -Perl_ithread_detach(pTHX_ ithread *thread) -{ - MUTEX_LOCK(&thread->mutex); - if (!thread->detached) { - thread->detached = 1; -#ifdef WIN32 - CloseHandle(thread->handle); - thread->handle = 0; -#else - PERL_THREAD_DETACH(thread->thr); -#endif - } - MUTEX_UNLOCK(&thread->mutex); -} - - -void Perl_ithread_DESTROY(pTHX_ SV *sv) { ithread *thread = SV_to_ithread(aTHX_ sv); sv_unmagic(SvRV(sv),PERL_MAGIC_shared_scalar); } - +#endif /* USE_ITHREADS */ MODULE = threads PACKAGE = threads PREFIX = ithread_ PROTOTYPES: DISABLE +#ifdef USE_ITHREADS + void ithread_new (classname, function_to_call, ...) char * classname @@ -489,7 +643,7 @@ CODE: if (items > 2) { int i; for(i = 2; i < items ; i++) { - av_push(params, ST(i)); + av_push(params, SvREFCNT_inc(ST(i))); } } ST(0) = sv_2mortal(Perl_ithread_create(aTHX_ Nullsv, classname, function_to_call, newRV_noinc((SV*) params))); @@ -497,6 +651,28 @@ CODE: } void +ithread_list(char *classname) +PPCODE: +{ + ithread *curr_thread; + MUTEX_LOCK(&create_destruct_mutex); + curr_thread = threads; + if(curr_thread->tid != 0) + XPUSHs( sv_2mortal(ithread_to_SV(aTHX_ NULL, curr_thread, classname, TRUE))); + while(curr_thread) { + curr_thread = curr_thread->next; + if(curr_thread == threads) + break; + if(curr_thread->state & PERL_ITHR_DETACHED || + curr_thread->state & PERL_ITHR_JOINED) + continue; + XPUSHs( sv_2mortal(ithread_to_SV(aTHX_ NULL, curr_thread, classname, TRUE))); + } + MUTEX_UNLOCK(&create_destruct_mutex); +} + + +void ithread_self(char *classname) CODE: { @@ -515,11 +691,20 @@ PPCODE: int i; I32 len = AvFILL(params); for (i = 0; i <= len; i++) { - XPUSHs(av_shift(params)); + SV* tmp = av_shift(params); + XPUSHs(tmp); + sv_2mortal(tmp); } SvREFCNT_dec(params); } +void +yield(...) +CODE: +{ + YIELD; +} + void ithread_detach(ithread *thread) @@ -527,13 +712,16 @@ ithread_detach(ithread *thread) void ithread_DESTROY(SV *thread) +#endif /* USE_ITHREADS */ + BOOT: { +#ifdef USE_ITHREADS ithread* thread; PL_perl_destruct_level = 2; - PERL_THREAD_ALLOC_SPECIFIC(self_key); MUTEX_INIT(&create_destruct_mutex); MUTEX_LOCK(&create_destruct_mutex); + PL_threadhook = &Perl_ithread_hook; thread = PerlMemShared_malloc(sizeof(ithread)); Zero(thread,1,ithread); PL_perl_destruct_level = 2; @@ -542,16 +730,19 @@ BOOT: thread->next = thread; thread->prev = thread; thread->interp = aTHX; - thread->count = 1; /* imortal */ + thread->count = 1; /* Immortal. */ thread->tid = tid_counter++; + known_threads++; active_threads++; - thread->detached = 1; + thread->state = PERL_ITHR_DETACHED; #ifdef WIN32 thread->thr = GetCurrentThreadId(); #else thread->thr = pthread_self(); #endif - PERL_THREAD_SETSPECIFIC(self_key,thread); + + Perl_ithread_set(aTHX_ thread); MUTEX_UNLOCK(&create_destruct_mutex); +#endif /* USE_ITHREADS */ }