package threads::shared;
-
use strict;
use warnings;
use Config;
-use Scalar::Util qw(weaken);
-use attributes qw(reftype);
+
+require Exporter;
+our @ISA = qw(Exporter);
+our @EXPORT = qw(share cond_wait cond_broadcast cond_signal unlock);
+our $VERSION = '0.90';
+
+use XSLoader;
+XSLoader::load('threads::shared',$VERSION);
BEGIN {
- if ($Config{'useithreads'} && $threads::threads) {
- *share = \&share_enabled;
+ if ($Config{'useithreads'}) {
*cond_wait = \&cond_wait_enabled;
*cond_signal = \&cond_signal_enabled;
*cond_broadcast = \&cond_broadcast_enabled;
}
}
-require Exporter;
-require DynaLoader;
-our @ISA = qw(Exporter DynaLoader);
-
-our @EXPORT = qw(share cond_wait cond_broadcast cond_signal unlock);
-our $VERSION = '0.90';
-
-our %shared;
sub cond_wait_disabled { return @_ };
sub cond_signal_disabled { return @_};
sub lock_disabled { 1 }
sub share_disabled { return @_}
-sub share_enabled (\[$@%]) { # \]
- my $value = $_[0];
- my $ref = reftype($value);
- if($ref eq 'SCALAR') {
- my $obj = \threads::shared::sv->new($$value);
- bless $obj, 'threads::shared::sv';
- $shared{$$obj} = $value;
- weaken($shared{$$obj});
- } elsif($ref eq "ARRAY") {
- tie @$value, 'threads::shared::av', $value;
- } elsif($ref eq "HASH") {
- tie %$value, "threads::shared::hv", $value;
- } else {
- die "You cannot share ref of type $_[0]\n";
- }
-}
-
-
-package threads::shared::sv;
-use base 'threads::shared';
-
-sub DESTROY {}
-
-package threads::shared::av;
-use base 'threads::shared';
-use Scalar::Util qw(weaken);
-sub TIEARRAY {
- my $class = shift;
- my $value = shift;
- my $self = bless \threads::shared::av->new($value),'threads::shared::av';
- $shared{$self->ptr} = $value;
- weaken($shared{$self->ptr});
- return $self;
-}
-
-package threads::shared::hv;
-use base 'threads::shared';
-use Scalar::Util qw(weaken);
-sub TIEHASH {
- my $class = shift;
- my $value = shift;
- my $self = bless \threads::shared::hv->new($value),'threads::shared::hv';
- $shared{$self->ptr} = $value;
- weaken($shared{$self->ptr});
- return $self;
-}
-
-package threads::shared;
-
$threads::shared::threads_shared = 1;
-bootstrap threads::shared $VERSION;
__END__
* Only one thread at a time is allowed to mess with shared space.
*/
perl_mutex PL_sharedsv_space_mutex; /* Mutex protecting the shared sv space */
+PerlInterpreter *PL_shared_owner; /* For locking assertions */
+
+#define SHARED_LOCK STMT_START { \
+ MUTEX_LOCK(&PL_sharedsv_space_mutex); \
+ PL_shared_owner = aTHX; \
+ } STMT_END
+
+#define SHARED_UNLOCK STMT_START { \
+ PL_shared_owner = NULL; \
+ MUTEX_UNLOCK(&PL_sharedsv_space_mutex); \
+ } STMT_END
-#define SHARED_LOCK MUTEX_LOCK(&PL_sharedsv_space_mutex)
-#define SHARED_UNLOCK MUTEX_UNLOCK(&PL_sharedsv_space_mutex)
/* A common idiom is to acquire access and switch in ... */
#define SHARED_EDIT STMT_START { \
{
shared_sv *shared = (shared_sv *) mg->mg_ptr;
if (shared) {
+ PerlIO_debug(__FUNCTION__ "Free %p\n",shared);
PerlMemShared_free(shared);
mg->mg_ptr = NULL;
}
Perl_sharedsv_find(pTHX_ SV *sv)
{
MAGIC *mg;
- switch(SvTYPE(sv)) {
- case SVt_PVAV:
- case SVt_PVHV:
- if ((mg = mg_find(sv, PERL_MAGIC_tied))
- && mg->mg_virtual == &sharedsv_array_vtbl) {
+ if (SvTYPE(sv) >= SVt_PVMG) {
+ switch(SvTYPE(sv)) {
+ case SVt_PVAV:
+ case SVt_PVHV:
+ if ((mg = mg_find(sv, PERL_MAGIC_tied))
+ && mg->mg_virtual == &sharedsv_array_vtbl) {
return (shared_sv *) mg->mg_ptr;
}
break;
- default:
- if ((mg = mg_find(sv, PERL_MAGIC_shared_scalar))
- && mg->mg_virtual == &sharedsv_scalar_vtbl) {
+ default:
+ if ((mg = mg_find(sv, PERL_MAGIC_shared_scalar))
+ && mg->mg_virtual == &sharedsv_scalar_vtbl) {
return (shared_sv *) mg->mg_ptr;
+ }
+ break;
}
}
return NULL;
{
/* First try and get global data structure */
dTHXc;
- MAGIC *mg;
+ MAGIC *mg = 0;
SV *sv;
- if (aTHX == PL_sharedsv_space) {
- croak("panic:Cannot associate from within shared space");
- }
- SHARED_LOCK;
+
+ /* If we are asked for an private ops we need a thread */
+ assert ( aTHX != PL_sharedsv_space );
+
+ /* To avoid need for recursive locks require caller to hold lock */
+ if ( PL_shared_owner != aTHX )
+ abort();
+ assert ( PL_shared_owner == aTHX );
/* Try shared SV as 1st choice */
- if (!data && ssv) {
+ if (!data && ssv && SvTYPE(ssv) >= SVt_PVMG) {
if (mg = mg_find(ssv, PERL_MAGIC_ext)) {
data = (shared_sv *) mg->mg_ptr;
}
}
/* Next try private SV */
if (!data && psv && *psv) {
- data = Perl_sharedsv_find(aTHX_ *psv);
+ data = Perl_sharedsv_find(aTHX,*psv);
}
/* If neither of those then create a new one */
if (!data) {
}
/* Finally if private SV exists check and add magic */
- if (psv && *psv) {
- SV *sv = *psv;
- MAGIC *mg;
+ if (psv && (sv = *psv)) {
+ MAGIC *mg = 0;
switch(SvTYPE(sv)) {
case SVt_PVAV:
case SVt_PVHV:
if (!(mg = mg_find(sv, PERL_MAGIC_tied))
|| mg->mg_virtual != &sharedsv_array_vtbl) {
+ SV *obj = newSV(0);
+ sv_setref_iv(obj, "threads::shared::tie",PTR2IV(data));
if (mg)
sv_unmagic(sv, PERL_MAGIC_tied);
- mg = sv_magicext(sv, sv, PERL_MAGIC_tied, &sharedsv_array_vtbl,
+ mg = sv_magicext(sv, obj, PERL_MAGIC_tied, &sharedsv_array_vtbl,
(char *) data, 0);
mg->mg_flags |= (MGf_COPY|MGf_DUP);
+ SvREFCNT_inc(SHAREDSvPTR(data));
+ PerlIO_debug(__FUNCTION__ " %p %d\n",data,SvREFCNT(SHAREDSvPTR(data)));
+ SvREFCNT_dec(obj);
}
break;
default:
- if (!(mg = mg_find(sv, PERL_MAGIC_shared_scalar)) ||
+ if (SvTYPE(sv) < SVt_PVMG || !(mg = mg_find(sv, PERL_MAGIC_shared_scalar)) ||
mg->mg_virtual != &sharedsv_scalar_vtbl) {
if (mg)
sv_unmagic(sv, PERL_MAGIC_shared_scalar);
mg = sv_magicext(sv, Nullsv, PERL_MAGIC_shared_scalar,
&sharedsv_scalar_vtbl, (char *)data, 0);
mg->mg_flags |= (MGf_COPY|MGf_DUP);
+ SvREFCNT_inc(SHAREDSvPTR(data));
+ PerlIO_debug(__FUNCTION__ " %p %d\n",data,SvREFCNT(SHAREDSvPTR(data)));
}
break;
}
}
- SHARED_UNLOCK;
return data;
}
break;
default:
+ SHARED_LOCK;
Perl_sharedsv_associate(aTHX_ &sv, 0, 0);
+ SHARED_UNLOCK;
+ SvSETMAGIC(sv);
+ break;
}
}
shared_sv *shared = (shared_sv *) mg->mg_ptr;
SHARED_LOCK;
- SvOK_off(sv);
if (SHAREDSvPTR(shared)) {
if (SvROK(SHAREDSvPTR(shared))) {
- SV *rv = newRV(Nullsv);
- Perl_sharedsv_associate(aTHX_ &SvRV(rv), SvRV(SHAREDSvPTR(shared)), NULL);
- sv_setsv(sv, rv);
+ SV *obj = Nullsv;
+ Perl_sharedsv_associate(aTHX_ &obj, SvRV(SHAREDSvPTR(shared)), NULL);
+ sv_setsv_nomg(sv, &PL_sv_undef);
+ SvRV(sv) = obj;
+ SvROK_on(sv);
}
else {
- sv_setsv(sv, SHAREDSvPTR(shared));
+ sv_setsv_nomg(sv, SHAREDSvPTR(shared));
}
}
SHARED_UNLOCK;
sharedsv_scalar_mg_set(pTHX_ SV *sv, MAGIC *mg)
{
dTHXc;
- shared_sv *shared = Perl_sharedsv_associate(aTHX_ &sv, Nullsv,
- (shared_sv *) mg->mg_ptr);
+ shared_sv *shared;
bool allowed = TRUE;
+ SHARED_LOCK;
+ shared = Perl_sharedsv_associate(aTHX_ &sv, Nullsv, (shared_sv *) mg->mg_ptr);
- SHARED_EDIT;
if (SvROK(sv)) {
shared_sv* target = Perl_sharedsv_find(aTHX_ SvRV(sv));
if (target) {
- SV *tmp = newRV(SHAREDSvPTR(target));
- sv_setsv(SHAREDSvPTR(shared), tmp);
+ SV *tmp;
+ SHARED_CONTEXT;
+ tmp = newRV(SHAREDSvPTR(target));
+ sv_setsv_nomg(SHAREDSvPTR(shared), tmp);
SvREFCNT_dec(tmp);
+ CALLER_CONTEXT;
}
else {
allowed = FALSE;
}
}
else {
- sv_setsv(SHAREDSvPTR(shared), sv);
+ SHARED_CONTEXT;
+ sv_setsv_nomg(SHAREDSvPTR(shared), sv);
+ CALLER_CONTEXT;
}
SHARED_RELEASE;
int
sharedsv_scalar_mg_free(pTHX_ SV *sv, MAGIC *mg)
{
- Perl_sharedsv_free(aTHX_ (shared_sv *) mg->mg_ptr);
+ shared_sv *shared = (shared_sv *) mg->mg_ptr;
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared))-1);
+ assert (SvREFCNT(SHAREDSvPTR(shared)) < 1000);
+ Perl_sharedsv_free(aTHX_ shared);
+ return 0;
+}
+
+int
+sharedsv_scalar_mg_clear(pTHX_ SV *sv, MAGIC *mg)
+{
+ shared_sv *shared = (shared_sv *) mg->mg_ptr;
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared)));
return 0;
}
if (shared) {
SvREFCNT_inc(SHAREDSvPTR(shared));
}
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared)));
return 0;
}
sharedsv_scalar_mg_get, /* get */
sharedsv_scalar_mg_set, /* set */
0, /* len */
- 0, /* clear */
+ sharedsv_scalar_mg_clear, /* clear */
sharedsv_scalar_mg_free, /* free */
0, /* copy */
sharedsv_scalar_mg_dup /* dup */
shared_sv *target = Perl_sharedsv_find(aTHX_ sv);
SV** svp;
+ assert ( shared );
+ assert ( SHAREDSvPTR(shared) );
+
SHARED_EDIT;
if (SvTYPE(SHAREDSvPTR(shared)) == SVt_PVAV) {
- svp = av_fetch((AV*) SHAREDSvPTR(shared), mg->mg_len, 0);
+ assert ( mg->mg_ptr == 0 );
+ svp = av_fetch((AV*) SHAREDSvPTR(shared), mg->mg_len, 0);
}
else {
+ assert ( mg->mg_ptr != 0 );
svp = hv_fetch((HV*) SHAREDSvPTR(shared), mg->mg_ptr, mg->mg_len, 0);
}
if (svp) {
- if (SHAREDSvPTR(target) != *svp) {
- if (SHAREDSvPTR(target)) {
- SvREFCNT_dec(SHAREDSvPTR(target));
+ if (target) {
+ if (SHAREDSvPTR(target) != *svp) {
+ if (SHAREDSvPTR(target)) {
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared)));
+ SvREFCNT_dec(SHAREDSvPTR(target));
+ }
+ SHAREDSvPTR(target) = SvREFCNT_inc(*svp);
}
- SHAREDSvPTR(target) = SvREFCNT_inc(*svp);
+ }
+ else {
+ CALLER_CONTEXT;
+ Perl_sharedsv_associate(aTHX_ &sv, *svp, 0);
+ SHARED_CONTEXT;
}
}
- else {
+ else if (target) {
if (SHAREDSvPTR(target)) {
SvREFCNT_dec(SHAREDSvPTR(target));
}
{
dTHXc;
shared_sv *shared = Perl_sharedsv_find(aTHX_ mg->mg_obj);
- shared_sv *target = Perl_sharedsv_associate(aTHX_ &sv, Nullsv, 0);
+ shared_sv *target;
+ SV *val;
/* Theory - SV itself is magically shared - and we have ordered the
magic such that by the time we get here it has been stored
to its shared counterpart
*/
- SHARED_EDIT;
+ SHARED_LOCK;
+ target = Perl_sharedsv_associate(aTHX_ &sv, Nullsv, 0);
+ SHARED_CONTEXT;
+ val = SHAREDSvPTR(target);
if (SvTYPE(SHAREDSvPTR(shared)) == SVt_PVAV) {
- av_store((AV*) SHAREDSvPTR(shared), mg->mg_len, SHAREDSvPTR(target));
+ av_store((AV*) SHAREDSvPTR(shared), mg->mg_len, SvREFCNT_inc(val));
}
else {
hv_store((HV*) SHAREDSvPTR(shared), mg->mg_ptr, mg->mg_len,
- SHAREDSvPTR(target), 0);
+ SvREFCNT_inc(val), 0);
}
SHARED_RELEASE;
return 0;
{
shared_sv *shared = Perl_sharedsv_find(aTHX_ mg->mg_obj);
SvREFCNT_inc(SHAREDSvPTR(shared));
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared)));
mg->mg_flags |= MGf_DUP;
return 0;
}
MAGIC *nmg = sv_magicext(nsv,mg->mg_obj,
toLOWER(mg->mg_type),&sharedsv_elem_vtbl,
name, namlen);
+ SvREFCNT_inc(SHAREDSvPTR(shared));
nmg->mg_flags |= MGf_DUP;
#if 0
/* Maybe do this to associate shared value immediately ? */
{
shared_sv *shared = (shared_sv *) mg->mg_ptr;
SvREFCNT_inc(SHAREDSvPTR(shared));
+ PerlIO_debug(__FUNCTION__ " %p %d\n",shared,SvREFCNT(SHAREDSvPTR(shared)));
mg->mg_flags |= MGf_DUP;
return 0;
}
CODE:
dTHXc;
int i;
- SHARED_LOCK;
for(i = 1; i < items; i++) {
SV* tmp = newSVsv(ST(i));
- shared_sv *target = Perl_sharedsv_associate(aTHX_ &tmp, Nullsv, 0);
+ shared_sv *target;
+ SHARED_LOCK;
+ target = Perl_sharedsv_associate(aTHX_ &tmp, Nullsv, 0);
SHARED_CONTEXT;
av_push((AV*) SHAREDSvPTR(shared), SHAREDSvPTR(target));
- CALLER_CONTEXT;
+ SHARED_RELEASE;
SvREFCNT_dec(tmp);
}
- SHARED_UNLOCK;
void
UNSHIFT(shared_sv *shared, ...)
PROTOTYPES: ENABLE
void
+_thrcnt(SV *ref)
+ PROTOTYPE: \[$@%]
+CODE:
+ shared_sv *shared;
+ if(SvROK(ref))
+ ref = SvRV(ref);
+ if (shared = Perl_sharedsv_find(aTHX_ ref)) {
+ if (SHAREDSvPTR(shared)) {
+ ST(0) = sv_2mortal(newSViv(SvREFCNT(SHAREDSvPTR(shared))));
+ XSRETURN(1);
+ }
+ else {
+ Perl_warn(aTHX_ "%_ s=%p has no shared SV",ST(0),shared);
+ }
+ }
+ else {
+ Perl_warn(aTHX_ "%_ is not shared",ST(0));
+ }
+ XSRETURN_UNDEF;
+
+void
+share(SV *ref)
+ PROTOTYPE: \[$@%]
+ CODE:
+ if(SvROK(ref))
+ ref = SvRV(ref);
+ Perl_sharedsv_share(aTHX, ref);
+
+void
lock_enabled(SV *ref)
PROTOTYPE: \[$@%]
CODE:
perl_mutex mutex; /* mutex for updating things in this struct */
I32 count; /* how many SVs have a reference to us */
signed char detached; /* are we detached ? */
+ int gimme; /* Context of create */
SV* init_function; /* Code to run */
SV* params; /* args to pass function */
#ifdef WIN32
XPUSHs(av_shift(params));
}
PUTBACK;
- call_sv(thread->init_function, G_DISCARD|G_EVAL);
+ len = call_sv(thread->init_function, thread->gimme|G_EVAL);
SPAGAIN;
+ for (i=len-1; i >= 0; i--) {
+ SV *sv = POPs;
+ av_store(params, i, SvREFCNT_inc(sv));
+ }
+ PUTBACK;
+ if (SvTRUE(ERRSV)) {
+ Perl_warn(aTHX_ "Died:%_",ERRSV);
+ }
FREETMPS;
LEAVE;
- SvREFCNT_dec(thread->params);
SvREFCNT_dec(thread->init_function);
}
PerlIO_flush((PerlIO*)NULL);
MUTEX_LOCK(&thread->mutex);
- if (thread->detached == 1) {
+ if (thread->detached & 1) {
MUTEX_UNLOCK(&thread->mutex);
+ SvREFCNT_dec(thread->params);
+ thread->params = Nullsv;
Perl_ithread_destruct(aTHX_ thread);
} else {
+ thread->detached |= 4;
MUTEX_UNLOCK(&thread->mutex);
}
#ifdef WIN32
thread->count = 1;
MUTEX_INIT(&thread->mutex);
thread->tid = tid_counter++;
- thread->detached = 0;
+ thread->gimme = GIMME_V;
+ thread->detached = (thread->gimme == G_VOID) ? 1 : 0;
/* "Clone" our interpreter into the thread's interpreter
* This gives thread access to "static data" and code.
#endif
/* perl_clone leaves us in new interpreter's context.
As it is tricky to spot implcit aTHX create a new scope
- with aTHX matching the context for the duration of
+ with aTHX matching the context for the duration of
our work for new interpreter.
*/
{
{
ithread *thread = SV_to_ithread(aTHX_ obj);
MUTEX_LOCK(&thread->mutex);
- if (!thread->detached) {
+ if (thread->detached & 1) {
+ MUTEX_UNLOCK(&thread->mutex);
+ Perl_croak(aTHX_ "Cannot join a detached thread");
+ }
+ else if (thread->detached & 2) {
+ MUTEX_UNLOCK(&thread->mutex);
+ Perl_croak(aTHX_ "Thread already joined");
+ }
+ else {
#ifdef WIN32
DWORD waitcode;
#else
#else
pthread_join(thread->thr,&retval);
#endif
- /* We have finished with it */
MUTEX_LOCK(&thread->mutex);
- thread->detached = 2;
+ /* sv_dup over the args */
+ /* We have finished with it */
+ thread->detached |= 2;
MUTEX_UNLOCK(&thread->mutex);
sv_unmagic(SvRV(obj),PERL_MAGIC_shared_scalar);
}
- else {
- MUTEX_UNLOCK(&thread->mutex);
- Perl_croak(aTHX_ "Cannot join a detached thread");
- }
}
void