Thread::Queue 2.08
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
1 package Thread::Queue;
2
3 use strict;
4 use warnings;
5
6 our $VERSION = '2.08';
7
8 use threads::shared 1.21;
9 use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
10
11 # Carp errors from threads::shared calls should complain about caller
12 our @CARP_NOT = ("threads::shared");
13
14 # Predeclarations for internal functions
15 my ($validate_count, $validate_index);
16
17 # Create a new queue possibly pre-populated with items
18 sub new
19 {
20     my $class = shift;
21     my @queue :shared = map { shared_clone($_) } @_;
22     return bless(\@queue, $class);
23 }
24
25 # Add items to the tail of a queue
26 sub enqueue
27 {
28     my $queue = shift;
29     lock(@$queue);
30     push(@$queue, map { shared_clone($_) } @_)
31         and cond_signal(@$queue);
32 }
33
34 # Return a count of the number of items on a queue
35 sub pending
36 {
37     my $queue = shift;
38     lock(@$queue);
39     return scalar(@$queue);
40 }
41
42 # Return 1 or more items from the head of a queue, blocking if needed
43 sub dequeue
44 {
45     my $queue = shift;
46     lock(@$queue);
47
48     my $count = @_ ? $validate_count->(shift) : 1;
49
50     # Wait for requisite number of items
51     cond_wait(@$queue) until (@$queue >= $count);
52     cond_signal(@$queue) if (@$queue > $count);
53
54     # Return single item
55     return shift(@$queue) if ($count == 1);
56
57     # Return multiple items
58     my @items;
59     push(@items, shift(@$queue)) for (1..$count);
60     return @items;
61 }
62
63 # Return items from the head of a queue with no blocking
64 sub dequeue_nb
65 {
66     my $queue = shift;
67     lock(@$queue);
68
69     my $count = @_ ? $validate_count->(shift) : 1;
70
71     # Return single item
72     return shift(@$queue) if ($count == 1);
73
74     # Return multiple items
75     my @items;
76     for (1..$count) {
77         last if (! @$queue);
78         push(@items, shift(@$queue));
79     }
80     return @items;
81 }
82
83 # Return an item without removing it from a queue
84 sub peek
85 {
86     my $queue = shift;
87     lock(@$queue);
88     my $index = @_ ? $validate_index->(shift) : 0;
89     return $$queue[$index];
90 }
91
92 # Insert items anywhere into a queue
93 sub insert
94 {
95     my $queue = shift;
96     lock(@$queue);
97
98     my $index = $validate_index->(shift);
99
100     return if (! @_);   # Nothing to insert
101
102     # Support negative indices
103     if ($index < 0) {
104         $index += @$queue;
105         if ($index < 0) {
106             $index = 0;
107         }
108     }
109
110     # Dequeue items from $index onward
111     my @tmp;
112     while (@$queue > $index) {
113         unshift(@tmp, pop(@$queue))
114     }
115
116     # Add new items to the queue
117     push(@$queue, map { shared_clone($_) } @_);
118
119     # Add previous items back onto the queue
120     push(@$queue, @tmp);
121
122     # Soup's up
123     cond_signal(@$queue);
124 }
125
126 # Remove items from anywhere in a queue
127 sub extract
128 {
129     my $queue = shift;
130     lock(@$queue);
131
132     my $index = @_ ? $validate_index->(shift) : 0;
133     my $count = @_ ? $validate_count->(shift) : 1;
134
135     # Support negative indices
136     if ($index < 0) {
137         $index += @$queue;
138         if ($index < 0) {
139             $count += $index;
140             return if ($count <= 0);            # Beyond the head of the queue
141             return $queue->dequeue_nb($count);  # Extract from the head
142         }
143     }
144
145     # Dequeue items from $index+$count onward
146     my @tmp;
147     while (@$queue > ($index+$count)) {
148         unshift(@tmp, pop(@$queue))
149     }
150
151     # Extract desired items
152     my @items;
153     unshift(@items, pop(@$queue)) while (@$queue > $index);
154
155     # Add back any removed items
156     push(@$queue, @tmp);
157
158     # Return single item
159     return $items[0] if ($count == 1);
160
161     # Return multiple items
162     return @items;
163 }
164
165 ### Internal Functions ###
166
167 # Check value of the requested index
168 $validate_index = sub {
169     my $index = shift;
170
171     if (! looks_like_number($index) || (int($index) != $index)) {
172         require Carp;
173         my ($method) = (caller(1))[3];
174         $method =~ s/Thread::Queue:://;
175         $index = 'undef' if (! defined($index));
176         Carp::croak("Invalid 'index' argument ($index) to '$method' method");
177     }
178
179     return $index;
180 };
181
182 # Check value of the requested count
183 $validate_count = sub {
184     my $count = shift;
185
186     if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
187         require Carp;
188         my ($method) = (caller(1))[3];
189         $method =~ s/Thread::Queue:://;
190         $count = 'undef' if (! defined($count));
191         Carp::croak("Invalid 'count' argument ($count) to '$method' method");
192     }
193
194     return $count;
195 };
196
197 1;
198
199 =head1 NAME
200
201 Thread::Queue - Thread-safe queues
202
203 =head1 VERSION
204
205 This document describes Thread::Queue version 2.08
206
207 =head1 SYNOPSIS
208
209     use strict;
210     use warnings;
211
212     use threads;
213     use Thread::Queue;
214
215     my $q = Thread::Queue->new();    # A new empty queue
216
217     # Worker thread
218     my $thr = threads->create(sub {
219                                 while (my $item = $q->dequeue()) {
220                                     # Do work on $item
221                                 }
222                              })->detach();
223
224     # Send work to the thread
225     $q->enqueue($item1, ...);
226
227
228     # Count of items in the queue
229     my $left = $q->pending();
230
231     # Non-blocking dequeue
232     if (defined(my $item = $q->dequeue_nb())) {
233         # Work on $item
234     }
235
236     # Get the second item in the queue without dequeuing anything
237     my $item = $q->peek(1);
238
239     # Insert two items into the queue just behind the head
240     $q->insert(1, $item1, $item2);
241
242     # Extract the last two items on the queue
243     my ($item1, $item2) = $q->extract(-2, 2);
244
245 =head1 DESCRIPTION
246
247 This module provides thread-safe FIFO queues that can be accessed safely by
248 any number of threads.
249
250 Any data types supported by L<threads::shared> can be passed via queues:
251
252 =over
253
254 =item Ordinary scalars
255
256 =item Array refs
257
258 =item Hash refs
259
260 =item Scalar refs
261
262 =item Objects based on the above
263
264 =back
265
266 Ordinary scalars are added to queues as they are.
267
268 If not already thread-shared, the other complex data types will be cloned
269 (recursively, if needed, and including any C<bless>ings and read-only
270 settings) into thread-shared structures before being placed onto a queue.
271
272 For example, the following would cause L<Thread::Queue> to create a empty,
273 shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
274 and 'baz' from C<@ary> into it, and then place that shared reference onto
275 the queue:
276
277     my @ary = qw/foo bar baz/;
278     $q->enqueue(\@ary);
279
280 However, for the following, the items are already shared, so their references
281 are added directly to the queue, and no cloning takes place:
282
283     my @ary :shared = qw/foo bar baz/;
284     $q->enqueue(\@ary);
285
286     my $obj = &shared({});
287     $$obj{'foo'} = 'bar';
288     $$obj{'qux'} = 99;
289     bless($obj, 'My::Class');
290     $q->enqueue($obj);
291
292 See L</"LIMITATIONS"> for caveats related to passing objects via queues.
293
294 =head1 QUEUE CREATION
295
296 =over
297
298 =item ->new()
299
300 Creates a new empty queue.
301
302 =item ->new(LIST)
303
304 Creates a new queue pre-populated with the provided list of items.
305
306 =back
307
308 =head1 BASIC METHODS
309
310 The following methods deal with queues on a FIFO basis.
311
312 =over
313
314 =item ->enqueue(LIST)
315
316 Adds a list of items onto the end of the queue.
317
318 =item ->dequeue()
319
320 =item ->dequeue(COUNT)
321
322 Removes the requested number of items (default is 1) from the head of the
323 queue, and returns them.  If the queue contains fewer than the requested
324 number of items, then the thread will be blocked until the requisite number
325 of items are available (i.e., until other threads <enqueue> more items).
326
327 =item ->dequeue_nb()
328
329 =item ->dequeue_nb(COUNT)
330
331 Removes the requested number of items (default is 1) from the head of the
332 queue, and returns them.  If the queue contains fewer than the requested
333 number of items, then it immediately (i.e., non-blocking) returns whatever
334 items there are on the queue.  If the queue is empty, then C<undef> is
335 returned.
336
337 =item ->pending()
338
339 Returns the number of items still in the queue.
340
341 =back
342
343 =head1 ADVANCED METHODS
344
345 The following methods can be used to manipulate items anywhere in a queue.
346
347 To prevent the contents of a queue from being modified by another thread
348 while it is being examined and/or changed, L<lock|threads::shared/"lock
349 VARIABLE"> the queue inside a local block:
350
351     {
352         lock($q);   # Keep other threads from changing the queue's contents
353         my $item = $q->peek();
354         if ($item ...) {
355             ...
356         }
357     }
358     # Queue is now unlocked
359
360 =over
361
362 =item ->peek()
363
364 =item ->peek(INDEX)
365
366 Returns an item from the queue without dequeuing anything.  Defaults to the
367 the head of queue (at index position 0) if no index is specified.  Negative
368 index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
369 is the end of the queue, -2 is next to last, and so on).
370
371 If no items exists at the specified index (i.e., the queue is empty, or the
372 index is beyond the number of items on the queue), then C<undef> is returned.
373
374 Remember, the returned item is not removed from the queue, so manipulating a
375 C<peek>ed at reference affects the item on the queue.
376
377 =item ->insert(INDEX, LIST)
378
379 Adds the list of items to the queue at the specified index position (0
380 is the head of the list).  Any existing items at and beyond that position are
381 pushed back past the newly added items:
382
383     $q->enqueue(1, 2, 3, 4);
384     $q->insert(1, qw/foo bar/);
385     # Queue now contains:  1, foo, bar, 2, 3, 4
386
387 Specifying an index position greater than the number of items in the queue
388 just adds the list to the end.
389
390 Negative index positions are supported:
391
392     $q->enqueue(1, 2, 3, 4);
393     $q->insert(-2, qw/foo bar/);
394     # Queue now contains:  1, 2, foo, bar, 3, 4
395
396 Specifying a negative index position greater than the number of items in the
397 queue adds the list to the head of the queue.
398
399 =item ->extract()
400
401 =item ->extract(INDEX)
402
403 =item ->extract(INDEX, COUNT)
404
405 Removes and returns the specified number of items (defaults to 1) from the
406 specified index position in the queue (0 is the head of the queue).  When
407 called with no arguments, C<extract> operates the same as C<dequeue_nb>.
408
409 This method is non-blocking, and will return only as many items as are
410 available to fulfill the request:
411
412     $q->enqueue(1, 2, 3, 4);
413     my $item  = $q->extract(2)     # Returns 3
414                                    # Queue now contains:  1, 2, 4
415     my @items = $q->extract(1, 3)  # Returns (2, 4)
416                                    # Queue now contains:  1
417
418 Specifying an index position greater than the number of items in the
419 queue results in C<undef> or an empty list being returned.
420
421     $q->enqueue('foo');
422     my $nada = $q->extract(3)      # Returns undef
423     my @nada = $q->extract(1, 3)   # Returns ()
424
425 Negative index positions are supported.  Specifying a negative index position
426 greater than the number of items in the queue may return items from the head
427 of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
428 queue from the specified position (i.e. if queue size + index + count is
429 greater than zero):
430
431     $q->enqueue(qw/foo bar baz/);
432     my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
433     my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
434                                      # Queue now contains:  bar, baz
435     my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
436
437 =back
438
439 =head1 NOTES
440
441 Queues created by L<Thread::Queue> can be used in both threaded and
442 non-threaded applications.
443
444 =head1 LIMITATIONS
445
446 Passing objects on queues may not work if the objects' classes do not support
447 sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
448
449 Passing array/hash refs that contain objects may not work for Perl prior to
450 5.10.0.
451
452 =head1 SEE ALSO
453
454 Thread::Queue Discussion Forum on CPAN:
455 L<http://www.cpanforum.com/dist/Thread-Queue>
456
457 Annotated POD for Thread::Queue:
458 L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.08/lib/Thread/Queue.pm>
459
460 Source repository:
461 L<http://code.google.com/p/thread-queue/>
462
463 L<threads>, L<threads::shared>
464
465 =head1 MAINTAINER
466
467 Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
468
469 =head1 LICENSE
470
471 This program is free software; you can redistribute it and/or modify it under
472 the same terms as Perl itself.
473
474 =cut