Thread::Queue 2.08
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
09782346 6our $VERSION = '2.08';
54c7876f 7
09782346 8use threads::shared 1.21;
ac9d3a9d 9use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 10
09782346 11# Carp errors from threads::shared calls should complain about caller
12our @CARP_NOT = ("threads::shared");
13
54c7876f 14# Predeclarations for internal functions
09782346 15my ($validate_count, $validate_index);
54c7876f 16
17# Create a new queue possibly pre-populated with items
18sub new
19{
20 my $class = shift;
09782346 21 my @queue :shared = map { shared_clone($_) } @_;
54c7876f 22 return bless(\@queue, $class);
23}
24
25# Add items to the tail of a queue
26sub enqueue
27{
28 my $queue = shift;
29 lock(@$queue);
09782346 30 push(@$queue, map { shared_clone($_) } @_)
54c7876f 31 and cond_signal(@$queue);
32}
33
34# Return a count of the number of items on a queue
35sub pending
36{
37 my $queue = shift;
38 lock(@$queue);
39 return scalar(@$queue);
40}
41
42# Return 1 or more items from the head of a queue, blocking if needed
43sub dequeue
44{
45 my $queue = shift;
46 lock(@$queue);
47
48 my $count = @_ ? $validate_count->(shift) : 1;
49
50 # Wait for requisite number of items
51 cond_wait(@$queue) until (@$queue >= $count);
52 cond_signal(@$queue) if (@$queue > $count);
53
54 # Return single item
55 return shift(@$queue) if ($count == 1);
56
57 # Return multiple items
58 my @items;
59 push(@items, shift(@$queue)) for (1..$count);
60 return @items;
61}
62
63# Return items from the head of a queue with no blocking
64sub dequeue_nb
65{
66 my $queue = shift;
67 lock(@$queue);
68
69 my $count = @_ ? $validate_count->(shift) : 1;
70
71 # Return single item
72 return shift(@$queue) if ($count == 1);
73
74 # Return multiple items
75 my @items;
76 for (1..$count) {
77 last if (! @$queue);
78 push(@items, shift(@$queue));
79 }
80 return @items;
81}
82
83# Return an item without removing it from a queue
84sub peek
85{
86 my $queue = shift;
87 lock(@$queue);
88 my $index = @_ ? $validate_index->(shift) : 0;
89 return $$queue[$index];
90}
91
92# Insert items anywhere into a queue
93sub insert
94{
95 my $queue = shift;
96 lock(@$queue);
97
98 my $index = $validate_index->(shift);
99
100 return if (! @_); # Nothing to insert
101
102 # Support negative indices
103 if ($index < 0) {
104 $index += @$queue;
105 if ($index < 0) {
106 $index = 0;
107 }
108 }
109
110 # Dequeue items from $index onward
111 my @tmp;
112 while (@$queue > $index) {
113 unshift(@tmp, pop(@$queue))
114 }
115
116 # Add new items to the queue
09782346 117 push(@$queue, map { shared_clone($_) } @_);
54c7876f 118
119 # Add previous items back onto the queue
120 push(@$queue, @tmp);
121
122 # Soup's up
123 cond_signal(@$queue);
124}
125
126# Remove items from anywhere in a queue
127sub extract
128{
129 my $queue = shift;
130 lock(@$queue);
131
132 my $index = @_ ? $validate_index->(shift) : 0;
133 my $count = @_ ? $validate_count->(shift) : 1;
134
135 # Support negative indices
136 if ($index < 0) {
137 $index += @$queue;
138 if ($index < 0) {
139 $count += $index;
140 return if ($count <= 0); # Beyond the head of the queue
141 return $queue->dequeue_nb($count); # Extract from the head
142 }
143 }
144
145 # Dequeue items from $index+$count onward
146 my @tmp;
147 while (@$queue > ($index+$count)) {
148 unshift(@tmp, pop(@$queue))
149 }
150
151 # Extract desired items
152 my @items;
153 unshift(@items, pop(@$queue)) while (@$queue > $index);
154
155 # Add back any removed items
156 push(@$queue, @tmp);
157
158 # Return single item
159 return $items[0] if ($count == 1);
160
161 # Return multiple items
162 return @items;
163}
164
165### Internal Functions ###
166
54c7876f 167# Check value of the requested index
168$validate_index = sub {
169 my $index = shift;
170
171 if (! looks_like_number($index) || (int($index) != $index)) {
172 require Carp;
173 my ($method) = (caller(1))[3];
174 $method =~ s/Thread::Queue:://;
175 $index = 'undef' if (! defined($index));
176 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
177 }
178
179 return $index;
180};
181
182# Check value of the requested count
183$validate_count = sub {
184 my $count = shift;
185
186 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
187 require Carp;
188 my ($method) = (caller(1))[3];
189 $method =~ s/Thread::Queue:://;
190 $count = 'undef' if (! defined($count));
191 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
192 }
193
194 return $count;
195};
196
1971;
9c6f8578 198
d516a115 199=head1 NAME
200
54c7876f 201Thread::Queue - Thread-safe queues
202
203=head1 VERSION
204
09782346 205This document describes Thread::Queue version 2.08
d516a115 206
207=head1 SYNOPSIS
208
54c7876f 209 use strict;
210 use warnings;
211
212 use threads;
d516a115 213 use Thread::Queue;
54c7876f 214
215 my $q = Thread::Queue->new(); # A new empty queue
216
217 # Worker thread
218 my $thr = threads->create(sub {
219 while (my $item = $q->dequeue()) {
220 # Do work on $item
221 }
222 })->detach();
223
224 # Send work to the thread
225 $q->enqueue($item1, ...);
226
227
228 # Count of items in the queue
229 my $left = $q->pending();
230
231 # Non-blocking dequeue
232 if (defined(my $item = $q->dequeue_nb())) {
233 # Work on $item
234 }
235
236 # Get the second item in the queue without dequeuing anything
237 my $item = $q->peek(1);
238
239 # Insert two items into the queue just behind the head
240 $q->insert(1, $item1, $item2);
241
242 # Extract the last two items on the queue
243 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 244
a99072da 245=head1 DESCRIPTION
246
54c7876f 247This module provides thread-safe FIFO queues that can be accessed safely by
248any number of threads.
249
250Any data types supported by L<threads::shared> can be passed via queues:
251
252=over
253
254=item Ordinary scalars
255
256=item Array refs
257
258=item Hash refs
259
260=item Scalar refs
261
262=item Objects based on the above
263
264=back
265
266Ordinary scalars are added to queues as they are.
267
268If not already thread-shared, the other complex data types will be cloned
269(recursively, if needed, and including any C<bless>ings and read-only
270settings) into thread-shared structures before being placed onto a queue.
a99072da 271
54c7876f 272For example, the following would cause L<Thread::Queue> to create a empty,
273shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
274and 'baz' from C<@ary> into it, and then place that shared reference onto
275the queue:
a99072da 276
54c7876f 277 my @ary = qw/foo bar baz/;
278 $q->enqueue(\@ary);
a99072da 279
54c7876f 280However, for the following, the items are already shared, so their references
281are added directly to the queue, and no cloning takes place:
a99072da 282
54c7876f 283 my @ary :shared = qw/foo bar baz/;
284 $q->enqueue(\@ary);
a99072da 285
54c7876f 286 my $obj = &shared({});
287 $$obj{'foo'} = 'bar';
288 $$obj{'qux'} = 99;
289 bless($obj, 'My::Class');
290 $q->enqueue($obj);
a99072da 291
54c7876f 292See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 293
54c7876f 294=head1 QUEUE CREATION
a99072da 295
54c7876f 296=over
a99072da 297
54c7876f 298=item ->new()
a99072da 299
54c7876f 300Creates a new empty queue.
a99072da 301
54c7876f 302=item ->new(LIST)
a99072da 303
54c7876f 304Creates a new queue pre-populated with the provided list of items.
a99072da 305
306=back
307
54c7876f 308=head1 BASIC METHODS
a99072da 309
54c7876f 310The following methods deal with queues on a FIFO basis.
bbc7dcd2 311
54c7876f 312=over
d516a115 313
54c7876f 314=item ->enqueue(LIST)
d21067e0 315
54c7876f 316Adds a list of items onto the end of the queue.
d21067e0 317
54c7876f 318=item ->dequeue()
a99072da 319
54c7876f 320=item ->dequeue(COUNT)
d21067e0 321
54c7876f 322Removes the requested number of items (default is 1) from the head of the
323queue, and returns them. If the queue contains fewer than the requested
324number of items, then the thread will be blocked until the requisite number
325of items are available (i.e., until other threads <enqueue> more items).
a99072da 326
54c7876f 327=item ->dequeue_nb()
328
329=item ->dequeue_nb(COUNT)
330
331Removes the requested number of items (default is 1) from the head of the
332queue, and returns them. If the queue contains fewer than the requested
333number of items, then it immediately (i.e., non-blocking) returns whatever
334items there are on the queue. If the queue is empty, then C<undef> is
335returned.
336
337=item ->pending()
338
339Returns the number of items still in the queue.
340
341=back
342
343=head1 ADVANCED METHODS
344
345The following methods can be used to manipulate items anywhere in a queue.
346
347To prevent the contents of a queue from being modified by another thread
348while it is being examined and/or changed, L<lock|threads::shared/"lock
349VARIABLE"> the queue inside a local block:
350
351 {
352 lock($q); # Keep other threads from changing the queue's contents
353 my $item = $q->peek();
354 if ($item ...) {
355 ...
356 }
357 }
358 # Queue is now unlocked
359
360=over
361
362=item ->peek()
363
364=item ->peek(INDEX)
365
366Returns an item from the queue without dequeuing anything. Defaults to the
367the head of queue (at index position 0) if no index is specified. Negative
368index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
369is the end of the queue, -2 is next to last, and so on).
370
371If no items exists at the specified index (i.e., the queue is empty, or the
372index is beyond the number of items on the queue), then C<undef> is returned.
373
374Remember, the returned item is not removed from the queue, so manipulating a
375C<peek>ed at reference affects the item on the queue.
376
377=item ->insert(INDEX, LIST)
378
379Adds the list of items to the queue at the specified index position (0
380is the head of the list). Any existing items at and beyond that position are
381pushed back past the newly added items:
382
383 $q->enqueue(1, 2, 3, 4);
384 $q->insert(1, qw/foo bar/);
385 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 386
54c7876f 387Specifying an index position greater than the number of items in the queue
388just adds the list to the end.
83272a45 389
54c7876f 390Negative index positions are supported:
391
392 $q->enqueue(1, 2, 3, 4);
393 $q->insert(-2, qw/foo bar/);
394 # Queue now contains: 1, 2, foo, bar, 3, 4
395
396Specifying a negative index position greater than the number of items in the
397queue adds the list to the head of the queue.
398
399=item ->extract()
400
401=item ->extract(INDEX)
402
403=item ->extract(INDEX, COUNT)
404
405Removes and returns the specified number of items (defaults to 1) from the
406specified index position in the queue (0 is the head of the queue). When
407called with no arguments, C<extract> operates the same as C<dequeue_nb>.
408
409This method is non-blocking, and will return only as many items as are
410available to fulfill the request:
411
412 $q->enqueue(1, 2, 3, 4);
413 my $item = $q->extract(2) # Returns 3
414 # Queue now contains: 1, 2, 4
415 my @items = $q->extract(1, 3) # Returns (2, 4)
416 # Queue now contains: 1
417
418Specifying an index position greater than the number of items in the
419queue results in C<undef> or an empty list being returned.
420
421 $q->enqueue('foo');
422 my $nada = $q->extract(3) # Returns undef
423 my @nada = $q->extract(1, 3) # Returns ()
424
425Negative index positions are supported. Specifying a negative index position
426greater than the number of items in the queue may return items from the head
427of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
428queue from the specified position (i.e. if queue size + index + count is
429greater than zero):
430
431 $q->enqueue(qw/foo bar baz/);
432 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
433 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
434 # Queue now contains: bar, baz
435 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
436
437=back
438
7fb1c73b 439=head1 NOTES
440
441Queues created by L<Thread::Queue> can be used in both threaded and
442non-threaded applications.
443
54c7876f 444=head1 LIMITATIONS
445
446Passing objects on queues may not work if the objects' classes do not support
447sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
448
449Passing array/hash refs that contain objects may not work for Perl prior to
4505.10.0.
451
452=head1 SEE ALSO
453
454Thread::Queue Discussion Forum on CPAN:
455L<http://www.cpanforum.com/dist/Thread-Queue>
456
457Annotated POD for Thread::Queue:
09782346 458L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.08/lib/Thread/Queue.pm>
7fb1c73b 459
460Source repository:
461L<http://code.google.com/p/thread-queue/>
54c7876f 462
463L<threads>, L<threads::shared>
464
465=head1 MAINTAINER
466
467Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
468
469=head1 LICENSE
470
471This program is free software; you can redistribute it and/or modify it under
472the same terms as Perl itself.
473
474=cut