Commit | Line | Data |
d21067e0 |
1 | package Thread::Queue; |
28b605d8 |
2 | |
3d1f1caf |
3 | use strict; |
54c7876f |
4 | use warnings; |
28b605d8 |
5 | |
09782346 |
6 | our $VERSION = '2.08'; |
54c7876f |
7 | |
09782346 |
8 | use threads::shared 1.21; |
ac9d3a9d |
9 | use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); |
54c7876f |
10 | |
09782346 |
11 | # Carp errors from threads::shared calls should complain about caller |
12 | our @CARP_NOT = ("threads::shared"); |
13 | |
54c7876f |
14 | # Predeclarations for internal functions |
09782346 |
15 | my ($validate_count, $validate_index); |
54c7876f |
16 | |
17 | # Create a new queue possibly pre-populated with items |
18 | sub new |
19 | { |
20 | my $class = shift; |
09782346 |
21 | my @queue :shared = map { shared_clone($_) } @_; |
54c7876f |
22 | return bless(\@queue, $class); |
23 | } |
24 | |
25 | # Add items to the tail of a queue |
26 | sub enqueue |
27 | { |
28 | my $queue = shift; |
29 | lock(@$queue); |
09782346 |
30 | push(@$queue, map { shared_clone($_) } @_) |
54c7876f |
31 | and cond_signal(@$queue); |
32 | } |
33 | |
34 | # Return a count of the number of items on a queue |
35 | sub pending |
36 | { |
37 | my $queue = shift; |
38 | lock(@$queue); |
39 | return scalar(@$queue); |
40 | } |
41 | |
42 | # Return 1 or more items from the head of a queue, blocking if needed |
43 | sub dequeue |
44 | { |
45 | my $queue = shift; |
46 | lock(@$queue); |
47 | |
48 | my $count = @_ ? $validate_count->(shift) : 1; |
49 | |
50 | # Wait for requisite number of items |
51 | cond_wait(@$queue) until (@$queue >= $count); |
52 | cond_signal(@$queue) if (@$queue > $count); |
53 | |
54 | # Return single item |
55 | return shift(@$queue) if ($count == 1); |
56 | |
57 | # Return multiple items |
58 | my @items; |
59 | push(@items, shift(@$queue)) for (1..$count); |
60 | return @items; |
61 | } |
62 | |
63 | # Return items from the head of a queue with no blocking |
64 | sub dequeue_nb |
65 | { |
66 | my $queue = shift; |
67 | lock(@$queue); |
68 | |
69 | my $count = @_ ? $validate_count->(shift) : 1; |
70 | |
71 | # Return single item |
72 | return shift(@$queue) if ($count == 1); |
73 | |
74 | # Return multiple items |
75 | my @items; |
76 | for (1..$count) { |
77 | last if (! @$queue); |
78 | push(@items, shift(@$queue)); |
79 | } |
80 | return @items; |
81 | } |
82 | |
83 | # Return an item without removing it from a queue |
84 | sub peek |
85 | { |
86 | my $queue = shift; |
87 | lock(@$queue); |
88 | my $index = @_ ? $validate_index->(shift) : 0; |
89 | return $$queue[$index]; |
90 | } |
91 | |
92 | # Insert items anywhere into a queue |
93 | sub insert |
94 | { |
95 | my $queue = shift; |
96 | lock(@$queue); |
97 | |
98 | my $index = $validate_index->(shift); |
99 | |
100 | return if (! @_); # Nothing to insert |
101 | |
102 | # Support negative indices |
103 | if ($index < 0) { |
104 | $index += @$queue; |
105 | if ($index < 0) { |
106 | $index = 0; |
107 | } |
108 | } |
109 | |
110 | # Dequeue items from $index onward |
111 | my @tmp; |
112 | while (@$queue > $index) { |
113 | unshift(@tmp, pop(@$queue)) |
114 | } |
115 | |
116 | # Add new items to the queue |
09782346 |
117 | push(@$queue, map { shared_clone($_) } @_); |
54c7876f |
118 | |
119 | # Add previous items back onto the queue |
120 | push(@$queue, @tmp); |
121 | |
122 | # Soup's up |
123 | cond_signal(@$queue); |
124 | } |
125 | |
126 | # Remove items from anywhere in a queue |
127 | sub extract |
128 | { |
129 | my $queue = shift; |
130 | lock(@$queue); |
131 | |
132 | my $index = @_ ? $validate_index->(shift) : 0; |
133 | my $count = @_ ? $validate_count->(shift) : 1; |
134 | |
135 | # Support negative indices |
136 | if ($index < 0) { |
137 | $index += @$queue; |
138 | if ($index < 0) { |
139 | $count += $index; |
140 | return if ($count <= 0); # Beyond the head of the queue |
141 | return $queue->dequeue_nb($count); # Extract from the head |
142 | } |
143 | } |
144 | |
145 | # Dequeue items from $index+$count onward |
146 | my @tmp; |
147 | while (@$queue > ($index+$count)) { |
148 | unshift(@tmp, pop(@$queue)) |
149 | } |
150 | |
151 | # Extract desired items |
152 | my @items; |
153 | unshift(@items, pop(@$queue)) while (@$queue > $index); |
154 | |
155 | # Add back any removed items |
156 | push(@$queue, @tmp); |
157 | |
158 | # Return single item |
159 | return $items[0] if ($count == 1); |
160 | |
161 | # Return multiple items |
162 | return @items; |
163 | } |
164 | |
165 | ### Internal Functions ### |
166 | |
54c7876f |
167 | # Check value of the requested index |
168 | $validate_index = sub { |
169 | my $index = shift; |
170 | |
171 | if (! looks_like_number($index) || (int($index) != $index)) { |
172 | require Carp; |
173 | my ($method) = (caller(1))[3]; |
174 | $method =~ s/Thread::Queue:://; |
175 | $index = 'undef' if (! defined($index)); |
176 | Carp::croak("Invalid 'index' argument ($index) to '$method' method"); |
177 | } |
178 | |
179 | return $index; |
180 | }; |
181 | |
182 | # Check value of the requested count |
183 | $validate_count = sub { |
184 | my $count = shift; |
185 | |
186 | if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) { |
187 | require Carp; |
188 | my ($method) = (caller(1))[3]; |
189 | $method =~ s/Thread::Queue:://; |
190 | $count = 'undef' if (! defined($count)); |
191 | Carp::croak("Invalid 'count' argument ($count) to '$method' method"); |
192 | } |
193 | |
194 | return $count; |
195 | }; |
196 | |
197 | 1; |
9c6f8578 |
198 | |
d516a115 |
199 | =head1 NAME |
200 | |
54c7876f |
201 | Thread::Queue - Thread-safe queues |
202 | |
203 | =head1 VERSION |
204 | |
09782346 |
205 | This document describes Thread::Queue version 2.08 |
d516a115 |
206 | |
207 | =head1 SYNOPSIS |
208 | |
54c7876f |
209 | use strict; |
210 | use warnings; |
211 | |
212 | use threads; |
d516a115 |
213 | use Thread::Queue; |
54c7876f |
214 | |
215 | my $q = Thread::Queue->new(); # A new empty queue |
216 | |
217 | # Worker thread |
218 | my $thr = threads->create(sub { |
219 | while (my $item = $q->dequeue()) { |
220 | # Do work on $item |
221 | } |
222 | })->detach(); |
223 | |
224 | # Send work to the thread |
225 | $q->enqueue($item1, ...); |
226 | |
227 | |
228 | # Count of items in the queue |
229 | my $left = $q->pending(); |
230 | |
231 | # Non-blocking dequeue |
232 | if (defined(my $item = $q->dequeue_nb())) { |
233 | # Work on $item |
234 | } |
235 | |
236 | # Get the second item in the queue without dequeuing anything |
237 | my $item = $q->peek(1); |
238 | |
239 | # Insert two items into the queue just behind the head |
240 | $q->insert(1, $item1, $item2); |
241 | |
242 | # Extract the last two items on the queue |
243 | my ($item1, $item2) = $q->extract(-2, 2); |
d516a115 |
244 | |
a99072da |
245 | =head1 DESCRIPTION |
246 | |
54c7876f |
247 | This module provides thread-safe FIFO queues that can be accessed safely by |
248 | any number of threads. |
249 | |
250 | Any data types supported by L<threads::shared> can be passed via queues: |
251 | |
252 | =over |
253 | |
254 | =item Ordinary scalars |
255 | |
256 | =item Array refs |
257 | |
258 | =item Hash refs |
259 | |
260 | =item Scalar refs |
261 | |
262 | =item Objects based on the above |
263 | |
264 | =back |
265 | |
266 | Ordinary scalars are added to queues as they are. |
267 | |
268 | If not already thread-shared, the other complex data types will be cloned |
269 | (recursively, if needed, and including any C<bless>ings and read-only |
270 | settings) into thread-shared structures before being placed onto a queue. |
a99072da |
271 | |
54c7876f |
272 | For example, the following would cause L<Thread::Queue> to create a empty, |
273 | shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' |
274 | and 'baz' from C<@ary> into it, and then place that shared reference onto |
275 | the queue: |
a99072da |
276 | |
54c7876f |
277 | my @ary = qw/foo bar baz/; |
278 | $q->enqueue(\@ary); |
a99072da |
279 | |
54c7876f |
280 | However, for the following, the items are already shared, so their references |
281 | are added directly to the queue, and no cloning takes place: |
a99072da |
282 | |
54c7876f |
283 | my @ary :shared = qw/foo bar baz/; |
284 | $q->enqueue(\@ary); |
a99072da |
285 | |
54c7876f |
286 | my $obj = &shared({}); |
287 | $$obj{'foo'} = 'bar'; |
288 | $$obj{'qux'} = 99; |
289 | bless($obj, 'My::Class'); |
290 | $q->enqueue($obj); |
a99072da |
291 | |
54c7876f |
292 | See L</"LIMITATIONS"> for caveats related to passing objects via queues. |
a99072da |
293 | |
54c7876f |
294 | =head1 QUEUE CREATION |
a99072da |
295 | |
54c7876f |
296 | =over |
a99072da |
297 | |
54c7876f |
298 | =item ->new() |
a99072da |
299 | |
54c7876f |
300 | Creates a new empty queue. |
a99072da |
301 | |
54c7876f |
302 | =item ->new(LIST) |
a99072da |
303 | |
54c7876f |
304 | Creates a new queue pre-populated with the provided list of items. |
a99072da |
305 | |
306 | =back |
307 | |
54c7876f |
308 | =head1 BASIC METHODS |
a99072da |
309 | |
54c7876f |
310 | The following methods deal with queues on a FIFO basis. |
bbc7dcd2 |
311 | |
54c7876f |
312 | =over |
d516a115 |
313 | |
54c7876f |
314 | =item ->enqueue(LIST) |
d21067e0 |
315 | |
54c7876f |
316 | Adds a list of items onto the end of the queue. |
d21067e0 |
317 | |
54c7876f |
318 | =item ->dequeue() |
a99072da |
319 | |
54c7876f |
320 | =item ->dequeue(COUNT) |
d21067e0 |
321 | |
54c7876f |
322 | Removes the requested number of items (default is 1) from the head of the |
323 | queue, and returns them. If the queue contains fewer than the requested |
324 | number of items, then the thread will be blocked until the requisite number |
325 | of items are available (i.e., until other threads <enqueue> more items). |
a99072da |
326 | |
54c7876f |
327 | =item ->dequeue_nb() |
328 | |
329 | =item ->dequeue_nb(COUNT) |
330 | |
331 | Removes the requested number of items (default is 1) from the head of the |
332 | queue, and returns them. If the queue contains fewer than the requested |
333 | number of items, then it immediately (i.e., non-blocking) returns whatever |
334 | items there are on the queue. If the queue is empty, then C<undef> is |
335 | returned. |
336 | |
337 | =item ->pending() |
338 | |
339 | Returns the number of items still in the queue. |
340 | |
341 | =back |
342 | |
343 | =head1 ADVANCED METHODS |
344 | |
345 | The following methods can be used to manipulate items anywhere in a queue. |
346 | |
347 | To prevent the contents of a queue from being modified by another thread |
348 | while it is being examined and/or changed, L<lock|threads::shared/"lock |
349 | VARIABLE"> the queue inside a local block: |
350 | |
351 | { |
352 | lock($q); # Keep other threads from changing the queue's contents |
353 | my $item = $q->peek(); |
354 | if ($item ...) { |
355 | ... |
356 | } |
357 | } |
358 | # Queue is now unlocked |
359 | |
360 | =over |
361 | |
362 | =item ->peek() |
363 | |
364 | =item ->peek(INDEX) |
365 | |
366 | Returns an item from the queue without dequeuing anything. Defaults to the |
367 | the head of queue (at index position 0) if no index is specified. Negative |
368 | index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 |
369 | is the end of the queue, -2 is next to last, and so on). |
370 | |
371 | If no items exists at the specified index (i.e., the queue is empty, or the |
372 | index is beyond the number of items on the queue), then C<undef> is returned. |
373 | |
374 | Remember, the returned item is not removed from the queue, so manipulating a |
375 | C<peek>ed at reference affects the item on the queue. |
376 | |
377 | =item ->insert(INDEX, LIST) |
378 | |
379 | Adds the list of items to the queue at the specified index position (0 |
380 | is the head of the list). Any existing items at and beyond that position are |
381 | pushed back past the newly added items: |
382 | |
383 | $q->enqueue(1, 2, 3, 4); |
384 | $q->insert(1, qw/foo bar/); |
385 | # Queue now contains: 1, foo, bar, 2, 3, 4 |
83272a45 |
386 | |
54c7876f |
387 | Specifying an index position greater than the number of items in the queue |
388 | just adds the list to the end. |
83272a45 |
389 | |
54c7876f |
390 | Negative index positions are supported: |
391 | |
392 | $q->enqueue(1, 2, 3, 4); |
393 | $q->insert(-2, qw/foo bar/); |
394 | # Queue now contains: 1, 2, foo, bar, 3, 4 |
395 | |
396 | Specifying a negative index position greater than the number of items in the |
397 | queue adds the list to the head of the queue. |
398 | |
399 | =item ->extract() |
400 | |
401 | =item ->extract(INDEX) |
402 | |
403 | =item ->extract(INDEX, COUNT) |
404 | |
405 | Removes and returns the specified number of items (defaults to 1) from the |
406 | specified index position in the queue (0 is the head of the queue). When |
407 | called with no arguments, C<extract> operates the same as C<dequeue_nb>. |
408 | |
409 | This method is non-blocking, and will return only as many items as are |
410 | available to fulfill the request: |
411 | |
412 | $q->enqueue(1, 2, 3, 4); |
413 | my $item = $q->extract(2) # Returns 3 |
414 | # Queue now contains: 1, 2, 4 |
415 | my @items = $q->extract(1, 3) # Returns (2, 4) |
416 | # Queue now contains: 1 |
417 | |
418 | Specifying an index position greater than the number of items in the |
419 | queue results in C<undef> or an empty list being returned. |
420 | |
421 | $q->enqueue('foo'); |
422 | my $nada = $q->extract(3) # Returns undef |
423 | my @nada = $q->extract(1, 3) # Returns () |
424 | |
425 | Negative index positions are supported. Specifying a negative index position |
426 | greater than the number of items in the queue may return items from the head |
427 | of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the |
428 | queue from the specified position (i.e. if queue size + index + count is |
429 | greater than zero): |
430 | |
431 | $q->enqueue(qw/foo bar baz/); |
432 | my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 |
433 | my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 |
434 | # Queue now contains: bar, baz |
435 | my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 |
436 | |
437 | =back |
438 | |
7fb1c73b |
439 | =head1 NOTES |
440 | |
441 | Queues created by L<Thread::Queue> can be used in both threaded and |
442 | non-threaded applications. |
443 | |
54c7876f |
444 | =head1 LIMITATIONS |
445 | |
446 | Passing objects on queues may not work if the objects' classes do not support |
447 | sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. |
448 | |
449 | Passing array/hash refs that contain objects may not work for Perl prior to |
450 | 5.10.0. |
451 | |
452 | =head1 SEE ALSO |
453 | |
454 | Thread::Queue Discussion Forum on CPAN: |
455 | L<http://www.cpanforum.com/dist/Thread-Queue> |
456 | |
457 | Annotated POD for Thread::Queue: |
09782346 |
458 | L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.08/lib/Thread/Queue.pm> |
7fb1c73b |
459 | |
460 | Source repository: |
461 | L<http://code.google.com/p/thread-queue/> |
54c7876f |
462 | |
463 | L<threads>, L<threads::shared> |
464 | |
465 | =head1 MAINTAINER |
466 | |
467 | Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> |
468 | |
469 | =head1 LICENSE |
470 | |
471 | This program is free software; you can redistribute it and/or modify it under |
472 | the same terms as Perl itself. |
473 | |
474 | =cut |