Thread::Queue 2.06
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.06';
7
8 use threads::shared 0.96;
9 use Scalar::Util 1.10 qw(looks_like_number);
10
11 # Predeclarations for internal functions
12 my ($make_shared, $validate_count, $validate_index);
13
14 # Create a new queue possibly pre-populated with items
15 sub new
16 {
17     my $class = shift;
18     my @queue :shared = map { $make_shared->($_) } @_;
19     return bless(\@queue, $class);
20 }
21
22 # Add items to the tail of a queue
23 sub enqueue
24 {
25     my $queue = shift;
26     lock(@$queue);
27     push(@$queue, map { $make_shared->($_) } @_)
28         and cond_signal(@$queue);
29 }
30
31 # Return a count of the number of items on a queue
32 sub pending
33 {
34     my $queue = shift;
35     lock(@$queue);
36     return scalar(@$queue);
37 }
38
39 # Return 1 or more items from the head of a queue, blocking if needed
40 sub dequeue
41 {
42     my $queue = shift;
43     lock(@$queue);
44
45     my $count = @_ ? $validate_count->(shift) : 1;
46
47     # Wait for requisite number of items
48     cond_wait(@$queue) until (@$queue >= $count);
49     cond_signal(@$queue) if (@$queue > $count);
50
51     # Return single item
52     return shift(@$queue) if ($count == 1);
53
54     # Return multiple items
55     my @items;
56     push(@items, shift(@$queue)) for (1..$count);
57     return @items;
58 }
59
60 # Return items from the head of a queue with no blocking
61 sub dequeue_nb
62 {
63     my $queue = shift;
64     lock(@$queue);
65
66     my $count = @_ ? $validate_count->(shift) : 1;
67
68     # Return single item
69     return shift(@$queue) if ($count == 1);
70
71     # Return multiple items
72     my @items;
73     for (1..$count) {
74         last if (! @$queue);
75         push(@items, shift(@$queue));
76     }
77     return @items;
78 }
79
80 # Return an item without removing it from a queue
81 sub peek
82 {
83     my $queue = shift;
84     lock(@$queue);
85     my $index = @_ ? $validate_index->(shift) : 0;
86     return $$queue[$index];
87 }
88
89 # Insert items anywhere into a queue
90 sub insert
91 {
92     my $queue = shift;
93     lock(@$queue);
94
95     my $index = $validate_index->(shift);
96
97     return if (! @_);   # Nothing to insert
98
99     # Support negative indices
100     if ($index < 0) {
101         $index += @$queue;
102         if ($index < 0) {
103             $index = 0;
104         }
105     }
106
107     # Dequeue items from $index onward
108     my @tmp;
109     while (@$queue > $index) {
110         unshift(@tmp, pop(@$queue))
111     }
112
113     # Add new items to the queue
114     push(@$queue, map { $make_shared->($_) } @_);
115
116     # Add previous items back onto the queue
117     push(@$queue, @tmp);
118
119     # Soup's up
120     cond_signal(@$queue);
121 }
122
123 # Remove items from anywhere in a queue
124 sub extract
125 {
126     my $queue = shift;
127     lock(@$queue);
128
129     my $index = @_ ? $validate_index->(shift) : 0;
130     my $count = @_ ? $validate_count->(shift) : 1;
131
132     # Support negative indices
133     if ($index < 0) {
134         $index += @$queue;
135         if ($index < 0) {
136             $count += $index;
137             return if ($count <= 0);            # Beyond the head of the queue
138             return $queue->dequeue_nb($count);  # Extract from the head
139         }
140     }
141
142     # Dequeue items from $index+$count onward
143     my @tmp;
144     while (@$queue > ($index+$count)) {
145         unshift(@tmp, pop(@$queue))
146     }
147
148     # Extract desired items
149     my @items;
150     unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152     # Add back any removed items
153     push(@$queue, @tmp);
154
155     # Return single item
156     return $items[0] if ($count == 1);
157
158     # Return multiple items
159     return @items;
160 }
161
162 ### Internal Functions ###
163
164 # Create a thread-shared version of a complex data structure or object
165 $make_shared = sub {
166     my $item = shift;
167
168     # If not running 'threads' or already thread-shared,
169     #   then just return the input item
170     return $item if (! $threads::threads ||
171                      threads::shared::is_shared($item));
172
173     # Make copies of array, hash and scalar refs
174     my $copy;
175     if (my $ref_type = Scalar::Util::reftype($item)) {
176         # Copy an array ref
177         if ($ref_type eq 'ARRAY') {
178             # Make empty shared array ref
179             $copy = &share([]);
180             # Recursively copy and add contents
181             push(@$copy, map { $make_shared->($_) } @$item);
182         }
183
184         # Copy a hash ref
185         elsif ($ref_type eq 'HASH') {
186             # Make empty shared hash ref
187             $copy = &share({});
188             # Recursively copy and add contents
189             foreach my $key (keys(%{$item})) {
190                 $copy->{$key} = $make_shared->($item->{$key});
191             }
192         }
193
194         # Copy a scalar ref
195         elsif ($ref_type eq 'SCALAR') {
196             $copy = \do{ my $scalar = $$item; };
197             share($copy);
198             # Clone READONLY flag
199             if (Internals::SvREADONLY($$item)) {
200                 Internals::SvREADONLY($$copy, 1);
201             }
202         }
203
204         # Copy of a ref of a ref
205         elsif ($ref_type eq 'REF') {
206             my $tmp = $make_shared->($$item);
207             $copy = \$tmp;
208             share($copy);
209         }
210     }
211
212     # If no copy is created above, then just return the input item
213     # NOTE:  This will end up generating an error for anything
214     #        other than an ordinary scalar
215     return $item if (! defined($copy));
216
217     # Clone READONLY flag
218     if (Internals::SvREADONLY($item)) {
219         Internals::SvREADONLY($copy, 1);
220     }
221
222     # If input item is an object, then bless the copy into the same class
223     if (my $class = Scalar::Util::blessed($item)) {
224         bless($copy, $class);
225     }
226
227     return $copy;
228 };
229
230 # Check value of the requested index
231 $validate_index = sub {
232     my $index = shift;
233
234     if (! looks_like_number($index) || (int($index) != $index)) {
235         require Carp;
236         my ($method) = (caller(1))[3];
237         $method =~ s/Thread::Queue:://;
238         $index = 'undef' if (! defined($index));
239         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
240     }
241
242     return $index;
243 };
244
245 # Check value of the requested count
246 $validate_count = sub {
247     my $count = shift;
248
249     if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
250         require Carp;
251         my ($method) = (caller(1))[3];
252         $method =~ s/Thread::Queue:://;
253         $count = 'undef' if (! defined($count));
254         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
255     }
256
257     return $count;
258 };
259
260 1;
261
262 =head1 NAME
263
264 Thread::Queue - Thread-safe queues
265
266 =head1 VERSION
267
268 This document describes Thread::Queue version 2.06
269
270 =head1 SYNOPSIS
271
272     use strict;
273     use warnings;
274
275     use threads;
276     use Thread::Queue;
277
278     my $q = Thread::Queue->new();    # A new empty queue
279
280     # Worker thread
281     my $thr = threads->create(sub {
282                                 while (my $item = $q->dequeue()) {
283                                     # Do work on $item
284                                 }
285                              })->detach();
286
287     # Send work to the thread
288     $q->enqueue($item1, ...);
289
290
291     # Count of items in the queue
292     my $left = $q->pending();
293
294     # Non-blocking dequeue
295     if (defined(my $item = $q->dequeue_nb())) {
296         # Work on $item
297     }
298
299     # Get the second item in the queue without dequeuing anything
300     my $item = $q->peek(1);
301
302     # Insert two items into the queue just behind the head
303     $q->insert(1, $item1, $item2);
304
305     # Extract the last two items on the queue
306     my ($item1, $item2) = $q->extract(-2, 2);
307
308 =head1 DESCRIPTION
309
310 This module provides thread-safe FIFO queues that can be accessed safely by
311 any number of threads.
312
313 Any data types supported by L<threads::shared> can be passed via queues:
314
315 =over
316
317 =item Ordinary scalars
318
319 =item Array refs
320
321 =item Hash refs
322
323 =item Scalar refs
324
325 =item Objects based on the above
326
327 =back
328
329 Ordinary scalars are added to queues as they are.
330
331 If not already thread-shared, the other complex data types will be cloned
332 (recursively, if needed, and including any C<bless>ings and read-only
333 settings) into thread-shared structures before being placed onto a queue.
334
335 For example, the following would cause L<Thread::Queue> to create a empty,
336 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
337 and 'baz' from C<@ary> into it, and then place that shared reference onto
338 the queue:
339
340     my @ary = qw/foo bar baz/;
341     $q->enqueue(\@ary);
342
343 However, for the following, the items are already shared, so their references
344 are added directly to the queue, and no cloning takes place:
345
346     my @ary :shared = qw/foo bar baz/;
347     $q->enqueue(\@ary);
348
349     my $obj = &shared({});
350     $$obj{'foo'} = 'bar';
351     $$obj{'qux'} = 99;
352     bless($obj, 'My::Class');
353     $q->enqueue($obj);
354
355 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
356
357 =head1 QUEUE CREATION
358
359 =over
360
361 =item ->new()
362
363 Creates a new empty queue.
364
365 =item ->new(LIST)
366
367 Creates a new queue pre-populated with the provided list of items.
368
369 =back
370
371 =head1 BASIC METHODS
372
373 The following methods deal with queues on a FIFO basis.
374
375 =over
376
377 =item ->enqueue(LIST)
378
379 Adds a list of items onto the end of the queue.
380
381 =item ->dequeue()
382
383 =item ->dequeue(COUNT)
384
385 Removes the requested number of items (default is 1) from the head of the
386 queue, and returns them.  If the queue contains fewer than the requested
387 number of items, then the thread will be blocked until the requisite number
388 of items are available (i.e., until other threads <enqueue> more items).
389
390 =item ->dequeue_nb()
391
392 =item ->dequeue_nb(COUNT)
393
394 Removes the requested number of items (default is 1) from the head of the
395 queue, and returns them.  If the queue contains fewer than the requested
396 number of items, then it immediately (i.e., non-blocking) returns whatever
397 items there are on the queue.  If the queue is empty, then C<undef> is
398 returned.
399
400 =item ->pending()
401
402 Returns the number of items still in the queue.
403
404 =back
405
406 =head1 ADVANCED METHODS
407
408 The following methods can be used to manipulate items anywhere in a queue.
409
410 To prevent the contents of a queue from being modified by another thread
411 while it is being examined and/or changed, L<lock|threads::shared/"lock
412 VARIABLE"> the queue inside a local block:
413
414     {
415         lock($q);   # Keep other threads from changing the queue's contents
416         my $item = $q->peek();
417         if ($item ...) {
418             ...
419         }
420     }
421     # Queue is now unlocked
422
423 =over
424
425 =item ->peek()
426
427 =item ->peek(INDEX)
428
429 Returns an item from the queue without dequeuing anything.  Defaults to the
430 the head of queue (at index position 0) if no index is specified.  Negative
431 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
432 is the end of the queue, -2 is next to last, and so on).
433
434 If no items exists at the specified index (i.e., the queue is empty, or the
435 index is beyond the number of items on the queue), then C<undef> is returned.
436
437 Remember, the returned item is not removed from the queue, so manipulating a
438 C<peek>ed at reference affects the item on the queue.
439
440 =item ->insert(INDEX, LIST)
441
442 Adds the list of items to the queue at the specified index position (0
443 is the head of the list).  Any existing items at and beyond that position are
444 pushed back past the newly added items:
445
446     $q->enqueue(1, 2, 3, 4);
447     $q->insert(1, qw/foo bar/);
448     # Queue now contains:  1, foo, bar, 2, 3, 4
449
450 Specifying an index position greater than the number of items in the queue
451 just adds the list to the end.
452
453 Negative index positions are supported:
454
455     $q->enqueue(1, 2, 3, 4);
456     $q->insert(-2, qw/foo bar/);
457     # Queue now contains:  1, 2, foo, bar, 3, 4
458
459 Specifying a negative index position greater than the number of items in the
460 queue adds the list to the head of the queue.
461
462 =item ->extract()
463
464 =item ->extract(INDEX)
465
466 =item ->extract(INDEX, COUNT)
467
468 Removes and returns the specified number of items (defaults to 1) from the
469 specified index position in the queue (0 is the head of the queue).  When
470 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
471
472 This method is non-blocking, and will return only as many items as are
473 available to fulfill the request:
474
475     $q->enqueue(1, 2, 3, 4);
476     my $item  = $q->extract(2)     # Returns 3
477                                    # Queue now contains:  1, 2, 4
478     my @items = $q->extract(1, 3)  # Returns (2, 4)
479                                    # Queue now contains:  1
480
481 Specifying an index position greater than the number of items in the
482 queue results in C<undef> or an empty list being returned.
483
484     $q->enqueue('foo');
485     my $nada = $q->extract(3)      # Returns undef
486     my @nada = $q->extract(1, 3)   # Returns ()
487
488 Negative index positions are supported.  Specifying a negative index position
489 greater than the number of items in the queue may return items from the head
490 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
491 queue from the specified position (i.e. if queue size + index + count is
492 greater than zero):
493
494     $q->enqueue(qw/foo bar baz/);
495     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
496     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
497                                      # Queue now contains:  bar, baz
498     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
499
500 =back
501
502 =head1 NOTES
503
504 Queues created by L<Thread::Queue> can be used in both threaded and
505 non-threaded applications.
506
507 =head1 LIMITATIONS
508
509 Passing objects on queues may not work if the objects' classes do not support
510 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
511
512 Passing array/hash refs that contain objects may not work for Perl prior to
513 5.10.0.
514
515 =head1 SEE ALSO
516
517 Thread::Queue Discussion Forum on CPAN:
518 L<http://www.cpanforum.com/dist/Thread-Queue>
519
520 Annotated POD for Thread::Queue:
521 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.06/lib/Thread/Queue.pm>
522
523 Source repository:
524 L<http://code.google.com/p/thread-queue/>
525
526 L<threads>, L<threads::shared>
527
528 =head1 MAINTAINER
529
530 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
531
532 =head1 LICENSE
533
534 This program is free software; you can redistribute it and/or modify it under
535 the same terms as Perl itself.
536
537 =cut