cleanup cursor class handling
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;    
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use Scalar::Util qw/blessed weaken/;
13
14 __PACKAGE__->mk_group_accessors('simple' =>
15     qw/_connect_info _dbi_connect_info _dbh _sql_maker _sql_maker_opts
16        _conn_pid _conn_tid disable_sth_caching on_connect_do
17        transaction_depth unsafe _dbh_autocommit/
18 );
19
20 __PACKAGE__->cursor_class('DBIx::Class::Storage::DBI::Cursor');
21
22 BEGIN {
23
24 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
25
26 use base qw/SQL::Abstract::Limit/;
27
28 # This prevents the caching of $dbh in S::A::L, I believe
29 sub new {
30   my $self = shift->SUPER::new(@_);
31
32   # If limit_dialect is a ref (like a $dbh), go ahead and replace
33   #   it with what it resolves to:
34   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
35     if ref $self->{limit_dialect};
36
37   $self;
38 }
39
40 sub _RowNumberOver {
41   my ($self, $sql, $order, $rows, $offset ) = @_;
42
43   $offset += 1;
44   my $last = $rows + $offset;
45   my ( $order_by ) = $self->_order_by( $order );
46
47   $sql = <<"";
48 SELECT * FROM
49 (
50    SELECT Q1.*, ROW_NUMBER() OVER( ) AS ROW_NUM FROM (
51       $sql
52       $order_by
53    ) Q1
54 ) Q2
55 WHERE ROW_NUM BETWEEN $offset AND $last
56
57   return $sql;
58 }
59
60
61 # While we're at it, this should make LIMIT queries more efficient,
62 #  without digging into things too deeply
63 use Scalar::Util 'blessed';
64 sub _find_syntax {
65   my ($self, $syntax) = @_;
66   my $dbhname = blessed($syntax) ?  $syntax->{Driver}{Name} : $syntax;
67   if(ref($self) && $dbhname && $dbhname eq 'DB2') {
68     return 'RowNumberOver';
69   }
70
71   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
72 }
73
74 sub select {
75   my ($self, $table, $fields, $where, $order, @rest) = @_;
76   $table = $self->_quote($table) unless ref($table);
77   local $self->{rownum_hack_count} = 1
78     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
79   @rest = (-1) unless defined $rest[0];
80   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
81     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
82   local $self->{having_bind} = [];
83   my ($sql, @ret) = $self->SUPER::select(
84     $table, $self->_recurse_fields($fields), $where, $order, @rest
85   );
86   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
87 }
88
89 sub insert {
90   my $self = shift;
91   my $table = shift;
92   $table = $self->_quote($table) unless ref($table);
93   $self->SUPER::insert($table, @_);
94 }
95
96 sub update {
97   my $self = shift;
98   my $table = shift;
99   $table = $self->_quote($table) unless ref($table);
100   $self->SUPER::update($table, @_);
101 }
102
103 sub delete {
104   my $self = shift;
105   my $table = shift;
106   $table = $self->_quote($table) unless ref($table);
107   $self->SUPER::delete($table, @_);
108 }
109
110 sub _emulate_limit {
111   my $self = shift;
112   if ($_[3] == -1) {
113     return $_[1].$self->_order_by($_[2]);
114   } else {
115     return $self->SUPER::_emulate_limit(@_);
116   }
117 }
118
119 sub _recurse_fields {
120   my ($self, $fields, $params) = @_;
121   my $ref = ref $fields;
122   return $self->_quote($fields) unless $ref;
123   return $$fields if $ref eq 'SCALAR';
124
125   if ($ref eq 'ARRAY') {
126     return join(', ', map {
127       $self->_recurse_fields($_)
128         .(exists $self->{rownum_hack_count} && !($params && $params->{no_rownum_hack})
129           ? ' AS col'.$self->{rownum_hack_count}++
130           : '')
131       } @$fields);
132   } elsif ($ref eq 'HASH') {
133     foreach my $func (keys %$fields) {
134       return $self->_sqlcase($func)
135         .'( '.$self->_recurse_fields($fields->{$func}).' )';
136     }
137   }
138 }
139
140 sub _order_by {
141   my $self = shift;
142   my $ret = '';
143   my @extra;
144   if (ref $_[0] eq 'HASH') {
145     if (defined $_[0]->{group_by}) {
146       $ret = $self->_sqlcase(' group by ')
147         .$self->_recurse_fields($_[0]->{group_by}, { no_rownum_hack => 1 });
148     }
149     if (defined $_[0]->{having}) {
150       my $frag;
151       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
152       push(@{$self->{having_bind}}, @extra);
153       $ret .= $self->_sqlcase(' having ').$frag;
154     }
155     if (defined $_[0]->{order_by}) {
156       $ret .= $self->_order_by($_[0]->{order_by});
157     }
158   } elsif (ref $_[0] eq 'SCALAR') {
159     $ret = $self->_sqlcase(' order by ').${ $_[0] };
160   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
161     my @order = @{+shift};
162     $ret = $self->_sqlcase(' order by ')
163           .join(', ', map {
164                         my $r = $self->_order_by($_, @_);
165                         $r =~ s/^ ?ORDER BY //i;
166                         $r;
167                       } @order);
168   } else {
169     $ret = $self->SUPER::_order_by(@_);
170   }
171   return $ret;
172 }
173
174 sub _order_directions {
175   my ($self, $order) = @_;
176   $order = $order->{order_by} if ref $order eq 'HASH';
177   return $self->SUPER::_order_directions($order);
178 }
179
180 sub _table {
181   my ($self, $from) = @_;
182   if (ref $from eq 'ARRAY') {
183     return $self->_recurse_from(@$from);
184   } elsif (ref $from eq 'HASH') {
185     return $self->_make_as($from);
186   } else {
187     return $from; # would love to quote here but _table ends up getting called
188                   # twice during an ->select without a limit clause due to
189                   # the way S::A::Limit->select works. should maybe consider
190                   # bypassing this and doing S::A::select($self, ...) in
191                   # our select method above. meantime, quoting shims have
192                   # been added to select/insert/update/delete here
193   }
194 }
195
196 sub _recurse_from {
197   my ($self, $from, @join) = @_;
198   my @sqlf;
199   push(@sqlf, $self->_make_as($from));
200   foreach my $j (@join) {
201     my ($to, $on) = @$j;
202
203     # check whether a join type exists
204     my $join_clause = '';
205     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
206     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
207       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
208     } else {
209       $join_clause = ' JOIN ';
210     }
211     push(@sqlf, $join_clause);
212
213     if (ref $to eq 'ARRAY') {
214       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
215     } else {
216       push(@sqlf, $self->_make_as($to));
217     }
218     push(@sqlf, ' ON ', $self->_join_condition($on));
219   }
220   return join('', @sqlf);
221 }
222
223 sub _make_as {
224   my ($self, $from) = @_;
225   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
226                      reverse each %{$self->_skip_options($from)});
227 }
228
229 sub _skip_options {
230   my ($self, $hash) = @_;
231   my $clean_hash = {};
232   $clean_hash->{$_} = $hash->{$_}
233     for grep {!/^-/} keys %$hash;
234   return $clean_hash;
235 }
236
237 sub _join_condition {
238   my ($self, $cond) = @_;
239   if (ref $cond eq 'HASH') {
240     my %j;
241     for (keys %$cond) {
242       my $v = $cond->{$_};
243       if (ref $v) {
244         # XXX no throw_exception() in this package and croak() fails with strange results
245         Carp::croak(ref($v) . qq{ reference arguments are not supported in JOINS - try using \"..." instead'})
246             if ref($v) ne 'SCALAR';
247         $j{$_} = $v;
248       }
249       else {
250         my $x = '= '.$self->_quote($v); $j{$_} = \$x;
251       }
252     };
253     return scalar($self->_recurse_where(\%j));
254   } elsif (ref $cond eq 'ARRAY') {
255     return join(' OR ', map { $self->_join_condition($_) } @$cond);
256   } else {
257     die "Can't handle this yet!";
258   }
259 }
260
261 sub _quote {
262   my ($self, $label) = @_;
263   return '' unless defined $label;
264   return "*" if $label eq '*';
265   return $label unless $self->{quote_char};
266   if(ref $self->{quote_char} eq "ARRAY"){
267     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
268       if !defined $self->{name_sep};
269     my $sep = $self->{name_sep};
270     return join($self->{name_sep},
271         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
272        split(/\Q$sep\E/,$label));
273   }
274   return $self->SUPER::_quote($label);
275 }
276
277 sub limit_dialect {
278     my $self = shift;
279     $self->{limit_dialect} = shift if @_;
280     return $self->{limit_dialect};
281 }
282
283 sub quote_char {
284     my $self = shift;
285     $self->{quote_char} = shift if @_;
286     return $self->{quote_char};
287 }
288
289 sub name_sep {
290     my $self = shift;
291     $self->{name_sep} = shift if @_;
292     return $self->{name_sep};
293 }
294
295 } # End of BEGIN block
296
297 =head1 NAME
298
299 DBIx::Class::Storage::DBI - DBI storage handler
300
301 =head1 SYNOPSIS
302
303 =head1 DESCRIPTION
304
305 This class represents the connection to an RDBMS via L<DBI>.  See
306 L<DBIx::Class::Storage> for general information.  This pod only
307 documents DBI-specific methods and behaviors.
308
309 =head1 METHODS
310
311 =cut
312
313 sub new {
314   my $new = shift->next::method(@_);
315
316   $new->transaction_depth(0);
317   $new->_sql_maker_opts({});
318   $new->{_in_dbh_do} = 0;
319   $new->{_dbh_gen} = 0;
320
321   $new;
322 }
323
324 =head2 connect_info
325
326 The arguments of C<connect_info> are always a single array reference.
327
328 This is normally accessed via L<DBIx::Class::Schema/connection>, which
329 encapsulates its argument list in an arrayref before calling
330 C<connect_info> here.
331
332 The arrayref can either contain the same set of arguments one would
333 normally pass to L<DBI/connect>, or a lone code reference which returns
334 a connected database handle.  Please note that the L<DBI> docs
335 recommend that you always explicitly set C<AutoCommit> to either
336 C<0> or C<1>.   L<DBIx::Class> further recommends that it be set
337 to C<1>, and that you perform transactions via our L</txn_do>
338 method.  L<DBIx::Class> will set it to C<1> if you do not do explicitly
339 set it to zero.  This is the default for most DBDs.  See below for more
340 details.
341
342 In either case, if the final argument in your connect_info happens
343 to be a hashref, C<connect_info> will look there for several
344 connection-specific options:
345
346 =over 4
347
348 =item on_connect_do
349
350 This can be set to an arrayref of literal sql statements, which will
351 be executed immediately after making the connection to the database
352 every time we [re-]connect.
353
354 =item disable_sth_caching
355
356 If set to a true value, this option will disable the caching of
357 statement handles via L<DBI/prepare_cached>.
358
359 =item limit_dialect 
360
361 Sets the limit dialect. This is useful for JDBC-bridge among others
362 where the remote SQL-dialect cannot be determined by the name of the
363 driver alone.
364
365 =item quote_char
366
367 Specifies what characters to use to quote table and column names. If 
368 you use this you will want to specify L<name_sep> as well.
369
370 quote_char expects either a single character, in which case is it is placed
371 on either side of the table/column, or an arrayref of length 2 in which case the
372 table/column name is placed between the elements.
373
374 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
375 use C<quote_char =E<gt> [qw/[ ]/]>.
376
377 =item name_sep
378
379 This only needs to be used in conjunction with L<quote_char>, and is used to 
380 specify the charecter that seperates elements (schemas, tables, columns) from 
381 each other. In most cases this is simply a C<.>.
382
383 =item unsafe
384
385 This Storage driver normally installs its own C<HandleError>, sets
386 C<RaiseError> and C<ShowErrorStatement> on, and sets C<PrintError> off on
387 all database handles, including those supplied by a coderef.  It does this
388 so that it can have consistent and useful error behavior.
389
390 If you set this option to a true value, Storage will not do its usual
391 modifications to the database handle's attributes, and instead relies on
392 the settings in your connect_info DBI options (or the values you set in
393 your connection coderef, in the case that you are connecting via coderef).
394
395 Note that your custom settings can cause Storage to malfunction,
396 especially if you set a C<HandleError> handler that suppresses exceptions
397 and/or disable C<RaiseError>.
398
399 =back
400
401 These options can be mixed in with your other L<DBI> connection attributes,
402 or placed in a seperate hashref after all other normal L<DBI> connection
403 arguments.
404
405 Every time C<connect_info> is invoked, any previous settings for
406 these options will be cleared before setting the new ones, regardless of
407 whether any options are specified in the new C<connect_info>.
408
409 Another Important Note:
410
411 DBIC can do some wonderful magic with handling exceptions,
412 disconnections, and transactions when you use C<AutoCommit =&gt; 1>
413 combined with C<txn_do> for transaction support.
414
415 If you set C<AutoCommit =&gt; 0> in your connect info, then you are always
416 in an assumed transaction between commits, and you're telling us you'd
417 like to manage that manually.  A lot of DBIC's magic protections
418 go away.  We can't protect you from exceptions due to database
419 disconnects because we don't know anything about how to restart your
420 transactions.  You're on your own for handling all sorts of exceptional
421 cases if you choose the C<AutoCommit =&gt 0> path, just as you would
422 be with raw DBI.
423
424 Examples:
425
426   # Simple SQLite connection
427   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
428
429   # Connect via subref
430   ->connect_info([ sub { DBI->connect(...) } ]);
431
432   # A bit more complicated
433   ->connect_info(
434     [
435       'dbi:Pg:dbname=foo',
436       'postgres',
437       'my_pg_password',
438       { AutoCommit => 1 },
439       { quote_char => q{"}, name_sep => q{.} },
440     ]
441   );
442
443   # Equivalent to the previous example
444   ->connect_info(
445     [
446       'dbi:Pg:dbname=foo',
447       'postgres',
448       'my_pg_password',
449       { AutoCommit => 1, quote_char => q{"}, name_sep => q{.} },
450     ]
451   );
452
453   # Subref + DBIC-specific connection options
454   ->connect_info(
455     [
456       sub { DBI->connect(...) },
457       {
458           quote_char => q{`},
459           name_sep => q{@},
460           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
461           disable_sth_caching => 1,
462       },
463     ]
464   );
465
466 =cut
467
468 sub connect_info {
469   my ($self, $info_arg) = @_;
470
471   return $self->_connect_info if !$info_arg;
472
473   # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
474   #  the new set of options
475   $self->_sql_maker(undef);
476   $self->_sql_maker_opts({});
477   $self->_connect_info([@$info_arg]); # copy for _connect_info
478
479   my $dbi_info = [@$info_arg]; # copy for _dbi_connect_info
480
481   my $last_info = $dbi_info->[-1];
482   if(ref $last_info eq 'HASH') {
483     $last_info = { %$last_info }; # so delete is non-destructive
484     for my $storage_opt (qw/on_connect_do disable_sth_caching unsafe/) {
485       if(my $value = delete $last_info->{$storage_opt}) {
486         $self->$storage_opt($value);
487       }
488     }
489     for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
490       if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
491         $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
492       }
493     }
494     # re-insert modified hashref
495     $dbi_info->[-1] = $last_info;
496
497     # Get rid of any trailing empty hashref
498     pop(@$dbi_info) if !keys %$last_info;
499   }
500   $self->_dbi_connect_info($dbi_info);
501
502   $self->_connect_info;
503 }
504
505 =head2 on_connect_do
506
507 This method is deprecated in favor of setting via L</connect_info>.
508
509 =head2 dbh_do
510
511 Arguments: $subref, @extra_coderef_args?
512
513 Execute the given subref using the new exception-based connection management.
514
515 The first two arguments will be the storage object that C<dbh_do> was called
516 on and a database handle to use.  Any additional arguments will be passed
517 verbatim to the called subref as arguments 2 and onwards.
518
519 Using this (instead of $self->_dbh or $self->dbh) ensures correct
520 exception handling and reconnection (or failover in future subclasses).
521
522 Your subref should have no side-effects outside of the database, as
523 there is the potential for your subref to be partially double-executed
524 if the database connection was stale/dysfunctional.
525
526 Example:
527
528   my @stuff = $schema->storage->dbh_do(
529     sub {
530       my ($storage, $dbh, @cols) = @_;
531       my $cols = join(q{, }, @cols);
532       $dbh->selectrow_array("SELECT $cols FROM foo");
533     },
534     @column_list
535   );
536
537 =cut
538
539 sub dbh_do {
540   my $self = shift;
541   my $coderef = shift;
542
543   ref $coderef eq 'CODE' or $self->throw_exception
544     ('$coderef must be a CODE reference');
545
546   return $coderef->($self, $self->_dbh, @_) if $self->{_in_dbh_do}
547       || $self->{transaction_depth};
548
549   local $self->{_in_dbh_do} = 1;
550
551   my @result;
552   my $want_array = wantarray;
553
554   eval {
555     $self->_verify_pid if $self->_dbh;
556     $self->_populate_dbh if !$self->_dbh;
557     if($want_array) {
558         @result = $coderef->($self, $self->_dbh, @_);
559     }
560     elsif(defined $want_array) {
561         $result[0] = $coderef->($self, $self->_dbh, @_);
562     }
563     else {
564         $coderef->($self, $self->_dbh, @_);
565     }
566   };
567
568   my $exception = $@;
569   if(!$exception) { return $want_array ? @result : $result[0] }
570
571   $self->throw_exception($exception) if $self->connected;
572
573   # We were not connected - reconnect and retry, but let any
574   #  exception fall right through this time
575   $self->_populate_dbh;
576   $coderef->($self, $self->_dbh, @_);
577 }
578
579 # This is basically a blend of dbh_do above and DBIx::Class::Storage::txn_do.
580 # It also informs dbh_do to bypass itself while under the direction of txn_do,
581 #  via $self->{_in_dbh_do} (this saves some redundant eval and errorcheck, etc)
582 sub txn_do {
583   my $self = shift;
584   my $coderef = shift;
585
586   ref $coderef eq 'CODE' or $self->throw_exception
587     ('$coderef must be a CODE reference');
588
589   return $coderef->(@_) if $self->{transaction_depth};
590
591   local $self->{_in_dbh_do} = 1;
592
593   my @result;
594   my $want_array = wantarray;
595
596   my $tried = 0;
597   while(1) {
598     eval {
599       $self->_verify_pid if $self->_dbh;
600       $self->_populate_dbh if !$self->_dbh;
601
602       $self->txn_begin;
603       if($want_array) {
604           @result = $coderef->(@_);
605       }
606       elsif(defined $want_array) {
607           $result[0] = $coderef->(@_);
608       }
609       else {
610           $coderef->(@_);
611       }
612       $self->txn_commit;
613     };
614
615     my $exception = $@;
616     if(!$exception) { return $want_array ? @result : $result[0] }
617
618     if($tried++ > 0 || $self->connected) {
619       eval { $self->txn_rollback };
620       my $rollback_exception = $@;
621       if($rollback_exception) {
622         my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
623         $self->throw_exception($exception)  # propagate nested rollback
624           if $rollback_exception =~ /$exception_class/;
625
626         $self->throw_exception(
627           "Transaction aborted: ${exception}. "
628           . "Rollback failed: ${rollback_exception}"
629         );
630       }
631       $self->throw_exception($exception)
632     }
633
634     # We were not connected, and was first try - reconnect and retry
635     # via the while loop
636     $self->_populate_dbh;
637   }
638 }
639
640 =head2 disconnect
641
642 Our C<disconnect> method also performs a rollback first if the
643 database is not in C<AutoCommit> mode.
644
645 =cut
646
647 sub disconnect {
648   my ($self) = @_;
649
650   if( $self->connected ) {
651     $self->_dbh->rollback unless $self->_dbh_autocommit;
652     $self->_dbh->disconnect;
653     $self->_dbh(undef);
654     $self->{_dbh_gen}++;
655   }
656 }
657
658 sub connected {
659   my ($self) = @_;
660
661   if(my $dbh = $self->_dbh) {
662       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
663           $self->_dbh(undef);
664           $self->{_dbh_gen}++;
665           return;
666       }
667       else {
668           $self->_verify_pid;
669       }
670       return ($dbh->FETCH('Active') && $dbh->ping);
671   }
672
673   return 0;
674 }
675
676 # handle pid changes correctly
677 #  NOTE: assumes $self->_dbh is a valid $dbh
678 sub _verify_pid {
679   my ($self) = @_;
680
681   return if $self->_conn_pid == $$;
682
683   $self->_dbh->{InactiveDestroy} = 1;
684   $self->_dbh(undef);
685   $self->{_dbh_gen}++;
686
687   return;
688 }
689
690 sub ensure_connected {
691   my ($self) = @_;
692
693   unless ($self->connected) {
694     $self->_populate_dbh;
695   }
696 }
697
698 =head2 dbh
699
700 Returns the dbh - a data base handle of class L<DBI>.
701
702 =cut
703
704 sub dbh {
705   my ($self) = @_;
706
707   $self->ensure_connected;
708   return $self->_dbh;
709 }
710
711 sub _sql_maker_args {
712     my ($self) = @_;
713     
714     return ( bindtype=>'columns', limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
715 }
716
717 sub sql_maker {
718   my ($self) = @_;
719   unless ($self->_sql_maker) {
720     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
721   }
722   return $self->_sql_maker;
723 }
724
725 sub _populate_dbh {
726   my ($self) = @_;
727   my @info = @{$self->_dbi_connect_info || []};
728   $self->_dbh($self->_connect(@info));
729
730   # Always set the transaction depth on connect, since
731   #  there is no transaction in progress by definition
732   $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
733
734   if(ref $self eq 'DBIx::Class::Storage::DBI') {
735     my $driver = $self->_dbh->{Driver}->{Name};
736     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
737       bless $self, "DBIx::Class::Storage::DBI::${driver}";
738       $self->_rebless() if $self->can('_rebless');
739     }
740   }
741
742   # if on-connect sql statements are given execute them
743   foreach my $sql_statement (@{$self->on_connect_do || []}) {
744     $self->debugobj->query_start($sql_statement) if $self->debug();
745     $self->_dbh->do($sql_statement);
746     $self->debugobj->query_end($sql_statement) if $self->debug();
747   }
748
749   $self->_conn_pid($$);
750   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
751 }
752
753 sub _connect {
754   my ($self, @info) = @_;
755
756   $self->throw_exception("You failed to provide any connection info")
757     if !@info;
758
759   my ($old_connect_via, $dbh);
760
761   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
762     $old_connect_via = $DBI::connect_via;
763     $DBI::connect_via = 'connect';
764   }
765
766   eval {
767     if(ref $info[0] eq 'CODE') {
768        $dbh = &{$info[0]}
769     }
770     else {
771        $dbh = DBI->connect(@info);
772     }
773
774     if(!$self->unsafe) {
775       my $weak_self = $self;
776       weaken($weak_self);
777       $dbh->{HandleError} = sub {
778           $weak_self->throw_exception("DBI Exception: $_[0]")
779       };
780       $dbh->{ShowErrorStatement} = 1;
781       $dbh->{RaiseError} = 1;
782       $dbh->{PrintError} = 0;
783     }
784   };
785
786   $DBI::connect_via = $old_connect_via if $old_connect_via;
787
788   $self->throw_exception("DBI Connection failed: " . ($@||$DBI::errstr))
789     if !$dbh || $@;
790
791   $self->_dbh_autocommit($dbh->{AutoCommit});
792
793   $dbh;
794 }
795
796
797 sub txn_begin {
798   my $self = shift;
799   $self->ensure_connected();
800   if($self->{transaction_depth} == 0) {
801     $self->debugobj->txn_begin()
802       if $self->debug;
803     # this isn't ->_dbh-> because
804     #  we should reconnect on begin_work
805     #  for AutoCommit users
806     $self->dbh->begin_work;
807   }
808   $self->{transaction_depth}++;
809 }
810
811 sub txn_commit {
812   my $self = shift;
813   if ($self->{transaction_depth} == 1) {
814     my $dbh = $self->_dbh;
815     $self->debugobj->txn_commit()
816       if ($self->debug);
817     $dbh->commit;
818     $self->{transaction_depth} = 0
819       if $self->_dbh_autocommit;
820   }
821   elsif($self->{transaction_depth} > 1) {
822     $self->{transaction_depth}--
823   }
824 }
825
826 sub txn_rollback {
827   my $self = shift;
828   my $dbh = $self->_dbh;
829   eval {
830     if ($self->{transaction_depth} == 1) {
831       $self->debugobj->txn_rollback()
832         if ($self->debug);
833       $self->{transaction_depth} = 0
834         if $self->_dbh_autocommit;
835       $dbh->rollback;
836     }
837     elsif($self->{transaction_depth} > 1) {
838       $self->{transaction_depth}--;
839     }
840     else {
841       die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
842     }
843   };
844   if ($@) {
845     my $error = $@;
846     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
847     $error =~ /$exception_class/ and $self->throw_exception($error);
848     # ensure that a failed rollback resets the transaction depth
849     $self->{transaction_depth} = $self->_dbh_autocommit ? 0 : 1;
850     $self->throw_exception($error);
851   }
852 }
853
854 # This used to be the top-half of _execute.  It was split out to make it
855 #  easier to override in NoBindVars without duping the rest.  It takes up
856 #  all of _execute's args, and emits $sql, @bind.
857 sub _prep_for_execute {
858   my ($self, $op, $extra_bind, $ident, $args) = @_;
859
860   my ($sql, @bind) = $self->sql_maker->$op($ident, @$args);
861   unshift(@bind,
862     map { ref $_ eq 'ARRAY' ? $_ : [ '!!dummy', $_ ] } @$extra_bind)
863       if $extra_bind;
864
865   return ($sql, \@bind);
866 }
867
868 sub _dbh_execute {
869   my ($self, $dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
870   
871   if( blessed($ident) && $ident->isa("DBIx::Class::ResultSource") ) {
872     $ident = $ident->from();
873   }
874
875   my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
876
877   if ($self->debug) {
878       my @debug_bind =
879         map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @$bind;
880       $self->debugobj->query_start($sql, @debug_bind);
881   }
882
883   my $sth = $self->sth($sql,$op);
884
885   my $placeholder_index = 1; 
886
887   foreach my $bound (@$bind) {
888     my $attributes = {};
889     my($column_name, @data) = @$bound;
890
891     if ($bind_attributes) {
892       $attributes = $bind_attributes->{$column_name}
893       if defined $bind_attributes->{$column_name};
894     }
895
896     foreach my $data (@data) {
897       $data = ref $data ? ''.$data : $data; # stringify args
898
899       $sth->bind_param($placeholder_index, $data, $attributes);
900       $placeholder_index++;
901     }
902   }
903
904   # Can this fail without throwing an exception anyways???
905   my $rv = $sth->execute();
906   $self->throw_exception($sth->errstr) if !$rv;
907
908   if ($self->debug) {
909      my @debug_bind =
910        map { defined ($_ && $_->[1]) ? qq{'$_->[1]'} : q{'NULL'} } @$bind; 
911      $self->debugobj->query_end($sql, @debug_bind);
912   }
913
914   return (wantarray ? ($rv, $sth, @$bind) : $rv);
915 }
916
917 sub _execute {
918     my $self = shift;
919     $self->dbh_do($self->can('_dbh_execute'), @_)
920 }
921
922 sub insert {
923   my ($self, $source, $to_insert) = @_;
924   
925   my $ident = $source->from; 
926   my $bind_attributes = $self->source_bind_attributes($source);
927
928   $self->_execute('insert' => [], $source, $bind_attributes, $to_insert);
929
930   return $to_insert;
931 }
932
933 ## Still not quite perfect, and EXPERIMENTAL
934 ## Currently it is assumed that all values passed will be "normal", i.e. not 
935 ## scalar refs, or at least, all the same type as the first set, the statement is
936 ## only prepped once.
937 sub insert_bulk {
938   my ($self, $source, $cols, $data) = @_;
939   my %colvalues;
940   my $table = $source->from;
941   @colvalues{@$cols} = (0..$#$cols);
942   my ($sql, @bind) = $self->sql_maker->insert($table, \%colvalues);
943   
944   if ($self->debug) {
945       my @debug_bind = map { defined $_->[1] ? qq{$_->[1]} : q{'NULL'} } @bind;
946       $self->debugobj->query_start($sql, @debug_bind);
947   }
948   my $sth = $self->sth($sql);
949
950 #  @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
951
952   ## This must be an arrayref, else nothing works!
953   
954   my $tuple_status = [];
955   
956   ##use Data::Dumper;
957   ##print STDERR Dumper( $data, $sql, [@bind] );
958
959   my $time = time();
960
961   ## Get the bind_attributes, if any exist
962   my $bind_attributes = $self->source_bind_attributes($source);
963
964   ## Bind the values and execute
965   my $placeholder_index = 1; 
966
967   foreach my $bound (@bind) {
968
969     my $attributes = {};
970     my ($column_name, $data_index) = @$bound;
971
972     if( $bind_attributes ) {
973       $attributes = $bind_attributes->{$column_name}
974       if defined $bind_attributes->{$column_name};
975     }
976
977     my @data = map { $_->[$data_index] } @$data;
978
979     $sth->bind_param_array( $placeholder_index, [@data], $attributes );
980     $placeholder_index++;
981   }
982   my $rv = $sth->execute_array({ArrayTupleStatus => $tuple_status});
983   $self->throw_exception($sth->errstr) if !$rv;
984
985   if ($self->debug) {
986       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
987       $self->debugobj->query_end($sql, @debug_bind);
988   }
989   return (wantarray ? ($rv, $sth, @bind) : $rv);
990 }
991
992 sub update {
993   my $self = shift @_;
994   my $source = shift @_;
995   my $bind_attributes = $self->source_bind_attributes($source);
996   
997   return $self->_execute('update' => [], $source, $bind_attributes, @_);
998 }
999
1000
1001 sub delete {
1002   my $self = shift @_;
1003   my $source = shift @_;
1004   
1005   my $bind_attrs = {}; ## If ever it's needed...
1006   
1007   return $self->_execute('delete' => [], $source, $bind_attrs, @_);
1008 }
1009
1010 sub _select {
1011   my ($self, $ident, $select, $condition, $attrs) = @_;
1012   my $order = $attrs->{order_by};
1013   if (ref $condition eq 'SCALAR') {
1014     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
1015   }
1016   if (exists $attrs->{group_by} || $attrs->{having}) {
1017     $order = {
1018       group_by => $attrs->{group_by},
1019       having => $attrs->{having},
1020       ($order ? (order_by => $order) : ())
1021     };
1022   }
1023   my $bind_attrs = {}; ## Future support
1024   my @args = ('select', $attrs->{bind}, $ident, $bind_attrs, $select, $condition, $order);
1025   if ($attrs->{software_limit} ||
1026       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
1027         $attrs->{software_limit} = 1;
1028   } else {
1029     $self->throw_exception("rows attribute must be positive if present")
1030       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
1031     push @args, $attrs->{rows}, $attrs->{offset};
1032   }
1033   return $self->_execute(@args);
1034 }
1035
1036 sub source_bind_attributes {
1037   my ($self, $source) = @_;
1038   
1039   my $bind_attributes;
1040   foreach my $column ($source->columns) {
1041   
1042     my $data_type = $source->column_info($column)->{data_type} || '';
1043     $bind_attributes->{$column} = $self->bind_attribute_by_data_type($data_type)
1044      if $data_type;
1045   }
1046
1047   return $bind_attributes;
1048 }
1049
1050 =head2 select
1051
1052 =over 4
1053
1054 =item Arguments: $ident, $select, $condition, $attrs
1055
1056 =back
1057
1058 Handle a SQL select statement.
1059
1060 =cut
1061
1062 sub select {
1063   my $self = shift;
1064   my ($ident, $select, $condition, $attrs) = @_;
1065   return $self->cursor_class->new($self, \@_, $attrs);
1066 }
1067
1068 sub select_single {
1069   my $self = shift;
1070   my ($rv, $sth, @bind) = $self->_select(@_);
1071   my @row = $sth->fetchrow_array;
1072   # Need to call finish() to work round broken DBDs
1073   $sth->finish();
1074   return @row;
1075 }
1076
1077 =head2 sth
1078
1079 =over 4
1080
1081 =item Arguments: $sql
1082
1083 =back
1084
1085 Returns a L<DBI> sth (statement handle) for the supplied SQL.
1086
1087 =cut
1088
1089 sub _dbh_sth {
1090   my ($self, $dbh, $sql) = @_;
1091
1092   # 3 is the if_active parameter which avoids active sth re-use
1093   my $sth = $self->disable_sth_caching
1094     ? $dbh->prepare($sql)
1095     : $dbh->prepare_cached($sql, {}, 3);
1096
1097   # XXX You would think RaiseError would make this impossible,
1098   #  but apparently that's not true :(
1099   $self->throw_exception($dbh->errstr) if !$sth;
1100
1101   $sth;
1102 }
1103
1104 sub sth {
1105   my ($self, $sql) = @_;
1106   $self->dbh_do($self->can('_dbh_sth'), $sql);
1107 }
1108
1109 sub _dbh_columns_info_for {
1110   my ($self, $dbh, $table) = @_;
1111
1112   if ($dbh->can('column_info')) {
1113     my %result;
1114     eval {
1115       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
1116       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
1117       $sth->execute();
1118       while ( my $info = $sth->fetchrow_hashref() ){
1119         my %column_info;
1120         $column_info{data_type}   = $info->{TYPE_NAME};
1121         $column_info{size}      = $info->{COLUMN_SIZE};
1122         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
1123         $column_info{default_value} = $info->{COLUMN_DEF};
1124         my $col_name = $info->{COLUMN_NAME};
1125         $col_name =~ s/^\"(.*)\"$/$1/;
1126
1127         $result{$col_name} = \%column_info;
1128       }
1129     };
1130     return \%result if !$@ && scalar keys %result;
1131   }
1132
1133   my %result;
1134   my $sth = $dbh->prepare($self->sql_maker->select($table, undef, \'1 = 0'));
1135   $sth->execute;
1136   my @columns = @{$sth->{NAME_lc}};
1137   for my $i ( 0 .. $#columns ){
1138     my %column_info;
1139     $column_info{data_type} = $sth->{TYPE}->[$i];
1140     $column_info{size} = $sth->{PRECISION}->[$i];
1141     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
1142
1143     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
1144       $column_info{data_type} = $1;
1145       $column_info{size}    = $2;
1146     }
1147
1148     $result{$columns[$i]} = \%column_info;
1149   }
1150   $sth->finish;
1151
1152   foreach my $col (keys %result) {
1153     my $colinfo = $result{$col};
1154     my $type_num = $colinfo->{data_type};
1155     my $type_name;
1156     if(defined $type_num && $dbh->can('type_info')) {
1157       my $type_info = $dbh->type_info($type_num);
1158       $type_name = $type_info->{TYPE_NAME} if $type_info;
1159       $colinfo->{data_type} = $type_name if $type_name;
1160     }
1161   }
1162
1163   return \%result;
1164 }
1165
1166 sub columns_info_for {
1167   my ($self, $table) = @_;
1168   $self->dbh_do($self->can('_dbh_columns_info_for'), $table);
1169 }
1170
1171 =head2 last_insert_id
1172
1173 Return the row id of the last insert.
1174
1175 =cut
1176
1177 sub _dbh_last_insert_id {
1178     my ($self, $dbh, $source, $col) = @_;
1179     # XXX This is a SQLite-ism as a default... is there a DBI-generic way?
1180     $dbh->func('last_insert_rowid');
1181 }
1182
1183 sub last_insert_id {
1184   my $self = shift;
1185   $self->dbh_do($self->can('_dbh_last_insert_id'), @_);
1186 }
1187
1188 =head2 sqlt_type
1189
1190 Returns the database driver name.
1191
1192 =cut
1193
1194 sub sqlt_type { shift->dbh->{Driver}->{Name} }
1195
1196 =head2 bind_attribute_by_data_type
1197
1198 Given a datatype from column info, returns a database specific bind attribute for
1199 $dbh->bind_param($val,$attribute) or nothing if we will let the database planner
1200 just handle it.
1201
1202 Generally only needed for special case column types, like bytea in postgres.
1203
1204 =cut
1205
1206 sub bind_attribute_by_data_type {
1207     return;
1208 }
1209
1210 =head2 create_ddl_dir
1211
1212 =over 4
1213
1214 =item Arguments: $schema \@databases, $version, $directory, $preversion, $sqlt_args
1215
1216 =back
1217
1218 Creates a SQL file based on the Schema, for each of the specified
1219 database types, in the given directory.
1220
1221 =cut
1222
1223 sub create_ddl_dir
1224 {
1225   my ($self, $schema, $databases, $version, $dir, $preversion, $sqltargs) = @_;
1226
1227   if(!$dir || !-d $dir)
1228   {
1229     warn "No directory given, using ./\n";
1230     $dir = "./";
1231   }
1232   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
1233   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
1234   $version ||= $schema->VERSION || '1.x';
1235   $sqltargs = { ( add_drop_table => 1 ), %{$sqltargs || {}} };
1236
1237   $self->throw_exception(q{Can't create a ddl file without SQL::Translator 0.08: '}
1238       . $self->_check_sqlt_message . q{'})
1239           if !$self->_check_sqlt_version;
1240
1241   my $sqlt = SQL::Translator->new({
1242 #      debug => 1,
1243       add_drop_table => 1,
1244   });
1245   foreach my $db (@$databases)
1246   {
1247     $sqlt->reset();
1248     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
1249 #    $sqlt->parser_args({'DBIx::Class' => $schema);
1250     $sqlt = $self->configure_sqlt($sqlt, $db);
1251     $sqlt->data($schema);
1252     $sqlt->producer($db);
1253
1254     my $file;
1255     my $filename = $schema->ddl_filename($db, $dir, $version);
1256     if(-e $filename)
1257     {
1258       warn("$filename already exists, skipping $db");
1259       next;
1260     }
1261
1262     my $output = $sqlt->translate;
1263     if(!$output)
1264     {
1265       warn("Failed to translate to $db, skipping. (" . $sqlt->error . ")");
1266       next;
1267     }
1268     if(!open($file, ">$filename"))
1269     {
1270         $self->throw_exception("Can't open $filename for writing ($!)");
1271         next;
1272     }
1273     print $file $output;
1274     close($file);
1275
1276     if($preversion)
1277     {
1278       require SQL::Translator::Diff;
1279
1280       my $prefilename = $schema->ddl_filename($db, $dir, $preversion);
1281 #      print "Previous version $prefilename\n";
1282       if(!-e $prefilename)
1283       {
1284         warn("No previous schema file found ($prefilename)");
1285         next;
1286       }
1287       #### We need to reparse the SQLite file we just wrote, so that 
1288       ##   Diff doesnt get all confoosed, and Diff is *very* confused.
1289       ##   FIXME: rip Diff to pieces!
1290 #      my $target_schema = $sqlt->schema;
1291 #      unless ( $target_schema->name ) {
1292 #        $target_schema->name( $filename );
1293 #      }
1294       my @input;
1295       push @input, {file => $prefilename, parser => $db};
1296       push @input, {file => $filename, parser => $db};
1297       my ( $source_schema, $source_db, $target_schema, $target_db ) = map {
1298         my $file   = $_->{'file'};
1299         my $parser = $_->{'parser'};
1300
1301         my $t = SQL::Translator->new;
1302         $t->debug( 0 );
1303         $t->trace( 0 );
1304         $t->parser( $parser )            or die $t->error;
1305         my $out = $t->translate( $file ) or die $t->error;
1306         my $schema = $t->schema;
1307         unless ( $schema->name ) {
1308           $schema->name( $file );
1309         }
1310         ($schema, $parser);
1311       } @input;
1312
1313       my $diff = SQL::Translator::Diff::schema_diff($source_schema, $db,
1314                                                     $target_schema, $db,
1315                                                     {}
1316                                                    );
1317       my $difffile = $schema->ddl_filename($db, $dir, $version, $preversion);
1318       print STDERR "Diff: $difffile: $db, $dir, $version, $preversion \n";
1319       if(-e $difffile)
1320       {
1321         warn("$difffile already exists, skipping");
1322         next;
1323       }
1324       if(!open $file, ">$difffile")
1325       { 
1326         $self->throw_exception("Can't write to $difffile ($!)");
1327         next;
1328       }
1329       print $file $diff;
1330       close($file);
1331     }
1332   }
1333 }
1334
1335 sub configure_sqlt() {
1336   my $self = shift;
1337   my $tr = shift;
1338   my $db = shift || $self->sqlt_type;
1339   if ($db eq 'PostgreSQL') {
1340     $tr->quote_table_names(0);
1341     $tr->quote_field_names(0);
1342   }
1343   return $tr;
1344 }
1345
1346 =head2 deployment_statements
1347
1348 =over 4
1349
1350 =item Arguments: $schema, $type, $version, $directory, $sqlt_args
1351
1352 =back
1353
1354 Returns the statements used by L</deploy> and L<DBIx::Class::Schema/deploy>.
1355 The database driver name is given by C<$type>, though the value from
1356 L</sqlt_type> is used if it is not specified.
1357
1358 C<$directory> is used to return statements from files in a previously created
1359 L</create_ddl_dir> directory and is optional. The filenames are constructed
1360 from L<DBIx::Class::Schema/ddl_filename>, the schema name and the C<$version>.
1361
1362 If no C<$directory> is specified then the statements are constructed on the
1363 fly using L<SQL::Translator> and C<$version> is ignored.
1364
1365 See L<SQL::Translator/METHODS> for a list of values for C<$sqlt_args>.
1366
1367 =cut
1368
1369 sub deployment_statements {
1370   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1371   # Need to be connected to get the correct sqlt_type
1372   $self->ensure_connected() unless $type;
1373   $type ||= $self->sqlt_type;
1374   $version ||= $schema->VERSION || '1.x';
1375   $dir ||= './';
1376   my $filename = $schema->ddl_filename($type, $dir, $version);
1377   if(-f $filename)
1378   {
1379       my $file;
1380       open($file, "<$filename") 
1381         or $self->throw_exception("Can't open $filename ($!)");
1382       my @rows = <$file>;
1383       close($file);
1384       return join('', @rows);
1385   }
1386
1387   $self->throw_exception(q{Can't deploy without SQL::Translator 0.08: '}
1388       . $self->_check_sqlt_message . q{'})
1389           if !$self->_check_sqlt_version;
1390
1391   require SQL::Translator::Parser::DBIx::Class;
1392   eval qq{use SQL::Translator::Producer::${type}};
1393   $self->throw_exception($@) if $@;
1394
1395   # sources needs to be a parser arg, but for simplicty allow at top level 
1396   # coming in
1397   $sqltargs->{parser_args}{sources} = delete $sqltargs->{sources}
1398       if exists $sqltargs->{sources};
1399
1400   my $tr = SQL::Translator->new(%$sqltargs);
1401   SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1402   return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1403
1404   return;
1405
1406 }
1407
1408 sub deploy {
1409   my ($self, $schema, $type, $sqltargs, $dir) = @_;
1410   foreach my $statement ( $self->deployment_statements($schema, $type, undef, $dir, { no_comments => 1, %{ $sqltargs || {} } } ) ) {
1411     foreach my $line ( split(";\n", $statement)) {
1412       next if($line =~ /^--/);
1413       next if(!$line);
1414 #      next if($line =~ /^DROP/m);
1415       next if($line =~ /^BEGIN TRANSACTION/m);
1416       next if($line =~ /^COMMIT/m);
1417       next if $line =~ /^\s+$/; # skip whitespace only
1418       $self->debugobj->query_start($line) if $self->debug;
1419       eval {
1420         $self->dbh->do($line); # shouldn't be using ->dbh ?
1421       };
1422       if ($@) {
1423         warn qq{$@ (running "${line}")};
1424       }
1425       $self->debugobj->query_end($line) if $self->debug;
1426     }
1427   }
1428 }
1429
1430 =head2 datetime_parser
1431
1432 Returns the datetime parser class
1433
1434 =cut
1435
1436 sub datetime_parser {
1437   my $self = shift;
1438   return $self->{datetime_parser} ||= do {
1439     $self->ensure_connected;
1440     $self->build_datetime_parser(@_);
1441   };
1442 }
1443
1444 =head2 datetime_parser_type
1445
1446 Defines (returns) the datetime parser class - currently hardwired to
1447 L<DateTime::Format::MySQL>
1448
1449 =cut
1450
1451 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1452
1453 =head2 build_datetime_parser
1454
1455 See L</datetime_parser>
1456
1457 =cut
1458
1459 sub build_datetime_parser {
1460   my $self = shift;
1461   my $type = $self->datetime_parser_type(@_);
1462   eval "use ${type}";
1463   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1464   return $type;
1465 }
1466
1467 {
1468     my $_check_sqlt_version; # private
1469     my $_check_sqlt_message; # private
1470     sub _check_sqlt_version {
1471         return $_check_sqlt_version if defined $_check_sqlt_version;
1472         eval 'use SQL::Translator 0.08';
1473         $_check_sqlt_message = $@ ? $@ : '';
1474         $_check_sqlt_version = $@ ? 0 : 1;
1475     }
1476
1477     sub _check_sqlt_message {
1478         _check_sqlt_version if !defined $_check_sqlt_message;
1479         $_check_sqlt_message;
1480     }
1481 }
1482
1483 sub DESTROY {
1484   my $self = shift;
1485   return if !$self->_dbh;
1486   $self->_verify_pid;
1487   $self->_dbh(undef);
1488 }
1489
1490 1;
1491
1492 =head1 SQL METHODS
1493
1494 The module defines a set of methods within the DBIC::SQL::Abstract
1495 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1496 SQL query functions.
1497
1498 The following methods are extended:-
1499
1500 =over 4
1501
1502 =item delete
1503
1504 =item insert
1505
1506 =item select
1507
1508 =item update
1509
1510 =item limit_dialect
1511
1512 See L</connect_info> for details.
1513 For setting, this method is deprecated in favor of L</connect_info>.
1514
1515 =item quote_char
1516
1517 See L</connect_info> for details.
1518 For setting, this method is deprecated in favor of L</connect_info>.
1519
1520 =item name_sep
1521
1522 See L</connect_info> for details.
1523 For setting, this method is deprecated in favor of L</connect_info>.
1524
1525 =back
1526
1527 =head1 AUTHORS
1528
1529 Matt S. Trout <mst@shadowcatsystems.co.uk>
1530
1531 Andy Grundman <andy@hybridized.org>
1532
1533 =head1 LICENSE
1534
1535 You may distribute this code under the same terms as Perl itself.
1536
1537 =cut