Commit | Line | Data |
d21067e0 |
1 | package Thread::Queue; |
28b605d8 |
2 | |
3d1f1caf |
3 | use strict; |
54c7876f |
4 | use warnings; |
28b605d8 |
5 | |
3d4f2f89 |
6 | our $VERSION = '2.11'; |
54c7876f |
7 | |
09782346 |
8 | use threads::shared 1.21; |
ac9d3a9d |
9 | use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); |
54c7876f |
10 | |
09782346 |
11 | # Carp errors from threads::shared calls should complain about caller |
12 | our @CARP_NOT = ("threads::shared"); |
13 | |
54c7876f |
14 | # Predeclarations for internal functions |
09782346 |
15 | my ($validate_count, $validate_index); |
54c7876f |
16 | |
17 | # Create a new queue possibly pre-populated with items |
18 | sub new |
19 | { |
20 | my $class = shift; |
09782346 |
21 | my @queue :shared = map { shared_clone($_) } @_; |
54c7876f |
22 | return bless(\@queue, $class); |
23 | } |
24 | |
25 | # Add items to the tail of a queue |
26 | sub enqueue |
27 | { |
28 | my $queue = shift; |
29 | lock(@$queue); |
09782346 |
30 | push(@$queue, map { shared_clone($_) } @_) |
54c7876f |
31 | and cond_signal(@$queue); |
32 | } |
33 | |
34 | # Return a count of the number of items on a queue |
35 | sub pending |
36 | { |
37 | my $queue = shift; |
38 | lock(@$queue); |
39 | return scalar(@$queue); |
40 | } |
41 | |
42 | # Return 1 or more items from the head of a queue, blocking if needed |
43 | sub dequeue |
44 | { |
45 | my $queue = shift; |
46 | lock(@$queue); |
47 | |
48 | my $count = @_ ? $validate_count->(shift) : 1; |
49 | |
50 | # Wait for requisite number of items |
51 | cond_wait(@$queue) until (@$queue >= $count); |
52 | cond_signal(@$queue) if (@$queue > $count); |
53 | |
54 | # Return single item |
55 | return shift(@$queue) if ($count == 1); |
56 | |
57 | # Return multiple items |
58 | my @items; |
59 | push(@items, shift(@$queue)) for (1..$count); |
60 | return @items; |
61 | } |
62 | |
63 | # Return items from the head of a queue with no blocking |
64 | sub dequeue_nb |
65 | { |
66 | my $queue = shift; |
67 | lock(@$queue); |
68 | |
69 | my $count = @_ ? $validate_count->(shift) : 1; |
70 | |
71 | # Return single item |
72 | return shift(@$queue) if ($count == 1); |
73 | |
74 | # Return multiple items |
75 | my @items; |
76 | for (1..$count) { |
77 | last if (! @$queue); |
78 | push(@items, shift(@$queue)); |
79 | } |
80 | return @items; |
81 | } |
82 | |
83 | # Return an item without removing it from a queue |
84 | sub peek |
85 | { |
86 | my $queue = shift; |
87 | lock(@$queue); |
88 | my $index = @_ ? $validate_index->(shift) : 0; |
89 | return $$queue[$index]; |
90 | } |
91 | |
92 | # Insert items anywhere into a queue |
93 | sub insert |
94 | { |
95 | my $queue = shift; |
96 | lock(@$queue); |
97 | |
98 | my $index = $validate_index->(shift); |
99 | |
100 | return if (! @_); # Nothing to insert |
101 | |
102 | # Support negative indices |
103 | if ($index < 0) { |
104 | $index += @$queue; |
105 | if ($index < 0) { |
106 | $index = 0; |
107 | } |
108 | } |
109 | |
110 | # Dequeue items from $index onward |
111 | my @tmp; |
112 | while (@$queue > $index) { |
113 | unshift(@tmp, pop(@$queue)) |
114 | } |
115 | |
116 | # Add new items to the queue |
09782346 |
117 | push(@$queue, map { shared_clone($_) } @_); |
54c7876f |
118 | |
119 | # Add previous items back onto the queue |
120 | push(@$queue, @tmp); |
121 | |
122 | # Soup's up |
123 | cond_signal(@$queue); |
124 | } |
125 | |
126 | # Remove items from anywhere in a queue |
127 | sub extract |
128 | { |
129 | my $queue = shift; |
130 | lock(@$queue); |
131 | |
132 | my $index = @_ ? $validate_index->(shift) : 0; |
133 | my $count = @_ ? $validate_count->(shift) : 1; |
134 | |
135 | # Support negative indices |
136 | if ($index < 0) { |
137 | $index += @$queue; |
138 | if ($index < 0) { |
139 | $count += $index; |
140 | return if ($count <= 0); # Beyond the head of the queue |
141 | return $queue->dequeue_nb($count); # Extract from the head |
142 | } |
143 | } |
144 | |
145 | # Dequeue items from $index+$count onward |
146 | my @tmp; |
147 | while (@$queue > ($index+$count)) { |
148 | unshift(@tmp, pop(@$queue)) |
149 | } |
150 | |
151 | # Extract desired items |
152 | my @items; |
153 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
154 | |
155 | # Add back any removed items |
156 | push(@$queue, @tmp); |
157 | |
158 | # Return single item |
159 | return $items[0] if ($count == 1); |
160 | |
161 | # Return multiple items |
162 | return @items; |
163 | } |
164 | |
165 | ### Internal Functions ### |
166 | |
54c7876f |
167 | # Check value of the requested index |
168 | $validate_index = sub { |
169 | my $index = shift; |
170 | |
e336069e |
171 | if (! defined($index) || |
172 | ! looks_like_number($index) || |
173 | (int($index) != $index)) |
174 | { |
54c7876f |
175 | require Carp; |
176 | my ($method) = (caller(1))[3]; |
177 | $method =~ s/Thread::Queue:://; |
178 | $index = 'undef' if (! defined($index)); |
179 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
180 | } |
181 | |
182 | return $index; |
183 | }; |
184 | |
185 | # Check value of the requested count |
186 | $validate_count = sub { |
187 | my $count = shift; |
188 | |
e336069e |
189 | if (! defined($count) || |
190 | ! looks_like_number($count) || |
191 | (int($count) != $count) || |
192 | ($count < 1)) |
193 | { |
54c7876f |
194 | require Carp; |
195 | my ($method) = (caller(1))[3]; |
196 | $method =~ s/Thread::Queue:://; |
197 | $count = 'undef' if (! defined($count)); |
198 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
199 | } |
200 | |
201 | return $count; |
202 | }; |
203 | |
204 | 1; |
9c6f8578 |
205 | |
d516a115 |
206 | =head1 NAME |
207 | |
54c7876f |
208 | Thread::Queue - Thread-safe queues |
209 | |
210 | =head1 VERSION |
211 | |
3d4f2f89 |
212 | This document describes Thread::Queue version 2.11 |
d516a115 |
213 | |
214 | =head1 SYNOPSIS |
215 | |
54c7876f |
216 | use strict; |
217 | use warnings; |
218 | |
219 | use threads; |
d516a115 |
220 | use Thread::Queue; |
54c7876f |
221 | |
222 | my $q = Thread::Queue->new(); # A new empty queue |
223 | |
224 | # Worker thread |
225 | my $thr = threads->create(sub { |
226 | while (my $item = $q->dequeue()) { |
227 | # Do work on $item |
228 | } |
229 | })->detach(); |
230 | |
231 | # Send work to the thread |
232 | $q->enqueue($item1, ...); |
233 | |
234 | |
235 | # Count of items in the queue |
236 | my $left = $q->pending(); |
237 | |
238 | # Non-blocking dequeue |
239 | if (defined(my $item = $q->dequeue_nb())) { |
240 | # Work on $item |
241 | } |
242 | |
243 | # Get the second item in the queue without dequeuing anything |
244 | my $item = $q->peek(1); |
245 | |
246 | # Insert two items into the queue just behind the head |
247 | $q->insert(1, $item1, $item2); |
248 | |
249 | # Extract the last two items on the queue |
250 | my ($item1, $item2) = $q->extract(-2, 2); |
d516a115 |
251 | |
a99072da |
252 | =head1 DESCRIPTION |
253 | |
54c7876f |
254 | This module provides thread-safe FIFO queues that can be accessed safely by |
255 | any number of threads. |
256 | |
257 | Any data types supported by L<threads::shared> can be passed via queues: |
258 | |
259 | =over |
260 | |
261 | =item Ordinary scalars |
262 | |
263 | =item Array refs |
264 | |
265 | =item Hash refs |
266 | |
267 | =item Scalar refs |
268 | |
269 | =item Objects based on the above |
270 | |
271 | =back |
272 | |
273 | Ordinary scalars are added to queues as they are. |
274 | |
275 | If not already thread-shared, the other complex data types will be cloned |
276 | (recursively, if needed, and including any C<bless>ings and read-only |
277 | settings) into thread-shared structures before being placed onto a queue. |
a99072da |
278 | |
54c7876f |
279 | For example, the following would cause L<Thread::Queue> to create a empty, |
280 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
281 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
282 | the queue: |
a99072da |
283 | |
54c7876f |
284 | my @ary = qw/foo bar baz/; |
285 | $q->enqueue(\@ary); |
a99072da |
286 | |
54c7876f |
287 | However, for the following, the items are already shared, so their references |
288 | are added directly to the queue, and no cloning takes place: |
a99072da |
289 | |
54c7876f |
290 | my @ary :shared = qw/foo bar baz/; |
291 | $q->enqueue(\@ary); |
a99072da |
292 | |
54c7876f |
293 | my $obj = &shared({}); |
294 | $$obj{'foo'} = 'bar'; |
295 | $$obj{'qux'} = 99; |
296 | bless($obj, 'My::Class'); |
297 | $q->enqueue($obj); |
a99072da |
298 | |
54c7876f |
299 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
a99072da |
300 | |
54c7876f |
301 | =head1 QUEUE CREATION |
a99072da |
302 | |
54c7876f |
303 | =over |
a99072da |
304 | |
54c7876f |
305 | =item ->new() |
a99072da |
306 | |
54c7876f |
307 | Creates a new empty queue. |
a99072da |
308 | |
54c7876f |
309 | =item ->new(LIST) |
a99072da |
310 | |
54c7876f |
311 | Creates a new queue pre-populated with the provided list of items. |
a99072da |
312 | |
313 | =back |
314 | |
54c7876f |
315 | =head1 BASIC METHODS |
a99072da |
316 | |
54c7876f |
317 | The following methods deal with queues on a FIFO basis. |
bbc7dcd2 |
318 | |
54c7876f |
319 | =over |
d516a115 |
320 | |
54c7876f |
321 | =item ->enqueue(LIST) |
d21067e0 |
322 | |
54c7876f |
323 | Adds a list of items onto the end of the queue. |
d21067e0 |
324 | |
54c7876f |
325 | =item ->dequeue() |
a99072da |
326 | |
54c7876f |
327 | =item ->dequeue(COUNT) |
d21067e0 |
328 | |
54c7876f |
329 | Removes the requested number of items (default is 1) from the head of the |
330 | queue, and returns them. If the queue contains fewer than the requested |
331 | number of items, then the thread will be blocked until the requisite number |
332 | of items are available (i.e., until other threads <enqueue> more items). |
a99072da |
333 | |
54c7876f |
334 | =item ->dequeue_nb() |
335 | |
336 | =item ->dequeue_nb(COUNT) |
337 | |
338 | Removes the requested number of items (default is 1) from the head of the |
339 | queue, and returns them. If the queue contains fewer than the requested |
340 | number of items, then it immediately (i.e., non-blocking) returns whatever |
341 | items there are on the queue. If the queue is empty, then C<undef> is |
342 | returned. |
343 | |
344 | =item ->pending() |
345 | |
346 | Returns the number of items still in the queue. |
347 | |
348 | =back |
349 | |
350 | =head1 ADVANCED METHODS |
351 | |
352 | The following methods can be used to manipulate items anywhere in a queue. |
353 | |
354 | To prevent the contents of a queue from being modified by another thread |
355 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
356 | VARIABLE"> the queue inside a local block: |
357 | |
358 | { |
359 | lock($q); # Keep other threads from changing the queue's contents |
360 | my $item = $q->peek(); |
361 | if ($item ...) { |
362 | ... |
363 | } |
364 | } |
365 | # Queue is now unlocked |
366 | |
367 | =over |
368 | |
369 | =item ->peek() |
370 | |
371 | =item ->peek(INDEX) |
372 | |
373 | Returns an item from the queue without dequeuing anything. Defaults to the |
374 | the head of queue (at index position 0) if no index is specified. Negative |
375 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
376 | is the end of the queue, -2 is next to last, and so on). |
377 | |
378 | If no items exists at the specified index (i.e., the queue is empty, or the |
379 | index is beyond the number of items on the queue), then C<undef> is returned. |
380 | |
381 | Remember, the returned item is not removed from the queue, so manipulating a |
382 | C<peek>ed at reference affects the item on the queue. |
383 | |
384 | =item ->insert(INDEX, LIST) |
385 | |
386 | Adds the list of items to the queue at the specified index position (0 |
387 | is the head of the list). Any existing items at and beyond that position are |
388 | pushed back past the newly added items: |
389 | |
390 | $q->enqueue(1, 2, 3, 4); |
391 | $q->insert(1, qw/foo bar/); |
392 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
83272a45 |
393 | |
54c7876f |
394 | Specifying an index position greater than the number of items in the queue |
395 | just adds the list to the end. |
83272a45 |
396 | |
54c7876f |
397 | Negative index positions are supported: |
398 | |
399 | $q->enqueue(1, 2, 3, 4); |
400 | $q->insert(-2, qw/foo bar/); |
401 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
402 | |
403 | Specifying a negative index position greater than the number of items in the |
404 | queue adds the list to the head of the queue. |
405 | |
406 | =item ->extract() |
407 | |
408 | =item ->extract(INDEX) |
409 | |
410 | =item ->extract(INDEX, COUNT) |
411 | |
412 | Removes and returns the specified number of items (defaults to 1) from the |
413 | specified index position in the queue (0 is the head of the queue). When |
414 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
415 | |
416 | This method is non-blocking, and will return only as many items as are |
417 | available to fulfill the request: |
418 | |
419 | $q->enqueue(1, 2, 3, 4); |
420 | my $item = $q->extract(2) # Returns 3 |
421 | # Queue now contains: 1, 2, 4 |
422 | my @items = $q->extract(1, 3) # Returns (2, 4) |
423 | # Queue now contains: 1 |
424 | |
425 | Specifying an index position greater than the number of items in the |
426 | queue results in C<undef> or an empty list being returned. |
427 | |
428 | $q->enqueue('foo'); |
429 | my $nada = $q->extract(3) # Returns undef |
430 | my @nada = $q->extract(1, 3) # Returns () |
431 | |
432 | Negative index positions are supported. Specifying a negative index position |
433 | greater than the number of items in the queue may return items from the head |
434 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
435 | queue from the specified position (i.e. if queue size + index + count is |
436 | greater than zero): |
437 | |
438 | $q->enqueue(qw/foo bar baz/); |
439 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
440 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
441 | # Queue now contains: bar, baz |
442 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
443 | |
444 | =back |
445 | |
7fb1c73b |
446 | =head1 NOTES |
447 | |
448 | Queues created by L<Thread::Queue> can be used in both threaded and |
449 | non-threaded applications. |
450 | |
54c7876f |
451 | =head1 LIMITATIONS |
452 | |
453 | Passing objects on queues may not work if the objects' classes do not support |
454 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
455 | |
456 | Passing array/hash refs that contain objects may not work for Perl prior to |
457 | 5.10.0. |
458 | |
459 | =head1 SEE ALSO |
460 | |
461 | Thread::Queue Discussion Forum on CPAN: |
462 | L<http://www.cpanforum.com/dist/Thread-Queue> |
463 | |
464 | Annotated POD for Thread::Queue: |
3d4f2f89 |
465 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm> |
7fb1c73b |
466 | |
467 | Source repository: |
468 | L<http://code.google.com/p/thread-queue/> |
54c7876f |
469 | |
470 | L<threads>, L<threads::shared> |
471 | |
472 | =head1 MAINTAINER |
473 | |
474 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
475 | |
476 | =head1 LICENSE |
477 | |
478 | This program is free software; you can redistribute it and/or modify it under |
479 | the same terms as Perl itself. |
480 | |
481 | =cut |