use strict;
use warnings;
-our $VERSION = '2.06';
+our $VERSION = '2.07';
use threads::shared 0.96;
-use Scalar::Util 1.10 qw(looks_like_number);
+use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
# Predeclarations for internal functions
my ($make_shared, $validate_count, $validate_index);
sub new
{
my $class = shift;
- my @queue :shared = map { $make_shared->($_) } @_;
+ my @queue :shared = map { $make_shared->($_, {}) } @_;
return bless(\@queue, $class);
}
{
my $queue = shift;
lock(@$queue);
- push(@$queue, map { $make_shared->($_) } @_)
+ push(@$queue, map { $make_shared->($_, {}) } @_)
and cond_signal(@$queue);
}
}
# Add new items to the queue
- push(@$queue, map { $make_shared->($_) } @_);
+ push(@$queue, map { $make_shared->($_, {}) } @_);
# Add previous items back onto the queue
push(@$queue, @tmp);
# Create a thread-shared version of a complex data structure or object
$make_shared = sub {
- my $item = shift;
+ my ($item, $cloned) = @_;
# If not running 'threads' or already thread-shared,
# then just return the input item
# Make copies of array, hash and scalar refs
my $copy;
- if (my $ref_type = Scalar::Util::reftype($item)) {
+ if (my $ref_type = reftype($item)) {
+ # Check for previously cloned references
+ # (this takes care of circular refs as well)
+ my $addr = refaddr($item);
+ if (defined($addr) && exists($cloned->{$addr})) {
+ # Return the already existing clone
+ return $cloned->{$addr};
+ }
+
# Copy an array ref
if ($ref_type eq 'ARRAY') {
# Make empty shared array ref
$copy = &share([]);
+ # Add to clone checking hash
+ $cloned->{$addr} = $copy;
# Recursively copy and add contents
- push(@$copy, map { $make_shared->($_) } @$item);
+ push(@$copy, map { $make_shared->($_, $cloned) } @$item);
}
# Copy a hash ref
elsif ($ref_type eq 'HASH') {
# Make empty shared hash ref
$copy = &share({});
+ # Add to clone checking hash
+ $cloned->{$addr} = $copy;
# Recursively copy and add contents
foreach my $key (keys(%{$item})) {
- $copy->{$key} = $make_shared->($item->{$key});
+ $copy->{$key} = $make_shared->($item->{$key}, $cloned);
}
}
if (Internals::SvREADONLY($$item)) {
Internals::SvREADONLY($$copy, 1);
}
+ # Add to clone checking hash
+ $cloned->{$addr} = $copy;
}
# Copy of a ref of a ref
elsif ($ref_type eq 'REF') {
- my $tmp = $make_shared->($$item);
- $copy = \$tmp;
- share($copy);
+ # Special handling for $x = \$x
+ my $addr2 = refaddr($$item);
+ if ($addr2 == $addr) {
+ $copy = \$copy;
+ share($copy);
+ $cloned->{$addr} = $copy;
+ } else {
+ my $tmp;
+ $copy = \$tmp;
+ share($copy);
+ # Add to clone checking hash
+ $cloned->{$addr} = $copy;
+ # Recursively copy and add contents
+ $tmp = $make_shared->($$item, $cloned);
+ }
}
}
# other than an ordinary scalar
return $item if (! defined($copy));
+ # If input item is an object, then bless the copy into the same class
+ if (my $class = blessed($item)) {
+ bless($copy, $class);
+ }
+
# Clone READONLY flag
if (Internals::SvREADONLY($item)) {
Internals::SvREADONLY($copy, 1);
}
- # If input item is an object, then bless the copy into the same class
- if (my $class = Scalar::Util::blessed($item)) {
- bless($copy, $class);
- }
-
return $copy;
};
=head1 VERSION
-This document describes Thread::Queue version 2.06
+This document describes Thread::Queue version 2.07
=head1 SYNOPSIS
L<http://www.cpanforum.com/dist/Thread-Queue>
Annotated POD for Thread::Queue:
-L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm>
+L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm>
Source repository:
L<http://code.google.com/p/thread-queue/>
require Test::More;
}
Test::More->import();
-plan('tests' => 39);
+plan('tests' => 45);
# Regular array
my @ary1 = qw/foo bar baz/;
my $qux = \$baz;
is_deeply($$$$qux, $foo, 'Ref of ref');
+# Circular refs
+my $cir1;
+$cir1 = \$cir1;
+
+my $cir1s : shared;
+$cir1s = \$cir1s;
+
+my $cir2;
+$cir2 = [ \$cir2, { 'ref' => \$cir2 } ];
+
+my $cir3 :shared = &share({});
+$cir3->{'self'} = \$cir3;
+bless($cir3, 'Circular');
+
# Queue up items
my $q = Thread::Queue->new(\@ary1, \@ary2);
ok($q, 'New queue');
is($q->pending(), 4, 'Queue count');
$q->enqueue($sref1, $sref2, $qux);
is($q->pending(), 7, 'Queue count');
+$q->enqueue($cir1, $cir1s, $cir2, $cir3);
+is($q->pending(), 11, 'Queue count');
# Process items in thread
threads->create(sub {
- is($q->pending(), 7, 'Queue count in thread');
+ is($q->pending(), 11, 'Queue count in thread');
my $tary1 = $q->dequeue();
ok($tary1, 'Thread got item');
my $qux = $q->dequeue();
is_deeply($$$$qux, $foo, 'Ref of ref');
+ my ($c1, $c1s, $c2, $c3) = $q->dequeue(4);
+ SKIP: {
+ skip("Needs threads::shared >= 1.19", 5)
+ if ($threads::shared::VERSION < 1.19);
+
+ is(threads::shared::_id($$c1),
+ threads::shared::_id($c1),
+ 'Circular ref - scalar');
+
+ is(threads::shared::_id($$c1s),
+ threads::shared::_id($c1s),
+ 'Circular ref - shared scalar');
+
+ is(threads::shared::_id(${$c2->[0]}),
+ threads::shared::_id($c2),
+ 'Circular ref - array');
+
+ is(threads::shared::_id(${$c2->[1]->{'ref'}}),
+ threads::shared::_id($c2),
+ 'Circular ref - mixed');
+
+ is(threads::shared::_id(${$c3->{'self'}}),
+ threads::shared::_id($c3),
+ 'Circular ref - hash');
+ }
+
is($q->pending(), 0, 'Empty queue');
my $nothing = $q->dequeue_nb();
ok(! defined($nothing), 'Nothing on queue');