8 use threads::shared 1.21;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11 # Carp errors from threads::shared calls should complain about caller
12 our @CARP_NOT = ("threads::shared");
14 # Predeclarations for internal functions
15 my ($validate_count, $validate_index);
17 # Create a new queue possibly pre-populated with items
21 my @queue :shared = map { shared_clone($_) } @_;
22 return bless(\@queue, $class);
25 # Add items to the tail of a queue
30 push(@$queue, map { shared_clone($_) } @_)
31 and cond_signal(@$queue);
34 # Return a count of the number of items on a queue
39 return scalar(@$queue);
42 # Return 1 or more items from the head of a queue, blocking if needed
48 my $count = @_ ? $validate_count->(shift) : 1;
50 # Wait for requisite number of items
51 cond_wait(@$queue) until (@$queue >= $count);
52 cond_signal(@$queue) if (@$queue > $count);
55 return shift(@$queue) if ($count == 1);
57 # Return multiple items
59 push(@items, shift(@$queue)) for (1..$count);
63 # Return items from the head of a queue with no blocking
69 my $count = @_ ? $validate_count->(shift) : 1;
72 return shift(@$queue) if ($count == 1);
74 # Return multiple items
78 push(@items, shift(@$queue));
83 # Return an item without removing it from a queue
88 my $index = @_ ? $validate_index->(shift) : 0;
89 return $$queue[$index];
92 # Insert items anywhere into a queue
98 my $index = $validate_index->(shift);
100 return if (! @_); # Nothing to insert
102 # Support negative indices
110 # Dequeue items from $index onward
112 while (@$queue > $index) {
113 unshift(@tmp, pop(@$queue))
116 # Add new items to the queue
117 push(@$queue, map { shared_clone($_) } @_);
119 # Add previous items back onto the queue
123 cond_signal(@$queue);
126 # Remove items from anywhere in a queue
132 my $index = @_ ? $validate_index->(shift) : 0;
133 my $count = @_ ? $validate_count->(shift) : 1;
135 # Support negative indices
140 return if ($count <= 0); # Beyond the head of the queue
141 return $queue->dequeue_nb($count); # Extract from the head
145 # Dequeue items from $index+$count onward
147 while (@$queue > ($index+$count)) {
148 unshift(@tmp, pop(@$queue))
151 # Extract desired items
153 unshift(@items, pop(@$queue)) while (@$queue > $index);
155 # Add back any removed items
159 return $items[0] if ($count == 1);
161 # Return multiple items
165 ### Internal Functions ###
167 # Check value of the requested index
168 $validate_index = sub {
171 if (! defined($index) ||
172 ! looks_like_number($index) ||
173 (int($index) != $index))
176 my ($method) = (caller(1))[3];
177 $method =~ s/Thread::Queue:://;
178 $index = 'undef' if (! defined($index));
179 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
185 # Check value of the requested count
186 $validate_count = sub {
189 if (! defined($count) ||
190 ! looks_like_number($count) ||
191 (int($count) != $count) ||
195 my ($method) = (caller(1))[3];
196 $method =~ s/Thread::Queue:://;
197 $count = 'undef' if (! defined($count));
198 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
208 Thread::Queue - Thread-safe queues
212 This document describes Thread::Queue version 2.11
222 my $q = Thread::Queue->new(); # A new empty queue
225 my $thr = threads->create(sub {
226 while (my $item = $q->dequeue()) {
231 # Send work to the thread
232 $q->enqueue($item1, ...);
235 # Count of items in the queue
236 my $left = $q->pending();
238 # Non-blocking dequeue
239 if (defined(my $item = $q->dequeue_nb())) {
243 # Get the second item in the queue without dequeuing anything
244 my $item = $q->peek(1);
246 # Insert two items into the queue just behind the head
247 $q->insert(1, $item1, $item2);
249 # Extract the last two items on the queue
250 my ($item1, $item2) = $q->extract(-2, 2);
254 This module provides thread-safe FIFO queues that can be accessed safely by
255 any number of threads.
257 Any data types supported by L<threads::shared> can be passed via queues:
261 =item Ordinary scalars
269 =item Objects based on the above
273 Ordinary scalars are added to queues as they are.
275 If not already thread-shared, the other complex data types will be cloned
276 (recursively, if needed, and including any C<bless>ings and read-only
277 settings) into thread-shared structures before being placed onto a queue.
279 For example, the following would cause L<Thread::Queue> to create a empty,
280 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
281 and 'baz' from C<@ary> into it, and then place that shared reference onto
284 my @ary = qw/foo bar baz/;
287 However, for the following, the items are already shared, so their references
288 are added directly to the queue, and no cloning takes place:
290 my @ary :shared = qw/foo bar baz/;
293 my $obj = &shared({});
294 $$obj{'foo'} = 'bar';
296 bless($obj, 'My::Class');
299 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
301 =head1 QUEUE CREATION
307 Creates a new empty queue.
311 Creates a new queue pre-populated with the provided list of items.
317 The following methods deal with queues on a FIFO basis.
321 =item ->enqueue(LIST)
323 Adds a list of items onto the end of the queue.
327 =item ->dequeue(COUNT)
329 Removes the requested number of items (default is 1) from the head of the
330 queue, and returns them. If the queue contains fewer than the requested
331 number of items, then the thread will be blocked until the requisite number
332 of items are available (i.e., until other threads <enqueue> more items).
336 =item ->dequeue_nb(COUNT)
338 Removes the requested number of items (default is 1) from the head of the
339 queue, and returns them. If the queue contains fewer than the requested
340 number of items, then it immediately (i.e., non-blocking) returns whatever
341 items there are on the queue. If the queue is empty, then C<undef> is
346 Returns the number of items still in the queue.
350 =head1 ADVANCED METHODS
352 The following methods can be used to manipulate items anywhere in a queue.
354 To prevent the contents of a queue from being modified by another thread
355 while it is being examined and/or changed, L<lock|threads::shared/"lock
356 VARIABLE"> the queue inside a local block:
359 lock($q); # Keep other threads from changing the queue's contents
360 my $item = $q->peek();
365 # Queue is now unlocked
373 Returns an item from the queue without dequeuing anything. Defaults to the
374 the head of queue (at index position 0) if no index is specified. Negative
375 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
376 is the end of the queue, -2 is next to last, and so on).
378 If no items exists at the specified index (i.e., the queue is empty, or the
379 index is beyond the number of items on the queue), then C<undef> is returned.
381 Remember, the returned item is not removed from the queue, so manipulating a
382 C<peek>ed at reference affects the item on the queue.
384 =item ->insert(INDEX, LIST)
386 Adds the list of items to the queue at the specified index position (0
387 is the head of the list). Any existing items at and beyond that position are
388 pushed back past the newly added items:
390 $q->enqueue(1, 2, 3, 4);
391 $q->insert(1, qw/foo bar/);
392 # Queue now contains: 1, foo, bar, 2, 3, 4
394 Specifying an index position greater than the number of items in the queue
395 just adds the list to the end.
397 Negative index positions are supported:
399 $q->enqueue(1, 2, 3, 4);
400 $q->insert(-2, qw/foo bar/);
401 # Queue now contains: 1, 2, foo, bar, 3, 4
403 Specifying a negative index position greater than the number of items in the
404 queue adds the list to the head of the queue.
408 =item ->extract(INDEX)
410 =item ->extract(INDEX, COUNT)
412 Removes and returns the specified number of items (defaults to 1) from the
413 specified index position in the queue (0 is the head of the queue). When
414 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
416 This method is non-blocking, and will return only as many items as are
417 available to fulfill the request:
419 $q->enqueue(1, 2, 3, 4);
420 my $item = $q->extract(2) # Returns 3
421 # Queue now contains: 1, 2, 4
422 my @items = $q->extract(1, 3) # Returns (2, 4)
423 # Queue now contains: 1
425 Specifying an index position greater than the number of items in the
426 queue results in C<undef> or an empty list being returned.
429 my $nada = $q->extract(3) # Returns undef
430 my @nada = $q->extract(1, 3) # Returns ()
432 Negative index positions are supported. Specifying a negative index position
433 greater than the number of items in the queue may return items from the head
434 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
435 queue from the specified position (i.e. if queue size + index + count is
438 $q->enqueue(qw/foo bar baz/);
439 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
440 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
441 # Queue now contains: bar, baz
442 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
448 Queues created by L<Thread::Queue> can be used in both threaded and
449 non-threaded applications.
453 Passing objects on queues may not work if the objects' classes do not support
454 sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
456 Passing array/hash refs that contain objects may not work for Perl prior to
461 Thread::Queue Discussion Forum on CPAN:
462 L<http://www.cpanforum.com/dist/Thread-Queue>
464 Annotated POD for Thread::Queue:
465 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm>
468 L<http://code.google.com/p/thread-queue/>
470 L<threads>, L<threads::shared>
474 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
478 This program is free software; you can redistribute it and/or modify it under
479 the same terms as Perl itself.