Commit | Line | Data |
d21067e0 |
1 | package Thread::Queue; |
28b605d8 |
2 | |
3d1f1caf |
3 | use strict; |
54c7876f |
4 | use warnings; |
28b605d8 |
5 | |
7fb1c73b |
6 | our $VERSION = '2.06'; |
54c7876f |
7 | |
8 | use threads::shared 0.96; |
9 | use Scalar::Util 1.10 qw(looks_like_number); |
10 | |
11 | # Predeclarations for internal functions |
12 | my ($make_shared, $validate_count, $validate_index); |
13 | |
14 | # Create a new queue possibly pre-populated with items |
15 | sub new |
16 | { |
17 | my $class = shift; |
18 | my @queue :shared = map { $make_shared->($_) } @_; |
19 | return bless(\@queue, $class); |
20 | } |
21 | |
22 | # Add items to the tail of a queue |
23 | sub enqueue |
24 | { |
25 | my $queue = shift; |
26 | lock(@$queue); |
27 | push(@$queue, map { $make_shared->($_) } @_) |
28 | and cond_signal(@$queue); |
29 | } |
30 | |
31 | # Return a count of the number of items on a queue |
32 | sub pending |
33 | { |
34 | my $queue = shift; |
35 | lock(@$queue); |
36 | return scalar(@$queue); |
37 | } |
38 | |
39 | # Return 1 or more items from the head of a queue, blocking if needed |
40 | sub dequeue |
41 | { |
42 | my $queue = shift; |
43 | lock(@$queue); |
44 | |
45 | my $count = @_ ? $validate_count->(shift) : 1; |
46 | |
47 | # Wait for requisite number of items |
48 | cond_wait(@$queue) until (@$queue >= $count); |
49 | cond_signal(@$queue) if (@$queue > $count); |
50 | |
51 | # Return single item |
52 | return shift(@$queue) if ($count == 1); |
53 | |
54 | # Return multiple items |
55 | my @items; |
56 | push(@items, shift(@$queue)) for (1..$count); |
57 | return @items; |
58 | } |
59 | |
60 | # Return items from the head of a queue with no blocking |
61 | sub dequeue_nb |
62 | { |
63 | my $queue = shift; |
64 | lock(@$queue); |
65 | |
66 | my $count = @_ ? $validate_count->(shift) : 1; |
67 | |
68 | # Return single item |
69 | return shift(@$queue) if ($count == 1); |
70 | |
71 | # Return multiple items |
72 | my @items; |
73 | for (1..$count) { |
74 | last if (! @$queue); |
75 | push(@items, shift(@$queue)); |
76 | } |
77 | return @items; |
78 | } |
79 | |
80 | # Return an item without removing it from a queue |
81 | sub peek |
82 | { |
83 | my $queue = shift; |
84 | lock(@$queue); |
85 | my $index = @_ ? $validate_index->(shift) : 0; |
86 | return $$queue[$index]; |
87 | } |
88 | |
89 | # Insert items anywhere into a queue |
90 | sub insert |
91 | { |
92 | my $queue = shift; |
93 | lock(@$queue); |
94 | |
95 | my $index = $validate_index->(shift); |
96 | |
97 | return if (! @_); # Nothing to insert |
98 | |
99 | # Support negative indices |
100 | if ($index < 0) { |
101 | $index += @$queue; |
102 | if ($index < 0) { |
103 | $index = 0; |
104 | } |
105 | } |
106 | |
107 | # Dequeue items from $index onward |
108 | my @tmp; |
109 | while (@$queue > $index) { |
110 | unshift(@tmp, pop(@$queue)) |
111 | } |
112 | |
113 | # Add new items to the queue |
114 | push(@$queue, map { $make_shared->($_) } @_); |
115 | |
116 | # Add previous items back onto the queue |
117 | push(@$queue, @tmp); |
118 | |
119 | # Soup's up |
120 | cond_signal(@$queue); |
121 | } |
122 | |
123 | # Remove items from anywhere in a queue |
124 | sub extract |
125 | { |
126 | my $queue = shift; |
127 | lock(@$queue); |
128 | |
129 | my $index = @_ ? $validate_index->(shift) : 0; |
130 | my $count = @_ ? $validate_count->(shift) : 1; |
131 | |
132 | # Support negative indices |
133 | if ($index < 0) { |
134 | $index += @$queue; |
135 | if ($index < 0) { |
136 | $count += $index; |
137 | return if ($count <= 0); # Beyond the head of the queue |
138 | return $queue->dequeue_nb($count); # Extract from the head |
139 | } |
140 | } |
141 | |
142 | # Dequeue items from $index+$count onward |
143 | my @tmp; |
144 | while (@$queue > ($index+$count)) { |
145 | unshift(@tmp, pop(@$queue)) |
146 | } |
147 | |
148 | # Extract desired items |
149 | my @items; |
150 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
151 | |
152 | # Add back any removed items |
153 | push(@$queue, @tmp); |
154 | |
155 | # Return single item |
156 | return $items[0] if ($count == 1); |
157 | |
158 | # Return multiple items |
159 | return @items; |
160 | } |
161 | |
162 | ### Internal Functions ### |
163 | |
164 | # Create a thread-shared version of a complex data structure or object |
165 | $make_shared = sub { |
166 | my $item = shift; |
167 | |
7fb1c73b |
168 | # If not running 'threads' or already thread-shared, |
169 | # then just return the input item |
170 | return $item if (! $threads::threads || |
171 | threads::shared::is_shared($item)); |
54c7876f |
172 | |
173 | # Make copies of array, hash and scalar refs |
174 | my $copy; |
175 | if (my $ref_type = Scalar::Util::reftype($item)) { |
176 | # Copy an array ref |
177 | if ($ref_type eq 'ARRAY') { |
178 | # Make empty shared array ref |
179 | $copy = &share([]); |
180 | # Recursively copy and add contents |
181 | push(@$copy, map { $make_shared->($_) } @$item); |
182 | } |
183 | |
184 | # Copy a hash ref |
185 | elsif ($ref_type eq 'HASH') { |
186 | # Make empty shared hash ref |
187 | $copy = &share({}); |
188 | # Recursively copy and add contents |
189 | foreach my $key (keys(%{$item})) { |
190 | $copy->{$key} = $make_shared->($item->{$key}); |
191 | } |
192 | } |
193 | |
194 | # Copy a scalar ref |
195 | elsif ($ref_type eq 'SCALAR') { |
196 | $copy = \do{ my $scalar = $$item; }; |
197 | share($copy); |
198 | # Clone READONLY flag |
199 | if (Internals::SvREADONLY($$item)) { |
200 | Internals::SvREADONLY($$copy, 1); |
201 | } |
202 | } |
203 | |
204 | # Copy of a ref of a ref |
205 | elsif ($ref_type eq 'REF') { |
206 | my $tmp = $make_shared->($$item); |
207 | $copy = \$tmp; |
208 | share($copy); |
209 | } |
210 | } |
211 | |
212 | # If no copy is created above, then just return the input item |
213 | # NOTE: This will end up generating an error for anything |
214 | # other than an ordinary scalar |
215 | return $item if (! defined($copy)); |
216 | |
217 | # Clone READONLY flag |
218 | if (Internals::SvREADONLY($item)) { |
219 | Internals::SvREADONLY($copy, 1); |
220 | } |
221 | |
222 | # If input item is an object, then bless the copy into the same class |
223 | if (my $class = Scalar::Util::blessed($item)) { |
224 | bless($copy, $class); |
225 | } |
226 | |
227 | return $copy; |
228 | }; |
229 | |
230 | # Check value of the requested index |
231 | $validate_index = sub { |
232 | my $index = shift; |
233 | |
234 | if (! looks_like_number($index) || (int($index) != $index)) { |
235 | require Carp; |
236 | my ($method) = (caller(1))[3]; |
237 | $method =~ s/Thread::Queue:://; |
238 | $index = 'undef' if (! defined($index)); |
239 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
240 | } |
241 | |
242 | return $index; |
243 | }; |
244 | |
245 | # Check value of the requested count |
246 | $validate_count = sub { |
247 | my $count = shift; |
248 | |
249 | if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) { |
250 | require Carp; |
251 | my ($method) = (caller(1))[3]; |
252 | $method =~ s/Thread::Queue:://; |
253 | $count = 'undef' if (! defined($count)); |
254 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
255 | } |
256 | |
257 | return $count; |
258 | }; |
259 | |
260 | 1; |
9c6f8578 |
261 | |
d516a115 |
262 | =head1 NAME |
263 | |
54c7876f |
264 | Thread::Queue - Thread-safe queues |
265 | |
266 | =head1 VERSION |
267 | |
7fb1c73b |
268 | This document describes Thread::Queue version 2.06 |
d516a115 |
269 | |
270 | =head1 SYNOPSIS |
271 | |
54c7876f |
272 | use strict; |
273 | use warnings; |
274 | |
275 | use threads; |
d516a115 |
276 | use Thread::Queue; |
54c7876f |
277 | |
278 | my $q = Thread::Queue->new(); # A new empty queue |
279 | |
280 | # Worker thread |
281 | my $thr = threads->create(sub { |
282 | while (my $item = $q->dequeue()) { |
283 | # Do work on $item |
284 | } |
285 | })->detach(); |
286 | |
287 | # Send work to the thread |
288 | $q->enqueue($item1, ...); |
289 | |
290 | |
291 | # Count of items in the queue |
292 | my $left = $q->pending(); |
293 | |
294 | # Non-blocking dequeue |
295 | if (defined(my $item = $q->dequeue_nb())) { |
296 | # Work on $item |
297 | } |
298 | |
299 | # Get the second item in the queue without dequeuing anything |
300 | my $item = $q->peek(1); |
301 | |
302 | # Insert two items into the queue just behind the head |
303 | $q->insert(1, $item1, $item2); |
304 | |
305 | # Extract the last two items on the queue |
306 | my ($item1, $item2) = $q->extract(-2, 2); |
d516a115 |
307 | |
a99072da |
308 | =head1 DESCRIPTION |
309 | |
54c7876f |
310 | This module provides thread-safe FIFO queues that can be accessed safely by |
311 | any number of threads. |
312 | |
313 | Any data types supported by L<threads::shared> can be passed via queues: |
314 | |
315 | =over |
316 | |
317 | =item Ordinary scalars |
318 | |
319 | =item Array refs |
320 | |
321 | =item Hash refs |
322 | |
323 | =item Scalar refs |
324 | |
325 | =item Objects based on the above |
326 | |
327 | =back |
328 | |
329 | Ordinary scalars are added to queues as they are. |
330 | |
331 | If not already thread-shared, the other complex data types will be cloned |
332 | (recursively, if needed, and including any C<bless>ings and read-only |
333 | settings) into thread-shared structures before being placed onto a queue. |
a99072da |
334 | |
54c7876f |
335 | For example, the following would cause L<Thread::Queue> to create a empty, |
336 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
337 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
338 | the queue: |
a99072da |
339 | |
54c7876f |
340 | my @ary = qw/foo bar baz/; |
341 | $q->enqueue(\@ary); |
a99072da |
342 | |
54c7876f |
343 | However, for the following, the items are already shared, so their references |
344 | are added directly to the queue, and no cloning takes place: |
a99072da |
345 | |
54c7876f |
346 | my @ary :shared = qw/foo bar baz/; |
347 | $q->enqueue(\@ary); |
a99072da |
348 | |
54c7876f |
349 | my $obj = &shared({}); |
350 | $$obj{'foo'} = 'bar'; |
351 | $$obj{'qux'} = 99; |
352 | bless($obj, 'My::Class'); |
353 | $q->enqueue($obj); |
a99072da |
354 | |
54c7876f |
355 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
a99072da |
356 | |
54c7876f |
357 | =head1 QUEUE CREATION |
a99072da |
358 | |
54c7876f |
359 | =over |
a99072da |
360 | |
54c7876f |
361 | =item ->new() |
a99072da |
362 | |
54c7876f |
363 | Creates a new empty queue. |
a99072da |
364 | |
54c7876f |
365 | =item ->new(LIST) |
a99072da |
366 | |
54c7876f |
367 | Creates a new queue pre-populated with the provided list of items. |
a99072da |
368 | |
369 | =back |
370 | |
54c7876f |
371 | =head1 BASIC METHODS |
a99072da |
372 | |
54c7876f |
373 | The following methods deal with queues on a FIFO basis. |
bbc7dcd2 |
374 | |
54c7876f |
375 | =over |
d516a115 |
376 | |
54c7876f |
377 | =item ->enqueue(LIST) |
d21067e0 |
378 | |
54c7876f |
379 | Adds a list of items onto the end of the queue. |
d21067e0 |
380 | |
54c7876f |
381 | =item ->dequeue() |
a99072da |
382 | |
54c7876f |
383 | =item ->dequeue(COUNT) |
d21067e0 |
384 | |
54c7876f |
385 | Removes the requested number of items (default is 1) from the head of the |
386 | queue, and returns them. If the queue contains fewer than the requested |
387 | number of items, then the thread will be blocked until the requisite number |
388 | of items are available (i.e., until other threads <enqueue> more items). |
a99072da |
389 | |
54c7876f |
390 | =item ->dequeue_nb() |
391 | |
392 | =item ->dequeue_nb(COUNT) |
393 | |
394 | Removes the requested number of items (default is 1) from the head of the |
395 | queue, and returns them. If the queue contains fewer than the requested |
396 | number of items, then it immediately (i.e., non-blocking) returns whatever |
397 | items there are on the queue. If the queue is empty, then C<undef> is |
398 | returned. |
399 | |
400 | =item ->pending() |
401 | |
402 | Returns the number of items still in the queue. |
403 | |
404 | =back |
405 | |
406 | =head1 ADVANCED METHODS |
407 | |
408 | The following methods can be used to manipulate items anywhere in a queue. |
409 | |
410 | To prevent the contents of a queue from being modified by another thread |
411 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
412 | VARIABLE"> the queue inside a local block: |
413 | |
414 | { |
415 | lock($q); # Keep other threads from changing the queue's contents |
416 | my $item = $q->peek(); |
417 | if ($item ...) { |
418 | ... |
419 | } |
420 | } |
421 | # Queue is now unlocked |
422 | |
423 | =over |
424 | |
425 | =item ->peek() |
426 | |
427 | =item ->peek(INDEX) |
428 | |
429 | Returns an item from the queue without dequeuing anything. Defaults to the |
430 | the head of queue (at index position 0) if no index is specified. Negative |
431 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
432 | is the end of the queue, -2 is next to last, and so on). |
433 | |
434 | If no items exists at the specified index (i.e., the queue is empty, or the |
435 | index is beyond the number of items on the queue), then C<undef> is returned. |
436 | |
437 | Remember, the returned item is not removed from the queue, so manipulating a |
438 | C<peek>ed at reference affects the item on the queue. |
439 | |
440 | =item ->insert(INDEX, LIST) |
441 | |
442 | Adds the list of items to the queue at the specified index position (0 |
443 | is the head of the list). Any existing items at and beyond that position are |
444 | pushed back past the newly added items: |
445 | |
446 | $q->enqueue(1, 2, 3, 4); |
447 | $q->insert(1, qw/foo bar/); |
448 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
83272a45 |
449 | |
54c7876f |
450 | Specifying an index position greater than the number of items in the queue |
451 | just adds the list to the end. |
83272a45 |
452 | |
54c7876f |
453 | Negative index positions are supported: |
454 | |
455 | $q->enqueue(1, 2, 3, 4); |
456 | $q->insert(-2, qw/foo bar/); |
457 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
458 | |
459 | Specifying a negative index position greater than the number of items in the |
460 | queue adds the list to the head of the queue. |
461 | |
462 | =item ->extract() |
463 | |
464 | =item ->extract(INDEX) |
465 | |
466 | =item ->extract(INDEX, COUNT) |
467 | |
468 | Removes and returns the specified number of items (defaults to 1) from the |
469 | specified index position in the queue (0 is the head of the queue). When |
470 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
471 | |
472 | This method is non-blocking, and will return only as many items as are |
473 | available to fulfill the request: |
474 | |
475 | $q->enqueue(1, 2, 3, 4); |
476 | my $item = $q->extract(2) # Returns 3 |
477 | # Queue now contains: 1, 2, 4 |
478 | my @items = $q->extract(1, 3) # Returns (2, 4) |
479 | # Queue now contains: 1 |
480 | |
481 | Specifying an index position greater than the number of items in the |
482 | queue results in C<undef> or an empty list being returned. |
483 | |
484 | $q->enqueue('foo'); |
485 | my $nada = $q->extract(3) # Returns undef |
486 | my @nada = $q->extract(1, 3) # Returns () |
487 | |
488 | Negative index positions are supported. Specifying a negative index position |
489 | greater than the number of items in the queue may return items from the head |
490 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
491 | queue from the specified position (i.e. if queue size + index + count is |
492 | greater than zero): |
493 | |
494 | $q->enqueue(qw/foo bar baz/); |
495 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
496 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
497 | # Queue now contains: bar, baz |
498 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
499 | |
500 | =back |
501 | |
7fb1c73b |
502 | =head1 NOTES |
503 | |
504 | Queues created by L<Thread::Queue> can be used in both threaded and |
505 | non-threaded applications. |
506 | |
54c7876f |
507 | =head1 LIMITATIONS |
508 | |
509 | Passing objects on queues may not work if the objects' classes do not support |
510 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
511 | |
512 | Passing array/hash refs that contain objects may not work for Perl prior to |
513 | 5.10.0. |
514 | |
515 | =head1 SEE ALSO |
516 | |
517 | Thread::Queue Discussion Forum on CPAN: |
518 | L<http://www.cpanforum.com/dist/Thread-Queue> |
519 | |
520 | Annotated POD for Thread::Queue: |
7fb1c73b |
521 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm> |
522 | |
523 | Source repository: |
524 | L<http://code.google.com/p/thread-queue/> |
54c7876f |
525 | |
526 | L<threads>, L<threads::shared> |
527 | |
528 | =head1 MAINTAINER |
529 | |
530 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
531 | |
532 | =head1 LICENSE |
533 | |
534 | This program is free software; you can redistribute it and/or modify it under |
535 | the same terms as Perl itself. |
536 | |
537 | =cut |