env var
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
137     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
138       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
139     } else {
140       $join_clause = ' JOIN ';
141     }
142     push(@sqlf, $join_clause);
143
144     if (ref $to eq 'ARRAY') {
145       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
146     } else {
147       push(@sqlf, $self->_make_as($to));
148     }
149     push(@sqlf, ' ON ', $self->_join_condition($on));
150   }
151   return join('', @sqlf);
152 }
153
154 sub _make_as {
155   my ($self, $from) = @_;
156   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
157                      reverse each %{$self->_skip_options($from)});
158 }
159
160 sub _skip_options {
161   my ($self, $hash) = @_;
162   my $clean_hash = {};
163   $clean_hash->{$_} = $hash->{$_}
164     for grep {!/^-/} keys %$hash;
165   return $clean_hash;
166 }
167
168 sub _join_condition {
169   my ($self, $cond) = @_;
170   if (ref $cond eq 'HASH') {
171     my %j;
172     for (keys %$cond) {
173       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
174     };
175     return $self->_recurse_where(\%j);
176   } elsif (ref $cond eq 'ARRAY') {
177     return join(' OR ', map { $self->_join_condition($_) } @$cond);
178   } else {
179     die "Can't handle this yet!";
180   }
181 }
182
183 sub _quote {
184   my ($self, $label) = @_;
185   return '' unless defined $label;
186   return "*" if $label eq '*';
187   return $label unless $self->{quote_char};
188   if(ref $self->{quote_char} eq "ARRAY"){
189     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
190       if !defined $self->{name_sep};
191     my $sep = $self->{name_sep};
192     return join($self->{name_sep},
193         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
194        split(/\Q$sep\E/,$label));
195   }
196   return $self->SUPER::_quote($label);
197 }
198
199 sub _RowNum {
200    my $self = shift;
201    my $c;
202    $_[0] =~ s/SELECT (.*?) FROM/
203      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
204    $self->SUPER::_RowNum(@_);
205 }
206
207 sub limit_dialect {
208     my $self = shift;
209     $self->{limit_dialect} = shift if @_;
210     return $self->{limit_dialect};
211 }
212
213 sub quote_char {
214     my $self = shift;
215     $self->{quote_char} = shift if @_;
216     return $self->{quote_char};
217 }
218
219 sub name_sep {
220     my $self = shift;
221     $self->{name_sep} = shift if @_;
222     return $self->{name_sep};
223 }
224
225 } # End of BEGIN block
226
227 use base qw/DBIx::Class/;
228
229 __PACKAGE__->load_components(qw/AccessorGroup/);
230
231 __PACKAGE__->mk_group_accessors('simple' =>
232   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
233      cursor on_connect_do transaction_depth/);
234
235 =head1 NAME
236
237 DBIx::Class::Storage::DBI - DBI storage handler
238
239 =head1 SYNOPSIS
240
241 =head1 DESCRIPTION
242
243 This class represents the connection to the database
244
245 =head1 METHODS
246
247 =head2 new
248
249 =cut
250
251 sub new {
252   my $new = bless({}, ref $_[0] || $_[0]);
253   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
254   $new->transaction_depth(0);
255
256   $new->debugobj(new DBIx::Class::Storage::Statistics());
257
258   my $fh;
259
260   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
261                   || $ENV{DBIC_TRACE};
262
263   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
264     $fh = IO::File->new($1, 'w')
265       or $new->throw_exception("Cannot open trace file $1");
266   } else {
267     $fh = IO::File->new('>&STDERR');
268   }
269   $new->debugfh($fh);
270   $new->debug(1) if $debug_env;
271   return $new;
272 }
273
274 =head2 throw_exception
275
276 Throws an exception - croaks.
277
278 =cut
279
280 sub throw_exception {
281   my ($self, $msg) = @_;
282   croak($msg);
283 }
284
285 =head2 connect_info
286
287 The arguments of C<connect_info> are always a single array reference.
288
289 This is normally accessed via L<DBIx::Class::Schema/connection>, which
290 encapsulates its argument list in an arrayref before calling
291 C<connect_info> here.
292
293 The arrayref can either contain the same set of arguments one would
294 normally pass to L<DBI/connect>, or a lone code reference which returns
295 a connected database handle.
296
297 In either case, there is an optional final element within the arrayref
298 which can hold a hashref of connection-specific Storage::DBI options.
299 These include C<on_connect_do>, and the sql_maker options
300 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
301
302   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
303
304   ->connect_info([ sub { DBI->connect(...) } ]);
305
306   ->connect_info(
307     [
308       'dbi:Pg:dbname=foo',
309       'postgres',
310       'my_pg_password',
311       { AutoCommit => 0 },
312       { quote_char => q{`}, name_sep => q{@} },
313     ]
314   );
315
316   ->connect_info(
317     [
318       sub { DBI->connect(...) },
319       { quote_char => q{`}, name_sep => q{@} },
320     ]
321   );
322
323 =head2 on_connect_do
324
325   $schema->storage->on_connect_do(['PRAGMA synchronous = OFF']);
326
327 Call this after C<< $schema->connect >> to have the sql statements
328 given executed on every db connect.
329
330 This option can also be set via L</connect_info>.
331
332 =head2 debug
333
334 Causes SQL trace information to be emitted on the C<debugobj> object.
335 (or C<STDERR> if C<debugobj> has not specifically been set).
336
337 =head2 debugfh
338
339 Set or retrieve the filehandle used for trace/debug output.  This should be
340 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
341 set to be STDERR - although see information on the
342 L<DBIC_TRACE> environment variable.
343
344 =cut
345
346 sub debugfh {
347     my $self = shift;
348
349     if ($self->debugobj->can('debugfh')) {
350         return $self->debugobj->debugfh(@_);
351     }
352 }
353
354 =head2 debugobj
355
356 Sets or retrieves the object used for metric collection. Defaults to an instance
357 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
358 method of using a coderef as a callback.  See the aforementioned Statistics
359 class for more information.
360
361 =head2 debugcb
362
363 Sets a callback to be executed each time a statement is run; takes a sub
364 reference.  Callback is executed as $sub->($op, $info) where $op is
365 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
366
367 See L<debugobj> for a better way.
368
369 =cut
370
371 sub debugcb {
372     my $self = shift;
373
374     if ($self->debugobj->can('callback')) {
375         return $self->debugobj->callback(@_);
376     }
377 }
378
379 =head2 disconnect
380
381 Disconnect the L<DBI> handle, performing a rollback first if the
382 database is not in C<AutoCommit> mode.
383
384 =cut
385
386 sub disconnect {
387   my ($self) = @_;
388
389   if( $self->connected ) {
390     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
391     $self->_dbh->disconnect;
392     $self->_dbh(undef);
393   }
394 }
395
396 =head2 connected
397
398 Check if the L<DBI> handle is connected.  Returns true if the handle
399 is connected.
400
401 =cut
402
403 sub connected { my ($self) = @_;
404
405   if(my $dbh = $self->_dbh) {
406       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
407           $self->_sql_maker(undef);
408           return $self->_dbh(undef);
409       }
410       elsif($self->_conn_pid != $$) {
411           $self->_dbh->{InactiveDestroy} = 1;
412           $self->_sql_maker(undef);
413           return $self->_dbh(undef)
414       }
415       return ($dbh->FETCH('Active') && $dbh->ping);
416   }
417
418   return 0;
419 }
420
421 =head2 ensure_connected
422
423 Check whether the database handle is connected - if not then make a
424 connection.
425
426 =cut
427
428 sub ensure_connected {
429   my ($self) = @_;
430
431   unless ($self->connected) {
432     $self->_populate_dbh;
433   }
434 }
435
436 =head2 dbh
437
438 Returns the dbh - a data base handle of class L<DBI>.
439
440 =cut
441
442 sub dbh {
443   my ($self) = @_;
444
445   $self->ensure_connected;
446   return $self->_dbh;
447 }
448
449 sub _sql_maker_args {
450     my ($self) = @_;
451     
452     return ( limit_dialect => $self->dbh );
453 }
454
455 =head2 sql_maker
456
457 Returns a C<sql_maker> object - normally an object of class
458 C<DBIC::SQL::Abstract>.
459
460 =cut
461
462 sub sql_maker {
463   my ($self) = @_;
464   unless ($self->_sql_maker) {
465     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
466   }
467   return $self->_sql_maker;
468 }
469
470 sub connect_info {
471   my ($self, $info_arg) = @_;
472
473   if($info_arg) {
474     my %sql_maker_opts;
475     my $info = [ @$info_arg ]; # copy because we can alter it
476     my $last_info = $info->[-1];
477     if(ref $last_info eq 'HASH') {
478       my $used;
479       if(my $on_connect_do = $last_info->{on_connect_do}) {
480         $used = 1;
481         $self->on_connect_do($on_connect_do);
482       }
483       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
484         if(my $opt_val = $last_info->{$sql_maker_opt}) {
485           $used = 1;
486           $sql_maker_opts{$sql_maker_opt} = $opt_val;
487         }
488       }
489
490       # remove our options hashref if it was there, to avoid confusing
491       #   DBI in the case the user didn't use all 4 DBI options, as in:
492       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
493       pop(@$info) if $used;
494     }
495
496     $self->_connect_info($info);
497     $self->sql_maker->$_($sql_maker_opts{$_}) for(keys %sql_maker_opts);
498   }
499
500   $self->_connect_info;
501 }
502
503 sub _populate_dbh {
504   my ($self) = @_;
505   my @info = @{$self->_connect_info || []};
506   $self->_dbh($self->_connect(@info));
507
508   if(ref $self eq 'DBIx::Class::Storage::DBI') {
509     my $driver = $self->_dbh->{Driver}->{Name};
510     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
511       bless $self, "DBIx::Class::Storage::DBI::${driver}";
512       $self->_rebless() if $self->can('_rebless');
513     }
514   }
515
516   # if on-connect sql statements are given execute them
517   foreach my $sql_statement (@{$self->on_connect_do || []}) {
518     $self->debugobj->query_start($sql_statement) if $self->debug();
519     $self->_dbh->do($sql_statement);
520     $self->debugobj->query_end($sql_statement) if $self->debug();
521   }
522
523   $self->_conn_pid($$);
524   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
525 }
526
527 sub _connect {
528   my ($self, @info) = @_;
529
530   $self->throw_exception("You failed to provide any connection info")
531       if !@info;
532
533   my ($old_connect_via, $dbh);
534
535   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
536       $old_connect_via = $DBI::connect_via;
537       $DBI::connect_via = 'connect';
538   }
539
540   eval {
541     $dbh = ref $info[0] eq 'CODE'
542          ? &{$info[0]}
543          : DBI->connect(@info);
544   };
545
546   $DBI::connect_via = $old_connect_via if $old_connect_via;
547
548   if (!$dbh || $@) {
549     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
550   }
551
552   $dbh;
553 }
554
555 =head2 txn_begin
556
557 Calls begin_work on the current dbh.
558
559 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
560 an entire code block to be executed transactionally.
561
562 =cut
563
564 sub txn_begin {
565   my $self = shift;
566   if ($self->{transaction_depth}++ == 0) {
567     my $dbh = $self->dbh;
568     if ($dbh->{AutoCommit}) {
569       $self->debugobj->txn_begin()
570         if ($self->debug);
571       $dbh->begin_work;
572     }
573   }
574 }
575
576 =head2 txn_commit
577
578 Issues a commit against the current dbh.
579
580 =cut
581
582 sub txn_commit {
583   my $self = shift;
584   my $dbh = $self->dbh;
585   if ($self->{transaction_depth} == 0) {
586     unless ($dbh->{AutoCommit}) {
587       $self->debugobj->txn_commit()
588         if ($self->debug);
589       $dbh->commit;
590     }
591   }
592   else {
593     if (--$self->{transaction_depth} == 0) {
594       $self->debugobj->txn_commit()
595         if ($self->debug);
596       $dbh->commit;
597     }
598   }
599 }
600
601 =head2 txn_rollback
602
603 Issues a rollback against the current dbh. A nested rollback will
604 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
605 which allows the rollback to propagate to the outermost transaction.
606
607 =cut
608
609 sub txn_rollback {
610   my $self = shift;
611
612   eval {
613     my $dbh = $self->dbh;
614     if ($self->{transaction_depth} == 0) {
615       unless ($dbh->{AutoCommit}) {
616         $self->debugobj->txn_rollback()
617           if ($self->debug);
618         $dbh->rollback;
619       }
620     }
621     else {
622       if (--$self->{transaction_depth} == 0) {
623         $self->debugobj->txn_rollback()
624           if ($self->debug);
625         $dbh->rollback;
626       }
627       else {
628         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
629       }
630     }
631   };
632
633   if ($@) {
634     my $error = $@;
635     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
636     $error =~ /$exception_class/ and $self->throw_exception($error);
637     $self->{transaction_depth} = 0;          # ensure that a failed rollback
638     $self->throw_exception($error);          # resets the transaction depth
639   }
640 }
641
642 sub _execute {
643   my ($self, $op, $extra_bind, $ident, @args) = @_;
644   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
645   unshift(@bind, @$extra_bind) if $extra_bind;
646   if ($self->debug) {
647       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
648       $self->debugobj->query_start($sql, @debug_bind);
649   }
650   my $sth = eval { $self->sth($sql,$op) };
651
652   if (!$sth || $@) {
653     $self->throw_exception(
654       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
655     );
656   }
657   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
658   my $rv;
659   if ($sth) {
660     my $time = time();
661     $rv = eval { $sth->execute(@bind) };
662
663     if ($@ || !$rv) {
664       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
665     }
666   } else {
667     $self->throw_exception("'$sql' did not generate a statement.");
668   }
669   if ($self->debug) {
670       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
671       $self->debugobj->query_end($sql, @debug_bind);
672   }
673   return (wantarray ? ($rv, $sth, @bind) : $rv);
674 }
675
676 sub insert {
677   my ($self, $ident, $to_insert) = @_;
678   $self->throw_exception(
679     "Couldn't insert ".join(', ',
680       map "$_ => $to_insert->{$_}", keys %$to_insert
681     )." into ${ident}"
682   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
683   return $to_insert;
684 }
685
686 sub update {
687   return shift->_execute('update' => [], @_);
688 }
689
690 sub delete {
691   return shift->_execute('delete' => [], @_);
692 }
693
694 sub _select {
695   my ($self, $ident, $select, $condition, $attrs) = @_;
696   my $order = $attrs->{order_by};
697   if (ref $condition eq 'SCALAR') {
698     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
699   }
700   if (exists $attrs->{group_by} || $attrs->{having}) {
701     $order = {
702       group_by => $attrs->{group_by},
703       having => $attrs->{having},
704       ($order ? (order_by => $order) : ())
705     };
706   }
707   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
708   if ($attrs->{software_limit} ||
709       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
710         $attrs->{software_limit} = 1;
711   } else {
712     $self->throw_exception("rows attribute must be positive if present")
713       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
714     push @args, $attrs->{rows}, $attrs->{offset};
715   }
716   return $self->_execute(@args);
717 }
718
719 =head2 select
720
721 Handle a SQL select statement.
722
723 =cut
724
725 sub select {
726   my $self = shift;
727   my ($ident, $select, $condition, $attrs) = @_;
728   return $self->cursor->new($self, \@_, $attrs);
729 }
730
731 =head2 select_single
732
733 Performs a select, fetch and return of data - handles a single row
734 only.
735
736 =cut
737
738 # Need to call finish() to work round broken DBDs
739
740 sub select_single {
741   my $self = shift;
742   my ($rv, $sth, @bind) = $self->_select(@_);
743   my @row = $sth->fetchrow_array;
744   $sth->finish();
745   return @row;
746 }
747
748 =head2 sth
749
750 Returns a L<DBI> sth (statement handle) for the supplied SQL.
751
752 =cut
753
754 sub sth {
755   my ($self, $sql) = @_;
756   # 3 is the if_active parameter which avoids active sth re-use
757   return $self->dbh->prepare_cached($sql, {}, 3);
758 }
759
760 =head2 columns_info_for
761
762 Returns database type info for a given table columns.
763
764 =cut
765
766 sub columns_info_for {
767   my ($self, $table) = @_;
768
769   my $dbh = $self->dbh;
770
771   if ($dbh->can('column_info')) {
772     my %result;
773     my $old_raise_err = $dbh->{RaiseError};
774     my $old_print_err = $dbh->{PrintError};
775     $dbh->{RaiseError} = 1;
776     $dbh->{PrintError} = 0;
777     eval {
778       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
779       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
780       $sth->execute();
781       while ( my $info = $sth->fetchrow_hashref() ){
782         my %column_info;
783         $column_info{data_type}   = $info->{TYPE_NAME};
784         $column_info{size}      = $info->{COLUMN_SIZE};
785         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
786         $column_info{default_value} = $info->{COLUMN_DEF};
787         my $col_name = $info->{COLUMN_NAME};
788         $col_name =~ s/^\"(.*)\"$/$1/;
789
790         $result{$col_name} = \%column_info;
791       }
792     };
793     $dbh->{RaiseError} = $old_raise_err;
794     $dbh->{PrintError} = $old_print_err;
795     return \%result if !$@;
796   }
797
798   my %result;
799   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
800   $sth->execute;
801   my @columns = @{$sth->{NAME_lc}};
802   for my $i ( 0 .. $#columns ){
803     my %column_info;
804     my $type_num = $sth->{TYPE}->[$i];
805     my $type_name;
806     if(defined $type_num && $dbh->can('type_info')) {
807       my $type_info = $dbh->type_info($type_num);
808       $type_name = $type_info->{TYPE_NAME} if $type_info;
809     }
810     $column_info{data_type} = $type_name ? $type_name : $type_num;
811     $column_info{size} = $sth->{PRECISION}->[$i];
812     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
813
814     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
815       $column_info{data_type} = $1;
816       $column_info{size}    = $2;
817     }
818
819     $result{$columns[$i]} = \%column_info;
820   }
821
822   return \%result;
823 }
824
825 =head2 last_insert_id
826
827 Return the row id of the last insert.
828
829 =cut
830
831 sub last_insert_id {
832   my ($self, $row) = @_;
833     
834   return $self->dbh->func('last_insert_rowid');
835
836 }
837
838 =head2 sqlt_type
839
840 Returns the database driver name.
841
842 =cut
843
844 sub sqlt_type { shift->dbh->{Driver}->{Name} }
845
846 =head2 create_ddl_dir (EXPERIMENTAL)
847
848 =over 4
849
850 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
851
852 =back
853
854 Creates an SQL file based on the Schema, for each of the specified
855 database types, in the given directory.
856
857 Note that this feature is currently EXPERIMENTAL and may not work correctly
858 across all databases, or fully handle complex relationships.
859
860 =cut
861
862 sub create_ddl_dir
863 {
864   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
865
866   if(!$dir || !-d $dir)
867   {
868     warn "No directory given, using ./\n";
869     $dir = "./";
870   }
871   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
872   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
873   $version ||= $schema->VERSION || '1.x';
874
875   eval "use SQL::Translator";
876   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
877
878   my $sqlt = SQL::Translator->new({
879 #      debug => 1,
880       add_drop_table => 1,
881   });
882   foreach my $db (@$databases)
883   {
884     $sqlt->reset();
885     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
886 #    $sqlt->parser_args({'DBIx::Class' => $schema);
887     $sqlt->data($schema);
888     $sqlt->producer($db);
889
890     my $file;
891     my $filename = $schema->ddl_filename($db, $dir, $version);
892     if(-e $filename)
893     {
894       $self->throw_exception("$filename already exists, skipping $db");
895       next;
896     }
897     open($file, ">$filename") 
898       or $self->throw_exception("Can't open $filename for writing ($!)");
899     my $output = $sqlt->translate;
900 #use Data::Dumper;
901 #    print join(":", keys %{$schema->source_registrations});
902 #    print Dumper($sqlt->schema);
903     if(!$output)
904     {
905       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
906       next;
907     }
908     print $file $output;
909     close($file);
910   }
911
912 }
913
914 =head2 deployment_statements
915
916 Create the statements for L</deploy> and
917 L<DBIx::Class::Schema/deploy>.
918
919 =cut
920
921 sub deployment_statements {
922   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
923   # Need to be connected to get the correct sqlt_type
924   $self->ensure_connected() unless $type;
925   $type ||= $self->sqlt_type;
926   $version ||= $schema->VERSION || '1.x';
927   $dir ||= './';
928   eval "use SQL::Translator";
929   if(!$@)
930   {
931     eval "use SQL::Translator::Parser::DBIx::Class;";
932     $self->throw_exception($@) if $@;
933     eval "use SQL::Translator::Producer::${type};";
934     $self->throw_exception($@) if $@;
935     my $tr = SQL::Translator->new(%$sqltargs);
936     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
937     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
938   }
939
940   my $filename = $schema->ddl_filename($type, $dir, $version);
941   if(!-f $filename)
942   {
943 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
944       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
945       return;
946   }
947   my $file;
948   open($file, "<$filename") 
949       or $self->throw_exception("Can't open $filename ($!)");
950   my @rows = <$file>;
951   close($file);
952
953   return join('', @rows);
954   
955 }
956
957 =head2 deploy
958
959 Sends the appropriate statements to create or modify tables to the
960 db. This would normally be called through
961 L<DBIx::Class::Schema/deploy>.
962
963 =cut
964
965 sub deploy {
966   my ($self, $schema, $type, $sqltargs) = @_;
967   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
968     for ( split(";\n", $statement)) {
969       next if($_ =~ /^--/);
970       next if(!$_);
971 #      next if($_ =~ /^DROP/m);
972       next if($_ =~ /^BEGIN TRANSACTION/m);
973       next if($_ =~ /^COMMIT/m);
974       $self->debugobj->query_start($_) if $self->debug;
975       $self->dbh->do($_) or warn "SQL was:\n $_";
976       $self->debugobj->query_end($_) if $self->debug;
977     }
978   }
979 }
980
981 =head2 datetime_parser
982
983 Returns the datetime parser class
984
985 =cut
986
987 sub datetime_parser {
988   my $self = shift;
989   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
990 }
991
992 =head2 datetime_parser_type
993
994 Defines (returns) the datetime parser class - currently hardwired to
995 L<DateTime::Format::MySQL>
996
997 =cut
998
999 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1000
1001 =head2 build_datetime_parser
1002
1003 See L</datetime_parser>
1004
1005 =cut
1006
1007 sub build_datetime_parser {
1008   my $self = shift;
1009   my $type = $self->datetime_parser_type(@_);
1010   eval "use ${type}";
1011   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1012   return $type;
1013 }
1014
1015 sub DESTROY { shift->disconnect }
1016
1017 1;
1018
1019 =head1 SQL METHODS
1020
1021 The module defines a set of methods within the DBIC::SQL::Abstract
1022 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1023 SQL query functions.
1024
1025 The following methods are extended:-
1026
1027 =over 4
1028
1029 =item delete
1030
1031 =item insert
1032
1033 =item select
1034
1035 =item update
1036
1037 =item limit_dialect
1038
1039 Accessor for setting limit dialect. This is useful
1040 for JDBC-bridge among others where the remote SQL-dialect cannot
1041 be determined by the name of the driver alone.
1042
1043 This option can also be set via L</connect_info>.
1044
1045 =item quote_char
1046
1047 Specifies what characters to use to quote table and column names. If 
1048 you use this you will want to specify L<name_sep> as well.
1049
1050 quote_char expectes either a single character, in which case is it is placed
1051 on either side of the table/column, or an arrayref of length 2 in which case the
1052 table/column name is placed between the elements.
1053
1054 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1055 use C<quote_char(qw/[ ]/)>.
1056
1057 This option can also be set via L</connect_info>.
1058
1059 =item name_sep
1060
1061 This only needs to be used in conjunction with L<quote_char>, and is used to 
1062 specify the charecter that seperates elements (schemas, tables, columns) from 
1063 each other. In most cases this is simply a C<.>.
1064
1065 This option can also be set via L</connect_info>.
1066
1067 =back
1068
1069 =head1 ENVIRONMENT VARIABLES
1070
1071 =head2 DBIC_TRACE
1072
1073 If C<DBIC_TRACE> is set then SQL trace information
1074 is produced (as when the L<debug> method is set).
1075
1076 If the value is of the form C<1=/path/name> then the trace output is
1077 written to the file C</path/name>.
1078
1079 This environment variable is checked when the storage object is first
1080 created (when you call connect on your schema).  So, run-time changes 
1081 to this environment variable will not take effect unless you also 
1082 re-connect on your schema.
1083
1084 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1085
1086 Old name for DBIC_TRACE
1087
1088 =head1 AUTHORS
1089
1090 Matt S. Trout <mst@shadowcatsystems.co.uk>
1091
1092 Andy Grundman <andy@hybridized.org>
1093
1094 =head1 LICENSE
1095
1096 You may distribute this code under the same terms as Perl itself.
1097
1098 =cut
1099