Reliable thread signal handling.
[p5sagit/p5-mst-13.2.git] / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 static I32 threadnum = 0;
6
7 static void *
8 threadstart(arg)
9 void *arg;
10 {
11 #ifdef FAKE_THREADS
12     Thread savethread = thr;
13     LOGOP myop;
14     dSP;
15     I32 oldscope = scopestack_ix;
16     I32 retval;
17     AV *returnav = newAV();
18     int i;
19
20     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
21                           thr, SvPEEK(TOPs)));
22     thr = (Thread) arg;
23     savemark = TOPMARK;
24     thr->prev = thr->prev_run = savethread;
25     thr->next = savethread->next;
26     thr->next_run = savethread->next_run;
27     savethread->next = savethread->next_run = thr;
28     thr->wait_queue = 0;
29     thr->private = 0;
30
31     /* Now duplicate most of perl_call_sv but with a few twists */
32     op = (OP*)&myop;
33     Zero(op, 1, LOGOP);
34     myop.op_flags = OPf_STACKED;
35     myop.op_next = Nullop;
36     myop.op_flags |= OPf_KNOW;
37     myop.op_flags |= OPf_WANT_LIST;
38     op = pp_entersub(ARGS);
39     DEBUG_L(if (!op)
40             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
41     /*
42      * When this thread is next scheduled, we start in the right
43      * place. When the thread runs off the end of the sub, perl.c
44      * handles things, using savemark to figure out how much of the
45      * stack is the return value for any join.
46      */
47     thr = savethread;           /* back to the old thread */
48     return 0;
49 #else
50     Thread thr = (Thread) arg;
51     LOGOP myop;
52     dSP;
53     I32 oldmark = TOPMARK;
54     I32 oldscope = scopestack_ix;
55     I32 retval;
56     AV *returnav = newAV();
57     int i;
58     dJMPENV;
59     int ret;
60
61     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
62     /*
63      * Wait until our creator releases us. If we didn't do this, then
64      * it would be potentially possible for out thread to carry on and
65      * do stuff before our creator fills in our "self" field. For example,
66      * if we went and created another thread which tried to pthread_join
67      * with us, then we'd be in a mess.
68      */
69     MUTEX_LOCK(threadstart_mutexp);
70     MUTEX_UNLOCK(threadstart_mutexp);
71     MUTEX_DESTROY(threadstart_mutexp);  /* don't need it any more */
72     Safefree(threadstart_mutexp);
73
74     /*
75      * It's safe to wait until now to set the thread-specific pointer
76      * from our pthread_t structure to our struct thread, since we're
77      * the only thread who can get at it anyway.
78      */
79     if (pthread_setspecific(thr_key, (void *) thr))
80         croak("panic: pthread_setspecific");
81
82     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
83     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
84                           thr, SvPEEK(TOPs)));
85
86     JMPENV_PUSH(ret);
87     switch (ret) {
88     case 3:
89         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
90         /* fall through */
91     case 1:
92         STATUS_ALL_FAILURE;
93         /* fall through */
94     case 2:
95         /* my_exit() was called */
96         while (scopestack_ix > oldscope)
97             LEAVE;
98         JMPENV_POP;
99         av_store(returnav, 0, newSViv(statusvalue));
100         goto finishoff;
101     }
102
103     /* Now duplicate most of perl_call_sv but with a few twists */
104     op = (OP*)&myop;
105     Zero(op, 1, LOGOP);
106     myop.op_flags = OPf_STACKED;
107     myop.op_next = Nullop;
108     myop.op_flags |= OPf_KNOW;
109     myop.op_flags |= OPf_WANT_LIST;
110     op = pp_entersub(ARGS);
111     if (op)
112         runops();
113     SPAGAIN;
114     retval = sp - (stack_base + oldmark);
115     sp = stack_base + oldmark + 1;
116     DEBUG_L(for (i = 1; i <= retval; i++)
117                 PerlIO_printf(PerlIO_stderr(),
118                               "%p returnav[%d] = %s\n",
119                               thr, i, SvPEEK(sp[i - 1]));)
120     av_store(returnav, 0, newSVpv("", 0));
121     for (i = 1; i <= retval; i++, sp++)
122         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
123     
124   finishoff:
125 #if 0    
126     /* removed for debug */
127     SvREFCNT_dec(curstack);
128 #endif
129     SvREFCNT_dec(cvcache);
130     Safefree(markstack);
131     Safefree(scopestack);
132     Safefree(savestack);
133     Safefree(retstack);
134     Safefree(cxstack);
135     Safefree(tmps_stack);
136
137     if (ThrSTATE(thr) == THRf_DETACHED) {
138         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
139                               "%p detached...zapping returnav\n", thr));
140         SvREFCNT_dec(returnav);
141         ThrSETSTATE(thr, THRf_DEAD);
142     }
143     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr));     
144     return (void *) returnav;   /* Available for anyone to join with us */
145                                 /* unless we are detached in which case */
146                                 /* noone will see the value anyway. */
147 #endif    
148 }
149
150 static SV *
151 newthread(startsv, initargs, class)
152 SV *startsv;
153 AV *initargs;
154 char *class;
155 {
156     dTHR;
157     dSP;
158     Thread savethread;
159     int i;
160     SV *sv;
161     sigset_t fullmask, oldmask;
162     
163     savethread = thr;
164     sv = newSVpv("", 0);
165     SvGROW(sv, sizeof(struct thread) + 1);
166     SvCUR_set(sv, sizeof(struct thread));
167     thr = (Thread) SvPVX(sv);
168     oursv = sv; 
169     /* If we don't zero these foostack pointers, init_stacks won't init them */
170     markstack = 0;
171     scopestack = 0;
172     savestack = 0;
173     retstack = 0;
174     init_stacks(ARGS);
175     curcop = savethread->Tcurcop;       /* XXX As good a guess as any? */
176     SPAGAIN;
177     defstash = savethread->Tdefstash;   /* XXX maybe these should */
178     curstash = savethread->Tcurstash;   /* always be set to main? */
179     /* top_env? */
180     /* runlevel */
181     cvcache = newHV();
182     thrflags = 0;
183     ThrSETSTATE(thr, THRf_NORMAL);
184
185     /* The following pushes the arg list and startsv onto the *new* stack */
186     PUSHMARK(sp);
187     /* Could easily speed up the following greatly */
188     for (i = 0; i <= AvFILL(initargs); i++)
189         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
190     XPUSHs(SvREFCNT_inc(startsv));
191     PUTBACK;
192
193 #ifdef FAKE_THREADS
194     threadstart(thr);
195 #else    
196     New(53, threadstart_mutexp, 1, perl_mutex);
197     /* On your marks... */
198     MUTEX_INIT(threadstart_mutexp);
199     MUTEX_LOCK(threadstart_mutexp);
200     /* Get set...
201      * Increment the global thread count. It is decremented
202      * by the destructor for the thread specific key thr_key.
203      */
204     MUTEX_LOCK(&nthreads_mutex);
205     nthreads++;
206     MUTEX_UNLOCK(&nthreads_mutex);
207     sigfillset(&fullmask);
208     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
209         croak("panic: sigprocmask");
210     if (pthread_create(&self, NULL, threadstart, (void*) thr))
211         return NULL;    /* XXX should clean up first */
212     /* Go */
213     MUTEX_UNLOCK(threadstart_mutexp);
214     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
215         croak("panic: sigprocmask");
216 #endif
217     sv = newSViv(++threadnum);
218     sv_magic(sv, oursv, '~', 0, 0);
219     return sv_bless(newRV(sv), gv_stashpv(class, TRUE));
220 }
221
222 static Signal_t
223 handle_thread_signal(sig)
224 int sig;
225 {
226     char c = (char) sig;
227     write(sig_pipe[0], &c, 1);
228 }
229
230 MODULE = Thread         PACKAGE = Thread
231
232 void
233 new(class, startsv, ...)
234         char *          class
235         SV *            startsv
236         AV *            av = av_make(items - 2, &ST(2));
237     PPCODE:
238         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
239
240 void
241 join(t)
242         Thread  t
243         AV *    av = NO_INIT
244         int     i = NO_INIT
245     PPCODE:
246         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
247                               "%p: joining %p (state 0x%lx)\n",
248                               thr, t, (unsigned long)ThrSTATE(t)););
249         if (ThrSTATE(t) == THRf_DETACHED)
250             croak("tried to join a detached thread");
251         else if (ThrSTATE(t) == THRf_JOINED)
252             croak("tried to rejoin an already joined thread");
253         else if (ThrSTATE(t) == THRf_DEAD)
254             croak("tried to join a dead thread");
255
256         if (pthread_join(t->Tself, (void **) &av))
257             croak("pthread_join failed");
258         ThrSETSTATE(t, THRf_JOINED);
259         /* Could easily speed up the following if necessary */
260         for (i = 0; i <= AvFILL(av); i++)
261             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
262
263 void
264 detach(t)
265         Thread  t
266     CODE:
267         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
268                               "%p: detaching %p (state 0x%lx)\n",
269                               thr, t, (unsigned long)ThrSTATE(t)););
270         if (ThrSTATE(t) == THRf_DETACHED)
271             croak("tried to detach an already detached thread");
272         else if (ThrSTATE(t) == THRf_JOINED)
273             croak("tried to detach an already joined thread");
274         else if (ThrSTATE(t) == THRf_DEAD)
275             croak("tried to detach a dead thread");
276         if (pthread_detach(t->Tself))
277             croak("panic: pthread_detach failed");
278         ThrSETSTATE(t, THRf_DETACHED);
279
280 void
281 DESTROY(t)
282         Thread  t
283     CODE:
284         if (ThrSTATE(t) == THRf_NORMAL) {
285             if (pthread_detach(t->Tself))
286                 croak("panic: pthread_detach failed");
287             ThrSETSTATE(t, THRf_DETACHED);
288             thrflags |= THRf_DIE_FATAL;
289         }
290
291 void
292 yield()
293     CODE:
294 #ifdef OLD_PTHREADS_API
295         pthread_yield();
296 #else
297 #ifndef NO_SCHED_YIELD
298         sched_yield();
299 #endif /* NO_SCHED_YIELD */
300 #endif /* OLD_PTHREADS_API */
301
302 void
303 cond_wait(sv)
304         SV *    sv
305         MAGIC * mg = NO_INIT
306 CODE:
307         if (SvROK(sv))
308             sv = SvRV(sv);
309
310         mg = condpair_magic(sv);
311         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
312         MUTEX_LOCK(MgMUTEXP(mg));
313         if (MgOWNER(mg) != thr) {
314             MUTEX_UNLOCK(MgMUTEXP(mg));
315             croak("cond_wait for lock that we don't own\n");
316         }
317         MgOWNER(mg) = 0;
318         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
319         MgOWNER(mg) = thr;
320         MUTEX_UNLOCK(MgMUTEXP(mg));
321         
322 void
323 cond_signal(sv)
324         SV *    sv
325         MAGIC * mg = NO_INIT
326 CODE:
327         if (SvROK(sv)) {
328             /*
329              * Kludge to allow lock of real objects without requiring
330              * to pass in every type of argument by explicit reference.
331              */
332             sv = SvRV(sv);
333         }
334         mg = condpair_magic(sv);
335         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
336         MUTEX_LOCK(MgMUTEXP(mg));
337         if (MgOWNER(mg) != thr) {
338             MUTEX_UNLOCK(MgMUTEXP(mg));
339             croak("cond_signal for lock that we don't own\n");
340         }
341         COND_SIGNAL(MgCONDP(mg));
342         MUTEX_UNLOCK(MgMUTEXP(mg));
343
344 void
345 cond_broadcast(sv)
346         SV *    sv
347         MAGIC * mg = NO_INIT
348 CODE:
349         if (SvROK(sv))
350             sv = SvRV(sv);
351
352         mg = condpair_magic(sv);
353         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
354                               thr, sv));
355         MUTEX_LOCK(MgMUTEXP(mg));
356         if (MgOWNER(mg) != thr) {
357             MUTEX_UNLOCK(MgMUTEXP(mg));
358             croak("cond_broadcast for lock that we don't own\n");
359         }
360         COND_BROADCAST(MgCONDP(mg));
361         MUTEX_UNLOCK(MgMUTEXP(mg));
362
363 MODULE = Thread         PACKAGE = Thread::Signal
364
365 void
366 kill_sighandler_thread()
367     PPCODE:
368         write(sig_pipe[0], "\0", 1);
369         PUSHs(&sv_yes);
370
371 void
372 init_thread_signals()
373     PPCODE:
374         sighandlerp = handle_thread_signal;
375         if (pipe(sig_pipe) == -1)
376             XSRETURN_UNDEF;
377         PUSHs(&sv_yes);
378
379 SV *
380 await_signal()
381     PREINIT:
382         char c;
383         ssize_t ret;
384     CODE:
385         do {
386             ret = read(sig_pipe[1], &c, 1);
387         } while (ret == -1 && errno == EINTR);
388         if (ret == -1)
389             croak("panic: await_signal");
390         if (ret == 0)
391             XSRETURN_UNDEF;
392         RETVAL = c ? psig_ptr[c] : &sv_no;
393     OUTPUT:
394         RETVAL