3dc25162a77c9383f85e9bc8d59971fc4153e66d
[p5sagit/p5-mst-13.2.git] / ext / Thread / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 /* Magic signature for Thread's mg_private is "Th" */ 
6 #define Thread_MAGIC_SIGNATURE 0x5468
7
8 static U32 threadnum = 0;
9 static int sig_pipe[2];
10
11 static void
12 remove_thread(t)
13 Thread t;
14 {
15     DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16                                    "%p: remove_thread %p\n", thr, t)));
17     MUTEX_LOCK(&threads_mutex);
18     MUTEX_DESTROY(&t->mutex);
19     nthreads--;
20     t->prev->next = t->next;
21     t->next->prev = t->prev;
22     COND_BROADCAST(&nthreads_cond);
23     MUTEX_UNLOCK(&threads_mutex);
24 }
25
26 static void *
27 threadstart(arg)
28 void *arg;
29 {
30 #ifdef FAKE_THREADS
31     Thread savethread = thr;
32     LOGOP myop;
33     dSP;
34     I32 oldscope = scopestack_ix;
35     I32 retval;
36     AV *returnav;
37     int i;
38
39     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
40                           thr, SvPEEK(TOPs)));
41     thr = (Thread) arg;
42     savemark = TOPMARK;
43     thr->prev = thr->prev_run = savethread;
44     thr->next = savethread->next;
45     thr->next_run = savethread->next_run;
46     savethread->next = savethread->next_run = thr;
47     thr->wait_queue = 0;
48     thr->private = 0;
49
50     /* Now duplicate most of perl_call_sv but with a few twists */
51     op = (OP*)&myop;
52     Zero(op, 1, LOGOP);
53     myop.op_flags = OPf_STACKED;
54     myop.op_next = Nullop;
55     myop.op_flags |= OPf_KNOW;
56     myop.op_flags |= OPf_WANT_LIST;
57     op = pp_entersub(ARGS);
58     DEBUG_L(if (!op)
59             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
60     /*
61      * When this thread is next scheduled, we start in the right
62      * place. When the thread runs off the end of the sub, perl.c
63      * handles things, using savemark to figure out how much of the
64      * stack is the return value for any join.
65      */
66     thr = savethread;           /* back to the old thread */
67     return 0;
68 #else
69     Thread thr = (Thread) arg;
70     LOGOP myop;
71     dSP;
72     I32 oldmark = TOPMARK;
73     I32 oldscope = scopestack_ix;
74     I32 retval;
75     AV *returnav;
76     int i, ret;
77     dJMPENV;
78
79     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
80     /*
81      * Wait until our creator releases us. If we didn't do this, then
82      * it would be potentially possible for out thread to carry on and
83      * do stuff before our creator fills in our "self" field. For example,
84      * if we went and created another thread which tried to pthread_join
85      * with us, then we'd be in a mess.
86      */
87     MUTEX_LOCK(&thr->mutex);
88     MUTEX_UNLOCK(&thr->mutex);
89
90     /*
91      * It's safe to wait until now to set the thread-specific pointer
92      * from our pthread_t structure to our struct thread, since we're
93      * the only thread who can get at it anyway.
94      */
95     if (pthread_setspecific(thr_key, (void *) thr))
96         croak("panic: pthread_setspecific");
97
98     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
99     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
100                           thr, SvPEEK(TOPs)));
101
102     JMPENV_PUSH(ret);
103     switch (ret) {
104     case 3:
105         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
106         /* fall through */
107     case 1:
108         STATUS_ALL_FAILURE;
109         /* fall through */
110     case 2:
111         /* my_exit() was called */
112         while (scopestack_ix > oldscope)
113             LEAVE;
114         JMPENV_POP;
115         av_store(returnav, 0, newSViv(statusvalue));
116         goto finishoff;
117     }
118
119     /* Now duplicate most of perl_call_sv but with a few twists */
120     op = (OP*)&myop;
121     Zero(op, 1, LOGOP);
122     myop.op_flags = OPf_STACKED;
123     myop.op_next = Nullop;
124     myop.op_flags |= OPf_KNOW;
125     myop.op_flags |= OPf_WANT_LIST;
126     op = pp_entersub(ARGS);
127     if (op)
128         runops();
129     SPAGAIN;
130     retval = sp - (stack_base + oldmark);
131     sp = stack_base + oldmark + 1;
132     DEBUG_L(for (i = 1; i <= retval; i++)
133                 PerlIO_printf(PerlIO_stderr(),
134                               "%p returnav[%d] = %s\n",
135                               thr, i, SvPEEK(sp[i - 1]));)
136     returnav = newAV();
137     av_store(returnav, 0, newSVpv("", 0));
138     for (i = 1; i <= retval; i++, sp++)
139         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
140     
141   finishoff:
142 #if 0    
143     /* removed for debug */
144     SvREFCNT_dec(curstack);
145 #endif
146     SvREFCNT_dec(cvcache);
147     Safefree(markstack);
148     Safefree(scopestack);
149     Safefree(savestack);
150     Safefree(retstack);
151     Safefree(cxstack);
152     Safefree(tmps_stack);
153
154     MUTEX_LOCK(&thr->mutex);
155     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
156                           "%p: threadstart finishing: state is %u\n",
157                           thr, ThrSTATE(thr)));
158     switch (ThrSTATE(thr)) {
159     case THRf_R_JOINABLE:
160         ThrSETSTATE(thr, THRf_ZOMBIE);
161         MUTEX_UNLOCK(&thr->mutex);
162         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
163                               "%p: R_JOINABLE thread finished\n", thr));
164         break;
165     case THRf_R_JOINED:
166         ThrSETSTATE(thr, THRf_DEAD);
167         MUTEX_UNLOCK(&thr->mutex);
168         remove_thread(thr);
169         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
170                               "%p: R_JOINED thread finished\n", thr));
171         break;
172     case THRf_R_DETACHED:
173         ThrSETSTATE(thr, THRf_DEAD);
174         MUTEX_UNLOCK(&thr->mutex);
175         SvREFCNT_dec(returnav);
176         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
177                               "%p: DETACHED thread finished\n", thr));
178         remove_thread(thr);     /* This might trigger main thread to finish */
179         break;
180     default:
181         MUTEX_UNLOCK(&thr->mutex);
182         croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
183         /* NOTREACHED */
184     }
185     return (void *) returnav;   /* Available for anyone to join with us */
186                                 /* unless we are detached in which case */
187                                 /* noone will see the value anyway. */
188 #endif    
189 }
190
191 static SV *
192 newthread(startsv, initargs, class)
193 SV *startsv;
194 AV *initargs;
195 char *class;
196 {
197     dTHR;
198     dSP;
199     Thread savethread;
200     int i;
201     SV *sv;
202     sigset_t fullmask, oldmask;
203     
204     savethread = thr;
205     sv = newSVpv("", 0);
206     SvGROW(sv, sizeof(struct thread) + 1);
207     SvCUR_set(sv, sizeof(struct thread));
208     thr = (Thread) SvPVX(sv);
209     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p)\n",
210                           savethread, SvPEEK(startsv), thr));
211     oursv = sv; 
212     /* If we don't zero these foostack pointers, init_stacks won't init them */
213     markstack = 0;
214     scopestack = 0;
215     savestack = 0;
216     retstack = 0;
217     init_stacks(ARGS);
218     curcop = savethread->Tcurcop;       /* XXX As good a guess as any? */
219     SPAGAIN;
220     defstash = savethread->Tdefstash;   /* XXX maybe these should */
221     curstash = savethread->Tcurstash;   /* always be set to main? */
222     /* top_env? */
223     /* runlevel */
224     cvcache = newHV();
225     thr->flags = THRf_R_JOINABLE;
226     MUTEX_INIT(&thr->mutex);
227     thr->tid = ++threadnum;
228     /* Insert new thread into the circular linked list and bump nthreads */
229     MUTEX_LOCK(&threads_mutex);
230     thr->next = savethread->next;
231     thr->prev = savethread;
232     savethread->next = thr;
233     thr->next->prev = thr;
234     nthreads++;
235     MUTEX_UNLOCK(&threads_mutex);
236
237     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
238                           "%p: newthread, tid is %u, preparing stack\n",
239                           savethread, thr->tid));
240     /* The following pushes the arg list and startsv onto the *new* stack */
241     PUSHMARK(sp);
242     /* Could easily speed up the following greatly */
243     for (i = 0; i <= AvFILL(initargs); i++)
244         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
245     XPUSHs(SvREFCNT_inc(startsv));
246     PUTBACK;
247
248 #ifdef FAKE_THREADS
249     threadstart(thr);
250 #else    
251     /* On your marks... */
252     MUTEX_LOCK(&thr->mutex);
253     /* Get set...
254      * Increment the global thread count.
255      */
256     sigfillset(&fullmask);
257     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
258         croak("panic: sigprocmask");
259     if (pthread_create(&self, NULL, threadstart, (void*) thr))
260         return NULL;    /* XXX should clean up first */
261     /* Go */
262     MUTEX_UNLOCK(&thr->mutex);
263     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
264         croak("panic: sigprocmask");
265 #endif
266     sv = newSViv(thr->tid);
267     sv_magic(sv, oursv, '~', 0, 0);
268     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
269     return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
270 }
271
272 static Signal_t
273 handle_thread_signal(sig)
274 int sig;
275 {
276     char c = (char) sig;
277     write(sig_pipe[0], &c, 1);
278 }
279
280 MODULE = Thread         PACKAGE = Thread
281
282 void
283 new(class, startsv, ...)
284         char *          class
285         SV *            startsv
286         AV *            av = av_make(items - 2, &ST(2));
287     PPCODE:
288         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
289
290 void
291 join(t)
292         Thread  t
293         AV *    av = NO_INIT
294         int     i = NO_INIT
295     PPCODE:
296         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
297                               thr, t, ThrSTATE(t)););
298         MUTEX_LOCK(&t->mutex);
299         switch (ThrSTATE(t)) {
300         case THRf_R_JOINABLE:
301         case THRf_R_JOINED:
302             ThrSETSTATE(t, THRf_R_JOINED);
303             MUTEX_UNLOCK(&t->mutex);
304             break;
305         case THRf_ZOMBIE:
306             ThrSETSTATE(t, THRf_DEAD);
307             MUTEX_UNLOCK(&t->mutex);
308             remove_thread(t);
309             break;
310         default:
311             MUTEX_UNLOCK(&t->mutex);
312             croak("can't join with thread");
313             /* NOTREACHED */
314         }
315         if (pthread_join(t->Tself, (void **) &av))
316             croak("pthread_join failed");
317
318         /* Could easily speed up the following if necessary */
319         for (i = 0; i <= AvFILL(av); i++)
320             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
321
322 void
323 detach(t)
324         Thread  t
325     CODE:
326         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
327                               thr, t, ThrSTATE(t)););
328         MUTEX_LOCK(&t->mutex);
329         switch (ThrSTATE(t)) {
330         case THRf_R_JOINABLE:
331             ThrSETSTATE(t, THRf_R_DETACHED);
332             /* fall through */
333         case THRf_R_DETACHED:
334             DETACH(t);
335             MUTEX_UNLOCK(&t->mutex);
336             break;
337         case THRf_ZOMBIE:
338             ThrSETSTATE(t, THRf_DEAD);
339             DETACH(t);
340             MUTEX_UNLOCK(&t->mutex);
341             remove_thread(t);
342             break;
343         default:
344             MUTEX_UNLOCK(&t->mutex);
345             croak("can't detach thread");
346             /* NOTREACHED */
347         }
348
349 void
350 equal(t1, t2)
351         Thread  t1
352         Thread  t2
353     PPCODE:
354         PUSHs((t1 == t2) ? &sv_yes : &sv_no);
355
356 void
357 flags(t)
358         Thread  t
359     PPCODE:
360         PUSHs(sv_2mortal(newSViv(t->flags)));
361
362 void
363 self(class)
364         char *  class
365     PREINIT:
366         SV *sv;
367     PPCODE:
368         sv = newSViv(thr->tid);
369         sv_magic(sv, oursv, '~', 0, 0);
370         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
371         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
372
373 U32
374 tid(t)
375         Thread  t
376     CODE:
377         MUTEX_LOCK(&t->mutex);
378         RETVAL = t->tid;
379         MUTEX_UNLOCK(&t->mutex);
380     OUTPUT:
381         RETVAL
382
383 void
384 DESTROY(t)
385         SV *    t
386     PPCODE:
387         PUSHs(&sv_yes);
388
389 void
390 yield()
391     CODE:
392 #ifdef OLD_PTHREADS_API
393         pthread_yield();
394 #else
395 #ifndef NO_SCHED_YIELD
396         sched_yield();
397 #endif /* NO_SCHED_YIELD */
398 #endif /* OLD_PTHREADS_API */
399
400 void
401 cond_wait(sv)
402         SV *    sv
403         MAGIC * mg = NO_INIT
404 CODE:
405         if (SvROK(sv))
406             sv = SvRV(sv);
407
408         mg = condpair_magic(sv);
409         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
410         MUTEX_LOCK(MgMUTEXP(mg));
411         if (MgOWNER(mg) != thr) {
412             MUTEX_UNLOCK(MgMUTEXP(mg));
413             croak("cond_wait for lock that we don't own\n");
414         }
415         MgOWNER(mg) = 0;
416         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
417         while (MgOWNER(mg))
418             COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
419         MgOWNER(mg) = thr;
420         MUTEX_UNLOCK(MgMUTEXP(mg));
421         
422 void
423 cond_signal(sv)
424         SV *    sv
425         MAGIC * mg = NO_INIT
426 CODE:
427         if (SvROK(sv))
428             sv = SvRV(sv);
429
430         mg = condpair_magic(sv);
431         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
432         MUTEX_LOCK(MgMUTEXP(mg));
433         if (MgOWNER(mg) != thr) {
434             MUTEX_UNLOCK(MgMUTEXP(mg));
435             croak("cond_signal for lock that we don't own\n");
436         }
437         COND_SIGNAL(MgCONDP(mg));
438         MUTEX_UNLOCK(MgMUTEXP(mg));
439
440 void
441 cond_broadcast(sv)
442         SV *    sv
443         MAGIC * mg = NO_INIT
444 CODE:
445         if (SvROK(sv))
446             sv = SvRV(sv);
447
448         mg = condpair_magic(sv);
449         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
450                               thr, sv));
451         MUTEX_LOCK(MgMUTEXP(mg));
452         if (MgOWNER(mg) != thr) {
453             MUTEX_UNLOCK(MgMUTEXP(mg));
454             croak("cond_broadcast for lock that we don't own\n");
455         }
456         COND_BROADCAST(MgCONDP(mg));
457         MUTEX_UNLOCK(MgMUTEXP(mg));
458
459 void
460 list(class)
461         char *  class
462     PREINIT:
463         Thread  t;
464         AV *    av;
465         SV **   svp;
466         int     n = 0;
467     PPCODE:
468         av = newAV();
469         /*
470          * Iterate until we have enough dynamic storage for all threads.
471          * We mustn't do any allocation while holding threads_mutex though.
472          */
473         MUTEX_LOCK(&threads_mutex);
474         do {
475             n = nthreads;
476             MUTEX_UNLOCK(&threads_mutex);
477             if (AvFILL(av) < n - 1) {
478                 int i = AvFILL(av);
479                 for (i = AvFILL(av); i < n - 1; i++) {
480                     SV *sv = newSViv(0);        /* fill in tid later */
481                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
482                     av_push(av, sv_bless(newRV_noinc(sv),
483                                          gv_stashpv(class, TRUE)));
484         
485                 }
486             }
487             MUTEX_LOCK(&threads_mutex);
488         } while (n < nthreads);
489         n = nthreads;   /* Get the final correct value */
490
491         /*
492          * At this point, there's enough room to fill in av.
493          * Note that we are holding threads_mutex so the list
494          * won't change out from under us but all the remaining
495          * processing is "fast" (no blocking, malloc etc.)
496          */
497         t = thr;
498         svp = AvARRAY(av);
499         do {
500             SV *sv = (SV*)SvRV(*svp);
501             sv_setiv(sv, t->tid);
502             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
503             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
504             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
505             t = t->next;
506             svp++;
507         } while (t != thr);
508         /*  */
509         MUTEX_UNLOCK(&threads_mutex);
510         /* Truncate any unneeded slots in av */
511         av_fill(av, n - 1);
512         /* Finally, push all the new objects onto the stack and drop av */
513         EXTEND(sp, n);
514         for (svp = AvARRAY(av); n > 0; n--, svp++)
515             PUSHs(*svp);
516         (void)sv_2mortal((SV*)av);
517
518
519 MODULE = Thread         PACKAGE = Thread::Signal
520
521 void
522 kill_sighandler_thread()
523     PPCODE:
524         write(sig_pipe[0], "\0", 1);
525         PUSHs(&sv_yes);
526
527 void
528 init_thread_signals()
529     PPCODE:
530         sighandlerp = handle_thread_signal;
531         if (pipe(sig_pipe) == -1)
532             XSRETURN_UNDEF;
533         PUSHs(&sv_yes);
534
535 SV *
536 await_signal()
537     PREINIT:
538         char c;
539         ssize_t ret;
540     CODE:
541         do {
542             ret = read(sig_pipe[1], &c, 1);
543         } while (ret == -1 && errno == EINTR);
544         if (ret == -1)
545             croak("panic: await_signal");
546         if (ret == 0)
547             XSRETURN_UNDEF;
548         RETVAL = c ? psig_ptr[c] : &sv_no;
549     OUTPUT:
550         RETVAL