Thread::Queue 2.06
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
7fb1c73b 6our $VERSION = '2.06';
54c7876f 7
8use threads::shared 0.96;
9use Scalar::Util 1.10 qw(looks_like_number);
10
11# Predeclarations for internal functions
12my ($make_shared, $validate_count, $validate_index);
13
14# Create a new queue possibly pre-populated with items
15sub new
16{
17 my $class = shift;
18 my @queue :shared = map { $make_shared->($_) } @_;
19 return bless(\@queue, $class);
20}
21
22# Add items to the tail of a queue
23sub enqueue
24{
25 my $queue = shift;
26 lock(@$queue);
27 push(@$queue, map { $make_shared->($_) } @_)
28 and cond_signal(@$queue);
29}
30
31# Return a count of the number of items on a queue
32sub pending
33{
34 my $queue = shift;
35 lock(@$queue);
36 return scalar(@$queue);
37}
38
39# Return 1 or more items from the head of a queue, blocking if needed
40sub dequeue
41{
42 my $queue = shift;
43 lock(@$queue);
44
45 my $count = @_ ? $validate_count->(shift) : 1;
46
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
50
51 # Return single item
52 return shift(@$queue) if ($count == 1);
53
54 # Return multiple items
55 my @items;
56 push(@items, shift(@$queue)) for (1..$count);
57 return @items;
58}
59
60# Return items from the head of a queue with no blocking
61sub dequeue_nb
62{
63 my $queue = shift;
64 lock(@$queue);
65
66 my $count = @_ ? $validate_count->(shift) : 1;
67
68 # Return single item
69 return shift(@$queue) if ($count == 1);
70
71 # Return multiple items
72 my @items;
73 for (1..$count) {
74 last if (! @$queue);
75 push(@items, shift(@$queue));
76 }
77 return @items;
78}
79
80# Return an item without removing it from a queue
81sub peek
82{
83 my $queue = shift;
84 lock(@$queue);
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
87}
88
89# Insert items anywhere into a queue
90sub insert
91{
92 my $queue = shift;
93 lock(@$queue);
94
95 my $index = $validate_index->(shift);
96
97 return if (! @_); # Nothing to insert
98
99 # Support negative indices
100 if ($index < 0) {
101 $index += @$queue;
102 if ($index < 0) {
103 $index = 0;
104 }
105 }
106
107 # Dequeue items from $index onward
108 my @tmp;
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
111 }
112
113 # Add new items to the queue
114 push(@$queue, map { $make_shared->($_) } @_);
115
116 # Add previous items back onto the queue
117 push(@$queue, @tmp);
118
119 # Soup's up
120 cond_signal(@$queue);
121}
122
123# Remove items from anywhere in a queue
124sub extract
125{
126 my $queue = shift;
127 lock(@$queue);
128
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
131
132 # Support negative indices
133 if ($index < 0) {
134 $index += @$queue;
135 if ($index < 0) {
136 $count += $index;
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
139 }
140 }
141
142 # Dequeue items from $index+$count onward
143 my @tmp;
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
146 }
147
148 # Extract desired items
149 my @items;
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152 # Add back any removed items
153 push(@$queue, @tmp);
154
155 # Return single item
156 return $items[0] if ($count == 1);
157
158 # Return multiple items
159 return @items;
160}
161
162### Internal Functions ###
163
164# Create a thread-shared version of a complex data structure or object
165$make_shared = sub {
166 my $item = shift;
167
7fb1c73b 168 # If not running 'threads' or already thread-shared,
169 # then just return the input item
170 return $item if (! $threads::threads ||
171 threads::shared::is_shared($item));
54c7876f 172
173 # Make copies of array, hash and scalar refs
174 my $copy;
175 if (my $ref_type = Scalar::Util::reftype($item)) {
176 # Copy an array ref
177 if ($ref_type eq 'ARRAY') {
178 # Make empty shared array ref
179 $copy = &share([]);
180 # Recursively copy and add contents
181 push(@$copy, map { $make_shared->($_) } @$item);
182 }
183
184 # Copy a hash ref
185 elsif ($ref_type eq 'HASH') {
186 # Make empty shared hash ref
187 $copy = &share({});
188 # Recursively copy and add contents
189 foreach my $key (keys(%{$item})) {
190 $copy->{$key} = $make_shared->($item->{$key});
191 }
192 }
193
194 # Copy a scalar ref
195 elsif ($ref_type eq 'SCALAR') {
196 $copy = \do{ my $scalar = $$item; };
197 share($copy);
198 # Clone READONLY flag
199 if (Internals::SvREADONLY($$item)) {
200 Internals::SvREADONLY($$copy, 1);
201 }
202 }
203
204 # Copy of a ref of a ref
205 elsif ($ref_type eq 'REF') {
206 my $tmp = $make_shared->($$item);
207 $copy = \$tmp;
208 share($copy);
209 }
210 }
211
212 # If no copy is created above, then just return the input item
213 # NOTE: This will end up generating an error for anything
214 # other than an ordinary scalar
215 return $item if (! defined($copy));
216
217 # Clone READONLY flag
218 if (Internals::SvREADONLY($item)) {
219 Internals::SvREADONLY($copy, 1);
220 }
221
222 # If input item is an object, then bless the copy into the same class
223 if (my $class = Scalar::Util::blessed($item)) {
224 bless($copy, $class);
225 }
226
227 return $copy;
228};
229
230# Check value of the requested index
231$validate_index = sub {
232 my $index = shift;
233
234 if (! looks_like_number($index) || (int($index) != $index)) {
235 require Carp;
236 my ($method) = (caller(1))[3];
237 $method =~ s/Thread::Queue:://;
238 $index = 'undef' if (! defined($index));
239 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
240 }
241
242 return $index;
243};
244
245# Check value of the requested count
246$validate_count = sub {
247 my $count = shift;
248
249 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
250 require Carp;
251 my ($method) = (caller(1))[3];
252 $method =~ s/Thread::Queue:://;
253 $count = 'undef' if (! defined($count));
254 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
255 }
256
257 return $count;
258};
259
2601;
9c6f8578 261
d516a115 262=head1 NAME
263
54c7876f 264Thread::Queue - Thread-safe queues
265
266=head1 VERSION
267
7fb1c73b 268This document describes Thread::Queue version 2.06
d516a115 269
270=head1 SYNOPSIS
271
54c7876f 272 use strict;
273 use warnings;
274
275 use threads;
d516a115 276 use Thread::Queue;
54c7876f 277
278 my $q = Thread::Queue->new(); # A new empty queue
279
280 # Worker thread
281 my $thr = threads->create(sub {
282 while (my $item = $q->dequeue()) {
283 # Do work on $item
284 }
285 })->detach();
286
287 # Send work to the thread
288 $q->enqueue($item1, ...);
289
290
291 # Count of items in the queue
292 my $left = $q->pending();
293
294 # Non-blocking dequeue
295 if (defined(my $item = $q->dequeue_nb())) {
296 # Work on $item
297 }
298
299 # Get the second item in the queue without dequeuing anything
300 my $item = $q->peek(1);
301
302 # Insert two items into the queue just behind the head
303 $q->insert(1, $item1, $item2);
304
305 # Extract the last two items on the queue
306 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 307
a99072da 308=head1 DESCRIPTION
309
54c7876f 310This module provides thread-safe FIFO queues that can be accessed safely by
311any number of threads.
312
313Any data types supported by L<threads::shared> can be passed via queues:
314
315=over
316
317=item Ordinary scalars
318
319=item Array refs
320
321=item Hash refs
322
323=item Scalar refs
324
325=item Objects based on the above
326
327=back
328
329Ordinary scalars are added to queues as they are.
330
331If not already thread-shared, the other complex data types will be cloned
332(recursively, if needed, and including any C<bless>ings and read-only
333settings) into thread-shared structures before being placed onto a queue.
a99072da 334
54c7876f 335For example, the following would cause L<Thread::Queue> to create a empty,
336shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
337and 'baz' from C<@ary> into it, and then place that shared reference onto
338the queue:
a99072da 339
54c7876f 340 my @ary = qw/foo bar baz/;
341 $q->enqueue(\@ary);
a99072da 342
54c7876f 343However, for the following, the items are already shared, so their references
344are added directly to the queue, and no cloning takes place:
a99072da 345
54c7876f 346 my @ary :shared = qw/foo bar baz/;
347 $q->enqueue(\@ary);
a99072da 348
54c7876f 349 my $obj = &shared({});
350 $$obj{'foo'} = 'bar';
351 $$obj{'qux'} = 99;
352 bless($obj, 'My::Class');
353 $q->enqueue($obj);
a99072da 354
54c7876f 355See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 356
54c7876f 357=head1 QUEUE CREATION
a99072da 358
54c7876f 359=over
a99072da 360
54c7876f 361=item ->new()
a99072da 362
54c7876f 363Creates a new empty queue.
a99072da 364
54c7876f 365=item ->new(LIST)
a99072da 366
54c7876f 367Creates a new queue pre-populated with the provided list of items.
a99072da 368
369=back
370
54c7876f 371=head1 BASIC METHODS
a99072da 372
54c7876f 373The following methods deal with queues on a FIFO basis.
bbc7dcd2 374
54c7876f 375=over
d516a115 376
54c7876f 377=item ->enqueue(LIST)
d21067e0 378
54c7876f 379Adds a list of items onto the end of the queue.
d21067e0 380
54c7876f 381=item ->dequeue()
a99072da 382
54c7876f 383=item ->dequeue(COUNT)
d21067e0 384
54c7876f 385Removes the requested number of items (default is 1) from the head of the
386queue, and returns them. If the queue contains fewer than the requested
387number of items, then the thread will be blocked until the requisite number
388of items are available (i.e., until other threads <enqueue> more items).
a99072da 389
54c7876f 390=item ->dequeue_nb()
391
392=item ->dequeue_nb(COUNT)
393
394Removes the requested number of items (default is 1) from the head of the
395queue, and returns them. If the queue contains fewer than the requested
396number of items, then it immediately (i.e., non-blocking) returns whatever
397items there are on the queue. If the queue is empty, then C<undef> is
398returned.
399
400=item ->pending()
401
402Returns the number of items still in the queue.
403
404=back
405
406=head1 ADVANCED METHODS
407
408The following methods can be used to manipulate items anywhere in a queue.
409
410To prevent the contents of a queue from being modified by another thread
411while it is being examined and/or changed, L<lock|threads::shared/"lock
412VARIABLE"> the queue inside a local block:
413
414 {
415 lock($q); # Keep other threads from changing the queue's contents
416 my $item = $q->peek();
417 if ($item ...) {
418 ...
419 }
420 }
421 # Queue is now unlocked
422
423=over
424
425=item ->peek()
426
427=item ->peek(INDEX)
428
429Returns an item from the queue without dequeuing anything. Defaults to the
430the head of queue (at index position 0) if no index is specified. Negative
431index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
432is the end of the queue, -2 is next to last, and so on).
433
434If no items exists at the specified index (i.e., the queue is empty, or the
435index is beyond the number of items on the queue), then C<undef> is returned.
436
437Remember, the returned item is not removed from the queue, so manipulating a
438C<peek>ed at reference affects the item on the queue.
439
440=item ->insert(INDEX, LIST)
441
442Adds the list of items to the queue at the specified index position (0
443is the head of the list). Any existing items at and beyond that position are
444pushed back past the newly added items:
445
446 $q->enqueue(1, 2, 3, 4);
447 $q->insert(1, qw/foo bar/);
448 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 449
54c7876f 450Specifying an index position greater than the number of items in the queue
451just adds the list to the end.
83272a45 452
54c7876f 453Negative index positions are supported:
454
455 $q->enqueue(1, 2, 3, 4);
456 $q->insert(-2, qw/foo bar/);
457 # Queue now contains: 1, 2, foo, bar, 3, 4
458
459Specifying a negative index position greater than the number of items in the
460queue adds the list to the head of the queue.
461
462=item ->extract()
463
464=item ->extract(INDEX)
465
466=item ->extract(INDEX, COUNT)
467
468Removes and returns the specified number of items (defaults to 1) from the
469specified index position in the queue (0 is the head of the queue). When
470called with no arguments, C<extract> operates the same as C<dequeue_nb>.
471
472This method is non-blocking, and will return only as many items as are
473available to fulfill the request:
474
475 $q->enqueue(1, 2, 3, 4);
476 my $item = $q->extract(2) # Returns 3
477 # Queue now contains: 1, 2, 4
478 my @items = $q->extract(1, 3) # Returns (2, 4)
479 # Queue now contains: 1
480
481Specifying an index position greater than the number of items in the
482queue results in C<undef> or an empty list being returned.
483
484 $q->enqueue('foo');
485 my $nada = $q->extract(3) # Returns undef
486 my @nada = $q->extract(1, 3) # Returns ()
487
488Negative index positions are supported. Specifying a negative index position
489greater than the number of items in the queue may return items from the head
490of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
491queue from the specified position (i.e. if queue size + index + count is
492greater than zero):
493
494 $q->enqueue(qw/foo bar baz/);
495 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
496 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
497 # Queue now contains: bar, baz
498 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
499
500=back
501
7fb1c73b 502=head1 NOTES
503
504Queues created by L<Thread::Queue> can be used in both threaded and
505non-threaded applications.
506
54c7876f 507=head1 LIMITATIONS
508
509Passing objects on queues may not work if the objects' classes do not support
510sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
511
512Passing array/hash refs that contain objects may not work for Perl prior to
5135.10.0.
514
515=head1 SEE ALSO
516
517Thread::Queue Discussion Forum on CPAN:
518L<http://www.cpanforum.com/dist/Thread-Queue>
519
520Annotated POD for Thread::Queue:
7fb1c73b 521L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm>
522
523Source repository:
524L<http://code.google.com/p/thread-queue/>
54c7876f 525
526L<threads>, L<threads::shared>
527
528=head1 MAINTAINER
529
530Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
531
532=head1 LICENSE
533
534This program is free software; you can redistribute it and/or modify it under
535the same terms as Perl itself.
536
537=cut