9c0325e07ddc10288273285b894fc692c63fc486
[p5sagit/p5-mst-13.2.git] / ext / Thread / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 /* Magic signature for Thread's mg_private is "Th" */ 
6 #define Thread_MAGIC_SIGNATURE 0x5468
7
8 static U32 threadnum = 0;
9 static int sig_pipe[2];
10
11 static void
12 remove_thread(t)
13 Thread t;
14 {
15     DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16                                    "%p: remove_thread %p\n", thr, t)));
17     MUTEX_LOCK(&threads_mutex);
18     MUTEX_DESTROY(&t->mutex);
19     nthreads--;
20     t->prev->next = t->next;
21     t->next->prev = t->prev;
22     COND_BROADCAST(&nthreads_cond);
23     MUTEX_UNLOCK(&threads_mutex);
24 }
25
26 static THREAD_RET_TYPE
27 threadstart(arg)
28 void *arg;
29 {
30 #ifdef FAKE_THREADS
31     Thread savethread = thr;
32     LOGOP myop;
33     dSP;
34     I32 oldscope = scopestack_ix;
35     I32 retval;
36     AV *returnav;
37     int i;
38
39     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
40                           thr, SvPEEK(TOPs)));
41     thr = (Thread) arg;
42     savemark = TOPMARK;
43     thr->prev = thr->prev_run = savethread;
44     thr->next = savethread->next;
45     thr->next_run = savethread->next_run;
46     savethread->next = savethread->next_run = thr;
47     thr->wait_queue = 0;
48     thr->private = 0;
49
50     /* Now duplicate most of perl_call_sv but with a few twists */
51     op = (OP*)&myop;
52     Zero(op, 1, LOGOP);
53     myop.op_flags = OPf_STACKED;
54     myop.op_next = Nullop;
55     myop.op_flags |= OPf_KNOW;
56     myop.op_flags |= OPf_WANT_LIST;
57     op = pp_entersub(ARGS);
58     DEBUG_L(if (!op)
59             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
60     /*
61      * When this thread is next scheduled, we start in the right
62      * place. When the thread runs off the end of the sub, perl.c
63      * handles things, using savemark to figure out how much of the
64      * stack is the return value for any join.
65      */
66     thr = savethread;           /* back to the old thread */
67     return 0;
68 #else
69     Thread thr = (Thread) arg;
70     LOGOP myop;
71     dSP;
72     I32 oldmark = TOPMARK;
73     I32 oldscope = scopestack_ix;
74     I32 retval;
75     AV *returnav;
76     int i, ret;
77     dJMPENV;
78
79     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
80     /*
81      * Wait until our creator releases us. If we didn't do this, then
82      * it would be potentially possible for out thread to carry on and
83      * do stuff before our creator fills in our "self" field. For example,
84      * if we went and created another thread which tried to JOIN with us,
85      * then we'd be in a mess.
86      */
87     MUTEX_LOCK(&thr->mutex);
88     MUTEX_UNLOCK(&thr->mutex);
89
90     /*
91      * It's safe to wait until now to set the thread-specific pointer
92      * from our pthread_t structure to our struct thread, since we're
93      * the only thread who can get at it anyway.
94      */
95     SET_THR(thr);
96
97     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
98     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
99                           thr, SvPEEK(TOPs)));
100
101     JMPENV_PUSH(ret);
102     switch (ret) {
103     case 3:
104         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
105         /* fall through */
106     case 1:
107         STATUS_ALL_FAILURE;
108         /* fall through */
109     case 2:
110         /* my_exit() was called */
111         while (scopestack_ix > oldscope)
112             LEAVE;
113         JMPENV_POP;
114         av_store(returnav, 0, newSViv(statusvalue));
115         goto finishoff;
116     }
117
118     CATCH_SET(TRUE);
119
120     /* Now duplicate most of perl_call_sv but with a few twists */
121     op = (OP*)&myop;
122     Zero(op, 1, LOGOP);
123     myop.op_flags = OPf_STACKED;
124     myop.op_next = Nullop;
125     myop.op_flags |= OPf_KNOW;
126     myop.op_flags |= OPf_WANT_LIST;
127     op = pp_entersub(ARGS);
128     if (op)
129         runops();
130     SPAGAIN;
131     retval = sp - (stack_base + oldmark);
132     sp = stack_base + oldmark + 1;
133     DEBUG_L(for (i = 1; i <= retval; i++)
134                 PerlIO_printf(PerlIO_stderr(),
135                               "%p returnav[%d] = %s\n",
136                               thr, i, SvPEEK(sp[i - 1]));)
137     returnav = newAV();
138     av_store(returnav, 0, newSVpv("", 0));
139     for (i = 1; i <= retval; i++, sp++)
140         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
141     
142   finishoff:
143 #if 0    
144     /* removed for debug */
145     SvREFCNT_dec(curstack);
146 #endif
147     SvREFCNT_dec(thr->cvcache);
148     SvREFCNT_dec(thr->magicals);
149     SvREFCNT_dec(thr->specific);
150     Safefree(markstack);
151     Safefree(scopestack);
152     Safefree(savestack);
153     Safefree(retstack);
154     Safefree(cxstack);
155     Safefree(tmps_stack);
156     Safefree(ofs);
157
158     MUTEX_LOCK(&thr->mutex);
159     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
160                           "%p: threadstart finishing: state is %u\n",
161                           thr, ThrSTATE(thr)));
162     switch (ThrSTATE(thr)) {
163     case THRf_R_JOINABLE:
164         ThrSETSTATE(thr, THRf_ZOMBIE);
165         MUTEX_UNLOCK(&thr->mutex);
166         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
167                               "%p: R_JOINABLE thread finished\n", thr));
168         break;
169     case THRf_R_JOINED:
170         ThrSETSTATE(thr, THRf_DEAD);
171         MUTEX_UNLOCK(&thr->mutex);
172         remove_thread(thr);
173         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
174                               "%p: R_JOINED thread finished\n", thr));
175         break;
176     case THRf_R_DETACHED:
177         ThrSETSTATE(thr, THRf_DEAD);
178         MUTEX_UNLOCK(&thr->mutex);
179         SvREFCNT_dec(returnav);
180         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
181                               "%p: DETACHED thread finished\n", thr));
182         remove_thread(thr);     /* This might trigger main thread to finish */
183         break;
184     default:
185         MUTEX_UNLOCK(&thr->mutex);
186         croak("panic: illegal state %u at end of threadstart", ThrSTATE(thr));
187         /* NOTREACHED */
188     }
189     return THREAD_RET_CAST(returnav);   /* Available for anyone to join with */
190                                         /* us unless we're detached, in which */
191                                         /* case noone sees the value anyway. */
192 #endif    
193 }
194
195 static SV *
196 newthread(startsv, initargs, class)
197 SV *startsv;
198 AV *initargs;
199 char *class;
200 {
201     dTHR;
202     dSP;
203     Thread savethread;
204     int i;
205     SV *sv;
206     int err;
207 #ifndef THREAD_CREATE
208     sigset_t fullmask, oldmask;
209 #endif
210     
211     savethread = thr;
212     thr = new_struct_thread(thr);
213     SPAGAIN;
214     DEBUG_L(PerlIO_printf(PerlIO_stderr(),
215                           "%p: newthread, tid is %u, preparing stack\n",
216                           savethread, thr->tid));
217     /* The following pushes the arg list and startsv onto the *new* stack */
218     PUSHMARK(sp);
219     /* Could easily speed up the following greatly */
220     for (i = 0; i <= AvFILL(initargs); i++)
221         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
222     XPUSHs(SvREFCNT_inc(startsv));
223     PUTBACK;
224
225 #ifdef THREAD_CREATE
226     THREAD_CREATE(thr, threadstart);
227 #else    
228     /* On your marks... */
229     MUTEX_LOCK(&thr->mutex);
230     /* Get set...  */
231     sigfillset(&fullmask);
232     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
233         croak("panic: sigprocmask");
234     err = pthread_create(&thr->self, pthread_attr_default,
235                          threadstart, (void*) thr);
236     /* Go */
237     MUTEX_UNLOCK(&thr->mutex);
238 #endif
239     if (err) {
240         /* Thread creation failed--clean up */
241         SvREFCNT_dec(thr->cvcache);
242         remove_thread(thr);
243         MUTEX_DESTROY(&thr->mutex);
244         for (i = 0; i <= AvFILL(initargs); i++)
245             SvREFCNT_dec(*av_fetch(initargs, i, FALSE));
246         SvREFCNT_dec(startsv);
247         return NULL;
248     }
249 #ifdef THREAD_POST_CREATE
250     THREAD_POST_CREATE(thr);
251 #else
252     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
253         croak("panic: sigprocmask");
254 #endif
255     sv = newSViv(thr->tid);
256     sv_magic(sv, thr->oursv, '~', 0, 0);
257     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
258     return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
259 }
260
261 static Signal_t
262 handle_thread_signal(sig)
263 int sig;
264 {
265     char c = (char) sig;
266     write(sig_pipe[0], &c, 1);
267 }
268
269 MODULE = Thread         PACKAGE = Thread
270
271 void
272 new(class, startsv, ...)
273         char *          class
274         SV *            startsv
275         AV *            av = av_make(items - 2, &ST(2));
276     PPCODE:
277         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
278
279 void
280 join(t)
281         Thread  t
282         AV *    av = NO_INIT
283         int     i = NO_INIT
284     PPCODE:
285         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
286                               thr, t, ThrSTATE(t)););
287         MUTEX_LOCK(&t->mutex);
288         switch (ThrSTATE(t)) {
289         case THRf_R_JOINABLE:
290         case THRf_R_JOINED:
291             ThrSETSTATE(t, THRf_R_JOINED);
292             MUTEX_UNLOCK(&t->mutex);
293             break;
294         case THRf_ZOMBIE:
295             ThrSETSTATE(t, THRf_DEAD);
296             MUTEX_UNLOCK(&t->mutex);
297             remove_thread(t);
298             break;
299         default:
300             MUTEX_UNLOCK(&t->mutex);
301             croak("can't join with thread");
302             /* NOTREACHED */
303         }
304         JOIN(t, &av);
305
306         /* Could easily speed up the following if necessary */
307         for (i = 0; i <= AvFILL(av); i++)
308             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
309
310 void
311 detach(t)
312         Thread  t
313     CODE:
314         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
315                               thr, t, ThrSTATE(t)););
316         MUTEX_LOCK(&t->mutex);
317         switch (ThrSTATE(t)) {
318         case THRf_R_JOINABLE:
319             ThrSETSTATE(t, THRf_R_DETACHED);
320             /* fall through */
321         case THRf_R_DETACHED:
322             DETACH(t);
323             MUTEX_UNLOCK(&t->mutex);
324             break;
325         case THRf_ZOMBIE:
326             ThrSETSTATE(t, THRf_DEAD);
327             DETACH(t);
328             MUTEX_UNLOCK(&t->mutex);
329             remove_thread(t);
330             break;
331         default:
332             MUTEX_UNLOCK(&t->mutex);
333             croak("can't detach thread");
334             /* NOTREACHED */
335         }
336
337 void
338 equal(t1, t2)
339         Thread  t1
340         Thread  t2
341     PPCODE:
342         PUSHs((t1 == t2) ? &sv_yes : &sv_no);
343
344 void
345 flags(t)
346         Thread  t
347     PPCODE:
348         PUSHs(sv_2mortal(newSViv(t->flags)));
349
350 void
351 self(class)
352         char *  class
353     PREINIT:
354         SV *sv;
355     PPCODE:
356         sv = newSViv(thr->tid);
357         sv_magic(sv, thr->oursv, '~', 0, 0);
358         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
359         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
360
361 U32
362 tid(t)
363         Thread  t
364     CODE:
365         MUTEX_LOCK(&t->mutex);
366         RETVAL = t->tid;
367         MUTEX_UNLOCK(&t->mutex);
368     OUTPUT:
369         RETVAL
370
371 void
372 DESTROY(t)
373         SV *    t
374     PPCODE:
375         PUSHs(&sv_yes);
376
377 void
378 yield()
379     CODE:
380         YIELD;
381
382 void
383 cond_wait(sv)
384         SV *    sv
385         MAGIC * mg = NO_INIT
386 CODE:
387         if (SvROK(sv))
388             sv = SvRV(sv);
389
390         mg = condpair_magic(sv);
391         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
392         MUTEX_LOCK(MgMUTEXP(mg));
393         if (MgOWNER(mg) != thr) {
394             MUTEX_UNLOCK(MgMUTEXP(mg));
395             croak("cond_wait for lock that we don't own\n");
396         }
397         MgOWNER(mg) = 0;
398         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
399         while (MgOWNER(mg))
400             COND_WAIT(MgOWNERCONDP(mg), MgMUTEXP(mg));
401         MgOWNER(mg) = thr;
402         MUTEX_UNLOCK(MgMUTEXP(mg));
403         
404 void
405 cond_signal(sv)
406         SV *    sv
407         MAGIC * mg = NO_INIT
408 CODE:
409         if (SvROK(sv))
410             sv = SvRV(sv);
411
412         mg = condpair_magic(sv);
413         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
414         MUTEX_LOCK(MgMUTEXP(mg));
415         if (MgOWNER(mg) != thr) {
416             MUTEX_UNLOCK(MgMUTEXP(mg));
417             croak("cond_signal for lock that we don't own\n");
418         }
419         COND_SIGNAL(MgCONDP(mg));
420         MUTEX_UNLOCK(MgMUTEXP(mg));
421
422 void
423 cond_broadcast(sv)
424         SV *    sv
425         MAGIC * mg = NO_INIT
426 CODE:
427         if (SvROK(sv))
428             sv = SvRV(sv);
429
430         mg = condpair_magic(sv);
431         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
432                               thr, sv));
433         MUTEX_LOCK(MgMUTEXP(mg));
434         if (MgOWNER(mg) != thr) {
435             MUTEX_UNLOCK(MgMUTEXP(mg));
436             croak("cond_broadcast for lock that we don't own\n");
437         }
438         COND_BROADCAST(MgCONDP(mg));
439         MUTEX_UNLOCK(MgMUTEXP(mg));
440
441 void
442 list(class)
443         char *  class
444     PREINIT:
445         Thread  t;
446         AV *    av;
447         SV **   svp;
448         int     n = 0;
449     PPCODE:
450         av = newAV();
451         /*
452          * Iterate until we have enough dynamic storage for all threads.
453          * We mustn't do any allocation while holding threads_mutex though.
454          */
455         MUTEX_LOCK(&threads_mutex);
456         do {
457             n = nthreads;
458             MUTEX_UNLOCK(&threads_mutex);
459             if (AvFILL(av) < n - 1) {
460                 int i = AvFILL(av);
461                 for (i = AvFILL(av); i < n - 1; i++) {
462                     SV *sv = newSViv(0);        /* fill in tid later */
463                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
464                     av_push(av, sv_bless(newRV_noinc(sv),
465                                          gv_stashpv(class, TRUE)));
466         
467                 }
468             }
469             MUTEX_LOCK(&threads_mutex);
470         } while (n < nthreads);
471         n = nthreads;   /* Get the final correct value */
472
473         /*
474          * At this point, there's enough room to fill in av.
475          * Note that we are holding threads_mutex so the list
476          * won't change out from under us but all the remaining
477          * processing is "fast" (no blocking, malloc etc.)
478          */
479         t = thr;
480         svp = AvARRAY(av);
481         do {
482             SV *sv = (SV*)SvRV(*svp);
483             sv_setiv(sv, t->tid);
484             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->oursv);
485             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
486             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
487             t = t->next;
488             svp++;
489         } while (t != thr);
490         /*  */
491         MUTEX_UNLOCK(&threads_mutex);
492         /* Truncate any unneeded slots in av */
493         av_fill(av, n - 1);
494         /* Finally, push all the new objects onto the stack and drop av */
495         EXTEND(sp, n);
496         for (svp = AvARRAY(av); n > 0; n--, svp++)
497             PUSHs(*svp);
498         (void)sv_2mortal((SV*)av);
499
500
501 MODULE = Thread         PACKAGE = Thread::Signal
502
503 void
504 kill_sighandler_thread()
505     PPCODE:
506         write(sig_pipe[0], "\0", 1);
507         PUSHs(&sv_yes);
508
509 void
510 init_thread_signals()
511     PPCODE:
512         sighandlerp = handle_thread_signal;
513         if (pipe(sig_pipe) == -1)
514             XSRETURN_UNDEF;
515         PUSHs(&sv_yes);
516
517 SV *
518 await_signal()
519     PREINIT:
520         char c;
521         SSize_t ret;
522     CODE:
523         do {
524             ret = read(sig_pipe[1], &c, 1);
525         } while (ret == -1 && errno == EINTR);
526         if (ret == -1)
527             croak("panic: await_signal");
528         if (ret == 0)
529             XSRETURN_UNDEF;
530         RETVAL = c ? psig_ptr[c] : &sv_no;
531     OUTPUT:
532         RETVAL