Added queue.pm and test case, still disabled because of discovered race (or am I...
Artur Bergman [Tue, 16 Apr 2002 21:26:53 +0000 (21:26 +0000)]
in the locking code. All threaded code seems to fail mysteriusly from the PL_utf8_idstart
stuff being 0xabababab on cleanup.

p4raw-id: //depot/perl@15956

MANIFEST
ext/threads/shared/queue.pm [new file with mode: 0644]
ext/threads/shared/t/queue.t [new file with mode: 0644]

index 6a4044e..4bdd765 100644 (file)
--- a/MANIFEST
+++ b/MANIFEST
@@ -647,7 +647,9 @@ ext/threads/shared/t/no_share.t     Tests for disabled share on variables.
 ext/threads/shared/t/shared_attr.t     Test :shared attribute
 ext/threads/shared/t/sv_refs.t thread shared variables
 ext/threads/shared/t/sv_simple.t       thread shared variables
+ext/threads/shared/t/queue.t   thread shared variables
 ext/threads/shared/typemap     thread::shared types
+ext/threads/shared/queue.pm    Threadsafe queue.
 ext/threads/t/basic.t          ithreads
 ext/threads/t/end.t            Test end functions
 ext/threads/t/join.t           Testing the join function
diff --git a/ext/threads/shared/queue.pm b/ext/threads/shared/queue.pm
new file mode 100644 (file)
index 0000000..e849c62
--- /dev/null
@@ -0,0 +1,46 @@
+
+package threads::shared::queue;
+
+use threads::shared;
+use strict;
+
+sub new {
+    my $class = shift;
+    my @q : shared = @_;
+    my $q = \@q;
+    return bless $q, $class;
+}
+
+sub dequeue  {
+    my $q = shift;
+    lock(@$q);
+    until(@$q) {
+       cond_wait(@$q);
+    }
+    return shift @$q;
+}
+
+sub dequeue_nb {
+  my $q = shift;
+  lock(@$q);
+  if (@$q) {
+    return shift @$q;
+  } else {
+    return undef;
+  }
+}
+
+sub enqueue {
+    my $q = shift;
+    lock(@$q);
+    push(@$q, @_) and cond_broadcast @$q;
+}
+
+sub pending  {
+  my $q = shift;
+  lock(@$q);
+  return scalar(@$q);
+}
+
+1;
+
diff --git a/ext/threads/shared/t/queue.t b/ext/threads/shared/t/queue.t
new file mode 100644 (file)
index 0000000..1b255b7
--- /dev/null
@@ -0,0 +1,59 @@
+
+
+BEGIN {
+#    chdir 't' if -d 't';
+#    push @INC ,'../lib';
+#    require Config; import Config;
+#    unless ($Config{'useithreads'}) {
+        print "1..0 # Skip: might still hang\n";
+        exit 0;
+#    }
+}
+
+
+use threads;
+use threads::queue;
+
+$q = new threads::shared::queue;
+
+sub reader {
+    my $tid = threads->self->tid;
+    my $i = 0;
+    while (1) {
+       $i++;
+#      print "reader (tid $tid): waiting for element $i...\n";
+       my $el = $q->dequeue;
+#      print "reader (tid $tid): dequeued element $i: value $el\n";
+       select(undef, undef, undef, rand(2));
+       if ($el == -1) {
+           # end marker
+#          print "reader (tid $tid) returning\n";
+           return;
+       }
+    }
+}
+
+my $nthreads = 1;
+my @threads;
+
+for (my $i = 0; $i < $nthreads; $i++) {
+    push @threads, threads->new(\&reader, $i);
+}
+
+for (my $i = 1; $i <= 10; $i++) {
+    my $el = int(rand(100));
+    select(undef, undef, undef, rand(2));
+#    print "writer: enqueuing value $el\n";
+    $q->enqueue($el);
+}
+
+$q->enqueue((-1) x $nthreads); # one end marker for each thread
+
+for(@threads) {
+       print "waiting for join\n";
+       $_->join();
+       
+}
+
+
+