Commit | Line | Data |
9345b14c |
1 | package # hide from pause until we figure it all out |
2 | DBIx::Class::Storage::BlockRunner; |
3 | |
4 | use Sub::Quote 'quote_sub'; |
5 | use DBIx::Class::Exception; |
6 | use DBIx::Class::Carp; |
7 | use Context::Preserve 'preserve_context'; |
8 | use Scalar::Util qw/weaken blessed/; |
9 | use Try::Tiny; |
10 | use Moo; |
11 | use namespace::clean; |
12 | |
13 | =head1 NAME |
14 | |
15 | DBIx::Class::Storage::BlockRunner - Try running a block of code until success with a configurable retry logic |
16 | |
17 | =head1 DESCRIPTION |
18 | |
19 | =head1 METHODS |
20 | |
21 | =cut |
22 | |
23 | has storage => ( |
24 | is => 'ro', |
25 | required => 1, |
26 | ); |
27 | |
28 | has wrap_txn => ( |
29 | is => 'ro', |
30 | required => 1, |
31 | ); |
32 | |
33 | # true - retry, false - rethrow, or you can throw your own (not catching) |
34 | has retry_handler => ( |
35 | is => 'ro', |
36 | required => 1, |
37 | isa => quote_sub( q| |
38 | (ref $_[0]) eq 'CODE' |
39 | or DBIx::Class::Exception->throw('retry_handler must be a CODE reference') |
40 | |), |
41 | ); |
42 | |
43 | has run_code => ( |
44 | is => 'ro', |
45 | required => 1, |
46 | isa => quote_sub( q| |
47 | (ref $_[0]) eq 'CODE' |
48 | or DBIx::Class::Exception->throw('run_code must be a CODE reference') |
49 | |), |
50 | ); |
51 | |
52 | has run_args => ( |
53 | is => 'ro', |
54 | isa => quote_sub( q| |
55 | (ref $_[0]) eq 'ARRAY' |
56 | or DBIx::Class::Exception->throw('run_args must be an ARRAY reference') |
57 | |), |
58 | default => quote_sub( '[]' ), |
59 | ); |
60 | |
61 | has retry_debug => ( |
62 | is => 'rw', |
63 | default => quote_sub( '$ENV{DBIC_STORAGE_RETRY_DEBUG}' ), |
64 | ); |
65 | |
66 | has max_retried_count => ( |
67 | is => 'ro', |
68 | default => quote_sub( '20' ), |
69 | ); |
70 | |
71 | has retried_count => ( |
72 | is => 'ro', |
73 | init_arg => undef, |
74 | writer => '_set_retried_count', |
75 | clearer => '_reset_retried_count', |
76 | default => quote_sub(q{ 0 }), |
77 | lazy => 1, |
78 | trigger => quote_sub(q{ |
79 | DBIx::Class::Exception->throw(sprintf ( |
80 | 'Exceeded max_retried_count amount of %d, latest exception: %s', |
81 | $_[0]->max_retried_count, $_[0]->last_exception |
82 | )) if $_[0]->max_retried_count < ($_[1]||0); |
83 | }), |
84 | ); |
85 | |
86 | has exception_stack => ( |
87 | is => 'ro', |
88 | init_arg => undef, |
89 | clearer => '_reset_exception_stack', |
90 | default => quote_sub(q{ [] }), |
91 | lazy => 1, |
92 | ); |
93 | |
94 | sub last_exception { shift->exception_stack->[-1] } |
95 | |
96 | sub run { |
97 | my $self = shift; |
98 | |
99 | DBIx::Class::Exception->throw('run() takes no arguments') if @_; |
100 | |
101 | $self->_reset_exception_stack; |
102 | $self->_reset_retried_count; |
103 | my $storage = $self->storage; |
104 | |
105 | return $self->run_code->( @{$self->run_args} ) |
106 | if (! $self->wrap_txn and $storage->{_in_do_block}); |
107 | |
108 | local $storage->{_in_do_block} = 1 unless $storage->{_in_do_block}; |
109 | |
110 | return $self->_run; |
111 | } |
112 | |
113 | # this is the actual recursing worker |
114 | sub _run { |
115 | # warnings here mean I did not anticipate some ueber-complex case |
116 | # fatal warnings are not warranted |
117 | no warnings; |
118 | use warnings; |
119 | |
120 | my $self = shift; |
121 | |
122 | # from this point on (defined $txn_init_depth) is an indicator for wrap_txn |
123 | # save a bit on method calls |
124 | my $txn_init_depth = $self->wrap_txn ? $self->storage->transaction_depth : undef; |
125 | my $txn_begin_ok; |
126 | |
127 | my $run_err = ''; |
128 | |
129 | weaken (my $weakself = $self); |
130 | |
131 | return preserve_context { |
132 | try { |
133 | if (defined $txn_init_depth) { |
134 | $weakself->storage->txn_begin; |
135 | $txn_begin_ok = 1; |
136 | } |
137 | $weakself->run_code->( @{$weakself->run_args} ); |
138 | } catch { |
139 | $run_err = $_; |
140 | (); # important, affects @_ below |
141 | }; |
142 | } replace => sub { |
143 | my @res = @_; |
144 | |
145 | my $storage = $weakself->storage; |
146 | my $cur_depth = $storage->transaction_depth; |
147 | |
148 | if (defined $txn_init_depth and $run_err eq '') { |
149 | my $delta_txn = (1 + $txn_init_depth) - $cur_depth; |
150 | |
151 | if ($delta_txn) { |
152 | # a rollback in a top-level txn_do is valid-ish (seen in the wild and our own tests) |
153 | carp (sprintf |
154 | 'Unexpected reduction of transaction depth by %d after execution of ' |
155 | . '%s, skipping txn_commit()', |
156 | $delta_txn, |
157 | $weakself->run_code, |
158 | ) unless $delta_txn == 1 and $cur_depth == 0; |
159 | } |
160 | else { |
161 | $run_err = eval { $storage->txn_commit; 1 } ? '' : $@; |
162 | } |
163 | } |
164 | |
165 | # something above threw an error (could be the begin, the code or the commit) |
166 | if ($run_err ne '') { |
167 | |
168 | # attempt a rollback if we did begin in the first place |
169 | if ($txn_begin_ok) { |
170 | # some DBDs go crazy if there is nothing to roll back on, perform a soft-check |
171 | my $rollback_exception = $storage->_seems_connected |
172 | ? (! eval { $storage->txn_rollback; 1 }) ? $@ : '' |
173 | : 'lost connection to storage' |
174 | ; |
175 | |
176 | if ( $rollback_exception and ( |
177 | ! defined blessed $rollback_exception |
178 | or |
179 | ! $rollback_exception->isa('DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION') |
180 | ) ) { |
181 | $run_err = "Transaction aborted: $run_err. Rollback failed: $rollback_exception"; |
182 | } |
183 | } |
184 | |
185 | push @{ $weakself->exception_stack }, $run_err; |
186 | |
187 | # init depth of > 0 ( > 1 with AC) implies nesting - no retry attempt queries |
188 | $storage->throw_exception($run_err) if ( |
189 | ( |
190 | defined $txn_init_depth |
191 | and |
192 | # FIXME - we assume that $storage->{_dbh_autocommit} is there if |
193 | # txn_init_depth is there, but this is a DBI-ism |
194 | $txn_init_depth > ( $storage->{_dbh_autocommit} ? 0 : 1 ) |
195 | ) or ! $weakself->retry_handler->($weakself) |
196 | ); |
197 | |
198 | $weakself->_set_retried_count($weakself->retried_count + 1); |
199 | |
200 | # we got that far - let's retry |
201 | carp( sprintf 'Retrying %s (run %d) after caught exception: %s', |
202 | $weakself->run_code, |
203 | $weakself->retried_count + 1, |
204 | $run_err, |
205 | ) if $weakself->retry_debug; |
206 | |
207 | $storage->ensure_connected; |
208 | # if txn_depth is > 1 this means something was done to the |
209 | # original $dbh, otherwise we would not get past the preceeding if() |
210 | $storage->throw_exception(sprintf |
211 | 'Unexpected transaction depth of %d on freshly connected handle', |
212 | $storage->transaction_depth, |
213 | ) if (defined $txn_init_depth and $storage->transaction_depth); |
214 | |
215 | return $weakself->_run; |
216 | } |
217 | |
218 | return wantarray ? @res : $res[0]; |
219 | }; |
220 | } |
221 | |
222 | =head1 AUTHORS |
223 | |
224 | see L<DBIx::Class> |
225 | |
226 | =head1 LICENSE |
227 | |
228 | You may distribute this code under the same terms as Perl itself. |
229 | |
230 | =cut |
231 | |
232 | 1; |