Thread::Semaphore 2.07
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
54c7876f 6our $VERSION = '2.03';
7
8use threads::shared 0.96;
9use Scalar::Util 1.10 qw(looks_like_number);
10
11# Predeclarations for internal functions
12my ($make_shared, $validate_count, $validate_index);
13
14# Create a new queue possibly pre-populated with items
15sub new
16{
17 my $class = shift;
18 my @queue :shared = map { $make_shared->($_) } @_;
19 return bless(\@queue, $class);
20}
21
22# Add items to the tail of a queue
23sub enqueue
24{
25 my $queue = shift;
26 lock(@$queue);
27 push(@$queue, map { $make_shared->($_) } @_)
28 and cond_signal(@$queue);
29}
30
31# Return a count of the number of items on a queue
32sub pending
33{
34 my $queue = shift;
35 lock(@$queue);
36 return scalar(@$queue);
37}
38
39# Return 1 or more items from the head of a queue, blocking if needed
40sub dequeue
41{
42 my $queue = shift;
43 lock(@$queue);
44
45 my $count = @_ ? $validate_count->(shift) : 1;
46
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
50
51 # Return single item
52 return shift(@$queue) if ($count == 1);
53
54 # Return multiple items
55 my @items;
56 push(@items, shift(@$queue)) for (1..$count);
57 return @items;
58}
59
60# Return items from the head of a queue with no blocking
61sub dequeue_nb
62{
63 my $queue = shift;
64 lock(@$queue);
65
66 my $count = @_ ? $validate_count->(shift) : 1;
67
68 # Return single item
69 return shift(@$queue) if ($count == 1);
70
71 # Return multiple items
72 my @items;
73 for (1..$count) {
74 last if (! @$queue);
75 push(@items, shift(@$queue));
76 }
77 return @items;
78}
79
80# Return an item without removing it from a queue
81sub peek
82{
83 my $queue = shift;
84 lock(@$queue);
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
87}
88
89# Insert items anywhere into a queue
90sub insert
91{
92 my $queue = shift;
93 lock(@$queue);
94
95 my $index = $validate_index->(shift);
96
97 return if (! @_); # Nothing to insert
98
99 # Support negative indices
100 if ($index < 0) {
101 $index += @$queue;
102 if ($index < 0) {
103 $index = 0;
104 }
105 }
106
107 # Dequeue items from $index onward
108 my @tmp;
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
111 }
112
113 # Add new items to the queue
114 push(@$queue, map { $make_shared->($_) } @_);
115
116 # Add previous items back onto the queue
117 push(@$queue, @tmp);
118
119 # Soup's up
120 cond_signal(@$queue);
121}
122
123# Remove items from anywhere in a queue
124sub extract
125{
126 my $queue = shift;
127 lock(@$queue);
128
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
131
132 # Support negative indices
133 if ($index < 0) {
134 $index += @$queue;
135 if ($index < 0) {
136 $count += $index;
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
139 }
140 }
141
142 # Dequeue items from $index+$count onward
143 my @tmp;
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
146 }
147
148 # Extract desired items
149 my @items;
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152 # Add back any removed items
153 push(@$queue, @tmp);
154
155 # Return single item
156 return $items[0] if ($count == 1);
157
158 # Return multiple items
159 return @items;
160}
161
162### Internal Functions ###
163
164# Create a thread-shared version of a complex data structure or object
165$make_shared = sub {
166 my $item = shift;
167
168 # If already thread-shared, then just return the input item
169 return $item if (threads::shared::is_shared($item));
170
171 # Make copies of array, hash and scalar refs
172 my $copy;
173 if (my $ref_type = Scalar::Util::reftype($item)) {
174 # Copy an array ref
175 if ($ref_type eq 'ARRAY') {
176 # Make empty shared array ref
177 $copy = &share([]);
178 # Recursively copy and add contents
179 push(@$copy, map { $make_shared->($_) } @$item);
180 }
181
182 # Copy a hash ref
183 elsif ($ref_type eq 'HASH') {
184 # Make empty shared hash ref
185 $copy = &share({});
186 # Recursively copy and add contents
187 foreach my $key (keys(%{$item})) {
188 $copy->{$key} = $make_shared->($item->{$key});
189 }
190 }
191
192 # Copy a scalar ref
193 elsif ($ref_type eq 'SCALAR') {
194 $copy = \do{ my $scalar = $$item; };
195 share($copy);
196 # Clone READONLY flag
197 if (Internals::SvREADONLY($$item)) {
198 Internals::SvREADONLY($$copy, 1);
199 }
200 }
201
202 # Copy of a ref of a ref
203 elsif ($ref_type eq 'REF') {
204 my $tmp = $make_shared->($$item);
205 $copy = \$tmp;
206 share($copy);
207 }
208 }
209
210 # If no copy is created above, then just return the input item
211 # NOTE: This will end up generating an error for anything
212 # other than an ordinary scalar
213 return $item if (! defined($copy));
214
215 # Clone READONLY flag
216 if (Internals::SvREADONLY($item)) {
217 Internals::SvREADONLY($copy, 1);
218 }
219
220 # If input item is an object, then bless the copy into the same class
221 if (my $class = Scalar::Util::blessed($item)) {
222 bless($copy, $class);
223 }
224
225 return $copy;
226};
227
228# Check value of the requested index
229$validate_index = sub {
230 my $index = shift;
231
232 if (! looks_like_number($index) || (int($index) != $index)) {
233 require Carp;
234 my ($method) = (caller(1))[3];
235 $method =~ s/Thread::Queue:://;
236 $index = 'undef' if (! defined($index));
237 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
238 }
239
240 return $index;
241};
242
243# Check value of the requested count
244$validate_count = sub {
245 my $count = shift;
246
247 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
248 require Carp;
249 my ($method) = (caller(1))[3];
250 $method =~ s/Thread::Queue:://;
251 $count = 'undef' if (! defined($count));
252 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
253 }
254
255 return $count;
256};
257
2581;
9c6f8578 259
d516a115 260=head1 NAME
261
54c7876f 262Thread::Queue - Thread-safe queues
263
264=head1 VERSION
265
266This document describes Thread::Queue version 2.03
d516a115 267
268=head1 SYNOPSIS
269
54c7876f 270 use strict;
271 use warnings;
272
273 use threads;
d516a115 274 use Thread::Queue;
54c7876f 275
276 my $q = Thread::Queue->new(); # A new empty queue
277
278 # Worker thread
279 my $thr = threads->create(sub {
280 while (my $item = $q->dequeue()) {
281 # Do work on $item
282 }
283 })->detach();
284
285 # Send work to the thread
286 $q->enqueue($item1, ...);
287
288
289 # Count of items in the queue
290 my $left = $q->pending();
291
292 # Non-blocking dequeue
293 if (defined(my $item = $q->dequeue_nb())) {
294 # Work on $item
295 }
296
297 # Get the second item in the queue without dequeuing anything
298 my $item = $q->peek(1);
299
300 # Insert two items into the queue just behind the head
301 $q->insert(1, $item1, $item2);
302
303 # Extract the last two items on the queue
304 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 305
a99072da 306=head1 DESCRIPTION
307
54c7876f 308This module provides thread-safe FIFO queues that can be accessed safely by
309any number of threads.
310
311Any data types supported by L<threads::shared> can be passed via queues:
312
313=over
314
315=item Ordinary scalars
316
317=item Array refs
318
319=item Hash refs
320
321=item Scalar refs
322
323=item Objects based on the above
324
325=back
326
327Ordinary scalars are added to queues as they are.
328
329If not already thread-shared, the other complex data types will be cloned
330(recursively, if needed, and including any C<bless>ings and read-only
331settings) into thread-shared structures before being placed onto a queue.
a99072da 332
54c7876f 333For example, the following would cause L<Thread::Queue> to create a empty,
334shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
335and 'baz' from C<@ary> into it, and then place that shared reference onto
336the queue:
a99072da 337
54c7876f 338 my @ary = qw/foo bar baz/;
339 $q->enqueue(\@ary);
a99072da 340
54c7876f 341However, for the following, the items are already shared, so their references
342are added directly to the queue, and no cloning takes place:
a99072da 343
54c7876f 344 my @ary :shared = qw/foo bar baz/;
345 $q->enqueue(\@ary);
a99072da 346
54c7876f 347 my $obj = &shared({});
348 $$obj{'foo'} = 'bar';
349 $$obj{'qux'} = 99;
350 bless($obj, 'My::Class');
351 $q->enqueue($obj);
a99072da 352
54c7876f 353See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 354
54c7876f 355=head1 QUEUE CREATION
a99072da 356
54c7876f 357=over
a99072da 358
54c7876f 359=item ->new()
a99072da 360
54c7876f 361Creates a new empty queue.
a99072da 362
54c7876f 363=item ->new(LIST)
a99072da 364
54c7876f 365Creates a new queue pre-populated with the provided list of items.
a99072da 366
367=back
368
54c7876f 369=head1 BASIC METHODS
a99072da 370
54c7876f 371The following methods deal with queues on a FIFO basis.
bbc7dcd2 372
54c7876f 373=over
d516a115 374
54c7876f 375=item ->enqueue(LIST)
d21067e0 376
54c7876f 377Adds a list of items onto the end of the queue.
d21067e0 378
54c7876f 379=item ->dequeue()
a99072da 380
54c7876f 381=item ->dequeue(COUNT)
d21067e0 382
54c7876f 383Removes the requested number of items (default is 1) from the head of the
384queue, and returns them. If the queue contains fewer than the requested
385number of items, then the thread will be blocked until the requisite number
386of items are available (i.e., until other threads <enqueue> more items).
a99072da 387
54c7876f 388=item ->dequeue_nb()
389
390=item ->dequeue_nb(COUNT)
391
392Removes the requested number of items (default is 1) from the head of the
393queue, and returns them. If the queue contains fewer than the requested
394number of items, then it immediately (i.e., non-blocking) returns whatever
395items there are on the queue. If the queue is empty, then C<undef> is
396returned.
397
398=item ->pending()
399
400Returns the number of items still in the queue.
401
402=back
403
404=head1 ADVANCED METHODS
405
406The following methods can be used to manipulate items anywhere in a queue.
407
408To prevent the contents of a queue from being modified by another thread
409while it is being examined and/or changed, L<lock|threads::shared/"lock
410VARIABLE"> the queue inside a local block:
411
412 {
413 lock($q); # Keep other threads from changing the queue's contents
414 my $item = $q->peek();
415 if ($item ...) {
416 ...
417 }
418 }
419 # Queue is now unlocked
420
421=over
422
423=item ->peek()
424
425=item ->peek(INDEX)
426
427Returns an item from the queue without dequeuing anything. Defaults to the
428the head of queue (at index position 0) if no index is specified. Negative
429index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
430is the end of the queue, -2 is next to last, and so on).
431
432If no items exists at the specified index (i.e., the queue is empty, or the
433index is beyond the number of items on the queue), then C<undef> is returned.
434
435Remember, the returned item is not removed from the queue, so manipulating a
436C<peek>ed at reference affects the item on the queue.
437
438=item ->insert(INDEX, LIST)
439
440Adds the list of items to the queue at the specified index position (0
441is the head of the list). Any existing items at and beyond that position are
442pushed back past the newly added items:
443
444 $q->enqueue(1, 2, 3, 4);
445 $q->insert(1, qw/foo bar/);
446 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 447
54c7876f 448Specifying an index position greater than the number of items in the queue
449just adds the list to the end.
83272a45 450
54c7876f 451Negative index positions are supported:
452
453 $q->enqueue(1, 2, 3, 4);
454 $q->insert(-2, qw/foo bar/);
455 # Queue now contains: 1, 2, foo, bar, 3, 4
456
457Specifying a negative index position greater than the number of items in the
458queue adds the list to the head of the queue.
459
460=item ->extract()
461
462=item ->extract(INDEX)
463
464=item ->extract(INDEX, COUNT)
465
466Removes and returns the specified number of items (defaults to 1) from the
467specified index position in the queue (0 is the head of the queue). When
468called with no arguments, C<extract> operates the same as C<dequeue_nb>.
469
470This method is non-blocking, and will return only as many items as are
471available to fulfill the request:
472
473 $q->enqueue(1, 2, 3, 4);
474 my $item = $q->extract(2) # Returns 3
475 # Queue now contains: 1, 2, 4
476 my @items = $q->extract(1, 3) # Returns (2, 4)
477 # Queue now contains: 1
478
479Specifying an index position greater than the number of items in the
480queue results in C<undef> or an empty list being returned.
481
482 $q->enqueue('foo');
483 my $nada = $q->extract(3) # Returns undef
484 my @nada = $q->extract(1, 3) # Returns ()
485
486Negative index positions are supported. Specifying a negative index position
487greater than the number of items in the queue may return items from the head
488of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
489queue from the specified position (i.e. if queue size + index + count is
490greater than zero):
491
492 $q->enqueue(qw/foo bar baz/);
493 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
494 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
495 # Queue now contains: bar, baz
496 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
497
498=back
499
500=head1 LIMITATIONS
501
502Passing objects on queues may not work if the objects' classes do not support
503sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
504
505Passing array/hash refs that contain objects may not work for Perl prior to
5065.10.0.
507
508=head1 SEE ALSO
509
510Thread::Queue Discussion Forum on CPAN:
511L<http://www.cpanforum.com/dist/Thread-Queue>
512
513Annotated POD for Thread::Queue:
514L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.03/lib/Thread/Queue.pm>
515
516L<threads>, L<threads::shared>
517
518=head1 MAINTAINER
519
520Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
521
522=head1 LICENSE
523
524This program is free software; you can redistribute it and/or modify it under
525the same terms as Perl itself.
526
527=cut