deployment_statements ensures_connected, this to stop the confusion etc over incorrec...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
137     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
138       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
139     } else {
140       $join_clause = ' JOIN ';
141     }
142     push(@sqlf, $join_clause);
143
144     if (ref $to eq 'ARRAY') {
145       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
146     } else {
147       push(@sqlf, $self->_make_as($to));
148     }
149     push(@sqlf, ' ON ', $self->_join_condition($on));
150   }
151   return join('', @sqlf);
152 }
153
154 sub _make_as {
155   my ($self, $from) = @_;
156   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
157                      reverse each %{$self->_skip_options($from)});
158 }
159
160 sub _skip_options {
161   my ($self, $hash) = @_;
162   my $clean_hash = {};
163   $clean_hash->{$_} = $hash->{$_}
164     for grep {!/^-/} keys %$hash;
165   return $clean_hash;
166 }
167
168 sub _join_condition {
169   my ($self, $cond) = @_;
170   if (ref $cond eq 'HASH') {
171     my %j;
172     for (keys %$cond) {
173       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
174     };
175     return $self->_recurse_where(\%j);
176   } elsif (ref $cond eq 'ARRAY') {
177     return join(' OR ', map { $self->_join_condition($_) } @$cond);
178   } else {
179     die "Can't handle this yet!";
180   }
181 }
182
183 sub _quote {
184   my ($self, $label) = @_;
185   return '' unless defined $label;
186   return "*" if $label eq '*';
187   return $label unless $self->{quote_char};
188   if(ref $self->{quote_char} eq "ARRAY"){
189     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
190       if !defined $self->{name_sep};
191     my $sep = $self->{name_sep};
192     return join($self->{name_sep},
193         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
194        split(/\Q$sep\E/,$label));
195   }
196   return $self->SUPER::_quote($label);
197 }
198
199 sub _RowNum {
200    my $self = shift;
201    my $c;
202    $_[0] =~ s/SELECT (.*?) FROM/
203      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
204    $self->SUPER::_RowNum(@_);
205 }
206
207 sub limit_dialect {
208     my $self = shift;
209     $self->{limit_dialect} = shift if @_;
210     return $self->{limit_dialect};
211 }
212
213 sub quote_char {
214     my $self = shift;
215     $self->{quote_char} = shift if @_;
216     return $self->{quote_char};
217 }
218
219 sub name_sep {
220     my $self = shift;
221     $self->{name_sep} = shift if @_;
222     return $self->{name_sep};
223 }
224
225 } # End of BEGIN block
226
227 use base qw/DBIx::Class/;
228
229 __PACKAGE__->load_components(qw/AccessorGroup/);
230
231 __PACKAGE__->mk_group_accessors('simple' =>
232   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
233      cursor on_connect_do transaction_depth/);
234
235 =head1 NAME
236
237 DBIx::Class::Storage::DBI - DBI storage handler
238
239 =head1 SYNOPSIS
240
241 =head1 DESCRIPTION
242
243 This class represents the connection to the database
244
245 =head1 METHODS
246
247 =head2 new
248
249 =cut
250
251 sub new {
252   my $new = bless({}, ref $_[0] || $_[0]);
253   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
254   $new->transaction_depth(0);
255
256   $new->debugobj(new DBIx::Class::Storage::Statistics());
257
258   my $fh;
259   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
260      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
261     $fh = IO::File->new($1, 'w')
262       or $new->throw_exception("Cannot open trace file $1");
263   } else {
264     $fh = IO::File->new('>&STDERR');
265   }
266   $new->debugfh($fh);
267   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
268   return $new;
269 }
270
271 =head2 throw_exception
272
273 Throws an exception - croaks.
274
275 =cut
276
277 sub throw_exception {
278   my ($self, $msg) = @_;
279   croak($msg);
280 }
281
282 =head2 connect_info
283
284 The arguments of C<connect_info> are always a single array reference.
285
286 This is normally accessed via L<DBIx::Class::Schema/connection>, which
287 encapsulates its argument list in an arrayref before calling
288 C<connect_info> here.
289
290 The arrayref can either contain the same set of arguments one would
291 normally pass to L<DBI/connect>, or a lone code reference which returns
292 a connected database handle.
293
294 In either case, there is an optional final element within the arrayref
295 which can hold a hashref of connection-specific Storage::DBI options.
296 These include C<on_connect_do>, and the sql_maker options
297 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
298
299   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
300
301   ->connect_info([ sub { DBI->connect(...) } ]);
302
303   ->connect_info(
304     [
305       'dbi:Pg:dbname=foo',
306       'postgres',
307       'my_pg_password',
308       { AutoCommit => 0 },
309       { quote_char => q{`}, name_sep => q{@} },
310     ]
311   );
312
313   ->connect_info(
314     [
315       sub { DBI->connect(...) },
316       { quote_char => q{`}, name_sep => q{@} },
317     ]
318   );
319
320 =head2 on_connect_do
321
322 Executes the sql statements given as a listref on every db connect.
323
324 This option can also be set via L</connect_info>.
325
326 =head2 debug
327
328 Causes SQL trace information to be emitted on the C<debugobj> object.
329 (or C<STDERR> if C<debugobj> has not specifically been set).
330
331 =head2 debugfh
332
333 Set or retrieve the filehandle used for trace/debug output.  This should be
334 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
335 set to be STDERR - although see information on the
336 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
337
338 =cut
339
340 sub debugfh {
341     my $self = shift;
342
343     if ($self->debugobj->can('debugfh')) {
344         return $self->debugobj->debugfh(@_);
345     }
346 }
347
348 =head2 debugobj
349
350 Sets or retrieves the object used for metric collection. Defaults to an instance
351 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
352 method of using a coderef as a callback.  See the aforementioned Statistics
353 class for more information.
354
355 =head2 debugcb
356
357 Sets a callback to be executed each time a statement is run; takes a sub
358 reference.  Callback is executed as $sub->($op, $info) where $op is
359 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
360
361 See L<debugobj> for a better way.
362
363 =cut
364
365 sub debugcb {
366     my $self = shift;
367
368     if ($self->debugobj->can('callback')) {
369         return $self->debugobj->callback(@_);
370     }
371 }
372
373 =head2 disconnect
374
375 Disconnect the L<DBI> handle, performing a rollback first if the
376 database is not in C<AutoCommit> mode.
377
378 =cut
379
380 sub disconnect {
381   my ($self) = @_;
382
383   if( $self->connected ) {
384     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
385     $self->_dbh->disconnect;
386     $self->_dbh(undef);
387   }
388 }
389
390 =head2 connected
391
392 Check if the L<DBI> handle is connected.  Returns true if the handle
393 is connected.
394
395 =cut
396
397 sub connected { my ($self) = @_;
398
399   if(my $dbh = $self->_dbh) {
400       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
401           $self->_sql_maker(undef);
402           return $self->_dbh(undef);
403       }
404       elsif($self->_conn_pid != $$) {
405           $self->_dbh->{InactiveDestroy} = 1;
406           $self->_sql_maker(undef);
407           return $self->_dbh(undef)
408       }
409       return ($dbh->FETCH('Active') && $dbh->ping);
410   }
411
412   return 0;
413 }
414
415 =head2 ensure_connected
416
417 Check whether the database handle is connected - if not then make a
418 connection.
419
420 =cut
421
422 sub ensure_connected {
423   my ($self) = @_;
424
425   unless ($self->connected) {
426     $self->_populate_dbh;
427   }
428 }
429
430 =head2 dbh
431
432 Returns the dbh - a data base handle of class L<DBI>.
433
434 =cut
435
436 sub dbh {
437   my ($self) = @_;
438
439   $self->ensure_connected;
440   return $self->_dbh;
441 }
442
443 sub _sql_maker_args {
444     my ($self) = @_;
445     
446     return ( limit_dialect => $self->dbh );
447 }
448
449 =head2 sql_maker
450
451 Returns a C<sql_maker> object - normally an object of class
452 C<DBIC::SQL::Abstract>.
453
454 =cut
455
456 sub sql_maker {
457   my ($self) = @_;
458   unless ($self->_sql_maker) {
459     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
460   }
461   return $self->_sql_maker;
462 }
463
464 sub connect_info {
465   my ($self, $info_arg) = @_;
466
467   if($info_arg) {
468     my %sql_maker_opts;
469     my $info = [ @$info_arg ]; # copy because we can alter it
470     my $last_info = $info->[-1];
471     if(ref $last_info eq 'HASH') {
472       my $used;
473       if(my $on_connect_do = $last_info->{on_connect_do}) {
474         $used = 1;
475         $self->on_connect_do($on_connect_do);
476       }
477       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
478         if(my $opt_val = $last_info->{$sql_maker_opt}) {
479           $used = 1;
480           $sql_maker_opts{$sql_maker_opt} = $opt_val;
481         }
482       }
483
484       # remove our options hashref if it was there, to avoid confusing
485       #   DBI in the case the user didn't use all 4 DBI options, as in:
486       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
487       pop(@$info) if $used;
488     }
489
490     $self->_connect_info($info);
491     $self->sql_maker->$_($sql_maker_opts{$_}) for(keys %sql_maker_opts);
492   }
493
494   $self->_connect_info;
495 }
496
497 sub _populate_dbh {
498   my ($self) = @_;
499   my @info = @{$self->_connect_info || []};
500   $self->_dbh($self->_connect(@info));
501
502   if(ref $self eq 'DBIx::Class::Storage::DBI') {
503     my $driver = $self->_dbh->{Driver}->{Name};
504     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
505       bless $self, "DBIx::Class::Storage::DBI::${driver}";
506       $self->_rebless() if $self->can('_rebless');
507     }
508   }
509
510   # if on-connect sql statements are given execute them
511   foreach my $sql_statement (@{$self->on_connect_do || []}) {
512     $self->debugobj->query_start($sql_statement) if $self->debug();
513     $self->_dbh->do($sql_statement);
514     $self->debugobj->query_end($sql_statement) if $self->debug();
515   }
516
517   $self->_conn_pid($$);
518   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
519 }
520
521 sub _connect {
522   my ($self, @info) = @_;
523
524   $self->throw_exception("You failed to provide any connection info")
525       if !@info;
526
527   my ($old_connect_via, $dbh);
528
529   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
530       $old_connect_via = $DBI::connect_via;
531       $DBI::connect_via = 'connect';
532   }
533
534   eval {
535     $dbh = ref $info[0] eq 'CODE'
536          ? &{$info[0]}
537          : DBI->connect(@info);
538   };
539
540   $DBI::connect_via = $old_connect_via if $old_connect_via;
541
542   if (!$dbh || $@) {
543     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
544   }
545
546   $dbh;
547 }
548
549 =head2 txn_begin
550
551 Calls begin_work on the current dbh.
552
553 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
554 an entire code block to be executed transactionally.
555
556 =cut
557
558 sub txn_begin {
559   my $self = shift;
560   if ($self->{transaction_depth}++ == 0) {
561     my $dbh = $self->dbh;
562     if ($dbh->{AutoCommit}) {
563       $self->debugobj->txn_begin()
564         if ($self->debug);
565       $dbh->begin_work;
566     }
567   }
568 }
569
570 =head2 txn_commit
571
572 Issues a commit against the current dbh.
573
574 =cut
575
576 sub txn_commit {
577   my $self = shift;
578   my $dbh = $self->dbh;
579   if ($self->{transaction_depth} == 0) {
580     unless ($dbh->{AutoCommit}) {
581       $self->debugobj->txn_commit()
582         if ($self->debug);
583       $dbh->commit;
584     }
585   }
586   else {
587     if (--$self->{transaction_depth} == 0) {
588       $self->debugobj->txn_commit()
589         if ($self->debug);
590       $dbh->commit;
591     }
592   }
593 }
594
595 =head2 txn_rollback
596
597 Issues a rollback against the current dbh. A nested rollback will
598 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
599 which allows the rollback to propagate to the outermost transaction.
600
601 =cut
602
603 sub txn_rollback {
604   my $self = shift;
605
606   eval {
607     my $dbh = $self->dbh;
608     if ($self->{transaction_depth} == 0) {
609       unless ($dbh->{AutoCommit}) {
610         $self->debugobj->txn_rollback()
611           if ($self->debug);
612         $dbh->rollback;
613       }
614     }
615     else {
616       if (--$self->{transaction_depth} == 0) {
617         $self->debugobj->txn_rollback()
618           if ($self->debug);
619         $dbh->rollback;
620       }
621       else {
622         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
623       }
624     }
625   };
626
627   if ($@) {
628     my $error = $@;
629     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
630     $error =~ /$exception_class/ and $self->throw_exception($error);
631     $self->{transaction_depth} = 0;          # ensure that a failed rollback
632     $self->throw_exception($error);          # resets the transaction depth
633   }
634 }
635
636 sub _execute {
637   my ($self, $op, $extra_bind, $ident, @args) = @_;
638   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
639   unshift(@bind, @$extra_bind) if $extra_bind;
640   if ($self->debug) {
641       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
642       $self->debugobj->query_start($sql, @debug_bind);
643   }
644   my $sth = eval { $self->sth($sql,$op) };
645
646   if (!$sth || $@) {
647     $self->throw_exception(
648       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
649     );
650   }
651   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
652   my $rv;
653   if ($sth) {
654     my $time = time();
655     $rv = eval { $sth->execute(@bind) };
656
657     if ($@ || !$rv) {
658       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
659     }
660   } else {
661     $self->throw_exception("'$sql' did not generate a statement.");
662   }
663   if ($self->debug) {
664       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
665       $self->debugobj->query_end($sql, @debug_bind);
666   }
667   return (wantarray ? ($rv, $sth, @bind) : $rv);
668 }
669
670 sub insert {
671   my ($self, $ident, $to_insert) = @_;
672   $self->throw_exception(
673     "Couldn't insert ".join(', ',
674       map "$_ => $to_insert->{$_}", keys %$to_insert
675     )." into ${ident}"
676   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
677   return $to_insert;
678 }
679
680 sub update {
681   return shift->_execute('update' => [], @_);
682 }
683
684 sub delete {
685   return shift->_execute('delete' => [], @_);
686 }
687
688 sub _select {
689   my ($self, $ident, $select, $condition, $attrs) = @_;
690   my $order = $attrs->{order_by};
691   if (ref $condition eq 'SCALAR') {
692     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
693   }
694   if (exists $attrs->{group_by} || $attrs->{having}) {
695     $order = {
696       group_by => $attrs->{group_by},
697       having => $attrs->{having},
698       ($order ? (order_by => $order) : ())
699     };
700   }
701   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
702   if ($attrs->{software_limit} ||
703       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
704         $attrs->{software_limit} = 1;
705   } else {
706     $self->throw_exception("rows attribute must be positive if present")
707       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
708     push @args, $attrs->{rows}, $attrs->{offset};
709   }
710   return $self->_execute(@args);
711 }
712
713 =head2 select
714
715 Handle a SQL select statement.
716
717 =cut
718
719 sub select {
720   my $self = shift;
721   my ($ident, $select, $condition, $attrs) = @_;
722   return $self->cursor->new($self, \@_, $attrs);
723 }
724
725 =head2 select_single
726
727 Performs a select, fetch and return of data - handles a single row
728 only.
729
730 =cut
731
732 # Need to call finish() to work round broken DBDs
733
734 sub select_single {
735   my $self = shift;
736   my ($rv, $sth, @bind) = $self->_select(@_);
737   my @row = $sth->fetchrow_array;
738   $sth->finish();
739   return @row;
740 }
741
742 =head2 sth
743
744 Returns a L<DBI> sth (statement handle) for the supplied SQL.
745
746 =cut
747
748 sub sth {
749   my ($self, $sql) = @_;
750   # 3 is the if_active parameter which avoids active sth re-use
751   return $self->dbh->prepare_cached($sql, {}, 3);
752 }
753
754 =head2 columns_info_for
755
756 Returns database type info for a given table columns.
757
758 =cut
759
760 sub columns_info_for {
761   my ($self, $table) = @_;
762
763   my $dbh = $self->dbh;
764
765   if ($dbh->can('column_info')) {
766     my %result;
767     my $old_raise_err = $dbh->{RaiseError};
768     my $old_print_err = $dbh->{PrintError};
769     $dbh->{RaiseError} = 1;
770     $dbh->{PrintError} = 0;
771     eval {
772       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
773       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
774       $sth->execute();
775       while ( my $info = $sth->fetchrow_hashref() ){
776         my %column_info;
777         $column_info{data_type}   = $info->{TYPE_NAME};
778         $column_info{size}      = $info->{COLUMN_SIZE};
779         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
780         $column_info{default_value} = $info->{COLUMN_DEF};
781         my $col_name = $info->{COLUMN_NAME};
782         $col_name =~ s/^\"(.*)\"$/$1/;
783
784         $result{$col_name} = \%column_info;
785       }
786     };
787     $dbh->{RaiseError} = $old_raise_err;
788     $dbh->{PrintError} = $old_print_err;
789     return \%result if !$@;
790   }
791
792   my %result;
793   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
794   $sth->execute;
795   my @columns = @{$sth->{NAME_lc}};
796   for my $i ( 0 .. $#columns ){
797     my %column_info;
798     my $type_num = $sth->{TYPE}->[$i];
799     my $type_name;
800     if(defined $type_num && $dbh->can('type_info')) {
801       my $type_info = $dbh->type_info($type_num);
802       $type_name = $type_info->{TYPE_NAME} if $type_info;
803     }
804     $column_info{data_type} = $type_name ? $type_name : $type_num;
805     $column_info{size} = $sth->{PRECISION}->[$i];
806     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
807
808     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
809       $column_info{data_type} = $1;
810       $column_info{size}    = $2;
811     }
812
813     $result{$columns[$i]} = \%column_info;
814   }
815
816   return \%result;
817 }
818
819 =head2 last_insert_id
820
821 Return the row id of the last insert.
822
823 =cut
824
825 sub last_insert_id {
826   my ($self, $row) = @_;
827     
828   return $self->dbh->func('last_insert_rowid');
829
830 }
831
832 =head2 sqlt_type
833
834 Returns the database driver name.
835
836 =cut
837
838 sub sqlt_type { shift->dbh->{Driver}->{Name} }
839
840 =head2 create_ddl_dir (EXPERIMENTAL)
841
842 =over 4
843
844 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
845
846 =back
847
848 Creates an SQL file based on the Schema, for each of the specified
849 database types, in the given directory.
850
851 Note that this feature is currently EXPERIMENTAL and may not work correctly
852 across all databases, or fully handle complex relationships.
853
854 =cut
855
856 sub create_ddl_dir
857 {
858   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
859
860   if(!$dir || !-d $dir)
861   {
862     warn "No directory given, using ./\n";
863     $dir = "./";
864   }
865   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
866   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
867   $version ||= $schema->VERSION || '1.x';
868
869   eval "use SQL::Translator";
870   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
871
872   my $sqlt = SQL::Translator->new({
873 #      debug => 1,
874       add_drop_table => 1,
875   });
876   foreach my $db (@$databases)
877   {
878     $sqlt->reset();
879     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
880 #    $sqlt->parser_args({'DBIx::Class' => $schema);
881     $sqlt->data($schema);
882     $sqlt->producer($db);
883
884     my $file;
885     my $filename = $schema->ddl_filename($db, $dir, $version);
886     if(-e $filename)
887     {
888       $self->throw_exception("$filename already exists, skipping $db");
889       next;
890     }
891     open($file, ">$filename") 
892       or $self->throw_exception("Can't open $filename for writing ($!)");
893     my $output = $sqlt->translate;
894 #use Data::Dumper;
895 #    print join(":", keys %{$schema->source_registrations});
896 #    print Dumper($sqlt->schema);
897     if(!$output)
898     {
899       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
900       next;
901     }
902     print $file $output;
903     close($file);
904   }
905
906 }
907
908 =head2 deployment_statements
909
910 Create the statements for L</deploy> and
911 L<DBIx::Class::Schema/deploy>.
912
913 =cut
914
915 sub deployment_statements {
916   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
917   # Need to be connected to get the correct sqlt_type
918   $elf->ensure_connected();
919   $type ||= $self->sqlt_type;
920   $version ||= $schema->VERSION || '1.x';
921   $dir ||= './';
922   eval "use SQL::Translator";
923   if(!$@)
924   {
925     eval "use SQL::Translator::Parser::DBIx::Class;";
926     $self->throw_exception($@) if $@;
927     eval "use SQL::Translator::Producer::${type};";
928     $self->throw_exception($@) if $@;
929     my $tr = SQL::Translator->new(%$sqltargs);
930     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
931     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
932   }
933
934   my $filename = $schema->ddl_filename($type, $dir, $version);
935   if(!-f $filename)
936   {
937 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
938       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
939       return;
940   }
941   my $file;
942   open($file, "<$filename") 
943       or $self->throw_exception("Can't open $filename ($!)");
944   my @rows = <$file>;
945   close($file);
946
947   return join('', @rows);
948   
949 }
950
951 =head2 deploy
952
953 Sends the appropriate statements to create or modify tables to the
954 db. This would normally be called through
955 L<DBIx::Class::Schema/deploy>.
956
957 =cut
958
959 sub deploy {
960   my ($self, $schema, $type, $sqltargs) = @_;
961   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
962     for ( split(";\n", $statement)) {
963       next if($_ =~ /^--/);
964       next if(!$_);
965 #      next if($_ =~ /^DROP/m);
966       next if($_ =~ /^BEGIN TRANSACTION/m);
967       next if($_ =~ /^COMMIT/m);
968       $self->debugobj->query_start($_) if $self->debug;
969       $self->dbh->do($_) or warn "SQL was:\n $_";
970       $self->debugobj->query_end($_) if $self->debug;
971     }
972   }
973 }
974
975 =head2 datetime_parser
976
977 Returns the datetime parser class
978
979 =cut
980
981 sub datetime_parser {
982   my $self = shift;
983   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
984 }
985
986 =head2 datetime_parser_type
987
988 Defines (returns) the datetime parser class - currently hardwired to
989 L<DateTime::Format::MySQL>
990
991 =cut
992
993 sub datetime_parser_type { "DateTime::Format::MySQL"; }
994
995 =head2 build_datetime_parser
996
997 See L</datetime_parser>
998
999 =cut
1000
1001 sub build_datetime_parser {
1002   my $self = shift;
1003   my $type = $self->datetime_parser_type(@_);
1004   eval "use ${type}";
1005   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1006   return $type;
1007 }
1008
1009 sub DESTROY { shift->disconnect }
1010
1011 1;
1012
1013 =head1 SQL METHODS
1014
1015 The module defines a set of methods within the DBIC::SQL::Abstract
1016 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1017 SQL query functions.
1018
1019 The following methods are extended:-
1020
1021 =over 4
1022
1023 =item delete
1024
1025 =item insert
1026
1027 =item select
1028
1029 =item update
1030
1031 =item limit_dialect
1032
1033 Accessor for setting limit dialect. This is useful
1034 for JDBC-bridge among others where the remote SQL-dialect cannot
1035 be determined by the name of the driver alone.
1036
1037 This option can also be set via L</connect_info>.
1038
1039 =item quote_char
1040
1041 Specifies what characters to use to quote table and column names. If 
1042 you use this you will want to specify L<name_sep> as well.
1043
1044 quote_char expectes either a single character, in which case is it is placed
1045 on either side of the table/column, or an arrayref of length 2 in which case the
1046 table/column name is placed between the elements.
1047
1048 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1049 use C<quote_char(qw/[ ]/)>.
1050
1051 This option can also be set via L</connect_info>.
1052
1053 =item name_sep
1054
1055 This only needs to be used in conjunction with L<quote_char>, and is used to 
1056 specify the charecter that seperates elements (schemas, tables, columns) from 
1057 each other. In most cases this is simply a C<.>.
1058
1059 This option can also be set via L</connect_info>.
1060
1061 =back
1062
1063 =head1 ENVIRONMENT VARIABLES
1064
1065 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1066
1067 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1068 is produced (as when the L<debug> method is set).
1069
1070 If the value is of the form C<1=/path/name> then the trace output is
1071 written to the file C</path/name>.
1072
1073 This environment variable is checked when the storage object is first
1074 created (when you call connect on your schema).  So, run-time changes 
1075 to this environment variable will not take effect unless you also 
1076 re-connect on your schema.
1077
1078 =head1 AUTHORS
1079
1080 Matt S. Trout <mst@shadowcatsystems.co.uk>
1081
1082 Andy Grundman <andy@hybridized.org>
1083
1084 =head1 LICENSE
1085
1086 You may distribute this code under the same terms as Perl itself.
1087
1088 =cut
1089