8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
14 # Create a new queue possibly pre-populated with items
18 my @queue :shared = map { $make_shared->($_, {}) } @_;
19 return bless(\@queue, $class);
22 # Add items to the tail of a queue
27 push(@$queue, map { $make_shared->($_, {}) } @_)
28 and cond_signal(@$queue);
31 # Return a count of the number of items on a queue
36 return scalar(@$queue);
39 # Return 1 or more items from the head of a queue, blocking if needed
45 my $count = @_ ? $validate_count->(shift) : 1;
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
52 return shift(@$queue) if ($count == 1);
54 # Return multiple items
56 push(@items, shift(@$queue)) for (1..$count);
60 # Return items from the head of a queue with no blocking
66 my $count = @_ ? $validate_count->(shift) : 1;
69 return shift(@$queue) if ($count == 1);
71 # Return multiple items
75 push(@items, shift(@$queue));
80 # Return an item without removing it from a queue
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
89 # Insert items anywhere into a queue
95 my $index = $validate_index->(shift);
97 return if (! @_); # Nothing to insert
99 # Support negative indices
107 # Dequeue items from $index onward
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
113 # Add new items to the queue
114 push(@$queue, map { $make_shared->($_, {}) } @_);
116 # Add previous items back onto the queue
120 cond_signal(@$queue);
123 # Remove items from anywhere in a queue
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
132 # Support negative indices
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
142 # Dequeue items from $index+$count onward
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
148 # Extract desired items
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
152 # Add back any removed items
156 return $items[0] if ($count == 1);
158 # Return multiple items
162 ### Internal Functions ###
164 # Create a thread-shared version of a complex data structure or object
166 my ($item, $cloned) = @_;
168 # If not running 'threads' or already thread-shared,
169 # then just return the input item
170 return $item if (! $threads::threads ||
171 threads::shared::is_shared($item));
173 # Make copies of array, hash and scalar refs
175 if (my $ref_type = reftype($item)) {
176 # Check for previously cloned references
177 # (this takes care of circular refs as well)
178 my $addr = refaddr($item);
179 if (defined($addr) && exists($cloned->{$addr})) {
180 # Return the already existing clone
181 return $cloned->{$addr};
185 if ($ref_type eq 'ARRAY') {
186 # Make empty shared array ref
188 # Add to clone checking hash
189 $cloned->{$addr} = $copy;
190 # Recursively copy and add contents
191 push(@$copy, map { $make_shared->($_, $cloned) } @$item);
195 elsif ($ref_type eq 'HASH') {
196 # Make empty shared hash ref
198 # Add to clone checking hash
199 $cloned->{$addr} = $copy;
200 # Recursively copy and add contents
201 foreach my $key (keys(%{$item})) {
202 $copy->{$key} = $make_shared->($item->{$key}, $cloned);
207 elsif ($ref_type eq 'SCALAR') {
208 $copy = \do{ my $scalar = $$item; };
210 # Clone READONLY flag
211 if (Internals::SvREADONLY($$item)) {
212 Internals::SvREADONLY($$copy, 1);
214 # Add to clone checking hash
215 $cloned->{$addr} = $copy;
218 # Copy of a ref of a ref
219 elsif ($ref_type eq 'REF') {
220 # Special handling for $x = \$x
221 my $addr2 = refaddr($$item);
222 if ($addr2 == $addr) {
225 $cloned->{$addr} = $copy;
230 # Add to clone checking hash
231 $cloned->{$addr} = $copy;
232 # Recursively copy and add contents
233 $tmp = $make_shared->($$item, $cloned);
238 # If no copy is created above, then just return the input item
239 # NOTE: This will end up generating an error for anything
240 # other than an ordinary scalar
241 return $item if (! defined($copy));
243 # If input item is an object, then bless the copy into the same class
244 if (my $class = blessed($item)) {
245 bless($copy, $class);
248 # Clone READONLY flag
249 if (Internals::SvREADONLY($item)) {
250 Internals::SvREADONLY($copy, 1);
256 # Check value of the requested index
257 $validate_index = sub {
260 if (! looks_like_number($index) || (int($index) != $index)) {
262 my ($method) = (caller(1))[3];
263 $method =~ s/Thread::Queue:://;
264 $index = 'undef' if (! defined($index));
265 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
271 # Check value of the requested count
272 $validate_count = sub {
275 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
277 my ($method) = (caller(1))[3];
278 $method =~ s/Thread::Queue:://;
279 $count = 'undef' if (! defined($count));
280 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
290 Thread::Queue - Thread-safe queues
294 This document describes Thread::Queue version 2.07
304 my $q = Thread::Queue->new(); # A new empty queue
307 my $thr = threads->create(sub {
308 while (my $item = $q->dequeue()) {
313 # Send work to the thread
314 $q->enqueue($item1, ...);
317 # Count of items in the queue
318 my $left = $q->pending();
320 # Non-blocking dequeue
321 if (defined(my $item = $q->dequeue_nb())) {
325 # Get the second item in the queue without dequeuing anything
326 my $item = $q->peek(1);
328 # Insert two items into the queue just behind the head
329 $q->insert(1, $item1, $item2);
331 # Extract the last two items on the queue
332 my ($item1, $item2) = $q->extract(-2, 2);
336 This module provides thread-safe FIFO queues that can be accessed safely by
337 any number of threads.
339 Any data types supported by L<threads::shared> can be passed via queues:
343 =item Ordinary scalars
351 =item Objects based on the above
355 Ordinary scalars are added to queues as they are.
357 If not already thread-shared, the other complex data types will be cloned
358 (recursively, if needed, and including any C<bless>ings and read-only
359 settings) into thread-shared structures before being placed onto a queue.
361 For example, the following would cause L<Thread::Queue> to create a empty,
362 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
363 and 'baz' from C<@ary> into it, and then place that shared reference onto
366 my @ary = qw/foo bar baz/;
369 However, for the following, the items are already shared, so their references
370 are added directly to the queue, and no cloning takes place:
372 my @ary :shared = qw/foo bar baz/;
375 my $obj = &shared({});
376 $$obj{'foo'} = 'bar';
378 bless($obj, 'My::Class');
381 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
383 =head1 QUEUE CREATION
389 Creates a new empty queue.
393 Creates a new queue pre-populated with the provided list of items.
399 The following methods deal with queues on a FIFO basis.
403 =item ->enqueue(LIST)
405 Adds a list of items onto the end of the queue.
409 =item ->dequeue(COUNT)
411 Removes the requested number of items (default is 1) from the head of the
412 queue, and returns them. If the queue contains fewer than the requested
413 number of items, then the thread will be blocked until the requisite number
414 of items are available (i.e., until other threads <enqueue> more items).
418 =item ->dequeue_nb(COUNT)
420 Removes the requested number of items (default is 1) from the head of the
421 queue, and returns them. If the queue contains fewer than the requested
422 number of items, then it immediately (i.e., non-blocking) returns whatever
423 items there are on the queue. If the queue is empty, then C<undef> is
428 Returns the number of items still in the queue.
432 =head1 ADVANCED METHODS
434 The following methods can be used to manipulate items anywhere in a queue.
436 To prevent the contents of a queue from being modified by another thread
437 while it is being examined and/or changed, L<lock|threads::shared/"lock
438 VARIABLE"> the queue inside a local block:
441 lock($q); # Keep other threads from changing the queue's contents
442 my $item = $q->peek();
447 # Queue is now unlocked
455 Returns an item from the queue without dequeuing anything. Defaults to the
456 the head of queue (at index position 0) if no index is specified. Negative
457 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
458 is the end of the queue, -2 is next to last, and so on).
460 If no items exists at the specified index (i.e., the queue is empty, or the
461 index is beyond the number of items on the queue), then C<undef> is returned.
463 Remember, the returned item is not removed from the queue, so manipulating a
464 C<peek>ed at reference affects the item on the queue.
466 =item ->insert(INDEX, LIST)
468 Adds the list of items to the queue at the specified index position (0
469 is the head of the list). Any existing items at and beyond that position are
470 pushed back past the newly added items:
472 $q->enqueue(1, 2, 3, 4);
473 $q->insert(1, qw/foo bar/);
474 # Queue now contains: 1, foo, bar, 2, 3, 4
476 Specifying an index position greater than the number of items in the queue
477 just adds the list to the end.
479 Negative index positions are supported:
481 $q->enqueue(1, 2, 3, 4);
482 $q->insert(-2, qw/foo bar/);
483 # Queue now contains: 1, 2, foo, bar, 3, 4
485 Specifying a negative index position greater than the number of items in the
486 queue adds the list to the head of the queue.
490 =item ->extract(INDEX)
492 =item ->extract(INDEX, COUNT)
494 Removes and returns the specified number of items (defaults to 1) from the
495 specified index position in the queue (0 is the head of the queue). When
496 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
498 This method is non-blocking, and will return only as many items as are
499 available to fulfill the request:
501 $q->enqueue(1, 2, 3, 4);
502 my $item = $q->extract(2) # Returns 3
503 # Queue now contains: 1, 2, 4
504 my @items = $q->extract(1, 3) # Returns (2, 4)
505 # Queue now contains: 1
507 Specifying an index position greater than the number of items in the
508 queue results in C<undef> or an empty list being returned.
511 my $nada = $q->extract(3) # Returns undef
512 my @nada = $q->extract(1, 3) # Returns ()
514 Negative index positions are supported. Specifying a negative index position
515 greater than the number of items in the queue may return items from the head
516 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
517 queue from the specified position (i.e. if queue size + index + count is
520 $q->enqueue(qw/foo bar baz/);
521 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
522 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
523 # Queue now contains: bar, baz
524 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
530 Queues created by L<Thread::Queue> can be used in both threaded and
531 non-threaded applications.
535 Passing objects on queues may not work if the objects' classes do not support
536 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
538 Passing array/hash refs that contain objects may not work for Perl prior to
543 Thread::Queue Discussion Forum on CPAN:
544 L<http://www.cpanforum.com/dist/Thread-Queue>
546 Annotated POD for Thread::Queue:
547 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm>
550 L<http://code.google.com/p/thread-queue/>
552 L<threads>, L<threads::shared>
556 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
560 This program is free software; you can redistribute it and/or modify it under
561 the same terms as Perl itself.