dc2b1edc1e1f7f62be19bb4e74327d0b62d4cb19
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.07';
7
8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
10
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
13
14 # Create a new queue possibly pre-populated with items
15 sub new
16 {
17     my $class = shift;
18     my @queue :shared = map { $make_shared->($_, {}) } @_;
19     return bless(\@queue, $class);
20 }
21
22 # Add items to the tail of a queue
23 sub enqueue
24 {
25     my $queue = shift;
26     lock(@$queue);
27     push(@$queue, map { $make_shared->($_, {}) } @_)
28         and cond_signal(@$queue);
29 }
30
31 # Return a count of the number of items on a queue
32 sub pending
33 {
34     my $queue = shift;
35     lock(@$queue);
36     return scalar(@$queue);
37 }
38
39 # Return 1 or more items from the head of a queue, blocking if needed
40 sub dequeue
41 {
42     my $queue = shift;
43     lock(@$queue);
44
45     my $count = @_ ? $validate_count->(shift) : 1;
46
47     # Wait for requisite number of items
48     cond_wait(@$queue) until (@$queue >= $count);
49     cond_signal(@$queue) if (@$queue > $count);
50
51     # Return single item
52     return shift(@$queue) if ($count == 1);
53
54     # Return multiple items
55     my @items;
56     push(@items, shift(@$queue)) for (1..$count);
57     return @items;
58 }
59
60 # Return items from the head of a queue with no blocking
61 sub dequeue_nb
62 {
63     my $queue = shift;
64     lock(@$queue);
65
66     my $count = @_ ? $validate_count->(shift) : 1;
67
68     # Return single item
69     return shift(@$queue) if ($count == 1);
70
71     # Return multiple items
72     my @items;
73     for (1..$count) {
74         last if (! @$queue);
75         push(@items, shift(@$queue));
76     }
77     return @items;
78 }
79
80 # Return an item without removing it from a queue
81 sub peek
82 {
83     my $queue = shift;
84     lock(@$queue);
85     my $index = @_ ? $validate_index->(shift) : 0;
86     return $$queue[$index];
87 }
88
89 # Insert items anywhere into a queue
90 sub insert
91 {
92     my $queue = shift;
93     lock(@$queue);
94
95     my $index = $validate_index->(shift);
96
97     return if (! @_);   # Nothing to insert
98
99     # Support negative indices
100     if ($index < 0) {
101         $index += @$queue;
102         if ($index < 0) {
103             $index = 0;
104         }
105     }
106
107     # Dequeue items from $index onward
108     my @tmp;
109     while (@$queue > $index) {
110         unshift(@tmp, pop(@$queue))
111     }
112
113     # Add new items to the queue
114     push(@$queue, map { $make_shared->($_, {}) } @_);
115
116     # Add previous items back onto the queue
117     push(@$queue, @tmp);
118
119     # Soup's up
120     cond_signal(@$queue);
121 }
122
123 # Remove items from anywhere in a queue
124 sub extract
125 {
126     my $queue = shift;
127     lock(@$queue);
128
129     my $index = @_ ? $validate_index->(shift) : 0;
130     my $count = @_ ? $validate_count->(shift) : 1;
131
132     # Support negative indices
133     if ($index < 0) {
134         $index += @$queue;
135         if ($index < 0) {
136             $count += $index;
137             return if ($count <= 0);            # Beyond the head of the queue
138             return $queue->dequeue_nb($count);  # Extract from the head
139         }
140     }
141
142     # Dequeue items from $index+$count onward
143     my @tmp;
144     while (@$queue > ($index+$count)) {
145         unshift(@tmp, pop(@$queue))
146     }
147
148     # Extract desired items
149     my @items;
150     unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152     # Add back any removed items
153     push(@$queue, @tmp);
154
155     # Return single item
156     return $items[0] if ($count == 1);
157
158     # Return multiple items
159     return @items;
160 }
161
162 ### Internal Functions ###
163
164 # Create a thread-shared version of a complex data structure or object
165 $make_shared = sub {
166     my ($item, $cloned) = @_;
167
168     # If not running 'threads' or already thread-shared,
169     #   then just return the input item
170     return $item if (! $threads::threads ||
171                      threads::shared::is_shared($item));
172
173     # Make copies of array, hash and scalar refs
174     my $copy;
175     if (my $ref_type = reftype($item)) {
176         # Check for previously cloned references
177         #   (this takes care of circular refs as well)
178         my $addr = refaddr($item);
179         if (defined($addr) && exists($cloned->{$addr})) {
180             # Return the already existing clone
181             return $cloned->{$addr};
182         }
183
184         # Copy an array ref
185         if ($ref_type eq 'ARRAY') {
186             # Make empty shared array ref
187             $copy = &share([]);
188             # Add to clone checking hash
189             $cloned->{$addr} = $copy;
190             # Recursively copy and add contents
191             push(@$copy, map { $make_shared->($_, $cloned) } @$item);
192         }
193
194         # Copy a hash ref
195         elsif ($ref_type eq 'HASH') {
196             # Make empty shared hash ref
197             $copy = &share({});
198             # Add to clone checking hash
199             $cloned->{$addr} = $copy;
200             # Recursively copy and add contents
201             foreach my $key (keys(%{$item})) {
202                 $copy->{$key} = $make_shared->($item->{$key}, $cloned);
203             }
204         }
205
206         # Copy a scalar ref
207         elsif ($ref_type eq 'SCALAR') {
208             $copy = \do{ my $scalar = $$item; };
209             share($copy);
210             # Clone READONLY flag
211             if (Internals::SvREADONLY($$item)) {
212                 Internals::SvREADONLY($$copy, 1);
213             }
214             # Add to clone checking hash
215             $cloned->{$addr} = $copy;
216         }
217
218         # Copy of a ref of a ref
219         elsif ($ref_type eq 'REF') {
220             # Special handling for $x = \$x
221             my $addr2 = refaddr($$item);
222             if ($addr2 == $addr) {
223                 $copy = \$copy;
224                 share($copy);
225                 $cloned->{$addr} = $copy;
226             } else {
227                 my $tmp;
228                 $copy = \$tmp;
229                 share($copy);
230                 # Add to clone checking hash
231                 $cloned->{$addr} = $copy;
232                 # Recursively copy and add contents
233                 $tmp = $make_shared->($$item, $cloned);
234             }
235         }
236     }
237
238     # If no copy is created above, then just return the input item
239     # NOTE:  This will end up generating an error for anything
240     #        other than an ordinary scalar
241     return $item if (! defined($copy));
242
243     # If input item is an object, then bless the copy into the same class
244     if (my $class = blessed($item)) {
245         bless($copy, $class);
246     }
247
248     # Clone READONLY flag
249     if (Internals::SvREADONLY($item)) {
250         Internals::SvREADONLY($copy, 1);
251     }
252
253     return $copy;
254 };
255
256 # Check value of the requested index
257 $validate_index = sub {
258     my $index = shift;
259
260     if (! looks_like_number($index) || (int($index) != $index)) {
261         require Carp;
262         my ($method) = (caller(1))[3];
263         $method =~ s/Thread::Queue:://;
264         $index = 'undef' if (! defined($index));
265         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
266     }
267
268     return $index;
269 };
270
271 # Check value of the requested count
272 $validate_count = sub {
273     my $count = shift;
274
275     if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
276         require Carp;
277         my ($method) = (caller(1))[3];
278         $method =~ s/Thread::Queue:://;
279         $count = 'undef' if (! defined($count));
280         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
281     }
282
283     return $count;
284 };
285
286 1;
287
288 =head1 NAME
289
290 Thread::Queue - Thread-safe queues
291
292 =head1 VERSION
293
294 This document describes Thread::Queue version 2.07
295
296 =head1 SYNOPSIS
297
298     use strict;
299     use warnings;
300
301     use threads;
302     use Thread::Queue;
303
304     my $q = Thread::Queue->new();    # A new empty queue
305
306     # Worker thread
307     my $thr = threads->create(sub {
308                                 while (my $item = $q->dequeue()) {
309                                     # Do work on $item
310                                 }
311                              })->detach();
312
313     # Send work to the thread
314     $q->enqueue($item1, ...);
315
316
317     # Count of items in the queue
318     my $left = $q->pending();
319
320     # Non-blocking dequeue
321     if (defined(my $item = $q->dequeue_nb())) {
322         # Work on $item
323     }
324
325     # Get the second item in the queue without dequeuing anything
326     my $item = $q->peek(1);
327
328     # Insert two items into the queue just behind the head
329     $q->insert(1, $item1, $item2);
330
331     # Extract the last two items on the queue
332     my ($item1, $item2) = $q->extract(-2, 2);
333
334 =head1 DESCRIPTION
335
336 This module provides thread-safe FIFO queues that can be accessed safely by
337 any number of threads.
338
339 Any data types supported by L<threads::shared> can be passed via queues:
340
341 =over
342
343 =item Ordinary scalars
344
345 =item Array refs
346
347 =item Hash refs
348
349 =item Scalar refs
350
351 =item Objects based on the above
352
353 =back
354
355 Ordinary scalars are added to queues as they are.
356
357 If not already thread-shared, the other complex data types will be cloned
358 (recursively, if needed, and including any C<bless>ings and read-only
359 settings) into thread-shared structures before being placed onto a queue.
360
361 For example, the following would cause L<Thread::Queue> to create a empty,
362 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
363 and 'baz' from C<@ary> into it, and then place that shared reference onto
364 the queue:
365
366     my @ary = qw/foo bar baz/;
367     $q->enqueue(\@ary);
368
369 However, for the following, the items are already shared, so their references
370 are added directly to the queue, and no cloning takes place:
371
372     my @ary :shared = qw/foo bar baz/;
373     $q->enqueue(\@ary);
374
375     my $obj = &shared({});
376     $$obj{'foo'} = 'bar';
377     $$obj{'qux'} = 99;
378     bless($obj, 'My::Class');
379     $q->enqueue($obj);
380
381 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
382
383 =head1 QUEUE CREATION
384
385 =over
386
387 =item ->new()
388
389 Creates a new empty queue.
390
391 =item ->new(LIST)
392
393 Creates a new queue pre-populated with the provided list of items.
394
395 =back
396
397 =head1 BASIC METHODS
398
399 The following methods deal with queues on a FIFO basis.
400
401 =over
402
403 =item ->enqueue(LIST)
404
405 Adds a list of items onto the end of the queue.
406
407 =item ->dequeue()
408
409 =item ->dequeue(COUNT)
410
411 Removes the requested number of items (default is 1) from the head of the
412 queue, and returns them.  If the queue contains fewer than the requested
413 number of items, then the thread will be blocked until the requisite number
414 of items are available (i.e., until other threads <enqueue> more items).
415
416 =item ->dequeue_nb()
417
418 =item ->dequeue_nb(COUNT)
419
420 Removes the requested number of items (default is 1) from the head of the
421 queue, and returns them.  If the queue contains fewer than the requested
422 number of items, then it immediately (i.e., non-blocking) returns whatever
423 items there are on the queue.  If the queue is empty, then C<undef> is
424 returned.
425
426 =item ->pending()
427
428 Returns the number of items still in the queue.
429
430 =back
431
432 =head1 ADVANCED METHODS
433
434 The following methods can be used to manipulate items anywhere in a queue.
435
436 To prevent the contents of a queue from being modified by another thread
437 while it is being examined and/or changed, L<lock|threads::shared/"lock
438 VARIABLE"> the queue inside a local block:
439
440     {
441         lock($q);   # Keep other threads from changing the queue's contents
442         my $item = $q->peek();
443         if ($item ...) {
444             ...
445         }
446     }
447     # Queue is now unlocked
448
449 =over
450
451 =item ->peek()
452
453 =item ->peek(INDEX)
454
455 Returns an item from the queue without dequeuing anything.  Defaults to the
456 the head of queue (at index position 0) if no index is specified.  Negative
457 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
458 is the end of the queue, -2 is next to last, and so on).
459
460 If no items exists at the specified index (i.e., the queue is empty, or the
461 index is beyond the number of items on the queue), then C<undef> is returned.
462
463 Remember, the returned item is not removed from the queue, so manipulating a
464 C<peek>ed at reference affects the item on the queue.
465
466 =item ->insert(INDEX, LIST)
467
468 Adds the list of items to the queue at the specified index position (0
469 is the head of the list).  Any existing items at and beyond that position are
470 pushed back past the newly added items:
471
472     $q->enqueue(1, 2, 3, 4);
473     $q->insert(1, qw/foo bar/);
474     # Queue now contains:  1, foo, bar, 2, 3, 4
475
476 Specifying an index position greater than the number of items in the queue
477 just adds the list to the end.
478
479 Negative index positions are supported:
480
481     $q->enqueue(1, 2, 3, 4);
482     $q->insert(-2, qw/foo bar/);
483     # Queue now contains:  1, 2, foo, bar, 3, 4
484
485 Specifying a negative index position greater than the number of items in the
486 queue adds the list to the head of the queue.
487
488 =item ->extract()
489
490 =item ->extract(INDEX)
491
492 =item ->extract(INDEX, COUNT)
493
494 Removes and returns the specified number of items (defaults to 1) from the
495 specified index position in the queue (0 is the head of the queue).  When
496 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
497
498 This method is non-blocking, and will return only as many items as are
499 available to fulfill the request:
500
501     $q->enqueue(1, 2, 3, 4);
502     my $item  = $q->extract(2)     # Returns 3
503                                    # Queue now contains:  1, 2, 4
504     my @items = $q->extract(1, 3)  # Returns (2, 4)
505                                    # Queue now contains:  1
506
507 Specifying an index position greater than the number of items in the
508 queue results in C<undef> or an empty list being returned.
509
510     $q->enqueue('foo');
511     my $nada = $q->extract(3)      # Returns undef
512     my @nada = $q->extract(1, 3)   # Returns ()
513
514 Negative index positions are supported.  Specifying a negative index position
515 greater than the number of items in the queue may return items from the head
516 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
517 queue from the specified position (i.e. if queue size + index + count is
518 greater than zero):
519
520     $q->enqueue(qw/foo bar baz/);
521     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
522     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
523                                      # Queue now contains:  bar, baz
524     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
525
526 =back
527
528 =head1 NOTES
529
530 Queues created by L<Thread::Queue> can be used in both threaded and
531 non-threaded applications.
532
533 =head1 LIMITATIONS
534
535 Passing objects on queues may not work if the objects' classes do not support
536 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
537
538 Passing array/hash refs that contain objects may not work for Perl prior to
539 5.10.0.
540
541 =head1 SEE ALSO
542
543 Thread::Queue Discussion Forum on CPAN:
544 L<http://www.cpanforum.com/dist/Thread-Queue>
545
546 Annotated POD for Thread::Queue:
547 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm>
548
549 Source repository:
550 L<http://code.google.com/p/thread-queue/>
551
552 L<threads>, L<threads::shared>
553
554 =head1 MAINTAINER
555
556 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
557
558 =head1 LICENSE
559
560 This program is free software; you can redistribute it and/or modify it under
561 the same terms as Perl itself.
562
563 =cut