Commit | Line | Data |
d21067e0 |
1 | package Thread::Queue; |
28b605d8 |
2 | |
3d1f1caf |
3 | use strict; |
54c7876f |
4 | use warnings; |
28b605d8 |
5 | |
54c7876f |
6 | our $VERSION = '2.03'; |
7 | |
8 | use threads::shared 0.96; |
9 | use Scalar::Util 1.10 qw(looks_like_number); |
10 | |
11 | # Predeclarations for internal functions |
12 | my ($make_shared, $validate_count, $validate_index); |
13 | |
14 | # Create a new queue possibly pre-populated with items |
15 | sub new |
16 | { |
17 | my $class = shift; |
18 | my @queue :shared = map { $make_shared->($_) } @_; |
19 | return bless(\@queue, $class); |
20 | } |
21 | |
22 | # Add items to the tail of a queue |
23 | sub enqueue |
24 | { |
25 | my $queue = shift; |
26 | lock(@$queue); |
27 | push(@$queue, map { $make_shared->($_) } @_) |
28 | and cond_signal(@$queue); |
29 | } |
30 | |
31 | # Return a count of the number of items on a queue |
32 | sub pending |
33 | { |
34 | my $queue = shift; |
35 | lock(@$queue); |
36 | return scalar(@$queue); |
37 | } |
38 | |
39 | # Return 1 or more items from the head of a queue, blocking if needed |
40 | sub dequeue |
41 | { |
42 | my $queue = shift; |
43 | lock(@$queue); |
44 | |
45 | my $count = @_ ? $validate_count->(shift) : 1; |
46 | |
47 | # Wait for requisite number of items |
48 | cond_wait(@$queue) until (@$queue >= $count); |
49 | cond_signal(@$queue) if (@$queue > $count); |
50 | |
51 | # Return single item |
52 | return shift(@$queue) if ($count == 1); |
53 | |
54 | # Return multiple items |
55 | my @items; |
56 | push(@items, shift(@$queue)) for (1..$count); |
57 | return @items; |
58 | } |
59 | |
60 | # Return items from the head of a queue with no blocking |
61 | sub dequeue_nb |
62 | { |
63 | my $queue = shift; |
64 | lock(@$queue); |
65 | |
66 | my $count = @_ ? $validate_count->(shift) : 1; |
67 | |
68 | # Return single item |
69 | return shift(@$queue) if ($count == 1); |
70 | |
71 | # Return multiple items |
72 | my @items; |
73 | for (1..$count) { |
74 | last if (! @$queue); |
75 | push(@items, shift(@$queue)); |
76 | } |
77 | return @items; |
78 | } |
79 | |
80 | # Return an item without removing it from a queue |
81 | sub peek |
82 | { |
83 | my $queue = shift; |
84 | lock(@$queue); |
85 | my $index = @_ ? $validate_index->(shift) : 0; |
86 | return $$queue[$index]; |
87 | } |
88 | |
89 | # Insert items anywhere into a queue |
90 | sub insert |
91 | { |
92 | my $queue = shift; |
93 | lock(@$queue); |
94 | |
95 | my $index = $validate_index->(shift); |
96 | |
97 | return if (! @_); # Nothing to insert |
98 | |
99 | # Support negative indices |
100 | if ($index < 0) { |
101 | $index += @$queue; |
102 | if ($index < 0) { |
103 | $index = 0; |
104 | } |
105 | } |
106 | |
107 | # Dequeue items from $index onward |
108 | my @tmp; |
109 | while (@$queue > $index) { |
110 | unshift(@tmp, pop(@$queue)) |
111 | } |
112 | |
113 | # Add new items to the queue |
114 | push(@$queue, map { $make_shared->($_) } @_); |
115 | |
116 | # Add previous items back onto the queue |
117 | push(@$queue, @tmp); |
118 | |
119 | # Soup's up |
120 | cond_signal(@$queue); |
121 | } |
122 | |
123 | # Remove items from anywhere in a queue |
124 | sub extract |
125 | { |
126 | my $queue = shift; |
127 | lock(@$queue); |
128 | |
129 | my $index = @_ ? $validate_index->(shift) : 0; |
130 | my $count = @_ ? $validate_count->(shift) : 1; |
131 | |
132 | # Support negative indices |
133 | if ($index < 0) { |
134 | $index += @$queue; |
135 | if ($index < 0) { |
136 | $count += $index; |
137 | return if ($count <= 0); # Beyond the head of the queue |
138 | return $queue->dequeue_nb($count); # Extract from the head |
139 | } |
140 | } |
141 | |
142 | # Dequeue items from $index+$count onward |
143 | my @tmp; |
144 | while (@$queue > ($index+$count)) { |
145 | unshift(@tmp, pop(@$queue)) |
146 | } |
147 | |
148 | # Extract desired items |
149 | my @items; |
150 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
151 | |
152 | # Add back any removed items |
153 | push(@$queue, @tmp); |
154 | |
155 | # Return single item |
156 | return $items[0] if ($count == 1); |
157 | |
158 | # Return multiple items |
159 | return @items; |
160 | } |
161 | |
162 | ### Internal Functions ### |
163 | |
164 | # Create a thread-shared version of a complex data structure or object |
165 | $make_shared = sub { |
166 | my $item = shift; |
167 | |
168 | # If already thread-shared, then just return the input item |
169 | return $item if (threads::shared::is_shared($item)); |
170 | |
171 | # Make copies of array, hash and scalar refs |
172 | my $copy; |
173 | if (my $ref_type = Scalar::Util::reftype($item)) { |
174 | # Copy an array ref |
175 | if ($ref_type eq 'ARRAY') { |
176 | # Make empty shared array ref |
177 | $copy = &share([]); |
178 | # Recursively copy and add contents |
179 | push(@$copy, map { $make_shared->($_) } @$item); |
180 | } |
181 | |
182 | # Copy a hash ref |
183 | elsif ($ref_type eq 'HASH') { |
184 | # Make empty shared hash ref |
185 | $copy = &share({}); |
186 | # Recursively copy and add contents |
187 | foreach my $key (keys(%{$item})) { |
188 | $copy->{$key} = $make_shared->($item->{$key}); |
189 | } |
190 | } |
191 | |
192 | # Copy a scalar ref |
193 | elsif ($ref_type eq 'SCALAR') { |
194 | $copy = \do{ my $scalar = $$item; }; |
195 | share($copy); |
196 | # Clone READONLY flag |
197 | if (Internals::SvREADONLY($$item)) { |
198 | Internals::SvREADONLY($$copy, 1); |
199 | } |
200 | } |
201 | |
202 | # Copy of a ref of a ref |
203 | elsif ($ref_type eq 'REF') { |
204 | my $tmp = $make_shared->($$item); |
205 | $copy = \$tmp; |
206 | share($copy); |
207 | } |
208 | } |
209 | |
210 | # If no copy is created above, then just return the input item |
211 | # NOTE: This will end up generating an error for anything |
212 | # other than an ordinary scalar |
213 | return $item if (! defined($copy)); |
214 | |
215 | # Clone READONLY flag |
216 | if (Internals::SvREADONLY($item)) { |
217 | Internals::SvREADONLY($copy, 1); |
218 | } |
219 | |
220 | # If input item is an object, then bless the copy into the same class |
221 | if (my $class = Scalar::Util::blessed($item)) { |
222 | bless($copy, $class); |
223 | } |
224 | |
225 | return $copy; |
226 | }; |
227 | |
228 | # Check value of the requested index |
229 | $validate_index = sub { |
230 | my $index = shift; |
231 | |
232 | if (! looks_like_number($index) || (int($index) != $index)) { |
233 | require Carp; |
234 | my ($method) = (caller(1))[3]; |
235 | $method =~ s/Thread::Queue:://; |
236 | $index = 'undef' if (! defined($index)); |
237 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
238 | } |
239 | |
240 | return $index; |
241 | }; |
242 | |
243 | # Check value of the requested count |
244 | $validate_count = sub { |
245 | my $count = shift; |
246 | |
247 | if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) { |
248 | require Carp; |
249 | my ($method) = (caller(1))[3]; |
250 | $method =~ s/Thread::Queue:://; |
251 | $count = 'undef' if (! defined($count)); |
252 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
253 | } |
254 | |
255 | return $count; |
256 | }; |
257 | |
258 | 1; |
9c6f8578 |
259 | |
d516a115 |
260 | =head1 NAME |
261 | |
54c7876f |
262 | Thread::Queue - Thread-safe queues |
263 | |
264 | =head1 VERSION |
265 | |
266 | This document describes Thread::Queue version 2.03 |
d516a115 |
267 | |
268 | =head1 SYNOPSIS |
269 | |
54c7876f |
270 | use strict; |
271 | use warnings; |
272 | |
273 | use threads; |
d516a115 |
274 | use Thread::Queue; |
54c7876f |
275 | |
276 | my $q = Thread::Queue->new(); # A new empty queue |
277 | |
278 | # Worker thread |
279 | my $thr = threads->create(sub { |
280 | while (my $item = $q->dequeue()) { |
281 | # Do work on $item |
282 | } |
283 | })->detach(); |
284 | |
285 | # Send work to the thread |
286 | $q->enqueue($item1, ...); |
287 | |
288 | |
289 | # Count of items in the queue |
290 | my $left = $q->pending(); |
291 | |
292 | # Non-blocking dequeue |
293 | if (defined(my $item = $q->dequeue_nb())) { |
294 | # Work on $item |
295 | } |
296 | |
297 | # Get the second item in the queue without dequeuing anything |
298 | my $item = $q->peek(1); |
299 | |
300 | # Insert two items into the queue just behind the head |
301 | $q->insert(1, $item1, $item2); |
302 | |
303 | # Extract the last two items on the queue |
304 | my ($item1, $item2) = $q->extract(-2, 2); |
d516a115 |
305 | |
a99072da |
306 | =head1 DESCRIPTION |
307 | |
54c7876f |
308 | This module provides thread-safe FIFO queues that can be accessed safely by |
309 | any number of threads. |
310 | |
311 | Any data types supported by L<threads::shared> can be passed via queues: |
312 | |
313 | =over |
314 | |
315 | =item Ordinary scalars |
316 | |
317 | =item Array refs |
318 | |
319 | =item Hash refs |
320 | |
321 | =item Scalar refs |
322 | |
323 | =item Objects based on the above |
324 | |
325 | =back |
326 | |
327 | Ordinary scalars are added to queues as they are. |
328 | |
329 | If not already thread-shared, the other complex data types will be cloned |
330 | (recursively, if needed, and including any C<bless>ings and read-only |
331 | settings) into thread-shared structures before being placed onto a queue. |
a99072da |
332 | |
54c7876f |
333 | For example, the following would cause L<Thread::Queue> to create a empty, |
334 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
335 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
336 | the queue: |
a99072da |
337 | |
54c7876f |
338 | my @ary = qw/foo bar baz/; |
339 | $q->enqueue(\@ary); |
a99072da |
340 | |
54c7876f |
341 | However, for the following, the items are already shared, so their references |
342 | are added directly to the queue, and no cloning takes place: |
a99072da |
343 | |
54c7876f |
344 | my @ary :shared = qw/foo bar baz/; |
345 | $q->enqueue(\@ary); |
a99072da |
346 | |
54c7876f |
347 | my $obj = &shared({}); |
348 | $$obj{'foo'} = 'bar'; |
349 | $$obj{'qux'} = 99; |
350 | bless($obj, 'My::Class'); |
351 | $q->enqueue($obj); |
a99072da |
352 | |
54c7876f |
353 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
a99072da |
354 | |
54c7876f |
355 | =head1 QUEUE CREATION |
a99072da |
356 | |
54c7876f |
357 | =over |
a99072da |
358 | |
54c7876f |
359 | =item ->new() |
a99072da |
360 | |
54c7876f |
361 | Creates a new empty queue. |
a99072da |
362 | |
54c7876f |
363 | =item ->new(LIST) |
a99072da |
364 | |
54c7876f |
365 | Creates a new queue pre-populated with the provided list of items. |
a99072da |
366 | |
367 | =back |
368 | |
54c7876f |
369 | =head1 BASIC METHODS |
a99072da |
370 | |
54c7876f |
371 | The following methods deal with queues on a FIFO basis. |
bbc7dcd2 |
372 | |
54c7876f |
373 | =over |
d516a115 |
374 | |
54c7876f |
375 | =item ->enqueue(LIST) |
d21067e0 |
376 | |
54c7876f |
377 | Adds a list of items onto the end of the queue. |
d21067e0 |
378 | |
54c7876f |
379 | =item ->dequeue() |
a99072da |
380 | |
54c7876f |
381 | =item ->dequeue(COUNT) |
d21067e0 |
382 | |
54c7876f |
383 | Removes the requested number of items (default is 1) from the head of the |
384 | queue, and returns them. If the queue contains fewer than the requested |
385 | number of items, then the thread will be blocked until the requisite number |
386 | of items are available (i.e., until other threads <enqueue> more items). |
a99072da |
387 | |
54c7876f |
388 | =item ->dequeue_nb() |
389 | |
390 | =item ->dequeue_nb(COUNT) |
391 | |
392 | Removes the requested number of items (default is 1) from the head of the |
393 | queue, and returns them. If the queue contains fewer than the requested |
394 | number of items, then it immediately (i.e., non-blocking) returns whatever |
395 | items there are on the queue. If the queue is empty, then C<undef> is |
396 | returned. |
397 | |
398 | =item ->pending() |
399 | |
400 | Returns the number of items still in the queue. |
401 | |
402 | =back |
403 | |
404 | =head1 ADVANCED METHODS |
405 | |
406 | The following methods can be used to manipulate items anywhere in a queue. |
407 | |
408 | To prevent the contents of a queue from being modified by another thread |
409 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
410 | VARIABLE"> the queue inside a local block: |
411 | |
412 | { |
413 | lock($q); # Keep other threads from changing the queue's contents |
414 | my $item = $q->peek(); |
415 | if ($item ...) { |
416 | ... |
417 | } |
418 | } |
419 | # Queue is now unlocked |
420 | |
421 | =over |
422 | |
423 | =item ->peek() |
424 | |
425 | =item ->peek(INDEX) |
426 | |
427 | Returns an item from the queue without dequeuing anything. Defaults to the |
428 | the head of queue (at index position 0) if no index is specified. Negative |
429 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
430 | is the end of the queue, -2 is next to last, and so on). |
431 | |
432 | If no items exists at the specified index (i.e., the queue is empty, or the |
433 | index is beyond the number of items on the queue), then C<undef> is returned. |
434 | |
435 | Remember, the returned item is not removed from the queue, so manipulating a |
436 | C<peek>ed at reference affects the item on the queue. |
437 | |
438 | =item ->insert(INDEX, LIST) |
439 | |
440 | Adds the list of items to the queue at the specified index position (0 |
441 | is the head of the list). Any existing items at and beyond that position are |
442 | pushed back past the newly added items: |
443 | |
444 | $q->enqueue(1, 2, 3, 4); |
445 | $q->insert(1, qw/foo bar/); |
446 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
83272a45 |
447 | |
54c7876f |
448 | Specifying an index position greater than the number of items in the queue |
449 | just adds the list to the end. |
83272a45 |
450 | |
54c7876f |
451 | Negative index positions are supported: |
452 | |
453 | $q->enqueue(1, 2, 3, 4); |
454 | $q->insert(-2, qw/foo bar/); |
455 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
456 | |
457 | Specifying a negative index position greater than the number of items in the |
458 | queue adds the list to the head of the queue. |
459 | |
460 | =item ->extract() |
461 | |
462 | =item ->extract(INDEX) |
463 | |
464 | =item ->extract(INDEX, COUNT) |
465 | |
466 | Removes and returns the specified number of items (defaults to 1) from the |
467 | specified index position in the queue (0 is the head of the queue). When |
468 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
469 | |
470 | This method is non-blocking, and will return only as many items as are |
471 | available to fulfill the request: |
472 | |
473 | $q->enqueue(1, 2, 3, 4); |
474 | my $item = $q->extract(2) # Returns 3 |
475 | # Queue now contains: 1, 2, 4 |
476 | my @items = $q->extract(1, 3) # Returns (2, 4) |
477 | # Queue now contains: 1 |
478 | |
479 | Specifying an index position greater than the number of items in the |
480 | queue results in C<undef> or an empty list being returned. |
481 | |
482 | $q->enqueue('foo'); |
483 | my $nada = $q->extract(3) # Returns undef |
484 | my @nada = $q->extract(1, 3) # Returns () |
485 | |
486 | Negative index positions are supported. Specifying a negative index position |
487 | greater than the number of items in the queue may return items from the head |
488 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
489 | queue from the specified position (i.e. if queue size + index + count is |
490 | greater than zero): |
491 | |
492 | $q->enqueue(qw/foo bar baz/); |
493 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
494 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
495 | # Queue now contains: bar, baz |
496 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
497 | |
498 | =back |
499 | |
500 | =head1 LIMITATIONS |
501 | |
502 | Passing objects on queues may not work if the objects' classes do not support |
503 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
504 | |
505 | Passing array/hash refs that contain objects may not work for Perl prior to |
506 | 5.10.0. |
507 | |
508 | =head1 SEE ALSO |
509 | |
510 | Thread::Queue Discussion Forum on CPAN: |
511 | L<http://www.cpanforum.com/dist/Thread-Queue> |
512 | |
513 | Annotated POD for Thread::Queue: |
514 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.03/lib/Thread/Queue.pm> |
515 | |
516 | L<threads>, L<threads::shared> |
517 | |
518 | =head1 MAINTAINER |
519 | |
520 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
521 | |
522 | =head1 LICENSE |
523 | |
524 | This program is free software; you can redistribute it and/or modify it under |
525 | the same terms as Perl itself. |
526 | |
527 | =cut |