Rewrite thread destruction system using linked list of threads.
[p5sagit/p5-mst-13.2.git] / Thread.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4
5 /* Magic signature for Thread's mg_private is "Th" */ 
6 #define Thread_MAGIC_SIGNATURE 0x5468
7
8 static U32 threadnum = 0;
9 static int sig_pipe[2];
10
11 static void
12 remove_thread(t)
13 Thread t;
14 {
15     DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16                                    "%p: remove_thread %p\n", thr, t)));
17     MUTEX_LOCK(&threads_mutex);
18     nthreads--;
19     t->prev->next = t->next;
20     t->next->prev = t->prev;
21     COND_BROADCAST(&nthreads_cond);
22     MUTEX_UNLOCK(&threads_mutex);
23 }
24
25 static void *
26 threadstart(arg)
27 void *arg;
28 {
29 #ifdef FAKE_THREADS
30     Thread savethread = thr;
31     LOGOP myop;
32     dSP;
33     I32 oldscope = scopestack_ix;
34     I32 retval;
35     AV *returnav = newAV();
36     int i;
37
38     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
39                           thr, SvPEEK(TOPs)));
40     thr = (Thread) arg;
41     savemark = TOPMARK;
42     thr->prev = thr->prev_run = savethread;
43     thr->next = savethread->next;
44     thr->next_run = savethread->next_run;
45     savethread->next = savethread->next_run = thr;
46     thr->wait_queue = 0;
47     thr->private = 0;
48
49     /* Now duplicate most of perl_call_sv but with a few twists */
50     op = (OP*)&myop;
51     Zero(op, 1, LOGOP);
52     myop.op_flags = OPf_STACKED;
53     myop.op_next = Nullop;
54     myop.op_flags |= OPf_KNOW;
55     myop.op_flags |= OPf_WANT_LIST;
56     op = pp_entersub(ARGS);
57     DEBUG_L(if (!op)
58             PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
59     /*
60      * When this thread is next scheduled, we start in the right
61      * place. When the thread runs off the end of the sub, perl.c
62      * handles things, using savemark to figure out how much of the
63      * stack is the return value for any join.
64      */
65     thr = savethread;           /* back to the old thread */
66     return 0;
67 #else
68     Thread thr = (Thread) arg;
69     LOGOP myop;
70     dSP;
71     I32 oldmark = TOPMARK;
72     I32 oldscope = scopestack_ix;
73     I32 retval;
74     AV *returnav = newAV();
75     int i;
76     dJMPENV;
77     int ret;
78
79     /* Don't call *anything* requiring dTHR until after pthread_setspecific */
80     /*
81      * Wait until our creator releases us. If we didn't do this, then
82      * it would be potentially possible for out thread to carry on and
83      * do stuff before our creator fills in our "self" field. For example,
84      * if we went and created another thread which tried to pthread_join
85      * with us, then we'd be in a mess.
86      */
87     MUTEX_LOCK(threadstart_mutexp);
88     MUTEX_UNLOCK(threadstart_mutexp);
89     MUTEX_DESTROY(threadstart_mutexp);  /* don't need it any more */
90     Safefree(threadstart_mutexp);
91
92     /*
93      * It's safe to wait until now to set the thread-specific pointer
94      * from our pthread_t structure to our struct thread, since we're
95      * the only thread who can get at it anyway.
96      */
97     if (pthread_setspecific(thr_key, (void *) thr))
98         croak("panic: pthread_setspecific");
99
100     /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
101     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
102                           thr, SvPEEK(TOPs)));
103
104     JMPENV_PUSH(ret);
105     switch (ret) {
106     case 3:
107         PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
108         /* fall through */
109     case 1:
110         STATUS_ALL_FAILURE;
111         /* fall through */
112     case 2:
113         /* my_exit() was called */
114         while (scopestack_ix > oldscope)
115             LEAVE;
116         JMPENV_POP;
117         av_store(returnav, 0, newSViv(statusvalue));
118         goto finishoff;
119     }
120
121     /* Now duplicate most of perl_call_sv but with a few twists */
122     op = (OP*)&myop;
123     Zero(op, 1, LOGOP);
124     myop.op_flags = OPf_STACKED;
125     myop.op_next = Nullop;
126     myop.op_flags |= OPf_KNOW;
127     myop.op_flags |= OPf_WANT_LIST;
128     op = pp_entersub(ARGS);
129     if (op)
130         runops();
131     SPAGAIN;
132     retval = sp - (stack_base + oldmark);
133     sp = stack_base + oldmark + 1;
134     DEBUG_L(for (i = 1; i <= retval; i++)
135                 PerlIO_printf(PerlIO_stderr(),
136                               "%p returnav[%d] = %s\n",
137                               thr, i, SvPEEK(sp[i - 1]));)
138     av_store(returnav, 0, newSVpv("", 0));
139     for (i = 1; i <= retval; i++, sp++)
140         sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
141     
142   finishoff:
143 #if 0    
144     /* removed for debug */
145     SvREFCNT_dec(curstack);
146 #endif
147     SvREFCNT_dec(cvcache);
148     Safefree(markstack);
149     Safefree(scopestack);
150     Safefree(savestack);
151     Safefree(retstack);
152     Safefree(cxstack);
153     Safefree(tmps_stack);
154
155     if (ThrSTATE(thr) == THRf_DETACHED) {
156         DEBUG_L(PerlIO_printf(PerlIO_stderr(),
157                               "%p detached...zapping returnav\n", thr));
158         SvREFCNT_dec(returnav);
159         ThrSETSTATE(thr, THRf_DEAD);
160         remove_thread(thr);
161     }
162     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr));     
163     return (void *) returnav;   /* Available for anyone to join with us */
164                                 /* unless we are detached in which case */
165                                 /* noone will see the value anyway. */
166 #endif    
167 }
168
169 static SV *
170 newthread(startsv, initargs, class)
171 SV *startsv;
172 AV *initargs;
173 char *class;
174 {
175     dTHR;
176     dSP;
177     Thread savethread;
178     int i;
179     SV *sv;
180     sigset_t fullmask, oldmask;
181     
182     savethread = thr;
183     sv = newSVpv("", 0);
184     SvGROW(sv, sizeof(struct thread) + 1);
185     SvCUR_set(sv, sizeof(struct thread));
186     thr = (Thread) SvPVX(sv);
187     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n",
188                           savethread, SvPEEK(startsv), thr));
189     oursv = sv; 
190     /* If we don't zero these foostack pointers, init_stacks won't init them */
191     markstack = 0;
192     scopestack = 0;
193     savestack = 0;
194     retstack = 0;
195     init_stacks(ARGS);
196     curcop = savethread->Tcurcop;       /* XXX As good a guess as any? */
197     SPAGAIN;
198     defstash = savethread->Tdefstash;   /* XXX maybe these should */
199     curstash = savethread->Tcurstash;   /* always be set to main? */
200     /* top_env? */
201     /* runlevel */
202     cvcache = newHV();
203     thr->flags = THRf_NORMAL;
204     thr->tid = ++threadnum;
205     /* Insert new thread into the circular linked list and bump nthreads */
206     MUTEX_LOCK(&threads_mutex);
207     thr->next = savethread->next;
208     thr->prev = savethread;
209     savethread->next = thr;
210     thr->next->prev = thr;
211     nthreads++;
212     MUTEX_UNLOCK(&threads_mutex);
213
214     DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n",
215                           savethread));
216     /* The following pushes the arg list and startsv onto the *new* stack */
217     PUSHMARK(sp);
218     /* Could easily speed up the following greatly */
219     for (i = 0; i <= AvFILL(initargs); i++)
220         XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
221     XPUSHs(SvREFCNT_inc(startsv));
222     PUTBACK;
223
224 #ifdef FAKE_THREADS
225     threadstart(thr);
226 #else    
227     New(53, threadstart_mutexp, 1, perl_mutex);
228     /* On your marks... */
229     MUTEX_INIT(threadstart_mutexp);
230     MUTEX_LOCK(threadstart_mutexp);
231     /* Get set...
232      * Increment the global thread count. It is decremented
233      * by the destructor for the thread specific key thr_key.
234      */
235     sigfillset(&fullmask);
236     if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
237         croak("panic: sigprocmask");
238     if (pthread_create(&self, NULL, threadstart, (void*) thr))
239         return NULL;    /* XXX should clean up first */
240     /* Go */
241     MUTEX_UNLOCK(threadstart_mutexp);
242     if (sigprocmask(SIG_SETMASK, &oldmask, 0))
243         croak("panic: sigprocmask");
244 #endif
245     sv = newSViv(thr->tid);
246     sv_magic(sv, oursv, '~', 0, 0);
247     SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
248     return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
249 }
250
251 static Signal_t
252 handle_thread_signal(sig)
253 int sig;
254 {
255     char c = (char) sig;
256     write(sig_pipe[0], &c, 1);
257 }
258
259 MODULE = Thread         PACKAGE = Thread
260
261 void
262 new(class, startsv, ...)
263         char *          class
264         SV *            startsv
265         AV *            av = av_make(items - 2, &ST(2));
266     PPCODE:
267         XPUSHs(sv_2mortal(newthread(startsv, av, class)));
268
269 void
270 join(t)
271         Thread  t
272         AV *    av = NO_INIT
273         int     i = NO_INIT
274     PPCODE:
275         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
276                               thr, t, ThrSTATE(t)););
277         if (ThrSTATE(t) == THRf_DETACHED)
278             croak("tried to join a detached thread");
279         else if (ThrSTATE(t) == THRf_JOINED)
280             croak("tried to rejoin an already joined thread");
281         else if (ThrSTATE(t) == THRf_DEAD)
282             croak("tried to join a dead thread");
283
284         if (pthread_join(t->Tself, (void **) &av))
285             croak("pthread_join failed");
286         ThrSETSTATE(t, THRf_JOINED);
287         remove_thread(t);
288
289         /* Could easily speed up the following if necessary */
290         for (i = 0; i <= AvFILL(av); i++)
291             XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
292
293 void
294 detach(t)
295         Thread  t
296     CODE:
297         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
298                               thr, t, ThrSTATE(t)););
299         if (ThrSTATE(t) == THRf_DETACHED)
300             croak("tried to detach an already detached thread");
301         else if (ThrSTATE(t) == THRf_JOINED)
302             croak("tried to detach an already joined thread");
303         else if (ThrSTATE(t) == THRf_DEAD)
304             croak("tried to detach a dead thread");
305         DETACH(t);
306         ThrSETSTATE(t, THRf_DETACHED);
307
308 void
309 DESTROY(t)
310         Thread  t
311     CODE:
312         DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
313                                        "%p: DESTROY(%p), state %u\n",
314                                        thr, t, ThrSTATE(t))));
315                               
316         if (ThrSTATE(t) == THRf_NORMAL) {
317             DETACH(t);
318             ThrSETSTATE(t, THRf_DETACHED);
319             t->flags |= THRf_DIE_FATAL;
320         }
321
322 void
323 equal(t1, t2)
324         Thread  t1
325         Thread  t2
326     PPCODE:
327         PUSHs((t1 == t2) ? &sv_yes : &sv_no);
328
329 void
330 flags(t)
331         Thread  t
332     PPCODE:
333         PUSHs(sv_2mortal(newSViv(t->flags)));
334
335 void
336 self(class)
337         char *  class
338     PREINIT:
339         SV *sv;
340     PPCODE:
341         sv = newSViv(thr->tid);
342         sv_magic(sv, oursv, '~', 0, 0);
343         SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
344         PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
345
346 void
347 yield()
348     CODE:
349 #ifdef OLD_PTHREADS_API
350         pthread_yield();
351 #else
352 #ifndef NO_SCHED_YIELD
353         sched_yield();
354 #endif /* NO_SCHED_YIELD */
355 #endif /* OLD_PTHREADS_API */
356
357 void
358 cond_wait(sv)
359         SV *    sv
360         MAGIC * mg = NO_INIT
361 CODE:
362         if (SvROK(sv))
363             sv = SvRV(sv);
364
365         mg = condpair_magic(sv);
366         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
367         MUTEX_LOCK(MgMUTEXP(mg));
368         if (MgOWNER(mg) != thr) {
369             MUTEX_UNLOCK(MgMUTEXP(mg));
370             croak("cond_wait for lock that we don't own\n");
371         }
372         MgOWNER(mg) = 0;
373         COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
374         MgOWNER(mg) = thr;
375         MUTEX_UNLOCK(MgMUTEXP(mg));
376         
377 void
378 cond_signal(sv)
379         SV *    sv
380         MAGIC * mg = NO_INIT
381 CODE:
382         if (SvROK(sv)) {
383             /*
384              * Kludge to allow lock of real objects without requiring
385              * to pass in every type of argument by explicit reference.
386              */
387             sv = SvRV(sv);
388         }
389         mg = condpair_magic(sv);
390         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
391         MUTEX_LOCK(MgMUTEXP(mg));
392         if (MgOWNER(mg) != thr) {
393             MUTEX_UNLOCK(MgMUTEXP(mg));
394             croak("cond_signal for lock that we don't own\n");
395         }
396         COND_SIGNAL(MgCONDP(mg));
397         MUTEX_UNLOCK(MgMUTEXP(mg));
398
399 void
400 cond_broadcast(sv)
401         SV *    sv
402         MAGIC * mg = NO_INIT
403 CODE:
404         if (SvROK(sv))
405             sv = SvRV(sv);
406
407         mg = condpair_magic(sv);
408         DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
409                               thr, sv));
410         MUTEX_LOCK(MgMUTEXP(mg));
411         if (MgOWNER(mg) != thr) {
412             MUTEX_UNLOCK(MgMUTEXP(mg));
413             croak("cond_broadcast for lock that we don't own\n");
414         }
415         COND_BROADCAST(MgCONDP(mg));
416         MUTEX_UNLOCK(MgMUTEXP(mg));
417
418 void
419 list(class)
420         char *  class
421     PREINIT:
422         Thread  t;
423         AV *    av;
424         SV **   svp;
425         int     n = 0;
426     PPCODE:
427         av = newAV();
428         /*
429          * Iterate until we have enough dynamic storage for all threads.
430          * We mustn't do any allocation while holding threads_mutex though.
431          */
432         MUTEX_LOCK(&threads_mutex);
433         do {
434             n = nthreads;
435             MUTEX_UNLOCK(&threads_mutex);
436             if (AvFILL(av) < n - 1) {
437                 int i = AvFILL(av);
438                 for (i = AvFILL(av); i < n - 1; i++) {
439                     SV *sv = newSViv(0);        /* fill in tid later */
440                     sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
441                     av_push(av, sv_bless(newRV_noinc(sv),
442                                          gv_stashpv(class, TRUE)));
443                 }
444             }
445             MUTEX_LOCK(&threads_mutex);
446         } while (n < nthreads);
447
448         /*
449          * At this point, there's enough room to fill in av.
450          * Note that we are holding threads_mutex so the list
451          * won't change out from under us but all the remaining
452          * processing is "fast" (no blocking, malloc etc.)
453          */
454         t = thr;
455         svp = AvARRAY(av);
456         do {
457             SV *sv = SvRV(*svp++);
458             sv_setiv(sv, t->tid);
459             SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
460             SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
461             SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
462             t = t->next;
463         } while (t != thr);
464         /* Record the overflow */
465         n -= nthreads;
466         MUTEX_UNLOCK(&threads_mutex);
467         /* Truncate any unneeded slots in av */
468         if (n > 0)
469             av_fill(av, AvFILL(av) - n);
470         /* Finally, push all the new objects onto the stack and drop av */
471         EXTEND(sp, n);
472         for (svp = AvARRAY(av); n > 0; n--, svp++)
473             PUSHs(*svp);
474         (void)sv_2mortal((SV*)av);
475
476
477 MODULE = Thread         PACKAGE = Thread::Signal
478
479 void
480 kill_sighandler_thread()
481     PPCODE:
482         write(sig_pipe[0], "\0", 1);
483         PUSHs(&sv_yes);
484
485 void
486 init_thread_signals()
487     PPCODE:
488         sighandlerp = handle_thread_signal;
489         if (pipe(sig_pipe) == -1)
490             XSRETURN_UNDEF;
491         PUSHs(&sv_yes);
492
493 SV *
494 await_signal()
495     PREINIT:
496         char c;
497         ssize_t ret;
498     CODE:
499         do {
500             ret = read(sig_pipe[1], &c, 1);
501         } while (ret == -1 && errno == EINTR);
502         if (ret == -1)
503             croak("panic: await_signal");
504         if (ret == 0)
505             XSRETURN_UNDEF;
506         RETVAL = c ? psig_ptr[c] : &sv_no;
507     OUTPUT:
508         RETVAL