Thread::Queue 2.07
Jerry D. Hedden [Thu, 8 May 2008 10:05:51 +0000 (06:05 -0400)]
From: "Jerry D. Hedden" <jdhedden@cpan.org>
Message-ID: <1ff86f510805080705p3cc8f657i7a1441da5b0a273b@mail.gmail.com>

p4raw-id: //depot/perl@33808

lib/Thread/Queue.pm
lib/Thread/Queue/t/02_refs.t

index 0d9eb10..dc2b1ed 100644 (file)
@@ -3,10 +3,10 @@ package Thread::Queue;
 use strict;
 use warnings;
 
-our $VERSION = '2.06';
+our $VERSION = '2.07';
 
 use threads::shared 0.96;
-use Scalar::Util 1.10 qw(looks_like_number);
+use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
 
 # Predeclarations for internal functions
 my ($make_shared, $validate_count, $validate_index);
@@ -15,7 +15,7 @@ my ($make_shared, $validate_count, $validate_index);
 sub new
 {
     my $class = shift;
-    my @queue :shared = map { $make_shared->($_) } @_;
+    my @queue :shared = map { $make_shared->($_, {}) } @_;
     return bless(\@queue, $class);
 }
 
@@ -24,7 +24,7 @@ sub enqueue
 {
     my $queue = shift;
     lock(@$queue);
-    push(@$queue, map { $make_shared->($_) } @_)
+    push(@$queue, map { $make_shared->($_, {}) } @_)
         and cond_signal(@$queue);
 }
 
@@ -111,7 +111,7 @@ sub insert
     }
 
     # Add new items to the queue
-    push(@$queue, map { $make_shared->($_) } @_);
+    push(@$queue, map { $make_shared->($_, {}) } @_);
 
     # Add previous items back onto the queue
     push(@$queue, @tmp);
