Rewrite thread destruction system using linked list of threads.
[p5sagit/p5-mst-13.2.git] / Thread.xs
CommitLineData
d9bb3666 1#include "EXTERN.h"
2#include "perl.h"
3#include "XSUB.h"
4
7d901afa 5/* Magic signature for Thread's mg_private is "Th" */
6#define Thread_MAGIC_SIGNATURE 0x5468
7
8static U32 threadnum = 0;
85ced67f 9static int sig_pipe[2];
683929b4 10
7d901afa 11static void
12remove_thread(t)
13Thread t;
14{
15 DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
16 "%p: remove_thread %p\n", thr, t)));
17 MUTEX_LOCK(&threads_mutex);
18 nthreads--;
19 t->prev->next = t->next;
20 t->next->prev = t->prev;
21 COND_BROADCAST(&nthreads_cond);
22 MUTEX_UNLOCK(&threads_mutex);
23}
24
d9bb3666 25static void *
26threadstart(arg)
27void *arg;
28{
783070da 29#ifdef FAKE_THREADS
30 Thread savethread = thr;
31 LOGOP myop;
32 dSP;
33 I32 oldscope = scopestack_ix;
34 I32 retval;
35 AV *returnav = newAV();
36 int i;
37
683929b4 38 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
39 thr, SvPEEK(TOPs)));
783070da 40 thr = (Thread) arg;
41 savemark = TOPMARK;
42 thr->prev = thr->prev_run = savethread;
43 thr->next = savethread->next;
44 thr->next_run = savethread->next_run;
45 savethread->next = savethread->next_run = thr;
46 thr->wait_queue = 0;
47 thr->private = 0;
48
49 /* Now duplicate most of perl_call_sv but with a few twists */
50 op = (OP*)&myop;
51 Zero(op, 1, LOGOP);
52 myop.op_flags = OPf_STACKED;
53 myop.op_next = Nullop;
54 myop.op_flags |= OPf_KNOW;
55 myop.op_flags |= OPf_WANT_LIST;
56 op = pp_entersub(ARGS);
57 DEBUG_L(if (!op)
58 PerlIO_printf(PerlIO_stderr(), "thread starts at Nullop\n"));
59 /*
60 * When this thread is next scheduled, we start in the right
61 * place. When the thread runs off the end of the sub, perl.c
62 * handles things, using savemark to figure out how much of the
63 * stack is the return value for any join.
64 */
65 thr = savethread; /* back to the old thread */
66 return 0;
67#else
d9bb3666 68 Thread thr = (Thread) arg;
69 LOGOP myop;
70 dSP;
71 I32 oldmark = TOPMARK;
72 I32 oldscope = scopestack_ix;
73 I32 retval;
74 AV *returnav = newAV();
75 int i;
783070da 76 dJMPENV;
77 int ret;
78
79 /* Don't call *anything* requiring dTHR until after pthread_setspecific */
d9bb3666 80 /*
81 * Wait until our creator releases us. If we didn't do this, then
82 * it would be potentially possible for out thread to carry on and
83 * do stuff before our creator fills in our "self" field. For example,
84 * if we went and created another thread which tried to pthread_join
85 * with us, then we'd be in a mess.
86 */
87 MUTEX_LOCK(threadstart_mutexp);
88 MUTEX_UNLOCK(threadstart_mutexp);
89 MUTEX_DESTROY(threadstart_mutexp); /* don't need it any more */
90 Safefree(threadstart_mutexp);
91
d9bb3666 92 /*
93 * It's safe to wait until now to set the thread-specific pointer
94 * from our pthread_t structure to our struct thread, since we're
95 * the only thread who can get at it anyway.
96 */
97 if (pthread_setspecific(thr_key, (void *) thr))
98 croak("panic: pthread_setspecific");
99
783070da 100 /* Only now can we use SvPEEK (which calls sv_newmortal which does dTHR) */
683929b4 101 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "new thread %p starting at %s\n",
102 thr, SvPEEK(TOPs)));
783070da 103
104 JMPENV_PUSH(ret);
105 switch (ret) {
106 case 3:
107 PerlIO_printf(PerlIO_stderr(), "panic: threadstart\n");
d9bb3666 108 /* fall through */
783070da 109 case 1:
110 STATUS_ALL_FAILURE;
d9bb3666 111 /* fall through */
783070da 112 case 2:
113 /* my_exit() was called */
114 while (scopestack_ix > oldscope)
115 LEAVE;
116 JMPENV_POP;
d9bb3666 117 av_store(returnav, 0, newSViv(statusvalue));
118 goto finishoff;
119 }
120
121 /* Now duplicate most of perl_call_sv but with a few twists */
122 op = (OP*)&myop;
123 Zero(op, 1, LOGOP);
124 myop.op_flags = OPf_STACKED;
125 myop.op_next = Nullop;
126 myop.op_flags |= OPf_KNOW;
783070da 127 myop.op_flags |= OPf_WANT_LIST;
d9bb3666 128 op = pp_entersub(ARGS);
129 if (op)
130 runops();
734689b1 131 SPAGAIN;
132 retval = sp - (stack_base + oldmark);
133 sp = stack_base + oldmark + 1;
783070da 134 DEBUG_L(for (i = 1; i <= retval; i++)
135 PerlIO_printf(PerlIO_stderr(),
136 "%p returnav[%d] = %s\n",
137 thr, i, SvPEEK(sp[i - 1]));)
d9bb3666 138 av_store(returnav, 0, newSVpv("", 0));
734689b1 139 for (i = 1; i <= retval; i++, sp++)
140 sv_setsv(*av_fetch(returnav, i, TRUE), SvREFCNT_inc(*sp));
141
d9bb3666 142 finishoff:
783070da 143#if 0
144 /* removed for debug */
145 SvREFCNT_dec(curstack);
146#endif
d9bb3666 147 SvREFCNT_dec(cvcache);
148 Safefree(markstack);
149 Safefree(scopestack);
150 Safefree(savestack);
151 Safefree(retstack);
152 Safefree(cxstack);
153 Safefree(tmps_stack);
154
683929b4 155 if (ThrSTATE(thr) == THRf_DETACHED) {
783070da 156 DEBUG_L(PerlIO_printf(PerlIO_stderr(),
157 "%p detached...zapping returnav\n", thr));
734689b1 158 SvREFCNT_dec(returnav);
683929b4 159 ThrSETSTATE(thr, THRf_DEAD);
7d901afa 160 remove_thread(thr);
734689b1 161 }
783070da 162 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p returning\n", thr));
d9bb3666 163 return (void *) returnav; /* Available for anyone to join with us */
734689b1 164 /* unless we are detached in which case */
165 /* noone will see the value anyway. */
783070da 166#endif
d9bb3666 167}
168
683929b4 169static SV *
170newthread(startsv, initargs, class)
d9bb3666 171SV *startsv;
172AV *initargs;
683929b4 173char *class;
d9bb3666 174{
175 dTHR;
176 dSP;
177 Thread savethread;
178 int i;
683929b4 179 SV *sv;
f152979c 180 sigset_t fullmask, oldmask;
d9bb3666 181
182 savethread = thr;
683929b4 183 sv = newSVpv("", 0);
184 SvGROW(sv, sizeof(struct thread) + 1);
185 SvCUR_set(sv, sizeof(struct thread));
186 thr = (Thread) SvPVX(sv);
7d901afa 187 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread(%s) = %p\n",
188 savethread, SvPEEK(startsv), thr));
683929b4 189 oursv = sv;
783070da 190 /* If we don't zero these foostack pointers, init_stacks won't init them */
191 markstack = 0;
192 scopestack = 0;
193 savestack = 0;
194 retstack = 0;
d9bb3666 195 init_stacks(ARGS);
783070da 196 curcop = savethread->Tcurcop; /* XXX As good a guess as any? */
d9bb3666 197 SPAGAIN;
198 defstash = savethread->Tdefstash; /* XXX maybe these should */
199 curstash = savethread->Tcurstash; /* always be set to main? */
d9bb3666 200 /* top_env? */
201 /* runlevel */
202 cvcache = newHV();
7d901afa 203 thr->flags = THRf_NORMAL;
204 thr->tid = ++threadnum;
205 /* Insert new thread into the circular linked list and bump nthreads */
206 MUTEX_LOCK(&threads_mutex);
207 thr->next = savethread->next;
208 thr->prev = savethread;
209 savethread->next = thr;
210 thr->next->prev = thr;
211 nthreads++;
212 MUTEX_UNLOCK(&threads_mutex);
d9bb3666 213
7d901afa 214 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: newthread preparing stack\n",
215 savethread));
d9bb3666 216 /* The following pushes the arg list and startsv onto the *new* stack */
217 PUSHMARK(sp);
218 /* Could easily speed up the following greatly */
734689b1 219 for (i = 0; i <= AvFILL(initargs); i++)
d9bb3666 220 XPUSHs(SvREFCNT_inc(*av_fetch(initargs, i, FALSE)));
221 XPUSHs(SvREFCNT_inc(startsv));
222 PUTBACK;
223
783070da 224#ifdef FAKE_THREADS
225 threadstart(thr);
226#else
2c127b02 227 New(53, threadstart_mutexp, 1, perl_mutex);
d9bb3666 228 /* On your marks... */
229 MUTEX_INIT(threadstart_mutexp);
230 MUTEX_LOCK(threadstart_mutexp);
231 /* Get set...
232 * Increment the global thread count. It is decremented
233 * by the destructor for the thread specific key thr_key.
234 */
f152979c 235 sigfillset(&fullmask);
236 if (sigprocmask(SIG_SETMASK, &fullmask, &oldmask) == -1)
237 croak("panic: sigprocmask");
d9bb3666 238 if (pthread_create(&self, NULL, threadstart, (void*) thr))
239 return NULL; /* XXX should clean up first */
240 /* Go */
241 MUTEX_UNLOCK(threadstart_mutexp);
f152979c 242 if (sigprocmask(SIG_SETMASK, &oldmask, 0))
243 croak("panic: sigprocmask");
783070da 244#endif
7d901afa 245 sv = newSViv(thr->tid);
683929b4 246 sv_magic(sv, oursv, '~', 0, 0);
7d901afa 247 SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
248 return sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE));
d9bb3666 249}
250
f152979c 251static Signal_t
252handle_thread_signal(sig)
253int sig;
254{
255 char c = (char) sig;
256 write(sig_pipe[0], &c, 1);
257}
258
d9bb3666 259MODULE = Thread PACKAGE = Thread
260
683929b4 261void
d9bb3666 262new(class, startsv, ...)
683929b4 263 char * class
d9bb3666 264 SV * startsv
734689b1 265 AV * av = av_make(items - 2, &ST(2));
683929b4 266 PPCODE:
267 XPUSHs(sv_2mortal(newthread(startsv, av, class)));
d9bb3666 268
269void
d9bb3666 270join(t)
271 Thread t
272 AV * av = NO_INIT
273 int i = NO_INIT
274 PPCODE:
7d901afa 275 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: joining %p (state %u)\n",
276 thr, t, ThrSTATE(t)););
683929b4 277 if (ThrSTATE(t) == THRf_DETACHED)
734689b1 278 croak("tried to join a detached thread");
683929b4 279 else if (ThrSTATE(t) == THRf_JOINED)
734689b1 280 croak("tried to rejoin an already joined thread");
683929b4 281 else if (ThrSTATE(t) == THRf_DEAD)
734689b1 282 croak("tried to join a dead thread");
283
d9bb3666 284 if (pthread_join(t->Tself, (void **) &av))
285 croak("pthread_join failed");
683929b4 286 ThrSETSTATE(t, THRf_JOINED);
7d901afa 287 remove_thread(t);
288
d9bb3666 289 /* Could easily speed up the following if necessary */
290 for (i = 0; i <= AvFILL(av); i++)
291 XPUSHs(sv_2mortal(*av_fetch(av, i, FALSE)));
292
293void
734689b1 294detach(t)
d9bb3666 295 Thread t
296 CODE:
7d901afa 297 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: detaching %p (state %u)\n",
298 thr, t, ThrSTATE(t)););
683929b4 299 if (ThrSTATE(t) == THRf_DETACHED)
734689b1 300 croak("tried to detach an already detached thread");
683929b4 301 else if (ThrSTATE(t) == THRf_JOINED)
734689b1 302 croak("tried to detach an already joined thread");
683929b4 303 else if (ThrSTATE(t) == THRf_DEAD)
734689b1 304 croak("tried to detach a dead thread");
7d901afa 305 DETACH(t);
683929b4 306 ThrSETSTATE(t, THRf_DETACHED);
d9bb3666 307
308void
734689b1 309DESTROY(t)
310 Thread t
d9bb3666 311 CODE:
7d901afa 312 DEBUG_L(WITH_THR(PerlIO_printf(PerlIO_stderr(),
313 "%p: DESTROY(%p), state %u\n",
314 thr, t, ThrSTATE(t))));
315
683929b4 316 if (ThrSTATE(t) == THRf_NORMAL) {
7d901afa 317 DETACH(t);
683929b4 318 ThrSETSTATE(t, THRf_DETACHED);
7d901afa 319 t->flags |= THRf_DIE_FATAL;
734689b1 320 }
d9bb3666 321
322void
7d901afa 323equal(t1, t2)
324 Thread t1
325 Thread t2
326 PPCODE:
327 PUSHs((t1 == t2) ? &sv_yes : &sv_no);
328
329void
330flags(t)
331 Thread t
332 PPCODE:
333 PUSHs(sv_2mortal(newSViv(t->flags)));
334
335void
336self(class)
337 char * class
338 PREINIT:
339 SV *sv;
340 PPCODE:
341 sv = newSViv(thr->tid);
342 sv_magic(sv, oursv, '~', 0, 0);
343 SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
344 PUSHs(sv_2mortal(sv_bless(newRV_noinc(sv), gv_stashpv(class, TRUE))));
345
346void
734689b1 347yield()
d9bb3666 348 CODE:
734689b1 349#ifdef OLD_PTHREADS_API
350 pthread_yield();
351#else
352#ifndef NO_SCHED_YIELD
353 sched_yield();
354#endif /* NO_SCHED_YIELD */
355#endif /* OLD_PTHREADS_API */
d9bb3666 356
357void
734689b1 358cond_wait(sv)
359 SV * sv
360 MAGIC * mg = NO_INIT
361CODE:
2c127b02 362 if (SvROK(sv))
734689b1 363 sv = SvRV(sv);
2c127b02 364
734689b1 365 mg = condpair_magic(sv);
683929b4 366 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_wait %p\n", thr, sv));
734689b1 367 MUTEX_LOCK(MgMUTEXP(mg));
368 if (MgOWNER(mg) != thr) {
369 MUTEX_UNLOCK(MgMUTEXP(mg));
370 croak("cond_wait for lock that we don't own\n");
371 }
372 MgOWNER(mg) = 0;
373 COND_WAIT(MgCONDP(mg), MgMUTEXP(mg));
374 MgOWNER(mg) = thr;
375 MUTEX_UNLOCK(MgMUTEXP(mg));
376
377void
378cond_signal(sv)
379 SV * sv
380 MAGIC * mg = NO_INIT
381CODE:
382 if (SvROK(sv)) {
383 /*
384 * Kludge to allow lock of real objects without requiring
385 * to pass in every type of argument by explicit reference.
386 */
387 sv = SvRV(sv);
388 }
389 mg = condpair_magic(sv);
683929b4 390 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_signal %p\n",thr,sv));
734689b1 391 MUTEX_LOCK(MgMUTEXP(mg));
392 if (MgOWNER(mg) != thr) {
393 MUTEX_UNLOCK(MgMUTEXP(mg));
394 croak("cond_signal for lock that we don't own\n");
395 }
396 COND_SIGNAL(MgCONDP(mg));
397 MUTEX_UNLOCK(MgMUTEXP(mg));
d9bb3666 398
734689b1 399void
400cond_broadcast(sv)
401 SV * sv
402 MAGIC * mg = NO_INIT
403CODE:
783070da 404 if (SvROK(sv))
734689b1 405 sv = SvRV(sv);
783070da 406
734689b1 407 mg = condpair_magic(sv);
683929b4 408 DEBUG_L(PerlIO_printf(PerlIO_stderr(), "%p: cond_broadcast %p\n",
409 thr, sv));
734689b1 410 MUTEX_LOCK(MgMUTEXP(mg));
411 if (MgOWNER(mg) != thr) {
412 MUTEX_UNLOCK(MgMUTEXP(mg));
413 croak("cond_broadcast for lock that we don't own\n");
414 }
415 COND_BROADCAST(MgCONDP(mg));
416 MUTEX_UNLOCK(MgMUTEXP(mg));
f152979c 417
7d901afa 418void
419list(class)
420 char * class
421 PREINIT:
422 Thread t;
423 AV * av;
424 SV ** svp;
425 int n = 0;
426 PPCODE:
427 av = newAV();
428 /*
429 * Iterate until we have enough dynamic storage for all threads.
430 * We mustn't do any allocation while holding threads_mutex though.
431 */
432 MUTEX_LOCK(&threads_mutex);
433 do {
434 n = nthreads;
435 MUTEX_UNLOCK(&threads_mutex);
436 if (AvFILL(av) < n - 1) {
437 int i = AvFILL(av);
438 for (i = AvFILL(av); i < n - 1; i++) {
439 SV *sv = newSViv(0); /* fill in tid later */
440 sv_magic(sv, 0, '~', 0, 0); /* fill in other magic later */
441 av_push(av, sv_bless(newRV_noinc(sv),
442 gv_stashpv(class, TRUE)));
443 }
444 }
445 MUTEX_LOCK(&threads_mutex);
446 } while (n < nthreads);
447
448 /*
449 * At this point, there's enough room to fill in av.
450 * Note that we are holding threads_mutex so the list
451 * won't change out from under us but all the remaining
452 * processing is "fast" (no blocking, malloc etc.)
453 */
454 t = thr;
455 svp = AvARRAY(av);
456 do {
457 SV *sv = SvRV(*svp++);
458 sv_setiv(sv, t->tid);
459 SvMAGIC(sv)->mg_obj = SvREFCNT_inc(t->Toursv);
460 SvMAGIC(sv)->mg_flags |= MGf_REFCOUNTED;
461 SvMAGIC(sv)->mg_private = Thread_MAGIC_SIGNATURE;
462 t = t->next;
463 } while (t != thr);
464 /* Record the overflow */
465 n -= nthreads;
466 MUTEX_UNLOCK(&threads_mutex);
467 /* Truncate any unneeded slots in av */
468 if (n > 0)
469 av_fill(av, AvFILL(av) - n);
470 /* Finally, push all the new objects onto the stack and drop av */
471 EXTEND(sp, n);
472 for (svp = AvARRAY(av); n > 0; n--, svp++)
473 PUSHs(*svp);
474 (void)sv_2mortal((SV*)av);
475
476
f152979c 477MODULE = Thread PACKAGE = Thread::Signal
478
479void
480kill_sighandler_thread()
481 PPCODE:
482 write(sig_pipe[0], "\0", 1);
483 PUSHs(&sv_yes);
484
485void
486init_thread_signals()
487 PPCODE:
488 sighandlerp = handle_thread_signal;
489 if (pipe(sig_pipe) == -1)
490 XSRETURN_UNDEF;
491 PUSHs(&sv_yes);
492
493SV *
494await_signal()
495 PREINIT:
496 char c;
497 ssize_t ret;
498 CODE:
499 do {
500 ret = read(sig_pipe[1], &c, 1);
501 } while (ret == -1 && errno == EINTR);
502 if (ret == -1)
503 croak("panic: await_signal");
504 if (ret == 0)
505 XSRETURN_UNDEF;
506 RETVAL = c ? psig_ptr[c] : &sv_no;
507 OUTPUT:
508 RETVAL