Commit | Line | Data |
d21067e0 |
1 | package Thread::Queue; |
28b605d8 |
2 | |
3d1f1caf |
3 | use strict; |
54c7876f |
4 | use warnings; |
28b605d8 |
5 | |
ac9d3a9d |
6 | our $VERSION = '2.07'; |
54c7876f |
7 | |
8 | use threads::shared 0.96; |
ac9d3a9d |
9 | use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); |
54c7876f |
10 | |
11 | # Predeclarations for internal functions |
12 | my ($make_shared, $validate_count, $validate_index); |
13 | |
14 | # Create a new queue possibly pre-populated with items |
15 | sub new |
16 | { |
17 | my $class = shift; |
ac9d3a9d |
18 | my @queue :shared = map { $make_shared->($_, {}) } @_; |
54c7876f |
19 | return bless(\@queue, $class); |
20 | } |
21 | |
22 | # Add items to the tail of a queue |
23 | sub enqueue |
24 | { |
25 | my $queue = shift; |
26 | lock(@$queue); |
ac9d3a9d |
27 | push(@$queue, map { $make_shared->($_, {}) } @_) |
54c7876f |
28 | and cond_signal(@$queue); |
29 | } |
30 | |
31 | # Return a count of the number of items on a queue |
32 | sub pending |
33 | { |
34 | my $queue = shift; |
35 | lock(@$queue); |
36 | return scalar(@$queue); |
37 | } |
38 | |
39 | # Return 1 or more items from the head of a queue, blocking if needed |
40 | sub dequeue |
41 | { |
42 | my $queue = shift; |
43 | lock(@$queue); |
44 | |
45 | my $count = @_ ? $validate_count->(shift) : 1; |
46 | |
47 | # Wait for requisite number of items |
48 | cond_wait(@$queue) until (@$queue >= $count); |
49 | cond_signal(@$queue) if (@$queue > $count); |
50 | |
51 | # Return single item |
52 | return shift(@$queue) if ($count == 1); |
53 | |
54 | # Return multiple items |
55 | my @items; |
56 | push(@items, shift(@$queue)) for (1..$count); |
57 | return @items; |
58 | } |
59 | |
60 | # Return items from the head of a queue with no blocking |
61 | sub dequeue_nb |
62 | { |
63 | my $queue = shift; |
64 | lock(@$queue); |
65 | |
66 | my $count = @_ ? $validate_count->(shift) : 1; |
67 | |
68 | # Return single item |
69 | return shift(@$queue) if ($count == 1); |
70 | |
71 | # Return multiple items |
72 | my @items; |
73 | for (1..$count) { |
74 | last if (! @$queue); |
75 | push(@items, shift(@$queue)); |
76 | } |
77 | return @items; |
78 | } |
79 | |
80 | # Return an item without removing it from a queue |
81 | sub peek |
82 | { |
83 | my $queue = shift; |
84 | lock(@$queue); |
85 | my $index = @_ ? $validate_index->(shift) : 0; |
86 | return $$queue[$index]; |
87 | } |
88 | |
89 | # Insert items anywhere into a queue |
90 | sub insert |
91 | { |
92 | my $queue = shift; |
93 | lock(@$queue); |
94 | |
95 | my $index = $validate_index->(shift); |
96 | |
97 | return if (! @_); # Nothing to insert |
98 | |
99 | # Support negative indices |
100 | if ($index < 0) { |
101 | $index += @$queue; |
102 | if ($index < 0) { |
103 | $index = 0; |
104 | } |
105 | } |
106 | |
107 | # Dequeue items from $index onward |
108 | my @tmp; |
109 | while (@$queue > $index) { |
110 | unshift(@tmp, pop(@$queue)) |
111 | } |
112 | |
113 | # Add new items to the queue |
ac9d3a9d |
114 | push(@$queue, map { $make_shared->($_, {}) } @_); |
54c7876f |
115 | |
116 | # Add previous items back onto the queue |
117 | push(@$queue, @tmp); |
118 | |
119 | # Soup's up |
120 | cond_signal(@$queue); |
121 | } |
122 | |
123 | # Remove items from anywhere in a queue |
124 | sub extract |
125 | { |
126 | my $queue = shift; |
127 | lock(@$queue); |
128 | |
129 | my $index = @_ ? $validate_index->(shift) : 0; |
130 | my $count = @_ ? $validate_count->(shift) : 1; |
131 | |
132 | # Support negative indices |
133 | if ($index < 0) { |
134 | $index += @$queue; |
135 | if ($index < 0) { |
136 | $count += $index; |
137 | return if ($count <= 0); # Beyond the head of the queue |
138 | return $queue->dequeue_nb($count); # Extract from the head |
139 | } |
140 | } |
141 | |
142 | # Dequeue items from $index+$count onward |
143 | my @tmp; |
144 | while (@$queue > ($index+$count)) { |
145 | unshift(@tmp, pop(@$queue)) |
146 | } |
147 | |
148 | # Extract desired items |
149 | my @items; |
150 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
151 | |
152 | # Add back any removed items |
153 | push(@$queue, @tmp); |
154 | |
155 | # Return single item |
156 | return $items[0] if ($count == 1); |
157 | |
158 | # Return multiple items |
159 | return @items; |
160 | } |
161 | |
162 | ### Internal Functions ### |
163 | |
164 | # Create a thread-shared version of a complex data structure or object |
165 | $make_shared = sub { |
ac9d3a9d |
166 | my ($item, $cloned) = @_; |
54c7876f |
167 | |
7fb1c73b |
168 | # If not running 'threads' or already thread-shared, |
169 | # then just return the input item |
170 | return $item if (! $threads::threads || |
171 | threads::shared::is_shared($item)); |
54c7876f |
172 | |
173 | # Make copies of array, hash and scalar refs |
174 | my $copy; |
ac9d3a9d |
175 | if (my $ref_type = reftype($item)) { |
176 | # Check for previously cloned references |
177 | # (this takes care of circular refs as well) |
178 | my $addr = refaddr($item); |
179 | if (defined($addr) && exists($cloned->{$addr})) { |
180 | # Return the already existing clone |
181 | return $cloned->{$addr}; |
182 | } |
183 | |
54c7876f |
184 | # Copy an array ref |
185 | if ($ref_type eq 'ARRAY') { |
186 | # Make empty shared array ref |
187 | $copy = &share([]); |
ac9d3a9d |
188 | # Add to clone checking hash |
189 | $cloned->{$addr} = $copy; |
54c7876f |
190 | # Recursively copy and add contents |
ac9d3a9d |
191 | push(@$copy, map { $make_shared->($_, $cloned) } @$item); |
54c7876f |
192 | } |
193 | |
194 | # Copy a hash ref |
195 | elsif ($ref_type eq 'HASH') { |
196 | # Make empty shared hash ref |
197 | $copy = &share({}); |
ac9d3a9d |
198 | # Add to clone checking hash |
199 | $cloned->{$addr} = $copy; |
54c7876f |
200 | # Recursively copy and add contents |
201 | foreach my $key (keys(%{$item})) { |
ac9d3a9d |
202 | $copy->{$key} = $make_shared->($item->{$key}, $cloned); |
54c7876f |
203 | } |
204 | } |
205 | |
206 | # Copy a scalar ref |
207 | elsif ($ref_type eq 'SCALAR') { |
208 | $copy = \do{ my $scalar = $$item; }; |
209 | share($copy); |
210 | # Clone READONLY flag |
211 | if (Internals::SvREADONLY($$item)) { |
212 | Internals::SvREADONLY($$copy, 1); |
213 | } |
ac9d3a9d |
214 | # Add to clone checking hash |
215 | $cloned->{$addr} = $copy; |
54c7876f |
216 | } |
217 | |
218 | # Copy of a ref of a ref |
219 | elsif ($ref_type eq 'REF') { |
ac9d3a9d |
220 | # Special handling for $x = \$x |
221 | my $addr2 = refaddr($$item); |
222 | if ($addr2 == $addr) { |
223 | $copy = \$copy; |
224 | share($copy); |
225 | $cloned->{$addr} = $copy; |
226 | } else { |
227 | my $tmp; |
228 | $copy = \$tmp; |
229 | share($copy); |
230 | # Add to clone checking hash |
231 | $cloned->{$addr} = $copy; |
232 | # Recursively copy and add contents |
233 | $tmp = $make_shared->($$item, $cloned); |
234 | } |
54c7876f |
235 | } |
236 | } |
237 | |
238 | # If no copy is created above, then just return the input item |
239 | # NOTE: This will end up generating an error for anything |
240 | # other than an ordinary scalar |
241 | return $item if (! defined($copy)); |
242 | |
ac9d3a9d |
243 | # If input item is an object, then bless the copy into the same class |
244 | if (my $class = blessed($item)) { |
245 | bless($copy, $class); |
246 | } |
247 | |
54c7876f |
248 | # Clone READONLY flag |
249 | if (Internals::SvREADONLY($item)) { |
250 | Internals::SvREADONLY($copy, 1); |
251 | } |
252 | |
54c7876f |
253 | return $copy; |
254 | }; |
255 | |
256 | # Check value of the requested index |
257 | $validate_index = sub { |
258 | my $index = shift; |
259 | |
260 | if (! looks_like_number($index) || (int($index) != $index)) { |
261 | require Carp; |
262 | my ($method) = (caller(1))[3]; |
263 | $method =~ s/Thread::Queue:://; |
264 | $index = 'undef' if (! defined($index)); |
265 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
266 | } |
267 | |
268 | return $index; |
269 | }; |
270 | |
271 | # Check value of the requested count |
272 | $validate_count = sub { |
273 | my $count = shift; |
274 | |
275 | if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) { |
276 | require Carp; |
277 | my ($method) = (caller(1))[3]; |
278 | $method =~ s/Thread::Queue:://; |
279 | $count = 'undef' if (! defined($count)); |
280 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
281 | } |
282 | |
283 | return $count; |
284 | }; |
285 | |
286 | 1; |
9c6f8578 |
287 | |
d516a115 |
288 | =head1 NAME |
289 | |
54c7876f |
290 | Thread::Queue - Thread-safe queues |
291 | |
292 | =head1 VERSION |
293 | |
ac9d3a9d |
294 | This document describes Thread::Queue version 2.07 |
d516a115 |
295 | |
296 | =head1 SYNOPSIS |
297 | |
54c7876f |
298 | use strict; |
299 | use warnings; |
300 | |
301 | use threads; |
d516a115 |
302 | use Thread::Queue; |
54c7876f |
303 | |
304 | my $q = Thread::Queue->new(); # A new empty queue |
305 | |
306 | # Worker thread |
307 | my $thr = threads->create(sub { |
308 | while (my $item = $q->dequeue()) { |
309 | # Do work on $item |
310 | } |
311 | })->detach(); |
312 | |
313 | # Send work to the thread |
314 | $q->enqueue($item1, ...); |
315 | |
316 | |
317 | # Count of items in the queue |
318 | my $left = $q->pending(); |
319 | |
320 | # Non-blocking dequeue |
321 | if (defined(my $item = $q->dequeue_nb())) { |
322 | # Work on $item |
323 | } |
324 | |
325 | # Get the second item in the queue without dequeuing anything |
326 | my $item = $q->peek(1); |
327 | |
328 | # Insert two items into the queue just behind the head |
329 | $q->insert(1, $item1, $item2); |
330 | |
331 | # Extract the last two items on the queue |
332 | my ($item1, $item2) = $q->extract(-2, 2); |
d516a115 |
333 | |
a99072da |
334 | =head1 DESCRIPTION |
335 | |
54c7876f |
336 | This module provides thread-safe FIFO queues that can be accessed safely by |
337 | any number of threads. |
338 | |
339 | Any data types supported by L<threads::shared> can be passed via queues: |
340 | |
341 | =over |
342 | |
343 | =item Ordinary scalars |
344 | |
345 | =item Array refs |
346 | |
347 | =item Hash refs |
348 | |
349 | =item Scalar refs |
350 | |
351 | =item Objects based on the above |
352 | |
353 | =back |
354 | |
355 | Ordinary scalars are added to queues as they are. |
356 | |
357 | If not already thread-shared, the other complex data types will be cloned |
358 | (recursively, if needed, and including any C<bless>ings and read-only |
359 | settings) into thread-shared structures before being placed onto a queue. |
a99072da |
360 | |
54c7876f |
361 | For example, the following would cause L<Thread::Queue> to create a empty, |
362 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
363 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
364 | the queue: |
a99072da |
365 | |
54c7876f |
366 | my @ary = qw/foo bar baz/; |
367 | $q->enqueue(\@ary); |
a99072da |
368 | |
54c7876f |
369 | However, for the following, the items are already shared, so their references |
370 | are added directly to the queue, and no cloning takes place: |
a99072da |
371 | |
54c7876f |
372 | my @ary :shared = qw/foo bar baz/; |
373 | $q->enqueue(\@ary); |
a99072da |
374 | |
54c7876f |
375 | my $obj = &shared({}); |
376 | $$obj{'foo'} = 'bar'; |
377 | $$obj{'qux'} = 99; |
378 | bless($obj, 'My::Class'); |
379 | $q->enqueue($obj); |
a99072da |
380 | |
54c7876f |
381 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
a99072da |
382 | |
54c7876f |
383 | =head1 QUEUE CREATION |
a99072da |
384 | |
54c7876f |
385 | =over |
a99072da |
386 | |
54c7876f |
387 | =item ->new() |
a99072da |
388 | |
54c7876f |
389 | Creates a new empty queue. |
a99072da |
390 | |
54c7876f |
391 | =item ->new(LIST) |
a99072da |
392 | |
54c7876f |
393 | Creates a new queue pre-populated with the provided list of items. |
a99072da |
394 | |
395 | =back |
396 | |
54c7876f |
397 | =head1 BASIC METHODS |
a99072da |
398 | |
54c7876f |
399 | The following methods deal with queues on a FIFO basis. |
bbc7dcd2 |
400 | |
54c7876f |
401 | =over |
d516a115 |
402 | |
54c7876f |
403 | =item ->enqueue(LIST) |
d21067e0 |
404 | |
54c7876f |
405 | Adds a list of items onto the end of the queue. |
d21067e0 |
406 | |
54c7876f |
407 | =item ->dequeue() |
a99072da |
408 | |
54c7876f |
409 | =item ->dequeue(COUNT) |
d21067e0 |
410 | |
54c7876f |
411 | Removes the requested number of items (default is 1) from the head of the |
412 | queue, and returns them. If the queue contains fewer than the requested |
413 | number of items, then the thread will be blocked until the requisite number |
414 | of items are available (i.e., until other threads <enqueue> more items). |
a99072da |
415 | |
54c7876f |
416 | =item ->dequeue_nb() |
417 | |
418 | =item ->dequeue_nb(COUNT) |
419 | |
420 | Removes the requested number of items (default is 1) from the head of the |
421 | queue, and returns them. If the queue contains fewer than the requested |
422 | number of items, then it immediately (i.e., non-blocking) returns whatever |
423 | items there are on the queue. If the queue is empty, then C<undef> is |
424 | returned. |
425 | |
426 | =item ->pending() |
427 | |
428 | Returns the number of items still in the queue. |
429 | |
430 | =back |
431 | |
432 | =head1 ADVANCED METHODS |
433 | |
434 | The following methods can be used to manipulate items anywhere in a queue. |
435 | |
436 | To prevent the contents of a queue from being modified by another thread |
437 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
438 | VARIABLE"> the queue inside a local block: |
439 | |
440 | { |
441 | lock($q); # Keep other threads from changing the queue's contents |
442 | my $item = $q->peek(); |
443 | if ($item ...) { |
444 | ... |
445 | } |
446 | } |
447 | # Queue is now unlocked |
448 | |
449 | =over |
450 | |
451 | =item ->peek() |
452 | |
453 | =item ->peek(INDEX) |
454 | |
455 | Returns an item from the queue without dequeuing anything. Defaults to the |
456 | the head of queue (at index position 0) if no index is specified. Negative |
457 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
458 | is the end of the queue, -2 is next to last, and so on). |
459 | |
460 | If no items exists at the specified index (i.e., the queue is empty, or the |
461 | index is beyond the number of items on the queue), then C<undef> is returned. |
462 | |
463 | Remember, the returned item is not removed from the queue, so manipulating a |
464 | C<peek>ed at reference affects the item on the queue. |
465 | |
466 | =item ->insert(INDEX, LIST) |
467 | |
468 | Adds the list of items to the queue at the specified index position (0 |
469 | is the head of the list). Any existing items at and beyond that position are |
470 | pushed back past the newly added items: |
471 | |
472 | $q->enqueue(1, 2, 3, 4); |
473 | $q->insert(1, qw/foo bar/); |
474 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
83272a45 |
475 | |
54c7876f |
476 | Specifying an index position greater than the number of items in the queue |
477 | just adds the list to the end. |
83272a45 |
478 | |
54c7876f |
479 | Negative index positions are supported: |
480 | |
481 | $q->enqueue(1, 2, 3, 4); |
482 | $q->insert(-2, qw/foo bar/); |
483 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
484 | |
485 | Specifying a negative index position greater than the number of items in the |
486 | queue adds the list to the head of the queue. |
487 | |
488 | =item ->extract() |
489 | |
490 | =item ->extract(INDEX) |
491 | |
492 | =item ->extract(INDEX, COUNT) |
493 | |
494 | Removes and returns the specified number of items (defaults to 1) from the |
495 | specified index position in the queue (0 is the head of the queue). When |
496 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
497 | |
498 | This method is non-blocking, and will return only as many items as are |
499 | available to fulfill the request: |
500 | |
501 | $q->enqueue(1, 2, 3, 4); |
502 | my $item = $q->extract(2) # Returns 3 |
503 | # Queue now contains: 1, 2, 4 |
504 | my @items = $q->extract(1, 3) # Returns (2, 4) |
505 | # Queue now contains: 1 |
506 | |
507 | Specifying an index position greater than the number of items in the |
508 | queue results in C<undef> or an empty list being returned. |
509 | |
510 | $q->enqueue('foo'); |
511 | my $nada = $q->extract(3) # Returns undef |
512 | my @nada = $q->extract(1, 3) # Returns () |
513 | |
514 | Negative index positions are supported. Specifying a negative index position |
515 | greater than the number of items in the queue may return items from the head |
516 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
517 | queue from the specified position (i.e. if queue size + index + count is |
518 | greater than zero): |
519 | |
520 | $q->enqueue(qw/foo bar baz/); |
521 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
522 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
523 | # Queue now contains: bar, baz |
524 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
525 | |
526 | =back |
527 | |
7fb1c73b |
528 | =head1 NOTES |
529 | |
530 | Queues created by L<Thread::Queue> can be used in both threaded and |
531 | non-threaded applications. |
532 | |
54c7876f |
533 | =head1 LIMITATIONS |
534 | |
535 | Passing objects on queues may not work if the objects' classes do not support |
536 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
537 | |
538 | Passing array/hash refs that contain objects may not work for Perl prior to |
539 | 5.10.0. |
540 | |
541 | =head1 SEE ALSO |
542 | |
543 | Thread::Queue Discussion Forum on CPAN: |
544 | L<http://www.cpanforum.com/dist/Thread-Queue> |
545 | |
546 | Annotated POD for Thread::Queue: |
ac9d3a9d |
547 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm> |
7fb1c73b |
548 | |
549 | Source repository: |
550 | L<http://code.google.com/p/thread-queue/> |
54c7876f |
551 | |
552 | L<threads>, L<threads::shared> |
553 | |
554 | =head1 MAINTAINER |
555 | |
556 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
557 | |
558 | =head1 LICENSE |
559 | |
560 | This program is free software; you can redistribute it and/or modify it under |
561 | the same terms as Perl itself. |
562 | |
563 | =cut |