@@ -163,7 +163,7 @@ sub extract
 
 # Create a thread-shared version of a complex data structure or object
 $make_shared = sub {
-    my $item = shift;
+    my ($item, $cloned) = @_;
 
     # If not running 'threads' or already thread-shared,
     #   then just return the input item
@@ -172,22 +172,34 @@ $make_shared = sub {
 
     # Make copies of array, hash and scalar refs
     my $copy;
-    if (my $ref_type = Scalar::Util::reftype($item)) {
+    if (my $ref_type = reftype($item)) {
+        # Check for previously cloned references
+        #   (this takes care of circular refs as well)
+        my $addr = refaddr($item);
+        if (defined($addr) && exists($cloned->{$addr})) {
+            # Return the already existing clone
+            return $cloned->{$addr};
+        }
+
         # Copy an array ref
         if ($ref_type eq 'ARRAY') {
             # Make empty shared array ref
             $copy = &share([]);
+            # Add to clone checking hash
+            $cloned->{$addr} = $copy;
             # Recursively copy and add contents
-            push(@$copy, map { $make_shared->($_) } @$item);
+            push(@$copy, map { $make_shared->($_, $cloned) } @$item);
         }
 
         # Copy a hash ref
         elsif ($ref_type eq 'HASH') {
             # Make empty shared hash ref
             $copy = &share({});
+            # Add to clone checking hash
+            $cloned->{$addr} = $copy;
             # Recursively copy and add contents
             foreach my $key (keys(%{$item})) {
-                $copy->{$key} = $make_shared->($item->{$key});
+                $copy->{$key} = $make_shared->($item->{$key}, $cloned);
             }
         }
 
@@ -199,13 +211,27 @@ $make_shared = sub {
             if (Internals::SvREADONLY($$item)) {
                 Internals::SvREADONLY($$copy, 1);
             }
+            # Add to clone checking hash
+            $cloned->{$addr} = $copy;
         }
 
         # Copy of a ref of a ref
         elsif ($ref_type eq 'REF') {
-            my $tmp = $make_shared->($$item);
-            $copy = \$tmp;
-            share($copy);
+            # Special handling for $x = \$x
+            my $addr2 = refaddr($$item);
+            if ($addr2 == $addr) {
+                $copy = \$copy;
+                share($copy);
+                $cloned->{$addr} = $copy;
+            } else {
+                my $tmp;
+                $copy = \$tmp;
+                share($copy);
+                # Add to clone checking hash
+                $cloned->{$addr} = $copy;
+                # Recursively copy and add contents
+                $tmp = $make_shared->($$item, $cloned);
+            }
         }
     }
 
@@ -214,16 +240,16 @@ $make_shared = sub {
     #        other than an ordinary scalar
     return $item if (! defined($copy));
 
+    # If input item is an object, then bless the copy into the same class
+    if (my $class = blessed($item)) {
+        bless($copy, $class);
+    }
+
     # Clone READONLY flag
     if (Internals::SvREADONLY($item)) {
         Internals::SvREADONLY($copy, 1);
     }
 
-    # If input item is an object, then bless the copy into the same class
-    if (my $class = Scalar::Util::blessed($item)) {
-        bless($copy, $class);
-    }
-
     return $copy;
 };
 
@@ -265,7 +291,7 @@ Thread::Queue - Thread-safe queues
 
 =head1 VERSION
 
-This document describes Thread::Queue version 2.06
+This document describes Thread::Queue version 2.07
 
 =head1 SYNOPSIS
 
@@ -518,7 +544,7 @@ Thread::Queue Discussion Forum on CPAN:
 L<http://www.cpanforum.com/dist/Thread-Queue>
 
 Annotated POD for Thread::Queue:
-L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm>
+L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm>
 
 Source repository:
 L<http://code.google.com/p/thread-queue/>
index 388cc6d..b09eca2 100644 (file)
@@ -23,7 +23,7 @@ if ($] == 5.008) {
     require Test::More;
 }
 Test::More->import();
-plan('tests' => 39);
+plan('tests' => 45);
 
 # Regular array
 my @ary1 = qw/foo bar baz/;
@@ -62,6 +62,20 @@ my $baz = \$bar;
 my $qux = \$baz;
 is_deeply($$$$qux, $foo, 'Ref of ref');
 
+# Circular refs
+my $cir1;
+$cir1 = \$cir1;
+
+my $cir1s : shared;
+$cir1s = \$cir1s;
+
+my $cir2;
+$cir2 = [ \$cir2, { 'ref' => \$cir2 } ];
+
+my $cir3 :shared = &share({});
+$cir3->{'self'} = \$cir3;
+bless($cir3, 'Circular');
+
 # Queue up items
 my $q = Thread::Queue->new(\@ary1, \@ary2);
 ok($q, 'New queue');
@@ -70,10 +84,12 @@ $q->enqueue($obj1, $obj2);
 is($q->pending(), 4, 'Queue count');
 $q->enqueue($sref1, $sref2, $qux);
 is($q->pending(), 7, 'Queue count');
+$q->enqueue($cir1, $cir1s, $cir2, $cir3);
+is($q->pending(), 11, 'Queue count');
 
 # Process items in thread
 threads->create(sub {
-    is($q->pending(), 7, 'Queue count in thread');
+    is($q->pending(), 11, 'Queue count in thread');
 
     my $tary1 = $q->dequeue();
     ok($tary1, 'Thread got item');
@@ -119,6 +135,32 @@ threads->create(sub {
     my $qux = $q->dequeue();
     is_deeply($$$$qux, $foo, 'Ref of ref');
 
+    my ($c1, $c1s, $c2, $c3) = $q->dequeue(4);
+    SKIP: {
+        skip("Needs threads::shared >= 1.19", 5)
+            if ($threads::shared::VERSION < 1.19);
+
+        is(threads::shared::_id($$c1),
+           threads::shared::_id($c1),
+                'Circular ref - scalar');
+
+        is(threads::shared::_id($$c1s),
+           threads::shared::_id($c1s),
+                'Circular ref - shared scalar');
+
+        is(threads::shared::_id(${$c2->[0]}),
+           threads::shared::_id($c2),
+                'Circular ref - array');
+
+        is(threads::shared::_id(${$c2->[1]->{'ref'}}),
+           threads::shared::_id($c2),
+                'Circular ref - mixed');
+
+        is(threads::shared::_id(${$c3->{'self'}}),
+           threads::shared::_id($c3),
+                'Circular ref - hash');
+    }
+
     is($q->pending(), 0, 'Empty queue');
     my $nothing = $q->dequeue_nb();
     ok(! defined($nothing), 'Nothing on queue');