Thread::Queue 2.07
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
ac9d3a9d 6our $VERSION = '2.07';
54c7876f 7
8use threads::shared 0.96;
ac9d3a9d 9use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 10
11# Predeclarations for internal functions
12my ($make_shared, $validate_count, $validate_index);
13
14# Create a new queue possibly pre-populated with items
15sub new
16{
17 my $class = shift;
ac9d3a9d 18 my @queue :shared = map { $make_shared->($_, {}) } @_;
54c7876f 19 return bless(\@queue, $class);
20}
21
22# Add items to the tail of a queue
23sub enqueue
24{
25 my $queue = shift;
26 lock(@$queue);
ac9d3a9d 27 push(@$queue, map { $make_shared->($_, {}) } @_)
54c7876f 28 and cond_signal(@$queue);
29}
30
31# Return a count of the number of items on a queue
32sub pending
33{
34 my $queue = shift;
35 lock(@$queue);
36 return scalar(@$queue);
37}
38
39# Return 1 or more items from the head of a queue, blocking if needed
40sub dequeue
41{
42 my $queue = shift;
43 lock(@$queue);
44
45 my $count = @_ ? $validate_count->(shift) : 1;
46
47 # Wait for requisite number of items
48 cond_wait(@$queue) until (@$queue >= $count);
49 cond_signal(@$queue) if (@$queue > $count);
50
51 # Return single item
52 return shift(@$queue) if ($count == 1);
53
54 # Return multiple items
55 my @items;
56 push(@items, shift(@$queue)) for (1..$count);
57 return @items;
58}
59
60# Return items from the head of a queue with no blocking
61sub dequeue_nb
62{
63 my $queue = shift;
64 lock(@$queue);
65
66 my $count = @_ ? $validate_count->(shift) : 1;
67
68 # Return single item
69 return shift(@$queue) if ($count == 1);
70
71 # Return multiple items
72 my @items;
73 for (1..$count) {
74 last if (! @$queue);
75 push(@items, shift(@$queue));
76 }
77 return @items;
78}
79
80# Return an item without removing it from a queue
81sub peek
82{
83 my $queue = shift;
84 lock(@$queue);
85 my $index = @_ ? $validate_index->(shift) : 0;
86 return $$queue[$index];
87}
88
89# Insert items anywhere into a queue
90sub insert
91{
92 my $queue = shift;
93 lock(@$queue);
94
95 my $index = $validate_index->(shift);
96
97 return if (! @_); # Nothing to insert
98
99 # Support negative indices
100 if ($index < 0) {
101 $index += @$queue;
102 if ($index < 0) {
103 $index = 0;
104 }
105 }
106
107 # Dequeue items from $index onward
108 my @tmp;
109 while (@$queue > $index) {
110 unshift(@tmp, pop(@$queue))
111 }
112
113 # Add new items to the queue
ac9d3a9d 114 push(@$queue, map { $make_shared->($_, {}) } @_);
54c7876f 115
116 # Add previous items back onto the queue
117 push(@$queue, @tmp);
118
119 # Soup's up
120 cond_signal(@$queue);
121}
122
123# Remove items from anywhere in a queue
124sub extract
125{
126 my $queue = shift;
127 lock(@$queue);
128
129 my $index = @_ ? $validate_index->(shift) : 0;
130 my $count = @_ ? $validate_count->(shift) : 1;
131
132 # Support negative indices
133 if ($index < 0) {
134 $index += @$queue;
135 if ($index < 0) {
136 $count += $index;
137 return if ($count <= 0); # Beyond the head of the queue
138 return $queue->dequeue_nb($count); # Extract from the head
139 }
140 }
141
142 # Dequeue items from $index+$count onward
143 my @tmp;
144 while (@$queue > ($index+$count)) {
145 unshift(@tmp, pop(@$queue))
146 }
147
148 # Extract desired items
149 my @items;
150 unshift(@items, pop(@$queue)) while (@$queue > $index);
151
152 # Add back any removed items
153 push(@$queue, @tmp);
154
155 # Return single item
156 return $items[0] if ($count == 1);
157
158 # Return multiple items
159 return @items;
160}
161
162### Internal Functions ###
163
164# Create a thread-shared version of a complex data structure or object
165$make_shared = sub {
ac9d3a9d 166 my ($item, $cloned) = @_;
54c7876f 167
7fb1c73b 168 # If not running 'threads' or already thread-shared,
169 # then just return the input item
170 return $item if (! $threads::threads ||
171 threads::shared::is_shared($item));
54c7876f 172
173 # Make copies of array, hash and scalar refs
174 my $copy;
ac9d3a9d 175 if (my $ref_type = reftype($item)) {
176 # Check for previously cloned references
177 # (this takes care of circular refs as well)
178 my $addr = refaddr($item);
179 if (defined($addr) && exists($cloned->{$addr})) {
180 # Return the already existing clone
181 return $cloned->{$addr};
182 }
183
54c7876f 184 # Copy an array ref
185 if ($ref_type eq 'ARRAY') {
186 # Make empty shared array ref
187 $copy = &share([]);
ac9d3a9d 188 # Add to clone checking hash
189 $cloned->{$addr} = $copy;
54c7876f 190 # Recursively copy and add contents
ac9d3a9d 191 push(@$copy, map { $make_shared->($_, $cloned) } @$item);
54c7876f 192 }
193
194 # Copy a hash ref
195 elsif ($ref_type eq 'HASH') {
196 # Make empty shared hash ref
197 $copy = &share({});
ac9d3a9d 198 # Add to clone checking hash
199 $cloned->{$addr} = $copy;
54c7876f 200 # Recursively copy and add contents
201 foreach my $key (keys(%{$item})) {
ac9d3a9d 202 $copy->{$key} = $make_shared->($item->{$key}, $cloned);
54c7876f 203 }
204 }
205
206 # Copy a scalar ref
207 elsif ($ref_type eq 'SCALAR') {
208 $copy = \do{ my $scalar = $$item; };
209 share($copy);
210 # Clone READONLY flag
211 if (Internals::SvREADONLY($$item)) {
212 Internals::SvREADONLY($$copy, 1);
213 }
ac9d3a9d 214 # Add to clone checking hash
215 $cloned->{$addr} = $copy;
54c7876f 216 }
217
218 # Copy of a ref of a ref
219 elsif ($ref_type eq 'REF') {
ac9d3a9d 220 # Special handling for $x = \$x
221 my $addr2 = refaddr($$item);
222 if ($addr2 == $addr) {
223 $copy = \$copy;
224 share($copy);
225 $cloned->{$addr} = $copy;
226 } else {
227 my $tmp;
228 $copy = \$tmp;
229 share($copy);
230 # Add to clone checking hash
231 $cloned->{$addr} = $copy;
232 # Recursively copy and add contents
233 $tmp = $make_shared->($$item, $cloned);
234 }
54c7876f 235 }
236 }
237
238 # If no copy is created above, then just return the input item
239 # NOTE: This will end up generating an error for anything
240 # other than an ordinary scalar
241 return $item if (! defined($copy));
242
ac9d3a9d 243 # If input item is an object, then bless the copy into the same class
244 if (my $class = blessed($item)) {
245 bless($copy, $class);
246 }
247
54c7876f 248 # Clone READONLY flag
249 if (Internals::SvREADONLY($item)) {
250 Internals::SvREADONLY($copy, 1);
251 }
252
54c7876f 253 return $copy;
254};
255
256# Check value of the requested index
257$validate_index = sub {
258 my $index = shift;
259
260 if (! looks_like_number($index) || (int($index) != $index)) {
261 require Carp;
262 my ($method) = (caller(1))[3];
263 $method =~ s/Thread::Queue:://;
264 $index = 'undef' if (! defined($index));
265 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
266 }
267
268 return $index;
269};
270
271# Check value of the requested count
272$validate_count = sub {
273 my $count = shift;
274
275 if ((! looks_like_number($count)) || (int($count) != $count) || ($count < 1)) {
276 require Carp;
277 my ($method) = (caller(1))[3];
278 $method =~ s/Thread::Queue:://;
279 $count = 'undef' if (! defined($count));
280 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
281 }
282
283 return $count;
284};
285
2861;
9c6f8578 287
d516a115 288=head1 NAME
289
54c7876f 290Thread::Queue - Thread-safe queues
291
292=head1 VERSION
293
ac9d3a9d 294This document describes Thread::Queue version 2.07
d516a115 295
296=head1 SYNOPSIS
297
54c7876f 298 use strict;
299 use warnings;
300
301 use threads;
d516a115 302 use Thread::Queue;
54c7876f 303
304 my $q = Thread::Queue->new(); # A new empty queue
305
306 # Worker thread
307 my $thr = threads->create(sub {
308 while (my $item = $q->dequeue()) {
309 # Do work on $item
310 }
311 })->detach();
312
313 # Send work to the thread
314 $q->enqueue($item1, ...);
315
316
317 # Count of items in the queue
318 my $left = $q->pending();
319
320 # Non-blocking dequeue
321 if (defined(my $item = $q->dequeue_nb())) {
322 # Work on $item
323 }
324
325 # Get the second item in the queue without dequeuing anything
326 my $item = $q->peek(1);
327
328 # Insert two items into the queue just behind the head
329 $q->insert(1, $item1, $item2);
330
331 # Extract the last two items on the queue
332 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 333
a99072da 334=head1 DESCRIPTION
335
54c7876f 336This module provides thread-safe FIFO queues that can be accessed safely by
337any number of threads.
338
339Any data types supported by L<threads::shared> can be passed via queues:
340
341=over
342
343=item Ordinary scalars
344
345=item Array refs
346
347=item Hash refs
348
349=item Scalar refs
350
351=item Objects based on the above
352
353=back
354
355Ordinary scalars are added to queues as they are.
356
357If not already thread-shared, the other complex data types will be cloned
358(recursively, if needed, and including any C<bless>ings and read-only
359settings) into thread-shared structures before being placed onto a queue.
a99072da 360
54c7876f 361For example, the following would cause L<Thread::Queue> to create a empty,
362shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
363and 'baz' from C<@ary> into it, and then place that shared reference onto
364the queue:
a99072da 365
54c7876f 366 my @ary = qw/foo bar baz/;
367 $q->enqueue(\@ary);
a99072da 368
54c7876f 369However, for the following, the items are already shared, so their references
370are added directly to the queue, and no cloning takes place:
a99072da 371
54c7876f 372 my @ary :shared = qw/foo bar baz/;
373 $q->enqueue(\@ary);
a99072da 374
54c7876f 375 my $obj = &shared({});
376 $$obj{'foo'} = 'bar';
377 $$obj{'qux'} = 99;
378 bless($obj, 'My::Class');
379 $q->enqueue($obj);
a99072da 380
54c7876f 381See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 382
54c7876f 383=head1 QUEUE CREATION
a99072da 384
54c7876f 385=over
a99072da 386
54c7876f 387=item ->new()
a99072da 388
54c7876f 389Creates a new empty queue.
a99072da 390
54c7876f 391=item ->new(LIST)
a99072da 392
54c7876f 393Creates a new queue pre-populated with the provided list of items.
a99072da 394
395=back
396
54c7876f 397=head1 BASIC METHODS
a99072da 398
54c7876f 399The following methods deal with queues on a FIFO basis.
bbc7dcd2 400
54c7876f 401=over
d516a115 402
54c7876f 403=item ->enqueue(LIST)
d21067e0 404
54c7876f 405Adds a list of items onto the end of the queue.
d21067e0 406
54c7876f 407=item ->dequeue()
a99072da 408
54c7876f 409=item ->dequeue(COUNT)
d21067e0 410
54c7876f 411Removes the requested number of items (default is 1) from the head of the
412queue, and returns them. If the queue contains fewer than the requested
413number of items, then the thread will be blocked until the requisite number
414of items are available (i.e., until other threads <enqueue> more items).
a99072da 415
54c7876f 416=item ->dequeue_nb()
417
418=item ->dequeue_nb(COUNT)
419
420Removes the requested number of items (default is 1) from the head of the
421queue, and returns them. If the queue contains fewer than the requested
422number of items, then it immediately (i.e., non-blocking) returns whatever
423items there are on the queue. If the queue is empty, then C<undef> is
424returned.
425
426=item ->pending()
427
428Returns the number of items still in the queue.
429
430=back
431
432=head1 ADVANCED METHODS
433
434The following methods can be used to manipulate items anywhere in a queue.
435
436To prevent the contents of a queue from being modified by another thread
437while it is being examined and/or changed, L<lock|threads::shared/"lock
438VARIABLE"> the queue inside a local block:
439
440 {
441 lock($q); # Keep other threads from changing the queue's contents
442 my $item = $q->peek();
443 if ($item ...) {
444 ...
445 }
446 }
447 # Queue is now unlocked
448
449=over
450
451=item ->peek()
452
453=item ->peek(INDEX)
454
455Returns an item from the queue without dequeuing anything. Defaults to the
456the head of queue (at index position 0) if no index is specified. Negative
457index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
458is the end of the queue, -2 is next to last, and so on).
459
460If no items exists at the specified index (i.e., the queue is empty, or the
461index is beyond the number of items on the queue), then C<undef> is returned.
462
463Remember, the returned item is not removed from the queue, so manipulating a
464C<peek>ed at reference affects the item on the queue.
465
466=item ->insert(INDEX, LIST)
467
468Adds the list of items to the queue at the specified index position (0
469is the head of the list). Any existing items at and beyond that position are
470pushed back past the newly added items:
471
472 $q->enqueue(1, 2, 3, 4);
473 $q->insert(1, qw/foo bar/);
474 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 475
54c7876f 476Specifying an index position greater than the number of items in the queue
477just adds the list to the end.
83272a45 478
54c7876f 479Negative index positions are supported:
480
481 $q->enqueue(1, 2, 3, 4);
482 $q->insert(-2, qw/foo bar/);
483 # Queue now contains: 1, 2, foo, bar, 3, 4
484
485Specifying a negative index position greater than the number of items in the
486queue adds the list to the head of the queue.
487
488=item ->extract()
489
490=item ->extract(INDEX)
491
492=item ->extract(INDEX, COUNT)
493
494Removes and returns the specified number of items (defaults to 1) from the
495specified index position in the queue (0 is the head of the queue). When
496called with no arguments, C<extract> operates the same as C<dequeue_nb>.
497
498This method is non-blocking, and will return only as many items as are
499available to fulfill the request:
500
501 $q->enqueue(1, 2, 3, 4);
502 my $item = $q->extract(2) # Returns 3
503 # Queue now contains: 1, 2, 4
504 my @items = $q->extract(1, 3) # Returns (2, 4)
505 # Queue now contains: 1
506
507Specifying an index position greater than the number of items in the
508queue results in C<undef> or an empty list being returned.
509
510 $q->enqueue('foo');
511 my $nada = $q->extract(3) # Returns undef
512 my @nada = $q->extract(1, 3) # Returns ()
513
514Negative index positions are supported. Specifying a negative index position
515greater than the number of items in the queue may return items from the head
516of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
517queue from the specified position (i.e. if queue size + index + count is
518greater than zero):
519
520 $q->enqueue(qw/foo bar baz/);
521 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
522 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
523 # Queue now contains: bar, baz
524 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
525
526=back
527
7fb1c73b 528=head1 NOTES
529
530Queues created by L<Thread::Queue> can be used in both threaded and
531non-threaded applications.
532
54c7876f 533=head1 LIMITATIONS
534
535Passing objects on queues may not work if the objects' classes do not support
536sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
537
538Passing array/hash refs that contain objects may not work for Perl prior to
5395.10.0.
540
541=head1 SEE ALSO
542
543Thread::Queue Discussion Forum on CPAN:
544L<http://www.cpanforum.com/dist/Thread-Queue>
545
546Annotated POD for Thread::Queue:
ac9d3a9d 547L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.07/lib/Thread/Queue.pm>
7fb1c73b 548
549Source repository:
550L<http://code.google.com/p/thread-queue/>
54c7876f 551
552L<threads>, L<threads::shared>
553
554=head1 MAINTAINER
555
556Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
557
558=head1 LICENSE
559
560This program is free software; you can redistribute it and/or modify it under
561the same terms as Perl itself.
562
563=cut