Move cp(1)-like permission changes from copy to cp,
[p5sagit/p5-mst-13.2.git] / lib / Thread / Queue.pm
CommitLineData
d21067e0 1package Thread::Queue;
28b605d8 2
3d1f1caf 3use strict;
54c7876f 4use warnings;
28b605d8 5
3d4f2f89 6our $VERSION = '2.11';
54c7876f 7
09782346 8use threads::shared 1.21;
ac9d3a9d 9use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
54c7876f 10
09782346 11# Carp errors from threads::shared calls should complain about caller
12our @CARP_NOT = ("threads::shared");
13
54c7876f 14# Predeclarations for internal functions
09782346 15my ($validate_count, $validate_index);
54c7876f 16
17# Create a new queue possibly pre-populated with items
18sub new
19{
20 my $class = shift;
09782346 21 my @queue :shared = map { shared_clone($_) } @_;
54c7876f 22 return bless(\@queue, $class);
23}
24
25# Add items to the tail of a queue
26sub enqueue
27{
28 my $queue = shift;
29 lock(@$queue);
09782346 30 push(@$queue, map { shared_clone($_) } @_)
54c7876f 31 and cond_signal(@$queue);
32}
33
34# Return a count of the number of items on a queue
35sub pending
36{
37 my $queue = shift;
38 lock(@$queue);
39 return scalar(@$queue);
40}
41
42# Return 1 or more items from the head of a queue, blocking if needed
43sub dequeue
44{
45 my $queue = shift;
46 lock(@$queue);
47
48 my $count = @_ ? $validate_count->(shift) : 1;
49
50 # Wait for requisite number of items
51 cond_wait(@$queue) until (@$queue >= $count);
52 cond_signal(@$queue) if (@$queue > $count);
53
54 # Return single item
55 return shift(@$queue) if ($count == 1);
56
57 # Return multiple items
58 my @items;
59 push(@items, shift(@$queue)) for (1..$count);
60 return @items;
61}
62
63# Return items from the head of a queue with no blocking
64sub dequeue_nb
65{
66 my $queue = shift;
67 lock(@$queue);
68
69 my $count = @_ ? $validate_count->(shift) : 1;
70
71 # Return single item
72 return shift(@$queue) if ($count == 1);
73
74 # Return multiple items
75 my @items;
76 for (1..$count) {
77 last if (! @$queue);
78 push(@items, shift(@$queue));
79 }
80 return @items;
81}
82
83# Return an item without removing it from a queue
84sub peek
85{
86 my $queue = shift;
87 lock(@$queue);
88 my $index = @_ ? $validate_index->(shift) : 0;
89 return $$queue[$index];
90}
91
92# Insert items anywhere into a queue
93sub insert
94{
95 my $queue = shift;
96 lock(@$queue);
97
98 my $index = $validate_index->(shift);
99
100 return if (! @_); # Nothing to insert
101
102 # Support negative indices
103 if ($index < 0) {
104 $index += @$queue;
105 if ($index < 0) {
106 $index = 0;
107 }
108 }
109
110 # Dequeue items from $index onward
111 my @tmp;
112 while (@$queue > $index) {
113 unshift(@tmp, pop(@$queue))
114 }
115
116 # Add new items to the queue
09782346 117 push(@$queue, map { shared_clone($_) } @_);
54c7876f 118
119 # Add previous items back onto the queue
120 push(@$queue, @tmp);
121
122 # Soup's up
123 cond_signal(@$queue);
124}
125
126# Remove items from anywhere in a queue
127sub extract
128{
129 my $queue = shift;
130 lock(@$queue);
131
132 my $index = @_ ? $validate_index->(shift) : 0;
133 my $count = @_ ? $validate_count->(shift) : 1;
134
135 # Support negative indices
136 if ($index < 0) {
137 $index += @$queue;
138 if ($index < 0) {
139 $count += $index;
140 return if ($count <= 0); # Beyond the head of the queue
141 return $queue->dequeue_nb($count); # Extract from the head
142 }
143 }
144
145 # Dequeue items from $index+$count onward
146 my @tmp;
147 while (@$queue > ($index+$count)) {
148 unshift(@tmp, pop(@$queue))
149 }
150
151 # Extract desired items
152 my @items;
153 unshift(@items, pop(@$queue)) while (@$queue > $index);
154
155 # Add back any removed items
156 push(@$queue, @tmp);
157
158 # Return single item
159 return $items[0] if ($count == 1);
160
161 # Return multiple items
162 return @items;
163}
164
165### Internal Functions ###
166
54c7876f 167# Check value of the requested index
168$validate_index = sub {
169 my $index = shift;
170
e336069e 171 if (! defined($index) ||
172 ! looks_like_number($index) ||
173 (int($index) != $index))
174 {
54c7876f 175 require Carp;
176 my ($method) = (caller(1))[3];
177 $method =~ s/Thread::Queue:://;
178 $index = 'undef' if (! defined($index));
179 Carp::croak("Invalid 'index' argument ($index) to '$method' method");
180 }
181
182 return $index;
183};
184
185# Check value of the requested count
186$validate_count = sub {
187 my $count = shift;
188
e336069e 189 if (! defined($count) ||
190 ! looks_like_number($count) ||
191 (int($count) != $count) ||
192 ($count < 1))
193 {
54c7876f 194 require Carp;
195 my ($method) = (caller(1))[3];
196 $method =~ s/Thread::Queue:://;
197 $count = 'undef' if (! defined($count));
198 Carp::croak("Invalid 'count' argument ($count) to '$method' method");
199 }
200
201 return $count;
202};
203
2041;
9c6f8578 205
d516a115 206=head1 NAME
207
54c7876f 208Thread::Queue - Thread-safe queues
209
210=head1 VERSION
211
3d4f2f89 212This document describes Thread::Queue version 2.11
d516a115 213
214=head1 SYNOPSIS
215
54c7876f 216 use strict;
217 use warnings;
218
219 use threads;
d516a115 220 use Thread::Queue;
54c7876f 221
222 my $q = Thread::Queue->new(); # A new empty queue
223
224 # Worker thread
225 my $thr = threads->create(sub {
226 while (my $item = $q->dequeue()) {
227 # Do work on $item
228 }
229 })->detach();
230
231 # Send work to the thread
232 $q->enqueue($item1, ...);
233
234
235 # Count of items in the queue
236 my $left = $q->pending();
237
238 # Non-blocking dequeue
239 if (defined(my $item = $q->dequeue_nb())) {
240 # Work on $item
241 }
242
243 # Get the second item in the queue without dequeuing anything
244 my $item = $q->peek(1);
245
246 # Insert two items into the queue just behind the head
247 $q->insert(1, $item1, $item2);
248
249 # Extract the last two items on the queue
250 my ($item1, $item2) = $q->extract(-2, 2);
d516a115 251
a99072da 252=head1 DESCRIPTION
253
54c7876f 254This module provides thread-safe FIFO queues that can be accessed safely by
255any number of threads.
256
257Any data types supported by L<threads::shared> can be passed via queues:
258
259=over
260
261=item Ordinary scalars
262
263=item Array refs
264
265=item Hash refs
266
267=item Scalar refs
268
269=item Objects based on the above
270
271=back
272
273Ordinary scalars are added to queues as they are.
274
275If not already thread-shared, the other complex data types will be cloned
276(recursively, if needed, and including any C<bless>ings and read-only
277settings) into thread-shared structures before being placed onto a queue.
a99072da 278
54c7876f 279For example, the following would cause L<Thread::Queue> to create a empty,
280shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
281and 'baz' from C<@ary> into it, and then place that shared reference onto
282the queue:
a99072da 283
54c7876f 284 my @ary = qw/foo bar baz/;
285 $q->enqueue(\@ary);
a99072da 286
54c7876f 287However, for the following, the items are already shared, so their references
288are added directly to the queue, and no cloning takes place:
a99072da 289
54c7876f 290 my @ary :shared = qw/foo bar baz/;
291 $q->enqueue(\@ary);
a99072da 292
54c7876f 293 my $obj = &shared({});
294 $$obj{'foo'} = 'bar';
295 $$obj{'qux'} = 99;
296 bless($obj, 'My::Class');
297 $q->enqueue($obj);
a99072da 298
54c7876f 299See L</"LIMITATIONS"> for caveats related to passing objects via queues.
a99072da 300
54c7876f 301=head1 QUEUE CREATION
a99072da 302
54c7876f 303=over
a99072da 304
54c7876f 305=item ->new()
a99072da 306
54c7876f 307Creates a new empty queue.
a99072da 308
54c7876f 309=item ->new(LIST)
a99072da 310
54c7876f 311Creates a new queue pre-populated with the provided list of items.
a99072da 312
313=back
314
54c7876f 315=head1 BASIC METHODS
a99072da 316
54c7876f 317The following methods deal with queues on a FIFO basis.
bbc7dcd2 318
54c7876f 319=over
d516a115 320
54c7876f 321=item ->enqueue(LIST)
d21067e0 322
54c7876f 323Adds a list of items onto the end of the queue.
d21067e0 324
54c7876f 325=item ->dequeue()
a99072da 326
54c7876f 327=item ->dequeue(COUNT)
d21067e0 328
54c7876f 329Removes the requested number of items (default is 1) from the head of the
330queue, and returns them. If the queue contains fewer than the requested
331number of items, then the thread will be blocked until the requisite number
332of items are available (i.e., until other threads <enqueue> more items).
a99072da 333
54c7876f 334=item ->dequeue_nb()
335
336=item ->dequeue_nb(COUNT)
337
338Removes the requested number of items (default is 1) from the head of the
339queue, and returns them. If the queue contains fewer than the requested
340number of items, then it immediately (i.e., non-blocking) returns whatever
341items there are on the queue. If the queue is empty, then C<undef> is
342returned.
343
344=item ->pending()
345
346Returns the number of items still in the queue.
347
348=back
349
350=head1 ADVANCED METHODS
351
352The following methods can be used to manipulate items anywhere in a queue.
353
354To prevent the contents of a queue from being modified by another thread
355while it is being examined and/or changed, L<lock|threads::shared/"lock
356VARIABLE"> the queue inside a local block:
357
358 {
359 lock($q); # Keep other threads from changing the queue's contents
360 my $item = $q->peek();
361 if ($item ...) {
362 ...
363 }
364 }
365 # Queue is now unlocked
366
367=over
368
369=item ->peek()
370
371=item ->peek(INDEX)
372
373Returns an item from the queue without dequeuing anything. Defaults to the
374the head of queue (at index position 0) if no index is specified. Negative
375index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
376is the end of the queue, -2 is next to last, and so on).
377
378If no items exists at the specified index (i.e., the queue is empty, or the
379index is beyond the number of items on the queue), then C<undef> is returned.
380
381Remember, the returned item is not removed from the queue, so manipulating a
382C<peek>ed at reference affects the item on the queue.
383
384=item ->insert(INDEX, LIST)
385
386Adds the list of items to the queue at the specified index position (0
387is the head of the list). Any existing items at and beyond that position are
388pushed back past the newly added items:
389
390 $q->enqueue(1, 2, 3, 4);
391 $q->insert(1, qw/foo bar/);
392 # Queue now contains: 1, foo, bar, 2, 3, 4
83272a45 393
54c7876f 394Specifying an index position greater than the number of items in the queue
395just adds the list to the end.
83272a45 396
54c7876f 397Negative index positions are supported:
398
399 $q->enqueue(1, 2, 3, 4);
400 $q->insert(-2, qw/foo bar/);
401 # Queue now contains: 1, 2, foo, bar, 3, 4
402
403Specifying a negative index position greater than the number of items in the
404queue adds the list to the head of the queue.
405
406=item ->extract()
407
408=item ->extract(INDEX)
409
410=item ->extract(INDEX, COUNT)
411
412Removes and returns the specified number of items (defaults to 1) from the
413specified index position in the queue (0 is the head of the queue). When
414called with no arguments, C<extract> operates the same as C<dequeue_nb>.
415
416This method is non-blocking, and will return only as many items as are
417available to fulfill the request:
418
419 $q->enqueue(1, 2, 3, 4);
420 my $item = $q->extract(2) # Returns 3
421 # Queue now contains: 1, 2, 4
422 my @items = $q->extract(1, 3) # Returns (2, 4)
423 # Queue now contains: 1
424
425Specifying an index position greater than the number of items in the
426queue results in C<undef> or an empty list being returned.
427
428 $q->enqueue('foo');
429 my $nada = $q->extract(3) # Returns undef
430 my @nada = $q->extract(1, 3) # Returns ()
431
432Negative index positions are supported. Specifying a negative index position
433greater than the number of items in the queue may return items from the head
434of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
435queue from the specified position (i.e. if queue size + index + count is
436greater than zero):
437
438 $q->enqueue(qw/foo bar baz/);
439 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
440 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
441 # Queue now contains: bar, baz
442 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
443
444=back
445
7fb1c73b 446=head1 NOTES
447
448Queues created by L<Thread::Queue> can be used in both threaded and
449non-threaded applications.
450
54c7876f 451=head1 LIMITATIONS
452
453Passing objects on queues may not work if the objects' classes do not support
454sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
455
456Passing array/hash refs that contain objects may not work for Perl prior to
4575.10.0.
458
459=head1 SEE ALSO
460
461Thread::Queue Discussion Forum on CPAN:
462L<http://www.cpanforum.com/dist/Thread-Queue>
463
464Annotated POD for Thread::Queue:
3d4f2f89 465L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm>
7fb1c73b 466
467Source repository:
468L<http://code.google.com/p/thread-queue/>
54c7876f 469
470L<threads>, L<threads::shared>
471
472=head1 MAINTAINER
473
474Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
475
476=head1 LICENSE
477
478This program is free software; you can redistribute it and/or modify it under
479the same terms as Perl itself.
480
481=cut