8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number);
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
14 # Create a new queue possibly pre-populated with items
18 my @queue :shared = map { $make_shared->($_) } @_;
19 return bless(\@queue, $class);
22 # Add items to the tail of a queue
27 push(@$queue, map { $make_shared->($_) } @_)
28 and cond_signal(@$queue);
31 # Return a count of the number of items on a queue
36 return scalar(@$queue);
39 # Return 1 or more items from the head of a queue, blocking if needed
45 my $count = @_ ? $validate_count->(shift) : 1;
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
52 return shift(@$queue) if ($count == 1);
54 # Return multiple items
56 push(@items, shift(@$queue)) for (1..$count);
60 # Return items from the head of a queue with no blocking
66 my $count = @_ ? $validate_count->(shift) : 1;
69 return shift(@$queue) if ($count == 1);
71 # Return multiple items
75 push(@items, shift(@$queue));
80 # Return an item without removing it from a queue
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
89 # Insert items anywhere into a queue
95 my $index = $validate_index->(shift);
97 return if (! @_); # Nothing to insert
99 # Support negative indices
107 # Dequeue items from $index onward
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
113 # Add new items to the queue
114 push(@$queue, map { $make_shared->($_) } @_);
116 # Add previous items back onto the queue
120 cond_signal(@$queue);
123 # Remove items from anywhere in a queue
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
132 # Support negative indices
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
142 # Dequeue items from $index+$count onward
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
148 # Extract desired items
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
152 # Add back any removed items
156 return $items[0] if ($count == 1);
158 # Return multiple items
162 ### Internal Functions ###
164 # Create a thread-shared version of a complex data structure or object
168 # If not running 'threads' or already thread-shared,
169 # then just return the input item
170 return $item if (! $threads::threads ||
171 threads::shared::is_shared($item));
173 # Make copies of array, hash and scalar refs
175 if (my $ref_type = Scalar::Util::reftype($item)) {
177 if ($ref_type eq 'ARRAY') {
178 # Make empty shared array ref
180 # Recursively copy and add contents
181 push(@$copy, map { $make_shared->($_) } @$item);
185 elsif ($ref_type eq 'HASH') {
186 # Make empty shared hash ref
188 # Recursively copy and add contents
189 foreach my $key (keys(%{$item})) {
190 $copy->{$key} = $make_shared->($item->{$key});
195 elsif ($ref_type eq 'SCALAR') {
196 $copy = \do{ my $scalar = $$item; };
198 # Clone READONLY flag
199 if (Internals::SvREADONLY($$item)) {
200 Internals::SvREADONLY($$copy, 1);
204 # Copy of a ref of a ref
205 elsif ($ref_type eq 'REF') {
206 my $tmp = $make_shared->($$item);
212 # If no copy is created above, then just return the input item
213 # NOTE: This will end up generating an error for anything
214 # other than an ordinary scalar
215 return $item if (! defined($copy));
217 # Clone READONLY flag
218 if (Internals::SvREADONLY($item)) {
219 Internals::SvREADONLY($copy, 1);
222 # If input item is an object, then bless the copy into the same class
223 if (my $class = Scalar::Util::blessed($item)) {
224 bless($copy, $class);
230 # Check value of the requested index
231 $validate_index = sub {
234 if (! looks_like_number($index) || (int($index) != $index)) {
236 my ($method) = (caller(1))[3];
237 $method =~ s/Thread::Queue:://;
238 $index = 'undef' if (! defined($index));
239 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
245 # Check value of the requested count
246 $validate_count = sub {
249 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
251 my ($method) = (caller(1))[3];
252 $method =~ s/Thread::Queue:://;
253 $count = 'undef' if (! defined($count));
254 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
264 Thread::Queue - Thread-safe queues
268 This document describes Thread::Queue version 2.06
278 my $q = Thread::Queue->new(); # A new empty queue
281 my $thr = threads->create(sub {
282 while (my $item = $q->dequeue()) {
287 # Send work to the thread
288 $q->enqueue($item1, ...);
291 # Count of items in the queue
292 my $left = $q->pending();
294 # Non-blocking dequeue
295 if (defined(my $item = $q->dequeue_nb())) {
299 # Get the second item in the queue without dequeuing anything
300 my $item = $q->peek(1);
302 # Insert two items into the queue just behind the head
303 $q->insert(1, $item1, $item2);
305 # Extract the last two items on the queue
306 my ($item1, $item2) = $q->extract(-2, 2);
310 This module provides thread-safe FIFO queues that can be accessed safely by
311 any number of threads.
313 Any data types supported by L<threads::shared> can be passed via queues:
317 =item Ordinary scalars
325 =item Objects based on the above
329 Ordinary scalars are added to queues as they are.
331 If not already thread-shared, the other complex data types will be cloned
332 (recursively, if needed, and including any C<bless>ings and read-only
333 settings) into thread-shared structures before being placed onto a queue.
335 For example, the following would cause L<Thread::Queue> to create a empty,
336 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
337 and 'baz' from C<@ary> into it, and then place that shared reference onto
340 my @ary = qw/foo bar baz/;
343 However, for the following, the items are already shared, so their references
344 are added directly to the queue, and no cloning takes place:
346 my @ary :shared = qw/foo bar baz/;
349 my $obj = &shared({});
350 $$obj{'foo'} = 'bar';
352 bless($obj, 'My::Class');
355 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
357 =head1 QUEUE CREATION
363 Creates a new empty queue.
367 Creates a new queue pre-populated with the provided list of items.
373 The following methods deal with queues on a FIFO basis.
377 =item ->enqueue(LIST)
379 Adds a list of items onto the end of the queue.
383 =item ->dequeue(COUNT)
385 Removes the requested number of items (default is 1) from the head of the
386 queue, and returns them. If the queue contains fewer than the requested
387 number of items, then the thread will be blocked until the requisite number
388 of items are available (i.e., until other threads <enqueue> more items).
392 =item ->dequeue_nb(COUNT)
394 Removes the requested number of items (default is 1) from the head of the
395 queue, and returns them. If the queue contains fewer than the requested
396 number of items, then it immediately (i.e., non-blocking) returns whatever
397 items there are on the queue. If the queue is empty, then C<undef> is
402 Returns the number of items still in the queue.
406 =head1 ADVANCED METHODS
408 The following methods can be used to manipulate items anywhere in a queue.
410 To prevent the contents of a queue from being modified by another thread
411 while it is being examined and/or changed, L<lock|threads::shared/"lock
412 VARIABLE"> the queue inside a local block:
415 lock($q); # Keep other threads from changing the queue's contents
416 my $item = $q->peek();
421 # Queue is now unlocked
429 Returns an item from the queue without dequeuing anything. Defaults to the
430 the head of queue (at index position 0) if no index is specified. Negative
431 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
432 is the end of the queue, -2 is next to last, and so on).
434 If no items exists at the specified index (i.e., the queue is empty, or the
435 index is beyond the number of items on the queue), then C<undef> is returned.
437 Remember, the returned item is not removed from the queue, so manipulating a
438 C<peek>ed at reference affects the item on the queue.
440 =item ->insert(INDEX, LIST)
442 Adds the list of items to the queue at the specified index position (0
443 is the head of the list). Any existing items at and beyond that position are
444 pushed back past the newly added items:
446 $q->enqueue(1, 2, 3, 4);
447 $q->insert(1, qw/foo bar/);
448 # Queue now contains: 1, foo, bar, 2, 3, 4
450 Specifying an index position greater than the number of items in the queue
451 just adds the list to the end.
453 Negative index positions are supported:
455 $q->enqueue(1, 2, 3, 4);
456 $q->insert(-2, qw/foo bar/);
457 # Queue now contains: 1, 2, foo, bar, 3, 4
459 Specifying a negative index position greater than the number of items in the
460 queue adds the list to the head of the queue.
464 =item ->extract(INDEX)
466 =item ->extract(INDEX, COUNT)
468 Removes and returns the specified number of items (defaults to 1) from the
469 specified index position in the queue (0 is the head of the queue). When
470 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
472 This method is non-blocking, and will return only as many items as are
473 available to fulfill the request:
475 $q->enqueue(1, 2, 3, 4);
476 my $item = $q->extract(2) # Returns 3
477 # Queue now contains: 1, 2, 4
478 my @items = $q->extract(1, 3) # Returns (2, 4)
479 # Queue now contains: 1
481 Specifying an index position greater than the number of items in the
482 queue results in C<undef> or an empty list being returned.
485 my $nada = $q->extract(3) # Returns undef
486 my @nada = $q->extract(1, 3) # Returns ()
488 Negative index positions are supported. Specifying a negative index position
489 greater than the number of items in the queue may return items from the head
490 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
491 queue from the specified position (i.e. if queue size + index + count is
494 $q->enqueue(qw/foo bar baz/);
495 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
496 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
497 # Queue now contains: bar, baz
498 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
504 Queues created by L<Thread::Queue> can be used in both threaded and
505 non-threaded applications.
509 Passing objects on queues may not work if the objects' classes do not support
510 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
512 Passing array/hash refs that contain objects may not work for Perl prior to
517 Thread::Queue Discussion Forum on CPAN:
518 L<http://www.cpanforum.com/dist/Thread-Queue>
520 Annotated POD for Thread::Queue:
521 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm>
524 L<http://code.google.com/p/thread-queue/>
526 L<threads>, L<threads::shared>
530 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
534 This program is free software; you can redistribute it and/or modify it under
535 the same terms as Perl itself.