ad99e2c409b09320eb22bfdc4b0909c460ffbe8e
[p5sagit/p5-mst-13.2.git] / ext / Thread / Thread.xs
1 #define PERL_NO_GET_CONTEXT
2 #include "EXTERN.h"
3 #include "perl.h"
4 #include "XSUB.h"
5
6 /* Magic signature for Thread's mg_private is "Th" */ 
7 #define Thread_MAGIC_SIGNATURE 0x5468
8
9 #ifdef __cplusplus
10 #ifdef I_UNISTD
11 #include <unistd.h>
12 #endif
13 #endif
14 #include <fcntl.h>
15                         
16 static int sig_pipe[2];
17             
18 #ifndef THREAD_RET_TYPE
19 #define THREAD_RET_TYPE void *
20 #define THREAD_RET_CAST(x) ((THREAD_RET_TYPE) x)
21 #endif
22
23 static void
24 remove_thread(pTHX_ struct perl_thread *t)
25 {
26 #ifdef USE_THREADS
27     DEBUG_S(WITH_THR(PerlIO_printf(PerlIO_stderr(),
28                                    "%p: remove_thread %p\n", thr, t)));
29     MUTEX_LOCK(&PL_threads_mutex);
30     MUTEX_DESTROY(&t->mutex);
31     PL_nthreads--;
32     t->prev->next = t->next;
33     t->next->prev = t->prev;
34     COND_BROADCAST(&PL_nthreads_cond);
35     MUTEX_UNLOCK(&PL_threads_mutex);
36 #endif
37 }
38
39 static THREAD_RET_TYPE
40 threadstart(void *arg)
41 {
42 #ifdef USE_THREADS
43 #ifdef FAKE_THREADS
44     Thread savethread = thr;
45     LOGOP myop;
46     dSP;
47     I32 oldscope = PL_scopestack_ix;
48     I32 retval;
49     AV *av;
50     int i;
51
52     DEBUG_S(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
53                           thr, SvPEEK(TOPs)));
54     thr = (Thread) arg;
55     savemark = TOPMARK;
56     thr->prev = thr->prev_run = savethread;
57     thr->next = savethread->next;
58     thr->next_run = savethread->next_run;
59     savethread->next = savethread->next_run = thr;
60     thr->wait_queue = 0;
61     thr->private = 0;
62
63     /* Now duplicate most of perl_call_sv but with a few twists */
64     PL_op = (OP*)&myop;
65     Zero(PL_op, 1, LOGOP);
66     myop.op_flags = OPf_STACKED;
67     myop.op_next = Nullop;
68     myop.op_flags |= OPf_KNOW;
69     myop.op_flags |= OPf_WANT_LIST;
70     PL_op = pp_entersub(ARGS);
71     DEBUG_S(if (!PL_op)
72             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
73     /*
74      * When this thread is next scheduled, we start in the right
75      * place. When the thread runs off the end of the sub, perl.c
76      * handles things, using savemark to figure out how much of the
77      * stack is the return value for any join.
78      */
79     thr = savethread;           /* back to the old thread */
80     return 0;
81 #else
82     Thread thr = (Thread) arg;
83     LOGOP myop;
84     djSP;
85     I32 oldmark = TOPMARK;
86     I32 oldscope = PL_scopestack_ix;
87     I32 retval;
88     SV *sv;
89     AV *av = newAV();
90     int i, ret;
91     dJMPENV;
92     DEBUG_S(PerlIO_printf(PerlIO_stderr(), "new thread %p waiting to start\n",
93                           thr));
94
95     /* Don't call *anything* requiring dTHR until after SET_THR() */
96     /*
97      * Wait until our creator releases us. If we didn't do this, then
98      * it would be potentially possible for out thread to carry on and
99      * do stuff before our creator fills in our "self" field. For example,
100      * if we went and created another thread which tried to JOIN with us,
101      * then we'd be in a mess.
102      */
103     MUTEX_LOCK(&thr->mutex);
104     MUTEX_UNLOCK(&thr->mutex);
105
106     /*
107      * It's safe to wait until now to set the thread-specific pointer
108      * from our pthread_t structure to our struct perl_thread, since
109      * we're the only thread who can get at it anyway.
110      */
111     SET_THR(thr);
112
113     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
114     DEBUG_S(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
115                           thr, SvPEEK(TOPs)));
116
117     sv = POPs;
118     PUTBACK;
119     ENTER;
120     SAVETMPS;
121     perl_call_sv(sv, G_ARRAY|G_EVAL);
122     SPAGAIN;
123     retval = SP - (PL_stack_base + oldmark);
124     SP = PL_stack_base + oldmark + 1;
125     if (SvCUR(thr->errsv)) {
126         MUTEX_LOCK(&thr->mutex);
127         thr->flags |= THRf_DID_DIE;
128         MUTEX_UNLOCK(&thr->mutex);
129         av_store(av, 0, &PL_sv_no);
130         av_store(av, 1, newSVsv(thr->errsv));
131         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p died: %s\n",
132                               thr, SvPV(thr->errsv, PL_na)));
133     } else {
134         DEBUG_S(STMT_START {
135             for (i = 1; i <= retval; i++) {
136                 PerlIO_printf(PerlIO_stderr(), "%p return[%d] = %s\n",
137                                 thr, i, SvPEEK(SP[i - 1]));
138             }
139         } STMT_END);
140         av_store(av, 0, &PL_sv_yes);
141         for (i = 1; i <= retval; i++, SP++)
142             sv_setsv(*av_fetch(av, i, TRUE), SvREFCNT_inc(*SP));
143     }
144     FREETMPS;
145     LEAVE;
146
147   finishoff:
148 #if 0    
149     /* removed for debug */
150     SvREFCNT_dec(PL_curstack);
151 #endif
152     SvREFCNT_dec(thr->cvcache);
153     SvREFCNT_dec(thr->threadsv);
154     SvREFCNT_dec(thr->specific);
155     SvREFCNT_dec(thr->errsv);
156     SvREFCNT_dec(thr->errhv);
157
158     /*Safefree(cxstack);*/
159     while (PL_curstackinfo->si_next)
160         PL_curstackinfo = PL_curstackinfo->si_next;
161     while (PL_curstackinfo) {
162         PERL_SI *p = PL_curstackinfo->si_prev;
163         SvREFCNT_dec(PL_curstackinfo->si_stack);
164         Safefree(PL_curstackinfo->si_cxstack);
165         Safefree(PL_curstackinfo);
166         PL_curstackinfo = p;
167     }    
168     Safefree(PL_markstack);
169     Safefree(PL_scopestack);
170     Safefree(PL_savestack);
171     Safefree(PL_retstack);
172     Safefree(PL_tmps_stack);
173     Safefree(PL_ofs);
174
175     SvREFCNT_dec(PL_rs);
176     SvREFCNT_dec(PL_nrs);
177     SvREFCNT_dec(PL_statname);
178     Safefree(PL_screamfirst);
179     Safefree(PL_screamnext);
180     Safefree(PL_reg_start_tmp);
181     SvREFCNT_dec(PL_lastscream);
182     SvREFCNT_dec(PL_defoutgv);
183     Safefree(PL_reg_poscache);
184
185     MUTEX_LOCK(&thr->mutex);
186     DEBUG_S(PerlIO_printf(PerlIO_stderr(),
187                           "%p: threadstart finishing: state is %u\n",
188                           thr, ThrSTATE(thr)));
189     switch (ThrSTATE(thr)) {
190     case THRf_R_JOINABLE:
191         ThrSETSTATE(thr, THRf_ZOMBIE);
192         MUTEX_UNLOCK(&thr->mutex);
193         DEBUG_S(PerlIO_printf(PerlIO_stderr(),
194                               "%p: R_JOINABLE thread finished\n", thr));
195         break;
196     case THRf_R_JOINED:
197         ThrSETSTATE(thr, THRf_DEAD);
198         MUTEX_UNLOCK(&thr->mutex);
199         remove_thread(aTHX_ thr);
200         DEBUG_S(PerlIO_printf(PerlIO_stderr(),
201                               "%p: R_JOINED thread finished\n", thr));
202         break;
203     case THRf_R_DETACHED:
204         ThrSETSTATE(thr, THRf_DEAD);
205         MUTEX_UNLOCK(&thr->mutex);
206         SvREFCNT_dec(av);
207         DEBUG_S(PerlIO_printf(PerlIO_stderr(),
208                               "%p: DETACHED thread finished\n", thr));
209         remove_thread(aTHX_ thr);       /* This might trigger main thread to finish */
210         break;
211     default:
212         MUTEX_UNLOCK(&thr->mutex);
213         croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
214         /* NOTREACHED */
215     }
216     return THREAD_RET_CAST(av); /* Available for anyone to join with */
217                                         /* us unless we're detached, in which */
218                                         /* case noone sees the value anyway. */
219 #endif    
220 #else
221     return THREAD_RET_CAST(NULL);
222 #endif
223 }
224
225 static SV *
226 newthread (pTHX_ SV *startsv, AV *initargs, char *classname)
227 {
228 #ifdef USE_THREADS
229     dSP;
230     Thread savethread;
231     int i;
232     SV *sv;
233     int err;
234 #ifndef THREAD_CREATE
235     static pthread_attr_t attr;
236     static int attr_inited = 0;
237     sigset_t fullmask, oldmask;
238     static int attr_joinable = PTHREAD_CREATE_JOINABLE;
239 #endif
240
241     savethread = thr;
242     thr = new_struct_thread(thr);
243     /* temporarily pretend to be the child thread in case the
244      * XPUSHs() below want to grow the child's stack.  This is
245      * safe, since the other thread is not yet created, and we
246      * are the only ones who know about it */
247     SET_THR(thr);
248     SPAGAIN;
249     DEBUG_S(PerlIO_printf(PerlIO_stderr(),
250                           "%p: newthread (%p), tid is %u, preparing stack\n",
251                           savethread, thr, thr->tid));
252     /* The following pushes the arg list and startsv onto the *new* stack */
253     PUSHMARK(SP);
254     /* Could easily speed up the following greatly */
255     for (i = 0; i <= AvFILL(initargs); i++)
256         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
257     XPUSHs(SvREFCNT_inc(startsv));
258     PUTBACK;
259
260     /* On your marks... */
261     SET_THR(savethread);
262     MUTEX_LOCK(&thr->mutex);
263
264 #ifdef THREAD_CREATE
265     err = THREAD_CREATE(thr, threadstart);
266 #else    
267     /* Get set...  */
268     sigfillset(&fullmask);
269     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
270         croak("panic: sigprocmask");
271     err = 0;
272     if (!attr_inited) {
273         attr_inited = 1;
274         err = pthread_attr_init(&attr);
275 #  ifdef PTHREAD_ATTR_SETDETACHSTATE
276         if (err == 0)
277             err = PTHREAD_ATTR_SETDETACHSTATE(&attr, attr_joinable);
278
279 #  else
280         croak("panic: can't pthread_attr_setdetachstate");
281 #  endif
282     }
283     if (err == 0)
284         err = PTHREAD_CREATE(&thr->self, attr, threadstart, (void*) thr);
285 #endif
286
287     if (err) {
288         MUTEX_UNLOCK(&thr->mutex);
289         DEBUG_S(PerlIO_printf(PerlIO_stderr(),
290                               "%p: create of %p failed %d\n",
291                               savethread, thr, err));
292         /* Thread creation failed--clean up */
293         SvREFCNT_dec(thr->cvcache);
294         remove_thread(aTHX_ thr);
295         MUTEX_DESTROY(&thr->mutex);
296         for (i = 0; i <= AvFILL(initargs); i++)
297             SvREFCNT_dec(*av_fetch(initargs, i, FALSE));
298         SvREFCNT_dec(startsv);
299         return NULL;
300     }
301
302 #ifdef THREAD_POST_CREATE
303     THREAD_POST_CREATE(thr);
304 #else
305     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
306         croak("panic: sigprocmask");
307 #endif
308
309     sv = newSViv(thr->tid);
310     sv_magic(sv, thr->oursv, '~', 0, 0);
311     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
312     sv = sv_bless(newRV_noinc(sv), gv_stashpv(classname, TRUE));
313
314     /* Go */
315     MUTEX_UNLOCK(&thr->mutex);
316
317     return sv;
318 #else
319     croak("No threads in this perl");
320     return &PL_sv_undef;
321 #endif
322 }
323
324 static Signal_t handle_thread_signal (int sig);
325
326 static Signal_t
327 handle_thread_signal(int sig)
328 {
329     dTHXo;
330     unsigned char c = (unsigned char) sig;
331     /*
332      * We're not really allowed to call fprintf in a signal handler
333      * so don't be surprised if this isn't robust while debugging
334      * with -DL.
335      */
336     DEBUG_S(PerlIO_printf(PerlIO_stderr(),
337             "handle_thread_signal: got signal %d\n", sig););
338     write(sig_pipe[1], &c, 1);
339 }
340
341 MODULE = Thread         PACKAGE = Thread
342 PROTOTYPES: DISABLE
343
344 void
345 new(classname, startsv, ...)
346         char *          classname
347         SV *            startsv
348         AV *            av = av_make(items - 2, &ST(2));
349     PPCODE:
350         XPUSHs(sv_2mortal(newthread(aTHX_ startsv, av, classname)));
351
352 void
353 join(t)
354         Thread  t
355         AV *    av = NO_INIT
356         int     i = NO_INIT
357     PPCODE:
358 #ifdef USE_THREADS
359         if (t == thr)
360             croak("Attempt to join self");
361         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
362                               thr, t, ThrSTATE(t)););
363         MUTEX_LOCK(&t->mutex);
364         switch (ThrSTATE(t)) {
365         case THRf_R_JOINABLE:
366         case THRf_R_JOINED:
367             ThrSETSTATE(t, THRf_R_JOINED);
368             MUTEX_UNLOCK(&t->mutex);
369             break;
370         case THRf_ZOMBIE:
371             ThrSETSTATE(t, THRf_DEAD);
372             MUTEX_UNLOCK(&t->mutex);
373             remove_thread(aTHX_ t);
374             break;
375         default:
376             MUTEX_UNLOCK(&t->mutex);
377             croak("can't join with thread");
378             /* NOTREACHED */
379         }
380         JOIN(t, &av);
381
382         if (SvTRUE(*av_fetch(av, 0, FALSE))) {
383             /* Could easily speed up the following if necessary */
384             for (i = 1; i <= AvFILL(av); i++)
385                 XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
386         } else {
387             STRLEN n_a;
388             char *mess = SvPV(*av_fetch(av, 1, FALSE), n_a);
389             DEBUG_S(PerlIO_printf(PerlIO_stderr(),
390                                   "%p: join propagating die message: %s\n",
391                                   thr, mess));
392             croak(mess);
393         }
394 #endif
395
396 void
397 detach(t)
398         Thread  t
399     CODE:
400 #ifdef USE_THREADS
401         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
402                               thr, t, ThrSTATE(t)););
403         MUTEX_LOCK(&t->mutex);
404         switch (ThrSTATE(t)) {
405         case THRf_R_JOINABLE:
406             ThrSETSTATE(t, THRf_R_DETACHED);
407             /* fall through */
408         case THRf_R_DETACHED:
409             DETACH(t);
410             MUTEX_UNLOCK(&t->mutex);
411             break;
412         case THRf_ZOMBIE:
413             ThrSETSTATE(t, THRf_DEAD);
414             DETACH(t);
415             MUTEX_UNLOCK(&t->mutex);
416             remove_thread(aTHX_ t);
417             break;
418         default:
419             MUTEX_UNLOCK(&t->mutex);
420             croak("can't detach thread");
421             /* NOTREACHED */
422         }
423 #endif
424
425 void
426 equal(t1, t2)
427         Thread  t1
428         Thread  t2
429     PPCODE:
430         PUSHs((t1 == t2) ? &PL_sv_yes : &PL_sv_no);
431
432 void
433 flags(t)
434         Thread  t
435     PPCODE:
436 #ifdef USE_THREADS
437         PUSHs(sv_2mortal(newSViv(t->flags)));
438 #endif
439
440 void
441 self(classname)
442         char *  classname
443     PREINIT:
444         SV *sv;
445     PPCODE:        
446 #ifdef USE_THREADS
447         sv = newSViv(thr->tid);
448         sv_magic(sv, thr->oursv, '~', 0, 0);
449         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
450         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv),
451                                   gv_stashpv(classname, TRUE))));
452 #endif
453
454 U32
455 tid(t)
456         Thread  t
457     CODE:
458 #ifdef USE_THREADS
459         MUTEX_LOCK(&t->mutex);
460         RETVAL = t->tid;
461         MUTEX_UNLOCK(&t->mutex);
462 #else 
463         RETVAL = 0;
464 #endif
465     OUTPUT:
466         RETVAL
467
468 void
469 DESTROY(t)
470         SV *    t
471     PPCODE:
472         PUSHs(&PL_sv_yes);
473
474 void
475 yield()
476     CODE:
477 {
478 #ifdef USE_THREADS
479         YIELD;
480 #endif
481 }
482
483 void
484 cond_wait(sv)
485         SV *    sv
486         MAGIC * mg = NO_INIT
487 CODE:                       
488 #ifdef USE_THREADS
489         if (SvROK(sv))
490             sv = SvRV(sv);
491
492         mg = condpair_magic(sv);
493         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
494         MUTEX_LOCK(MgMUTEXP(mg));
495         if (MgOWNER(mg) != thr) {
496             MUTEX_UNLOCK(MgMUTEXP(mg));
497             croak("cond_wait for lock that we don't own\n");
498         }
499         MgOWNER(mg) = 0;
500         COND_SIGNAL(MgOWNERCONDP(mg));
501         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
502         while (MgOWNER(mg))
503             COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
504         MgOWNER(mg) = thr;
505         MUTEX_UNLOCK(MgMUTEXP(mg));
506 #endif
507
508 void
509 cond_signal(sv)
510         SV *    sv
511         MAGIC * mg = NO_INIT
512 CODE:
513 #ifdef USE_THREADS
514         if (SvROK(sv))
515             sv = SvRV(sv);
516
517         mg = condpair_magic(sv);
518         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
519         MUTEX_LOCK(MgMUTEXP(mg));
520         if (MgOWNER(mg) != thr) {
521             MUTEX_UNLOCK(MgMUTEXP(mg));
522             croak("cond_signal for lock that we don't own\n");
523         }
524         COND_SIGNAL(MgCONDP(mg));
525         MUTEX_UNLOCK(MgMUTEXP(mg));
526 #endif
527
528 void
529 cond_broadcast(sv)
530         SV *    sv
531         MAGIC * mg = NO_INIT
532 CODE: 
533 #ifdef USE_THREADS
534         if (SvROK(sv))
535             sv = SvRV(sv);
536
537         mg = condpair_magic(sv);
538         DEBUG_S(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
539                               thr, sv));
540         MUTEX_LOCK(MgMUTEXP(mg));
541         if (MgOWNER(mg) != thr) {
542             MUTEX_UNLOCK(MgMUTEXP(mg));
543             croak("cond_broadcast for lock that we don't own\n");
544         }
545         COND_BROADCAST(MgCONDP(mg));
546         MUTEX_UNLOCK(MgMUTEXP(mg));
547 #endif
548
549 void
550 list(classname)
551         char *  classname
552     PREINIT:
553         Thread  t;
554         AV *    av;
555         SV **   svp;
556         int     n = 0;
557     PPCODE:
558 #ifdef USE_THREADS
559         av = newAV();
560         /*
561          * Iterate until we have enough dynamic storage for all threads.
562          * We mustn't do any allocation while holding threads_mutex though.
563          */
564         MUTEX_LOCK(&PL_threads_mutex);
565         do {
566             n = PL_nthreads;
567             MUTEX_UNLOCK(&PL_threads_mutex);
568             if (AvFILL(av) < n - 1) {
569                 int i = AvFILL(av);
570                 for (i = AvFILL(av); i < n - 1; i++) {
571                     SV *sv = newSViv(0);        /* fill in tid later */
572                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
573                     av_push(av, sv_bless(newRV_noinc(sv),
574                                          gv_stashpv(classname, TRUE)));
575         
576                 }
577             }
578             MUTEX_LOCK(&PL_threads_mutex);
579         } while (n < PL_nthreads);
580         n = PL_nthreads;        /* Get the final correct value */
581
582         /*
583          * At this point, there's enough room to fill in av.
584          * Note that we are holding threads_mutex so the list
585          * won't change out from under us but all the remaining
586          * processing is "fast" (no blocking, malloc etc.)
587          */
588         t = thr;
589         svp = AvARRAY(av);
590         do {
591             SV *sv = (SV*)SvRV(*svp);
592             sv_setiv(sv, t->tid);
593             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->oursv);
594             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
595             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
596             t = t->next;
597             svp++;
598         } while (t != thr);
599         /*  */
600         MUTEX_UNLOCK(&PL_threads_mutex);
601         /* Truncate any unneeded slots in av */
602         av_fill(av, n - 1);
603         /* Finally, push all the new objects onto the stack and drop av */
604         EXTEND(SP, n);
605         for (svp = AvARRAY(av); n > 0; n--, svp++)
606             PUSHs(*svp);
607         (void)sv_2mortal((SV*)av);
608 #endif
609
610
611 MODULE = Thread         PACKAGE = Thread::Signal
612
613 void
614 kill_sighandler_thread()
615     PPCODE:
616         write(sig_pipe[1], "\0", 1);
617         PUSHs(&PL_sv_yes);
618
619 void
620 init_thread_signals()
621     PPCODE:
622         PL_sighandlerp = handle_thread_signal;
623         if (pipe(sig_pipe) == -1)
624             XSRETURN_UNDEF;
625         PUSHs(&PL_sv_yes);
626
627 void
628 await_signal()
629     PREINIT:
630         unsigned char c;
631         SSize_t ret;
632     CODE:
633         do {
634             ret = read(sig_pipe[0], &c, 1);
635         } while (ret == -1 && errno == EINTR);
636         if (ret == -1)
637             croak("panic: await_signal");
638         ST(0) = sv_newmortal();
639         if (ret)
640             sv_setsv(ST(0), c ? PL_psig_ptr[c] : &PL_sv_no);
641         DEBUG_S(PerlIO_printf(PerlIO_stderr(),
642                               "await_signal returning %s\n", SvPEEK(ST(0))););
643
644 MODULE = Thread         PACKAGE = Thread::Specific
645
646 void
647 data(classname = "Thread::Specific")
648         char *  classname
649     PPCODE:
650 #ifdef USE_THREADS
651         if (AvFILL(thr->specific) == -1) {
652             GV *gv = gv_fetchpv("Thread::Specific::FIELDS", TRUE, SVt_PVHV);
653             av_store(thr->specific, 0, newRV((SV*)GvHV(gv)));
654         }
655         XPUSHs(sv_bless(newRV((SV*)thr->specific),gv_stashpv(classname,TRUE)));
656 #endif