Finish thread state machine: fixes global destruction of threads,
[p5sagit/p5-mst-13.2.git] / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 /* Magic signature for Thread's mg_private is "Th" */ 
6 #define Thread_MAGIC_SIGNATURE 0x5468
7
8 static U32 threadnum = 0;
9 static int sig_pipe[2];
10
11 static void
12 remove_thread(t)
13 Thread t;
14 {
15     DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16                                    "%p: remove_thread %p\n", thr, t)));
17     MUTEX_LOCK(&threads_mutex);
18     nthreads--;
19     t->prev->next = t->next;
20     t->next->prev = t->prev;
21     COND_BROADCAST(&nthreads_cond);
22     MUTEX_UNLOCK(&threads_mutex);
23 }
24
25 static void *
26 threadstart(arg)
27 void *arg;
28 {
29 #ifdef FAKE_THREADS
30     Thread savethread = thr;
31     LOGOP myop;
32     dSP;
33     I32 oldscope = scopestack_ix;
34     I32 retval;
35     AV *returnav;
36     int i;
37
38     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
39                           thr, SvPEEK(TOPs)));
40     thr = (Thread) arg;
41     savemark = TOPMARK;
42     thr->prev = thr->prev_run = savethread;
43     thr->next = savethread->next;
44     thr->next_run = savethread->next_run;
45     savethread->next = savethread->next_run = thr;
46     thr->wait_queue = 0;
47     thr->private = 0;
48
49     /* Now duplicate most of perl_call_sv but with a few twists */
50     op = (OP*)&myop;
51     Zero(op, 1, LOGOP);
52     myop.op_flags = OPf_STACKED;
53     myop.op_next = Nullop;
54     myop.op_flags |= OPf_KNOW;
55     myop.op_flags |= OPf_WANT_LIST;
56     op = pp_entersub(ARGS);
57     DEBUG_L(if (!op)
58             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
59     /*
60      * When this thread is next scheduled, we start in the right
61      * place. When the thread runs off the end of the sub, perl.c
62      * handles things, using savemark to figure out how much of the
63      * stack is the return value for any join.
64      */
65     thr = savethread;           /* back to the old thread */
66     return 0;
67 #else
68     Thread thr = (Thread) arg;
69     LOGOP myop;
70     dSP;
71     I32 oldmark = TOPMARK;
72     I32 oldscope = scopestack_ix;
73     I32 retval;
74     AV *returnav;
75     int i, ret;
76     dJMPENV;
77
78     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
79     /*
80      * Wait until our creator releases us. If we didn't do this, then
81      * it would be potentially possible for out thread to carry on and
82      * do stuff before our creator fills in our "self" field. For example,
83      * if we went and created another thread which tried to pthread_join
84      * with us, then we'd be in a mess.
85      */
86     MUTEX_LOCK(&thr->mutex);
87     MUTEX_UNLOCK(&thr->mutex);
88
89     /*
90      * It's safe to wait until now to set the thread-specific pointer
91      * from our pthread_t structure to our struct thread, since we're
92      * the only thread who can get at it anyway.
93      */
94     if (pthread_setspecific(thr_key, (void *) thr))
95         croak("panic: pthread_setspecific");
96
97     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
98     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
99                           thr, SvPEEK(TOPs)));
100
101     JMPENV_PUSH(ret);
102     switch (ret) {
103     case 3:
104         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
105         /* fall through */
106     case 1:
107         STATUS_ALL_FAILURE;
108         /* fall through */
109     case 2:
110         /* my_exit() was called */
111         while (scopestack_ix > oldscope)
112             LEAVE;
113         JMPENV_POP;
114         av_store(returnav, 0, newSViv(statusvalue));
115         goto finishoff;
116     }
117
118     /* Now duplicate most of perl_call_sv but with a few twists */
119     op = (OP*)&myop;
120     Zero(op, 1, LOGOP);
121     myop.op_flags = OPf_STACKED;
122     myop.op_next = Nullop;
123     myop.op_flags |= OPf_KNOW;
124     myop.op_flags |= OPf_WANT_LIST;
125     op = pp_entersub(ARGS);
126     if (op)
127         runops();
128     SPAGAIN;
129     retval = sp - (stack_base + oldmark);
130     sp = stack_base + oldmark + 1;
131     DEBUG_L(for (i = 1; i <= retval; i++)
132                 PerlIO_printf(PerlIO_stderr(),
133                               "%p returnav[%d] = %s\n",
134                               thr, i, SvPEEK(sp[i - 1]));)
135     returnav = newAV();
136     av_store(returnav, 0, newSVpv("", 0));
137     for (i = 1; i <= retval; i++, sp++)
138         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
139     
140   finishoff:
141 #if 0    
142     /* removed for debug */
143     SvREFCNT_dec(curstack);
144 #endif
145     SvREFCNT_dec(cvcache);
146     Safefree(markstack);
147     Safefree(scopestack);
148     Safefree(savestack);
149     Safefree(retstack);
150     Safefree(cxstack);
151     Safefree(tmps_stack);
152
153     MUTEX_LOCK(&thr->mutex);
154     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
155                           "%p: threadstart finishing: state is %u\n",
156                           thr, ThrSTATE(thr)));
157     switch (ThrSTATE(thr)) {
158     case THRf_R_JOINABLE:
159         ThrSETSTATE(thr, THRf_ZOMBIE);
160         MUTEX_UNLOCK(&thr->mutex);
161         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
162                               "%p: R_JOINABLE thread finished\n", thr));
163         break;
164     case THRf_R_JOINED:
165         ThrSETSTATE(thr, THRf_DEAD);
166         MUTEX_UNLOCK(&thr->mutex);
167         remove_thread(thr);
168         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
169                               "%p: R_JOINED thread finished\n", thr));
170         break;
171     case THRf_R_DETACHED:
172         ThrSETSTATE(thr, THRf_DEAD);
173         MUTEX_UNLOCK(&thr->mutex);
174         SvREFCNT_dec(returnav);
175         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
176                               "%p: DETACHED thread finished\n", thr));
177         remove_thread(thr);     /* This might trigger main thread to finish */
178         break;
179     default:
180         MUTEX_UNLOCK(&thr->mutex);
181         croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
182         /* NOTREACHED */
183     }
184     MUTEX_DESTROY(&thr->mutex);
185     return (void *) returnav;   /* Available for anyone to join with us */
186                                 /* unless we are detached in which case */
187                                 /* noone will see the value anyway. */
188 #endif    
189 }
190
191 static SV *
192 newthread(startsv, initargs, class)
193 SV *startsv;
194 AV *initargs;
195 char *class;
196 {
197     dTHR;
198     dSP;
199     Thread savethread;
200     int i;
201     SV *sv;
202     sigset_t fullmask, oldmask;
203     
204     savethread = thr;
205     sv = newSVpv("", 0);
206     SvGROW(sv, sizeof(struct thread) + 1);
207     SvCUR_set(sv, sizeof(struct thread));
208     thr = (Thread) SvPVX(sv);
209     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n",
210                           savethread, SvPEEK(startsv), thr));
211     oursv = sv; 
212     /* If we don't zero these foostack pointers, init_stacks won't init them */
213     markstack = 0;
214     scopestack = 0;
215     savestack = 0;
216     retstack = 0;
217     init_stacks(ARGS);
218     curcop = savethread->Tcurcop;       /* XXX As good a guess as any? */
219     SPAGAIN;
220     defstash = savethread->Tdefstash;   /* XXX maybe these should */
221     curstash = savethread->Tcurstash;   /* always be set to main? */
222     /* top_env? */
223     /* runlevel */
224     cvcache = newHV();
225     thr->flags = THRf_R_JOINABLE;
226     MUTEX_INIT(&thr->mutex);
227     thr->tid = ++threadnum;
228     /* Insert new thread into the circular linked list and bump nthreads */
229     MUTEX_LOCK(&threads_mutex);
230     thr->next = savethread->next;
231     thr->prev = savethread;
232     savethread->next = thr;
233     thr->next->prev = thr;
234     nthreads++;
235     MUTEX_UNLOCK(&threads_mutex);
236
237     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
238                           "%p: newthread, tid is %u, preparing stack\n",
239                           savethread, thr->tid));
240     /* The following pushes the arg list and startsv onto the *new* stack */
241     PUSHMARK(sp);
242     /* Could easily speed up the following greatly */
243     for (i = 0; i <= AvFILL(initargs); i++)
244         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
245     XPUSHs(SvREFCNT_inc(startsv));
246     PUTBACK;
247
248 #ifdef FAKE_THREADS
249     threadstart(thr);
250 #else    
251     /* On your marks... */
252     MUTEX_LOCK(&thr->mutex);
253     /* Get set...
254      * Increment the global thread count.
255      */
256     sigfillset(&fullmask);
257     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
258         croak("panic: sigprocmask");
259     if (pthread_create(&self, NULL, threadstart, (void*) thr))
260         return NULL;    /* XXX should clean up first */
261     /* Go */
262     MUTEX_UNLOCK(&thr->mutex);
263     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
264         croak("panic: sigprocmask");
265 #endif
266     sv = newSViv(thr->tid);
267     sv_magic(sv, oursv, '~', 0, 0);
268     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
269     return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
270 }
271
272 static Signal_t
273 handle_thread_signal(sig)
274 int sig;
275 {
276     char c = (char) sig;
277     write(sig_pipe[0], &c, 1);
278 }
279
280 MODULE = Thread         PACKAGE = Thread
281
282 void
283 new(class, startsv, ...)
284         char *          class
285         SV *            startsv
286         AV *            av = av_make(items - 2, &ST(2));
287     PPCODE:
288         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
289
290 void
291 join(t)
292         Thread  t
293         AV *    av = NO_INIT
294         int     i = NO_INIT
295     PPCODE:
296         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
297                               thr, t, ThrSTATE(t)););
298         MUTEX_LOCK(&t->mutex);
299         switch (ThrSTATE(t)) {
300         case THRf_R_JOINABLE:
301         case THRf_R_JOINED:
302             ThrSETSTATE(t, THRf_R_JOINED);
303             MUTEX_UNLOCK(&t->mutex);
304             break;
305         case THRf_ZOMBIE:
306             ThrSETSTATE(t, THRf_DEAD);
307             MUTEX_UNLOCK(&t->mutex);
308             remove_thread(t);
309             break;
310         default:
311             MUTEX_UNLOCK(&t->mutex);
312             croak("can't join with thread");
313             /* NOTREACHED */
314         }
315         if (pthread_join(t->Tself, (void **) &av))
316             croak("pthread_join failed");
317
318         /* Could easily speed up the following if necessary */
319         for (i = 0; i <= AvFILL(av); i++)
320             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
321
322 void
323 detach(t)
324         Thread  t
325     CODE:
326         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
327                               thr, t, ThrSTATE(t)););
328         MUTEX_LOCK(&t->mutex);
329         switch (ThrSTATE(t)) {
330         case THRf_R_JOINABLE:
331             ThrSETSTATE(t, THRf_R_DETACHED);
332             /* fall through */
333         case THRf_R_DETACHED:
334             DETACH(t);
335             MUTEX_UNLOCK(&t->mutex);
336             break;
337         case THRf_ZOMBIE:
338             ThrSETSTATE(t, THRf_DEAD);
339             DETACH(t);
340             MUTEX_UNLOCK(&t->mutex);
341             remove_thread(t);
342             break;
343         default:
344             MUTEX_UNLOCK(&t->mutex);
345             croak("can't detach thread");
346             /* NOTREACHED */
347         }
348
349 void
350 equal(t1, t2)
351         Thread  t1
352         Thread  t2
353     PPCODE:
354         PUSHs((t1 == t2) ? &sv_yes : &sv_no);
355
356 void
357 flags(t)
358         Thread  t
359     PPCODE:
360         PUSHs(sv_2mortal(newSViv(t->flags)));
361
362 void
363 self(class)
364         char *  class
365     PREINIT:
366         SV *sv;
367     PPCODE:
368         sv = newSViv(thr->tid);
369         sv_magic(sv, oursv, '~', 0, 0);
370         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
371         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
372
373 U32
374 tid(t)
375         Thread  t
376     CODE:
377         MUTEX_LOCK(&t->mutex);
378         RETVAL = t->tid;
379         MUTEX_UNLOCK(&t->mutex);
380     OUTPUT:
381         RETVAL
382
383 void
384 DESTROY(t)
385         SV *    t
386     PPCODE:
387         PUSHs(&sv_yes);
388
389 void
390 yield()
391     CODE:
392 #ifdef OLD_PTHREADS_API
393         pthread_yield();
394 #else
395 #ifndef NO_SCHED_YIELD
396         sched_yield();
397 #endif /* NO_SCHED_YIELD */
398 #endif /* OLD_PTHREADS_API */
399
400 void
401 cond_wait(sv)
402         SV *    sv
403         MAGIC * mg = NO_INIT
404 CODE:
405         if (SvROK(sv))
406             sv = SvRV(sv);
407
408         mg = condpair_magic(sv);
409         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
410         MUTEX_LOCK(MgMUTEXP(mg));
411         if (MgOWNER(mg) != thr) {
412             MUTEX_UNLOCK(MgMUTEXP(mg));
413             croak("cond_wait for lock that we don't own\n");
414         }
415         MgOWNER(mg) = 0;
416         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
417         while (MgOWNER(mg))
418             COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
419         MgOWNER(mg) = thr;
420         MUTEX_UNLOCK(MgMUTEXP(mg));
421         
422 void
423 cond_signal(sv)
424         SV *    sv
425         MAGIC * mg = NO_INIT
426 CODE:
427         if (SvROK(sv))
428             sv = SvRV(sv);
429
430         mg = condpair_magic(sv);
431         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
432         MUTEX_LOCK(MgMUTEXP(mg));
433         if (MgOWNER(mg) != thr) {
434             MUTEX_UNLOCK(MgMUTEXP(mg));
435             croak("cond_signal for lock that we don't own\n");
436         }
437         COND_SIGNAL(MgCONDP(mg));
438         MUTEX_UNLOCK(MgMUTEXP(mg));
439
440 void
441 cond_broadcast(sv)
442         SV *    sv
443         MAGIC * mg = NO_INIT
444 CODE:
445         if (SvROK(sv))
446             sv = SvRV(sv);
447
448         mg = condpair_magic(sv);
449         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
450                               thr, sv));
451         MUTEX_LOCK(MgMUTEXP(mg));
452         if (MgOWNER(mg) != thr) {
453             MUTEX_UNLOCK(MgMUTEXP(mg));
454             croak("cond_broadcast for lock that we don't own\n");
455         }
456         COND_BROADCAST(MgCONDP(mg));
457         MUTEX_UNLOCK(MgMUTEXP(mg));
458
459 void
460 list(class)
461         char *  class
462     PREINIT:
463         Thread  t;
464         AV *    av;
465         SV **   svp;
466         int     n = 0;
467     PPCODE:
468         av = newAV();
469         /*
470          * Iterate until we have enough dynamic storage for all threads.
471          * We mustn't do any allocation while holding threads_mutex though.
472          */
473         MUTEX_LOCK(&threads_mutex);
474         do {
475             n = nthreads;
476             MUTEX_UNLOCK(&threads_mutex);
477             DEBUG_L(PerlIO_printf(PerlIO_stderr(), "list: n = %d\n", n));
478             if (AvFILL(av) < n - 1) {
479                 int i = AvFILL(av);
480                 for (i = AvFILL(av); i < n - 1; i++) {
481                     SV *sv = newSViv(0);        /* fill in tid later */
482                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
483                     av_push(av, sv_bless(newRV_noinc(sv),
484                                          gv_stashpv(class, TRUE)));
485         
486                 }
487             }
488             MUTEX_LOCK(&threads_mutex);
489         } while (n < nthreads);
490         n = nthreads;   /* Get the final correct value */
491
492         /*
493          * At this point, there's enough room to fill in av.
494          * Note that we are holding threads_mutex so the list
495          * won't change out from under us but all the remaining
496          * processing is "fast" (no blocking, malloc etc.)
497          */
498         t = thr;
499         svp = AvARRAY(av);
500         do {
501             SV *sv = SvRV(*svp++);
502             DEBUG_L(PerlIO_printf(PerlIO_stderr(),
503                                   "list: filling in thread %p\n", t));
504             sv_setiv(sv, t->tid);
505             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
506             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
507             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
508             t = t->next;
509         } while (t != thr);
510         /*  */
511         MUTEX_UNLOCK(&threads_mutex);
512         /* Truncate any unneeded slots in av */
513         av_fill(av, n - 1);
514         /* Finally, push all the new objects onto the stack and drop av */
515         EXTEND(sp, n);
516         for (svp = AvARRAY(av); n > 0; n--, svp++)
517             PUSHs(*svp);
518         (void)sv_2mortal((SV*)av);
519
520
521 MODULE = Thread         PACKAGE = Thread::Signal
522
523 void
524 kill_sighandler_thread()
525     PPCODE:
526         write(sig_pipe[0], "\0", 1);
527         PUSHs(&sv_yes);
528
529 void
530 init_thread_signals()
531     PPCODE:
532         sighandlerp = handle_thread_signal;
533         if (pipe(sig_pipe) == -1)
534             XSRETURN_UNDEF;
535         PUSHs(&sv_yes);
536
537 SV *
538 await_signal()
539     PREINIT:
540         char c;
541         ssize_t ret;
542     CODE:
543         do {
544             ret = read(sig_pipe[1], &c, 1);
545         } while (ret == -1 && errno == EINTR);
546         if (ret == -1)
547             croak("panic: await_signal");
548         if (ret == 0)
549             XSRETURN_UNDEF;
550         RETVAL = c ? psig_ptr[c] : &sv_no;
551     OUTPUT:
552         RETVAL