404e480a78b5b9ec25a2335bf3852735510a57f6
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / BlockRunner.pm
1 package # hide from pause until we figure it all out
2   DBIx::Class::Storage::BlockRunner;
3
4 use Sub::Quote 'quote_sub';
5 use DBIx::Class::Exception;
6 use DBIx::Class::Carp;
7 use Context::Preserve 'preserve_context';
8 use Scalar::Util qw/weaken blessed/;
9 use Try::Tiny;
10 use Moo;
11 use namespace::clean;
12
13 =head1 NAME
14
15 DBIx::Class::Storage::BlockRunner - Try running a block of code until success with a configurable retry logic
16
17 =head1 DESCRIPTION
18
19 =head1 METHODS
20
21 =cut
22
23 has storage => (
24   is => 'ro',
25   required => 1,
26 );
27
28 has wrap_txn => (
29   is => 'ro',
30   required => 1,
31 );
32
33 # true - retry, false - rethrow, or you can throw your own (not catching)
34 has retry_handler => (
35   is => 'ro',
36   required => 1,
37   isa => quote_sub( q|
38     (ref $_[0]) eq 'CODE'
39       or DBIx::Class::Exception->throw('retry_handler must be a CODE reference')
40   |),
41 );
42
43 has run_code => (
44   is => 'ro',
45   required => 1,
46   isa => quote_sub( q|
47     (ref $_[0]) eq 'CODE'
48       or DBIx::Class::Exception->throw('run_code must be a CODE reference')
49   |),
50 );
51
52 has run_args => (
53   is => 'ro',
54   isa => quote_sub( q|
55     (ref $_[0]) eq 'ARRAY'
56       or DBIx::Class::Exception->throw('run_args must be an ARRAY reference')
57   |),
58   default => quote_sub( '[]' ),
59 );
60
61 has retry_debug => (
62   is => 'rw',
63   default => quote_sub( '$ENV{DBIC_STORAGE_RETRY_DEBUG}' ),
64 );
65
66 has max_retried_count => (
67   is => 'ro',
68   default => quote_sub( '20' ),
69 );
70
71 has retried_count => (
72   is => 'ro',
73   init_arg => undef,
74   writer => '_set_retried_count',
75   clearer => '_reset_retried_count',
76   default => quote_sub(q{ 0 }),
77   lazy => 1,
78   trigger => quote_sub(q{
79     $_[0]->throw_exception( sprintf (
80       'Exceeded max_retried_count amount of %d, latest exception: %s',
81       $_[0]->max_retried_count, $_[0]->last_exception
82     )) if $_[0]->max_retried_count < ($_[1]||0);
83   }),
84 );
85
86 has exception_stack => (
87   is => 'ro',
88   init_arg => undef,
89   clearer => '_reset_exception_stack',
90   default => quote_sub(q{ [] }),
91   lazy => 1,
92 );
93
94 sub last_exception { shift->exception_stack->[-1] }
95
96 sub throw_exception { shift->storage->throw_exception (@_) }
97
98 sub run {
99   my $self = shift;
100
101   $self->throw_exception('run() takes no arguments') if @_;
102
103   $self->_reset_exception_stack;
104   $self->_reset_retried_count;
105   my $storage = $self->storage;
106
107   return $self->run_code->( @{$self->run_args} )
108     if (! $self->wrap_txn and $storage->{_in_do_block});
109
110   local $storage->{_in_do_block} = 1 unless $storage->{_in_do_block};
111
112   return $self->_run;
113 }
114
115 # this is the actual recursing worker
116 sub _run {
117   # warnings here mean I did not anticipate some ueber-complex case
118   # fatal warnings are not warranted
119   no warnings;
120   use warnings;
121
122   my $self = shift;
123
124   # from this point on (defined $txn_init_depth) is an indicator for wrap_txn
125   # save a bit on method calls
126   my $txn_init_depth = $self->wrap_txn ? $self->storage->transaction_depth : undef;
127   my $txn_begin_ok;
128
129   my $run_err = '';
130
131   weaken (my $weakself = $self);
132
133   return preserve_context {
134     try {
135       if (defined $txn_init_depth) {
136         $weakself->storage->txn_begin;
137         $txn_begin_ok = 1;
138       }
139       $weakself->run_code->( @{$weakself->run_args} );
140     } catch {
141       $run_err = $_;
142       (); # important, affects @_ below
143     };
144   } replace => sub {
145     my @res = @_;
146
147     my $storage = $weakself->storage;
148     my $cur_depth = $storage->transaction_depth;
149
150     if (defined $txn_init_depth and $run_err eq '') {
151       my $delta_txn = (1 + $txn_init_depth) - $cur_depth;
152
153       if ($delta_txn) {
154         # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests)
155         carp (sprintf
156           'Unexpected reduction of transaction depth by %d after execution of '
157         . '%s, skipping txn_commit()',
158           $delta_txn,
159           $weakself->run_code,
160         ) unless $delta_txn == 1 and $cur_depth == 0;
161       }
162       else {
163         $run_err = eval { $storage->txn_commit; 1 } ? '' : $@;
164       }
165     }
166
167     # something above threw an error (could be the begin, the code or the commit)
168     if ($run_err ne '') {
169
170       # attempt a rollback if we did begin in the first place
171       if ($txn_begin_ok) {
172         # some DBDs go crazy if there is nothing to roll back on, perform a soft-check
173         my $rollback_exception = $storage->_seems_connected
174           ? (! eval { $storage->txn_rollback; 1 }) ? $@ : ''
175           : 'lost connection to storage'
176         ;
177
178         if ( $rollback_exception and (
179           ! defined blessed $rollback_exception
180             or
181           ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION')
182         ) ) {
183           $run_err = "Transaction aborted: $run_err. Rollback failed: $rollback_exception";
184         }
185       }
186
187       push @{ $weakself->exception_stack }, $run_err;
188
189       # init depth of > 0 ( > 1 with AC) implies nesting - no retry attempt queries
190       $storage->throw_exception($run_err) if (
191         (
192           defined $txn_init_depth
193             and
194           # FIXME - we assume that $storage->{_dbh_autocommit} is there if
195           # txn_init_depth is there, but this is a DBI-ism
196           $txn_init_depth > ( $storage->{_dbh_autocommit} ? 0 : 1 )
197         ) or ! $weakself->retry_handler->($weakself)
198       );
199
200       $weakself->_set_retried_count($weakself->retried_count + 1);
201
202       # we got that far - let's retry
203       carp( sprintf 'Retrying %s (run %d) after caught exception: %s',
204         $weakself->run_code,
205         $weakself->retried_count + 1,
206         $run_err,
207       ) if $weakself->retry_debug;
208
209       $storage->ensure_connected;
210       # if txn_depth is > 1 this means something was done to the
211       # original $dbh, otherwise we would not get past the preceeding if()
212       $storage->throw_exception(sprintf
213         'Unexpected transaction depth of %d on freshly connected handle',
214         $storage->transaction_depth,
215       ) if (defined $txn_init_depth and $storage->transaction_depth);
216
217       return $weakself->_run;
218     }
219
220     return wantarray ? @res : $res[0];
221   };
222 }
223
224 =head1 AUTHOR AND CONTRIBUTORS
225
226 See L<AUTHOR|DBIx::Class/AUTHOR> and L<CONTRIBUTORS|DBIx::Class/CONTRIBUTORS> in DBIx::Class
227
228 =head1 LICENSE
229
230 You may distribute this code under the same terms as Perl itself.
231
232 =cut
233
234 1;