8 use threads::shared 1.21;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11 # Carp errors from threads::shared calls should complain about caller
12 our @CARP_NOT = ("threads::shared");
14 # Predeclarations for internal functions
15 my ($validate_count, $validate_index);
17 # Create a new queue possibly pre-populated with items
21 my @queue :shared = map { shared_clone($_) } @_;
22 return bless(\@queue, $class);
25 # Add items to the tail of a queue
30 push(@$queue, map { shared_clone($_) } @_)
31 and cond_signal(@$queue);
34 # Return a count of the number of items on a queue
39 return scalar(@$queue);
42 # Return 1 or more items from the head of a queue, blocking if needed
48 my $count = @_ ? $validate_count->(shift) : 1;
50 # Wait for requisite number of items
51 cond_wait(@$queue) until (@$queue >= $count);
52 cond_signal(@$queue) if (@$queue > $count);
55 return shift(@$queue) if ($count == 1);
57 # Return multiple items
59 push(@items, shift(@$queue)) for (1..$count);
63 # Return items from the head of a queue with no blocking
69 my $count = @_ ? $validate_count->(shift) : 1;
72 return shift(@$queue) if ($count == 1);
74 # Return multiple items
78 push(@items, shift(@$queue));
83 # Return an item without removing it from a queue
88 my $index = @_ ? $validate_index->(shift) : 0;
89 return $$queue[$index];
92 # Insert items anywhere into a queue
98 my $index = $validate_index->(shift);
100 return if (! @_); # Nothing to insert
102 # Support negative indices
110 # Dequeue items from $index onward
112 while (@$queue > $index) {
113 unshift(@tmp, pop(@$queue))
116 # Add new items to the queue
117 push(@$queue, map { shared_clone($_) } @_);
119 # Add previous items back onto the queue
123 cond_signal(@$queue);
126 # Remove items from anywhere in a queue
132 my $index = @_ ? $validate_index->(shift) : 0;
133 my $count = @_ ? $validate_count->(shift) : 1;
135 # Support negative indices
140 return if ($count <= 0); # Beyond the head of the queue
141 return $queue->dequeue_nb($count); # Extract from the head
145 # Dequeue items from $index+$count onward
147 while (@$queue > ($index+$count)) {
148 unshift(@tmp, pop(@$queue))
151 # Extract desired items
153 unshift(@items, pop(@$queue)) while (@$queue > $index);
155 # Add back any removed items
159 return $items[0] if ($count == 1);
161 # Return multiple items
165 ### Internal Functions ###
167 # Check value of the requested index
168 $validate_index = sub {
171 if (! looks_like_number($index) || (int($index) != $index)) {
173 my ($method) = (caller(1))[3];
174 $method =~ s/Thread::Queue:://;
175 $index = 'undef' if (! defined($index));
176 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
182 # Check value of the requested count
183 $validate_count = sub {
186 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
188 my ($method) = (caller(1))[3];
189 $method =~ s/Thread::Queue:://;
190 $count = 'undef' if (! defined($count));
191 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
201 Thread::Queue - Thread-safe queues
205 This document describes Thread::Queue version 2.08
215 my $q = Thread::Queue->new(); # A new empty queue
218 my $thr = threads->create(sub {
219 while (my $item = $q->dequeue()) {
224 # Send work to the thread
225 $q->enqueue($item1, ...);
228 # Count of items in the queue
229 my $left = $q->pending();
231 # Non-blocking dequeue
232 if (defined(my $item = $q->dequeue_nb())) {
236 # Get the second item in the queue without dequeuing anything
237 my $item = $q->peek(1);
239 # Insert two items into the queue just behind the head
240 $q->insert(1, $item1, $item2);
242 # Extract the last two items on the queue
243 my ($item1, $item2) = $q->extract(-2, 2);
247 This module provides thread-safe FIFO queues that can be accessed safely by
248 any number of threads.
250 Any data types supported by L<threads::shared> can be passed via queues:
254 =item Ordinary scalars
262 =item Objects based on the above
266 Ordinary scalars are added to queues as they are.
268 If not already thread-shared, the other complex data types will be cloned
269 (recursively, if needed, and including any C<bless>ings and read-only
270 settings) into thread-shared structures before being placed onto a queue.
272 For example, the following would cause L<Thread::Queue> to create a empty,
273 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
274 and 'baz' from C<@ary> into it, and then place that shared reference onto
277 my @ary = qw/foo bar baz/;
280 However, for the following, the items are already shared, so their references
281 are added directly to the queue, and no cloning takes place:
283 my @ary :shared = qw/foo bar baz/;
286 my $obj = &shared({});
287 $$obj{'foo'} = 'bar';
289 bless($obj, 'My::Class');
292 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
294 =head1 QUEUE CREATION
300 Creates a new empty queue.
304 Creates a new queue pre-populated with the provided list of items.
310 The following methods deal with queues on a FIFO basis.
314 =item ->enqueue(LIST)
316 Adds a list of items onto the end of the queue.
320 =item ->dequeue(COUNT)
322 Removes the requested number of items (default is 1) from the head of the
323 queue, and returns them. If the queue contains fewer than the requested
324 number of items, then the thread will be blocked until the requisite number
325 of items are available (i.e., until other threads <enqueue> more items).
329 =item ->dequeue_nb(COUNT)
331 Removes the requested number of items (default is 1) from the head of the
332 queue, and returns them. If the queue contains fewer than the requested
333 number of items, then it immediately (i.e., non-blocking) returns whatever
334 items there are on the queue. If the queue is empty, then C<undef> is
339 Returns the number of items still in the queue.
343 =head1 ADVANCED METHODS
345 The following methods can be used to manipulate items anywhere in a queue.
347 To prevent the contents of a queue from being modified by another thread
348 while it is being examined and/or changed, L<lock|threads::shared/"lock
349 VARIABLE"> the queue inside a local block:
352 lock($q); # Keep other threads from changing the queue's contents
353 my $item = $q->peek();
358 # Queue is now unlocked
366 Returns an item from the queue without dequeuing anything. Defaults to the
367 the head of queue (at index position 0) if no index is specified. Negative
368 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
369 is the end of the queue, -2 is next to last, and so on).
371 If no items exists at the specified index (i.e., the queue is empty, or the
372 index is beyond the number of items on the queue), then C<undef> is returned.
374 Remember, the returned item is not removed from the queue, so manipulating a
375 C<peek>ed at reference affects the item on the queue.
377 =item ->insert(INDEX, LIST)
379 Adds the list of items to the queue at the specified index position (0
380 is the head of the list). Any existing items at and beyond that position are
381 pushed back past the newly added items:
383 $q->enqueue(1, 2, 3, 4);
384 $q->insert(1, qw/foo bar/);
385 # Queue now contains: 1, foo, bar, 2, 3, 4
387 Specifying an index position greater than the number of items in the queue
388 just adds the list to the end.
390 Negative index positions are supported:
392 $q->enqueue(1, 2, 3, 4);
393 $q->insert(-2, qw/foo bar/);
394 # Queue now contains: 1, 2, foo, bar, 3, 4
396 Specifying a negative index position greater than the number of items in the
397 queue adds the list to the head of the queue.
401 =item ->extract(INDEX)
403 =item ->extract(INDEX, COUNT)
405 Removes and returns the specified number of items (defaults to 1) from the
406 specified index position in the queue (0 is the head of the queue). When
407 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
409 This method is non-blocking, and will return only as many items as are
410 available to fulfill the request:
412 $q->enqueue(1, 2, 3, 4);
413 my $item = $q->extract(2) # Returns 3
414 # Queue now contains: 1, 2, 4
415 my @items = $q->extract(1, 3) # Returns (2, 4)
416 # Queue now contains: 1
418 Specifying an index position greater than the number of items in the
419 queue results in C<undef> or an empty list being returned.
422 my $nada = $q->extract(3) # Returns undef
423 my @nada = $q->extract(1, 3) # Returns ()
425 Negative index positions are supported. Specifying a negative index position
426 greater than the number of items in the queue may return items from the head
427 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
428 queue from the specified position (i.e. if queue size + index + count is
431 $q->enqueue(qw/foo bar baz/);
432 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
433 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
434 # Queue now contains: bar, baz
435 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
441 Queues created by L<Thread::Queue> can be used in both threaded and
442 non-threaded applications.
446 Passing objects on queues may not work if the objects' classes do not support
447 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
449 Passing array/hash refs that contain objects may not work for Perl prior to
454 Thread::Queue Discussion Forum on CPAN:
455 L<http://www.cpanforum.com/dist/Thread-Queue>
457 Annotated POD for Thread::Queue:
458 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.08/lib/Thread/Queue.pm>
461 L<http://code.google.com/p/thread-queue/>
463 L<threads>, L<threads::shared>
467 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
471 This program is free software; you can redistribute it and/or modify it under
472 the same terms as Perl itself.