8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number);
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
14 # Create a new queue possibly pre-populated with items
18 my @queue :shared = map { $make_shared->($_) } @_;
19 return bless(\@queue, $class);
22 # Add items to the tail of a queue
27 push(@$queue, map { $make_shared->($_) } @_)
28 and cond_signal(@$queue);
31 # Return a count of the number of items on a queue
36 return scalar(@$queue);
39 # Return 1 or more items from the head of a queue, blocking if needed
45 my $count = @_ ? $validate_count->(shift) : 1;
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
52 return shift(@$queue) if ($count == 1);
54 # Return multiple items
56 push(@items, shift(@$queue)) for (1..$count);
60 # Return items from the head of a queue with no blocking
66 my $count = @_ ? $validate_count->(shift) : 1;
69 return shift(@$queue) if ($count == 1);
71 # Return multiple items
75 push(@items, shift(@$queue));
80 # Return an item without removing it from a queue
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
89 # Insert items anywhere into a queue
95 my $index = $validate_index->(shift);
97 return if (! @_); # Nothing to insert
99 # Support negative indices
107 # Dequeue items from $index onward
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
113 # Add new items to the queue
114 push(@$queue, map { $make_shared->($_) } @_);
116 # Add previous items back onto the queue
120 cond_signal(@$queue);
123 # Remove items from anywhere in a queue
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
132 # Support negative indices
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
142 # Dequeue items from $index+$count onward
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
148 # Extract desired items
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
152 # Add back any removed items
156 return $items[0] if ($count == 1);
158 # Return multiple items
162 ### Internal Functions ###
164 # Create a thread-shared version of a complex data structure or object
168 # If already thread-shared, then just return the input item
169 return $item if (threads::shared::is_shared($item));
171 # Make copies of array, hash and scalar refs
173 if (my $ref_type = Scalar::Util::reftype($item)) {
175 if ($ref_type eq 'ARRAY') {
176 # Make empty shared array ref
178 # Recursively copy and add contents
179 push(@$copy, map { $make_shared->($_) } @$item);
183 elsif ($ref_type eq 'HASH') {
184 # Make empty shared hash ref
186 # Recursively copy and add contents
187 foreach my $key (keys(%{$item})) {
188 $copy->{$key} = $make_shared->($item->{$key});
193 elsif ($ref_type eq 'SCALAR') {
194 $copy = \do{ my $scalar = $$item; };
196 # Clone READONLY flag
197 if (Internals::SvREADONLY($$item)) {
198 Internals::SvREADONLY($$copy, 1);
202 # Copy of a ref of a ref
203 elsif ($ref_type eq 'REF') {
204 my $tmp = $make_shared->($$item);
210 # If no copy is created above, then just return the input item
211 # NOTE: This will end up generating an error for anything
212 # other than an ordinary scalar
213 return $item if (! defined($copy));
215 # Clone READONLY flag
216 if (Internals::SvREADONLY($item)) {
217 Internals::SvREADONLY($copy, 1);
220 # If input item is an object, then bless the copy into the same class
221 if (my $class = Scalar::Util::blessed($item)) {
222 bless($copy, $class);
228 # Check value of the requested index
229 $validate_index = sub {
232 if (! looks_like_number($index) || (int($index) != $index)) {
234 my ($method) = (caller(1))[3];
235 $method =~ s/Thread::Queue:://;
236 $index = 'undef' if (! defined($index));
237 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
243 # Check value of the requested count
244 $validate_count = sub {
247 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
249 my ($method) = (caller(1))[3];
250 $method =~ s/Thread::Queue:://;
251 $count = 'undef' if (! defined($count));
252 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
262 Thread::Queue - Thread-safe queues
266 This document describes Thread::Queue version 2.03
276 my $q = Thread::Queue->new(); # A new empty queue
279 my $thr = threads->create(sub {
280 while (my $item = $q->dequeue()) {
285 # Send work to the thread
286 $q->enqueue($item1, ...);
289 # Count of items in the queue
290 my $left = $q->pending();
292 # Non-blocking dequeue
293 if (defined(my $item = $q->dequeue_nb())) {
297 # Get the second item in the queue without dequeuing anything
298 my $item = $q->peek(1);
300 # Insert two items into the queue just behind the head
301 $q->insert(1, $item1, $item2);
303 # Extract the last two items on the queue
304 my ($item1, $item2) = $q->extract(-2, 2);
308 This module provides thread-safe FIFO queues that can be accessed safely by
309 any number of threads.
311 Any data types supported by L<threads::shared> can be passed via queues:
315 =item Ordinary scalars
323 =item Objects based on the above
327 Ordinary scalars are added to queues as they are.
329 If not already thread-shared, the other complex data types will be cloned
330 (recursively, if needed, and including any C<bless>ings and read-only
331 settings) into thread-shared structures before being placed onto a queue.
333 For example, the following would cause L<Thread::Queue> to create a empty,
334 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
335 and 'baz' from C<@ary> into it, and then place that shared reference onto
338 my @ary = qw/foo bar baz/;
341 However, for the following, the items are already shared, so their references
342 are added directly to the queue, and no cloning takes place:
344 my @ary :shared = qw/foo bar baz/;
347 my $obj = &shared({});
348 $$obj{'foo'} = 'bar';
350 bless($obj, 'My::Class');
353 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
355 =head1 QUEUE CREATION
361 Creates a new empty queue.
365 Creates a new queue pre-populated with the provided list of items.
371 The following methods deal with queues on a FIFO basis.
375 =item ->enqueue(LIST)
377 Adds a list of items onto the end of the queue.
381 =item ->dequeue(COUNT)
383 Removes the requested number of items (default is 1) from the head of the
384 queue, and returns them. If the queue contains fewer than the requested
385 number of items, then the thread will be blocked until the requisite number
386 of items are available (i.e., until other threads <enqueue> more items).
390 =item ->dequeue_nb(COUNT)
392 Removes the requested number of items (default is 1) from the head of the
393 queue, and returns them. If the queue contains fewer than the requested
394 number of items, then it immediately (i.e., non-blocking) returns whatever
395 items there are on the queue. If the queue is empty, then C<undef> is
400 Returns the number of items still in the queue.
404 =head1 ADVANCED METHODS
406 The following methods can be used to manipulate items anywhere in a queue.
408 To prevent the contents of a queue from being modified by another thread
409 while it is being examined and/or changed, L<lock|threads::shared/"lock
410 VARIABLE"> the queue inside a local block:
413 lock($q); # Keep other threads from changing the queue's contents
414 my $item = $q->peek();
419 # Queue is now unlocked
427 Returns an item from the queue without dequeuing anything. Defaults to the
428 the head of queue (at index position 0) if no index is specified. Negative
429 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
430 is the end of the queue, -2 is next to last, and so on).
432 If no items exists at the specified index (i.e., the queue is empty, or the
433 index is beyond the number of items on the queue), then C<undef> is returned.
435 Remember, the returned item is not removed from the queue, so manipulating a
436 C<peek>ed at reference affects the item on the queue.
438 =item ->insert(INDEX, LIST)
440 Adds the list of items to the queue at the specified index position (0
441 is the head of the list). Any existing items at and beyond that position are
442 pushed back past the newly added items:
444 $q->enqueue(1, 2, 3, 4);
445 $q->insert(1, qw/foo bar/);
446 # Queue now contains: 1, foo, bar, 2, 3, 4
448 Specifying an index position greater than the number of items in the queue
449 just adds the list to the end.
451 Negative index positions are supported:
453 $q->enqueue(1, 2, 3, 4);
454 $q->insert(-2, qw/foo bar/);
455 # Queue now contains: 1, 2, foo, bar, 3, 4
457 Specifying a negative index position greater than the number of items in the
458 queue adds the list to the head of the queue.
462 =item ->extract(INDEX)
464 =item ->extract(INDEX, COUNT)
466 Removes and returns the specified number of items (defaults to 1) from the
467 specified index position in the queue (0 is the head of the queue). When
468 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
470 This method is non-blocking, and will return only as many items as are
471 available to fulfill the request:
473 $q->enqueue(1, 2, 3, 4);
474 my $item = $q->extract(2) # Returns 3
475 # Queue now contains: 1, 2, 4
476 my @items = $q->extract(1, 3) # Returns (2, 4)
477 # Queue now contains: 1
479 Specifying an index position greater than the number of items in the
480 queue results in C<undef> or an empty list being returned.
483 my $nada = $q->extract(3) # Returns undef
484 my @nada = $q->extract(1, 3) # Returns ()
486 Negative index positions are supported. Specifying a negative index position
487 greater than the number of items in the queue may return items from the head
488 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
489 queue from the specified position (i.e. if queue size + index + count is
492 $q->enqueue(qw/foo bar baz/);
493 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
494 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
495 # Queue now contains: bar, baz
496 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
502 Passing objects on queues may not work if the objects' classes do not support
503 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
505 Passing array/hash refs that contain objects may not work for Perl prior to
510 Thread::Queue Discussion Forum on CPAN:
511 L<http://www.cpanforum.com/dist/Thread-Queue>
513 Annotated POD for Thread::Queue:
514 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.03/lib/Thread/Queue.pm>
516 L<threads>, L<threads::shared>
520 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
524 This program is free software; you can redistribute it and/or modify it under
525 the same terms as Perl itself.