Thread::Semaphore 2.07
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.03';
7
8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number);
10
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
13
14 # Create a new queue possibly pre-populated with items
15 sub new
16 {
17     my $class = shift;
18     my @queue :shared = map { $make_shared->($_) } @_;
19     return bless(\@queue, $class);
20 }
21
22 # Add items to the tail of a queue
23 sub enqueue
24 {
25     my $queue = shift;
26     lock(@$queue);
27     push(@$queue, map { $make_shared->($_) } @_)
28         and cond_signal(@$queue);
29 }
30
31 # Return a count of the number of items on a queue
32 sub pending
33 {
34     my $queue = shift;
35     lock(@$queue);
36     return scalar(@$queue);
37 }
38
39 # Return 1 or more items from the head of a queue, blocking if needed
40 sub dequeue
41 {
42     my $queue = shift;
43     lock(@$queue);
44
45     my $count = @_ ? $validate_count->(shift) : 1;
46
47     # Wait for requisite number of items
48     cond_wait(@$queue) until (@$queue >= $count);
49     cond_signal(@$queue) if (@$queue > $count);
50
51     # Return single item
52     return shift(@$queue) if ($count == 1);
53
54     # Return multiple items
55     my @items;
56     push(@items, shift(@$queue)) for (1..$count);
57     return @items;
58 }
59
60 # Return items from the head of a queue with no blocking
61 sub dequeue_nb
62 {
63     my $queue = shift;
64     lock(@$queue);
65
66     my $count = @_ ? $validate_count->(shift) : 1;
67
68     # Return single item
69     return shift(@$queue) if ($count == 1);
70
71     # Return multiple items
72     my @items;
73     for (1..$count) {
74         last if (! @$queue);
75         push(@items, shift(@$queue));
76     }
77     return @items;
78 }
79
80 # Return an item without removing it from a queue
81 sub peek
82 {
83     my $queue = shift;
84     lock(@$queue);
85     my $index = @_ ? $validate_index->(shift) : 0;
86     return $$queue[$index];
87 }
88
89 # Insert items anywhere into a queue
90 sub insert
91 {
92     my $queue = shift;
93     lock(@$queue);
94
95     my $index = $validate_index->(shift);
96
97     return if (! @_);   # Nothing to insert
98
99     # Support negative indices
100     if ($index < 0) {
101         $index += @$queue;
102         if ($index < 0) {
103             $index = 0;
104         }
105     }
106
107     # Dequeue items from $index onward
108     my @tmp;
109     while (@$queue > $index) {
110         unshift(@tmp, pop(@$queue))
111     }
112
113     # Add new items to the queue
114     push(@$queue, map { $make_shared->($_) } @_);
115
116     # Add previous items back onto the queue
117     push(@$queue, @tmp);
118
119     # Soup's up
120     cond_signal(@$queue);
121 }
122
123 # Remove items from anywhere in a queue
124 sub extract
125 {
126     my $queue = shift;
127     lock(@$queue);
128
129     my $index = @_ ? $validate_index->(shift) : 0;
130     my $count = @_ ? $validate_count->(shift) : 1;
131
132     # Support negative indices
133     if ($index < 0) {
134         $index += @$queue;
135         if ($index < 0) {
136             $count += $index;
137             return if ($count <= 0);            # Beyond the head of the queue
138             return $queue->dequeue_nb($count);  # Extract from the head
139         }
140     }
141
142     # Dequeue items from $index+$count onward
143     my @tmp;
144     while (@$queue > ($index+$count)) {
145         unshift(@tmp, pop(@$queue))
146     }
147
148     # Extract desired items
149     my @items;
150     unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152     # Add back any removed items
153     push(@$queue, @tmp);
154
155     # Return single item
156     return $items[0] if ($count == 1);
157
158     # Return multiple items
159     return @items;
160 }
161
162 ### Internal Functions ###
163
164 # Create a thread-shared version of a complex data structure or object
165 $make_shared = sub {
166     my $item = shift;
167
168     # If already thread-shared, then just return the input item
169     return $item if (threads::shared::is_shared($item));
170
171     # Make copies of array, hash and scalar refs
172     my $copy;
173     if (my $ref_type = Scalar::Util::reftype($item)) {
174         # Copy an array ref
175         if ($ref_type eq 'ARRAY') {
176             # Make empty shared array ref
177             $copy = &share([]);
178             # Recursively copy and add contents
179             push(@$copy, map { $make_shared->($_) } @$item);
180         }
181
182         # Copy a hash ref
183         elsif ($ref_type eq 'HASH') {
184             # Make empty shared hash ref
185             $copy = &share({});
186             # Recursively copy and add contents
187             foreach my $key (keys(%{$item})) {
188                 $copy->{$key} = $make_shared->($item->{$key});
189             }
190         }
191
192         # Copy a scalar ref
193         elsif ($ref_type eq 'SCALAR') {
194             $copy = \do{ my $scalar = $$item; };
195             share($copy);
196             # Clone READONLY flag
197             if (Internals::SvREADONLY($$item)) {
198                 Internals::SvREADONLY($$copy, 1);
199             }
200         }
201
202         # Copy of a ref of a ref
203         elsif ($ref_type eq 'REF') {
204             my $tmp = $make_shared->($$item);
205             $copy = \$tmp;
206             share($copy);
207         }
208     }
209
210     # If no copy is created above, then just return the input item
211     # NOTE:  This will end up generating an error for anything
212     #        other than an ordinary scalar
213     return $item if (! defined($copy));
214
215     # Clone READONLY flag
216     if (Internals::SvREADONLY($item)) {
217         Internals::SvREADONLY($copy, 1);
218     }
219
220     # If input item is an object, then bless the copy into the same class
221     if (my $class = Scalar::Util::blessed($item)) {
222         bless($copy, $class);
223     }
224
225     return $copy;
226 };
227
228 # Check value of the requested index
229 $validate_index = sub {
230     my $index = shift;
231
232     if (! looks_like_number($index) || (int($index) != $index)) {
233         require Carp;
234         my ($method) = (caller(1))[3];
235         $method =~ s/Thread::Queue:://;
236         $index = 'undef' if (! defined($index));
237         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
238     }
239
240     return $index;
241 };
242
243 # Check value of the requested count
244 $validate_count = sub {
245     my $count = shift;
246
247     if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
248         require Carp;
249         my ($method) = (caller(1))[3];
250         $method =~ s/Thread::Queue:://;
251         $count = 'undef' if (! defined($count));
252         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
253     }
254
255     return $count;
256 };
257
258 1;
259
260 =head1 NAME
261
262 Thread::Queue - Thread-safe queues
263
264 =head1 VERSION
265
266 This document describes Thread::Queue version 2.03
267
268 =head1 SYNOPSIS
269
270     use strict;
271     use warnings;
272
273     use threads;
274     use Thread::Queue;
275
276     my $q = Thread::Queue->new();    # A new empty queue
277
278     # Worker thread
279     my $thr = threads->create(sub {
280                                 while (my $item = $q->dequeue()) {
281                                     # Do work on $item
282                                 }
283                              })->detach();
284
285     # Send work to the thread
286     $q->enqueue($item1, ...);
287
288
289     # Count of items in the queue
290     my $left = $q->pending();
291
292     # Non-blocking dequeue
293     if (defined(my $item = $q->dequeue_nb())) {
294         # Work on $item
295     }
296
297     # Get the second item in the queue without dequeuing anything
298     my $item = $q->peek(1);
299
300     # Insert two items into the queue just behind the head
301     $q->insert(1, $item1, $item2);
302
303     # Extract the last two items on the queue
304     my ($item1, $item2) = $q->extract(-2, 2);
305
306 =head1 DESCRIPTION
307
308 This module provides thread-safe FIFO queues that can be accessed safely by
309 any number of threads.
310
311 Any data types supported by L<threads::shared> can be passed via queues:
312
313 =over
314
315 =item Ordinary scalars
316
317 =item Array refs
318
319 =item Hash refs
320
321 =item Scalar refs
322
323 =item Objects based on the above
324
325 =back
326
327 Ordinary scalars are added to queues as they are.
328
329 If not already thread-shared, the other complex data types will be cloned
330 (recursively, if needed, and including any C<bless>ings and read-only
331 settings) into thread-shared structures before being placed onto a queue.
332
333 For example, the following would cause L<Thread::Queue> to create a empty,
334 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
335 and 'baz' from C<@ary> into it, and then place that shared reference onto
336 the queue:
337
338     my @ary = qw/foo bar baz/;
339     $q->enqueue(\@ary);
340
341 However, for the following, the items are already shared, so their references
342 are added directly to the queue, and no cloning takes place:
343
344     my @ary :shared = qw/foo bar baz/;
345     $q->enqueue(\@ary);
346
347     my $obj = &shared({});
348     $$obj{'foo'} = 'bar';
349     $$obj{'qux'} = 99;
350     bless($obj, 'My::Class');
351     $q->enqueue($obj);
352
353 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
354
355 =head1 QUEUE CREATION
356
357 =over
358
359 =item ->new()
360
361 Creates a new empty queue.
362
363 =item ->new(LIST)
364
365 Creates a new queue pre-populated with the provided list of items.
366
367 =back
368
369 =head1 BASIC METHODS
370
371 The following methods deal with queues on a FIFO basis.
372
373 =over
374
375 =item ->enqueue(LIST)
376
377 Adds a list of items onto the end of the queue.
378
379 =item ->dequeue()
380
381 =item ->dequeue(COUNT)
382
383 Removes the requested number of items (default is 1) from the head of the
384 queue, and returns them.  If the queue contains fewer than the requested
385 number of items, then the thread will be blocked until the requisite number
386 of items are available (i.e., until other threads <enqueue> more items).
387
388 =item ->dequeue_nb()
389
390 =item ->dequeue_nb(COUNT)
391
392 Removes the requested number of items (default is 1) from the head of the
393 queue, and returns them.  If the queue contains fewer than the requested
394 number of items, then it immediately (i.e., non-blocking) returns whatever
395 items there are on the queue.  If the queue is empty, then C<undef> is
396 returned.
397
398 =item ->pending()
399
400 Returns the number of items still in the queue.
401
402 =back
403
404 =head1 ADVANCED METHODS
405
406 The following methods can be used to manipulate items anywhere in a queue.
407
408 To prevent the contents of a queue from being modified by another thread
409 while it is being examined and/or changed, L<lock|threads::shared/"lock
410 VARIABLE"> the queue inside a local block:
411
412     {
413         lock($q);   # Keep other threads from changing the queue's contents
414         my $item = $q->peek();
415         if ($item ...) {
416             ...
417         }
418     }
419     # Queue is now unlocked
420
421 =over
422
423 =item ->peek()
424
425 =item ->peek(INDEX)
426
427 Returns an item from the queue without dequeuing anything.  Defaults to the
428 the head of queue (at index position 0) if no index is specified.  Negative
429 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
430 is the end of the queue, -2 is next to last, and so on).
431
432 If no items exists at the specified index (i.e., the queue is empty, or the
433 index is beyond the number of items on the queue), then C<undef> is returned.
434
435 Remember, the returned item is not removed from the queue, so manipulating a
436 C<peek>ed at reference affects the item on the queue.
437
438 =item ->insert(INDEX, LIST)
439
440 Adds the list of items to the queue at the specified index position (0
441 is the head of the list).  Any existing items at and beyond that position are
442 pushed back past the newly added items:
443
444     $q->enqueue(1, 2, 3, 4);
445     $q->insert(1, qw/foo bar/);
446     # Queue now contains:  1, foo, bar, 2, 3, 4
447
448 Specifying an index position greater than the number of items in the queue
449 just adds the list to the end.
450
451 Negative index positions are supported:
452
453     $q->enqueue(1, 2, 3, 4);
454     $q->insert(-2, qw/foo bar/);
455     # Queue now contains:  1, 2, foo, bar, 3, 4
456
457 Specifying a negative index position greater than the number of items in the
458 queue adds the list to the head of the queue.
459
460 =item ->extract()
461
462 =item ->extract(INDEX)
463
464 =item ->extract(INDEX, COUNT)
465
466 Removes and returns the specified number of items (defaults to 1) from the
467 specified index position in the queue (0 is the head of the queue).  When
468 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
469
470 This method is non-blocking, and will return only as many items as are
471 available to fulfill the request:
472
473     $q->enqueue(1, 2, 3, 4);
474     my $item  = $q->extract(2)     # Returns 3
475                                    # Queue now contains:  1, 2, 4
476     my @items = $q->extract(1, 3)  # Returns (2, 4)
477                                    # Queue now contains:  1
478
479 Specifying an index position greater than the number of items in the
480 queue results in C<undef> or an empty list being returned.
481
482     $q->enqueue('foo');
483     my $nada = $q->extract(3)      # Returns undef
484     my @nada = $q->extract(1, 3)   # Returns ()
485
486 Negative index positions are supported.  Specifying a negative index position
487 greater than the number of items in the queue may return items from the head
488 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
489 queue from the specified position (i.e. if queue size + index + count is
490 greater than zero):
491
492     $q->enqueue(qw/foo bar baz/);
493     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
494     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
495                                      # Queue now contains:  bar, baz
496     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
497
498 =back
499
500 =head1 LIMITATIONS
501
502 Passing objects on queues may not work if the objects' classes do not support
503 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
504
505 Passing array/hash refs that contain objects may not work for Perl prior to
506 5.10.0.
507
508 =head1 SEE ALSO
509
510 Thread::Queue Discussion Forum on CPAN:
511 L<http://www.cpanforum.com/dist/Thread-Queue>
512
513 Annotated POD for Thread::Queue:
514 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.03/lib/Thread/Queue.pm>
515
516 L<threads>, L<threads::shared>
517
518 =head1 MAINTAINER
519
520 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
521
522 =head1 LICENSE
523
524 This program is free software; you can redistribute it and/or modify it under
525 the same terms as Perl itself.
526
527 =cut