POD clarification and content bugfixing + a few code formatting fixes
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     if (ref($to) eq 'HASH' and exists($to->{-join_type})) {
137       $join_clause = ' '.uc($to->{-join_type}).' JOIN ';
138     } else {
139       $join_clause = ' JOIN ';
140     }
141     push(@sqlf, $join_clause);
142
143     if (ref $to eq 'ARRAY') {
144       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
145     } else {
146       push(@sqlf, $self->_make_as($to));
147     }
148     push(@sqlf, ' ON ', $self->_join_condition($on));
149   }
150   return join('', @sqlf);
151 }
152
153 sub _make_as {
154   my ($self, $from) = @_;
155   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
156                      reverse each %{$self->_skip_options($from)});
157 }
158
159 sub _skip_options {
160   my ($self, $hash) = @_;
161   my $clean_hash = {};
162   $clean_hash->{$_} = $hash->{$_}
163     for grep {!/^-/} keys %$hash;
164   return $clean_hash;
165 }
166
167 sub _join_condition {
168   my ($self, $cond) = @_;
169   if (ref $cond eq 'HASH') {
170     my %j;
171     for (keys %$cond) {
172       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
173     };
174     return $self->_recurse_where(\%j);
175   } elsif (ref $cond eq 'ARRAY') {
176     return join(' OR ', map { $self->_join_condition($_) } @$cond);
177   } else {
178     die "Can't handle this yet!";
179   }
180 }
181
182 sub _quote {
183   my ($self, $label) = @_;
184   return '' unless defined $label;
185   return "*" if $label eq '*';
186   return $label unless $self->{quote_char};
187   if(ref $self->{quote_char} eq "ARRAY"){
188     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
189       if !defined $self->{name_sep};
190     my $sep = $self->{name_sep};
191     return join($self->{name_sep},
192         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
193        split(/\Q$sep\E/,$label));
194   }
195   return $self->SUPER::_quote($label);
196 }
197
198 sub _RowNum {
199    my $self = shift;
200    my $c;
201    $_[0] =~ s/SELECT (.*?) FROM/
202      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
203    $self->SUPER::_RowNum(@_);
204 }
205
206 sub limit_dialect {
207     my $self = shift;
208     $self->{limit_dialect} = shift if @_;
209     return $self->{limit_dialect};
210 }
211
212 sub quote_char {
213     my $self = shift;
214     $self->{quote_char} = shift if @_;
215     return $self->{quote_char};
216 }
217
218 sub name_sep {
219     my $self = shift;
220     $self->{name_sep} = shift if @_;
221     return $self->{name_sep};
222 }
223
224 } # End of BEGIN block
225
226 use base qw/DBIx::Class/;
227
228 __PACKAGE__->load_components(qw/AccessorGroup/);
229
230 __PACKAGE__->mk_group_accessors('simple' =>
231   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
232      cursor on_connect_do transaction_depth/);
233
234 =head2 new
235
236 =cut
237
238 sub new {
239   my $new = bless({}, ref $_[0] || $_[0]);
240   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
241   $new->transaction_depth(0);
242
243   $new->debugobj(new DBIx::Class::Storage::Statistics());
244
245   my $fh;
246   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
247      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
248     $fh = IO::File->new($1, 'w')
249       or $new->throw_exception("Cannot open trace file $1");
250   } else {
251     $fh = IO::File->new('>&STDERR');
252   }
253   $new->debugfh($fh);
254   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
255   return $new;
256 }
257
258 =head2 throw_exception
259
260 Throws an exception - croaks.
261
262 =cut
263
264 sub throw_exception {
265   my ($self, $msg) = @_;
266   croak($msg);
267 }
268
269 =head1 NAME
270
271 DBIx::Class::Storage::DBI - DBI storage handler
272
273 =head1 SYNOPSIS
274
275 =head1 DESCRIPTION
276
277 This class represents the connection to the database
278
279 =head1 METHODS
280
281 =cut
282
283 =head2 connect_info
284
285 The arguments of C<connect_info> are always a single array reference.
286
287 This is normally accessed via L<DBIx::Class::Schema/connection>, which
288 encapsulates its argument list in an arrayref before calling
289 C<connect_info> here.
290
291 The arrayref can either contain the same set of arguments one would
292 normally pass to L<DBI/connect>, or a lone code reference which returns
293 a connected database handle.
294
295 In either case, there is an optional final element within the arrayref
296 which can hold a hashref of connection-specific Storage::DBI options.
297 These include C<on_connect_do>, and the sql_maker options
298 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
299
300   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
301
302   ->connect_info([ sub { DBI->connect(...) } ]);
303
304   ->connect_info(
305     [
306       'dbi:Pg:dbname=foo',
307       'postgres',
308       'my_pg_password',
309       { AutoCommit => 0 },
310       { quote_char => q{`}, name_sep => q{@} },
311     ]
312   );
313
314   ->connect_info(
315     [
316       sub { DBI->connect(...) },
317       { quote_char => q{`}, name_sep => q{@} },
318     ]
319   );
320
321 =head2 on_connect_do
322
323 Executes the sql statements given as a listref on every db connect.
324
325 This option can also be set via L</connect_info>.
326
327 =head2 debug
328
329 Causes SQL trace information to be emitted on the C<debugobj> object.
330 (or C<STDERR> if C<debugobj> has not specifically been set).
331
332 =head2 debugfh
333
334 Set or retrieve the filehandle used for trace/debug output.  This should be
335 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
336 set to be STDERR - although see information on the
337 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
338
339 =cut
340
341 sub debugfh {
342     my $self = shift;
343
344     if ($self->debugobj->can('debugfh')) {
345         return $self->debugobj->debugfh(@_);
346     }
347 }
348
349 =head2 debugobj
350
351 Sets or retrieves the object used for metric collection. Defaults to an instance
352 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
353 method of using a coderef as a callback.  See the aforementioned Statistics
354 class for more information.
355
356 =head2 debugcb
357
358 Sets a callback to be executed each time a statement is run; takes a sub
359 reference.  Callback is executed as $sub->($op, $info) where $op is
360 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
361
362 See L<debugobj> for a better way.
363
364 =cut
365
366 sub debugcb {
367     my $self = shift;
368
369     if ($self->debugobj->can('callback')) {
370         return $self->debugobj->callback(@_);
371     }
372 }
373
374 =head2 disconnect
375
376 Disconnect the L<DBI> handle, performing a rollback first if the
377 database is not in C<AutoCommit> mode.
378
379 =cut
380
381 sub disconnect {
382   my ($self) = @_;
383
384   if( $self->connected ) {
385     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
386     $self->_dbh->disconnect;
387     $self->_dbh(undef);
388   }
389 }
390
391 =head2 connected
392
393 Check if the L<DBI> handle is connected.  Returns true if the handle
394 is connected.
395
396 =cut
397
398 sub connected { my ($self) = @_;
399
400   if(my $dbh = $self->_dbh) {
401       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
402           $self->_sql_maker(undef);
403           return $self->_dbh(undef);
404       }
405       elsif($self->_conn_pid != $$) {
406           $self->_dbh->{InactiveDestroy} = 1;
407           $self->_sql_maker(undef);
408           return $self->_dbh(undef)
409       }
410       return ($dbh->FETCH('Active') && $dbh->ping);
411   }
412
413   return 0;
414 }
415
416 =head2 ensure_connected
417
418 Check whether the database handle is connected - if not then make a
419 connection.
420
421 =cut
422
423 sub ensure_connected {
424   my ($self) = @_;
425
426   unless ($self->connected) {
427     $self->_populate_dbh;
428   }
429 }
430
431 =head2 dbh
432
433 Returns the dbh - a data base handle of class L<DBI>.
434
435 =cut
436
437 sub dbh {
438   my ($self) = @_;
439
440   $self->ensure_connected;
441   return $self->_dbh;
442 }
443
444 sub _sql_maker_args {
445     my ($self) = @_;
446     
447     return ( limit_dialect => $self->dbh );
448 }
449
450 =head2 sql_maker
451
452 Returns a C<sql_maker> object - normally an object of class
453 C<DBIC::SQL::Abstract>.
454
455 =cut
456
457 sub sql_maker {
458   my ($self) = @_;
459   unless ($self->_sql_maker) {
460     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
461   }
462   return $self->_sql_maker;
463 }
464
465 sub connect_info {
466   my ($self, $info_arg) = @_;
467
468   if($info_arg) {
469     my $info = [ @$info_arg ]; # copy because we can alter it
470     my $last_info = $info->[-1];
471     if(ref $last_info eq 'HASH') {
472       my $used;
473       if(my $on_connect_do = $last_info->{on_connect_do}) {
474         $used = 1;
475         $self->on_connect_do($on_connect_do);
476       }
477       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
478         if(my $opt_val = $last_info->{$sql_maker_opt}) {
479           $used = 1;
480           $self->sql_maker->$sql_maker_opt($opt_val);
481         }
482       }
483
484       # remove our options hashref if it was there, to avoid confusing
485       #   DBI in the case the user didn't use all 4 DBI options, as in:
486       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
487       pop(@$info) if $used;
488     }
489
490     $self->_connect_info($info);
491   }
492
493   $self->_connect_info;
494 }
495
496 sub _populate_dbh {
497   my ($self) = @_;
498   my @info = @{$self->_connect_info || []};
499   $self->_dbh($self->_connect(@info));
500   my $driver = $self->_dbh->{Driver}->{Name};
501   eval "require DBIx::Class::Storage::DBI::${driver}";
502   unless ($@) {
503     bless $self, "DBIx::Class::Storage::DBI::${driver}";
504     $self->_rebless() if $self->can('_rebless');
505   }
506   # if on-connect sql statements are given execute them
507   foreach my $sql_statement (@{$self->on_connect_do || []}) {
508     $self->debugobj->query_start($sql_statement) if $self->debug();
509     $self->_dbh->do($sql_statement);
510     $self->debugobj->query_end($sql_statement) if $self->debug();
511   }
512
513   $self->_conn_pid($$);
514   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
515 }
516
517 sub _connect {
518   my ($self, @info) = @_;
519
520   $self->throw_exception("You failed to provide any connection info")
521       if !@info;
522
523   my ($old_connect_via, $dbh);
524
525   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
526       $old_connect_via = $DBI::connect_via;
527       $DBI::connect_via = 'connect';
528   }
529
530   eval {
531     $dbh = ref $info[0] eq 'CODE'
532          ? &{$info[0]}
533          : DBI->connect(@info);
534   };
535
536   $DBI::connect_via = $old_connect_via if $old_connect_via;
537
538   if (!$dbh || $@) {
539     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
540   }
541
542   $dbh;
543 }
544
545 =head2 txn_begin
546
547 Calls begin_work on the current dbh.
548
549 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
550 an entire code block to be executed transactionally.
551
552 =cut
553
554 sub txn_begin {
555   my $self = shift;
556   if ($self->{transaction_depth}++ == 0) {
557     my $dbh = $self->dbh;
558     if ($dbh->{AutoCommit}) {
559       $self->debugobj->txn_begin()
560         if ($self->debug);
561       $dbh->begin_work;
562     }
563   }
564 }
565
566 =head2 txn_commit
567
568 Issues a commit against the current dbh.
569
570 =cut
571
572 sub txn_commit {
573   my $self = shift;
574   my $dbh = $self->dbh;
575   if ($self->{transaction_depth} == 0) {
576     unless ($dbh->{AutoCommit}) {
577       $self->debugobj->txn_commit()
578         if ($self->debug);
579       $dbh->commit;
580     }
581   }
582   else {
583     if (--$self->{transaction_depth} == 0) {
584       $self->debugobj->txn_commit()
585         if ($self->debug);
586       $dbh->commit;
587     }
588   }
589 }
590
591 =head2 txn_rollback
592
593 Issues a rollback against the current dbh. A nested rollback will
594 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
595 which allows the rollback to propagate to the outermost transaction.
596
597 =cut
598
599 sub txn_rollback {
600   my $self = shift;
601
602   eval {
603     my $dbh = $self->dbh;
604     if ($self->{transaction_depth} == 0) {
605       unless ($dbh->{AutoCommit}) {
606         $self->debugobj->txn_rollback()
607           if ($self->debug);
608         $dbh->rollback;
609       }
610     }
611     else {
612       if (--$self->{transaction_depth} == 0) {
613         $self->debugobj->txn_rollback()
614           if ($self->debug);
615         $dbh->rollback;
616       }
617       else {
618         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
619       }
620     }
621   };
622
623   if ($@) {
624     my $error = $@;
625     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
626     $error =~ /$exception_class/ and $self->throw_exception($error);
627     $self->{transaction_depth} = 0;          # ensure that a failed rollback
628     $self->throw_exception($error);          # resets the transaction depth
629   }
630 }
631
632 sub _execute {
633   my ($self, $op, $extra_bind, $ident, @args) = @_;
634   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
635   unshift(@bind, @$extra_bind) if $extra_bind;
636   if ($self->debug) {
637       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
638       $self->debugobj->query_start($sql, @debug_bind);
639   }
640   my $sth = eval { $self->sth($sql,$op) };
641
642   if (!$sth || $@) {
643     $self->throw_exception(
644       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
645     );
646   }
647   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
648   my $rv;
649   if ($sth) {
650     my $time = time();
651     $rv = eval { $sth->execute(@bind) };
652
653     if ($@ || !$rv) {
654       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
655     }
656   } else {
657     $self->throw_exception("'$sql' did not generate a statement.");
658   }
659   if ($self->debug) {
660       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
661       $self->debugobj->query_end($sql, @debug_bind);
662   }
663   return (wantarray ? ($rv, $sth, @bind) : $rv);
664 }
665
666 sub insert {
667   my ($self, $ident, $to_insert) = @_;
668   $self->throw_exception(
669     "Couldn't insert ".join(', ',
670       map "$_ => $to_insert->{$_}", keys %$to_insert
671     )." into ${ident}"
672   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
673   return $to_insert;
674 }
675
676 sub update {
677   return shift->_execute('update' => [], @_);
678 }
679
680 sub delete {
681   return shift->_execute('delete' => [], @_);
682 }
683
684 sub _select {
685   my ($self, $ident, $select, $condition, $attrs) = @_;
686   my $order = $attrs->{order_by};
687   if (ref $condition eq 'SCALAR') {
688     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
689   }
690   if (exists $attrs->{group_by} || $attrs->{having}) {
691     $order = {
692       group_by => $attrs->{group_by},
693       having => $attrs->{having},
694       ($order ? (order_by => $order) : ())
695     };
696   }
697   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
698   if ($attrs->{software_limit} ||
699       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
700         $attrs->{software_limit} = 1;
701   } else {
702     $self->throw_exception("rows attribute must be positive if present")
703       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
704     push @args, $attrs->{rows}, $attrs->{offset};
705   }
706   return $self->_execute(@args);
707 }
708
709 =head2 select
710
711 Handle a SQL select statement.
712
713 =cut
714
715 sub select {
716   my $self = shift;
717   my ($ident, $select, $condition, $attrs) = @_;
718   return $self->cursor->new($self, \@_, $attrs);
719 }
720
721 =head2 select_single
722
723 Performs a select, fetch and return of data - handles a single row
724 only.
725
726 =cut
727
728 # Need to call finish() to work round broken DBDs
729
730 sub select_single {
731   my $self = shift;
732   my ($rv, $sth, @bind) = $self->_select(@_);
733   my @row = $sth->fetchrow_array;
734   $sth->finish();
735   return @row;
736 }
737
738 =head2 sth
739
740 Returns a L<DBI> sth (statement handle) for the supplied SQL.
741
742 =cut
743
744 sub sth {
745   my ($self, $sql) = @_;
746   # 3 is the if_active parameter which avoids active sth re-use
747   return $self->dbh->prepare_cached($sql, {}, 3);
748 }
749
750 =head2 columns_info_for
751
752 Returns database type info for a given table columns.
753
754 =cut
755
756 sub columns_info_for {
757   my ($self, $table) = @_;
758
759   my $dbh = $self->dbh;
760
761   if ($dbh->can('column_info')) {
762     my %result;
763     my $old_raise_err = $dbh->{RaiseError};
764     my $old_print_err = $dbh->{PrintError};
765     $dbh->{RaiseError} = 1;
766     $dbh->{PrintError} = 0;
767     eval {
768       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
769       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
770       $sth->execute();
771       while ( my $info = $sth->fetchrow_hashref() ){
772         my %column_info;
773         $column_info{data_type}   = $info->{TYPE_NAME};
774         $column_info{size}      = $info->{COLUMN_SIZE};
775         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
776         $column_info{default_value} = $info->{COLUMN_DEF};
777         my $col_name = $info->{COLUMN_NAME};
778         $col_name =~ s/^\"(.*)\"$/$1/;
779
780         $result{$col_name} = \%column_info;
781       }
782     };
783     $dbh->{RaiseError} = $old_raise_err;
784     $dbh->{PrintError} = $old_print_err;
785     return \%result if !$@;
786   }
787
788   my %result;
789   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
790   $sth->execute;
791   my @columns = @{$sth->{NAME_lc}};
792   for my $i ( 0 .. $#columns ){
793     my %column_info;
794     my $type_num = $sth->{TYPE}->[$i];
795     my $type_name;
796     if(defined $type_num && $dbh->can('type_info')) {
797       my $type_info = $dbh->type_info($type_num);
798       $type_name = $type_info->{TYPE_NAME} if $type_info;
799     }
800     $column_info{data_type} = $type_name ? $type_name : $type_num;
801     $column_info{size} = $sth->{PRECISION}->[$i];
802     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
803
804     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
805       $column_info{data_type} = $1;
806       $column_info{size}    = $2;
807     }
808
809     $result{$columns[$i]} = \%column_info;
810   }
811
812   return \%result;
813 }
814
815 =head2 last_insert_id
816
817 Return the row id of the last insert.
818
819 =cut
820
821 sub last_insert_id {
822   my ($self, $row) = @_;
823     
824   return $self->dbh->func('last_insert_rowid');
825
826 }
827
828 =head2 sqlt_type
829
830 Returns the database driver name.
831
832 =cut
833
834 sub sqlt_type { shift->dbh->{Driver}->{Name} }
835
836 =head2 create_ddl_dir (EXPERIMENTAL)
837
838 =over 4
839
840 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
841
842 =back
843
844 Creates an SQL file based on the Schema, for each of the specified
845 database types, in the given directory.
846
847 Note that this feature is currently EXPERIMENTAL and may not work correctly
848 across all databases, or fully handle complex relationships.
849
850 =cut
851
852 sub create_ddl_dir
853 {
854   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
855
856   if(!$dir || !-d $dir)
857   {
858     warn "No directory given, using ./\n";
859     $dir = "./";
860   }
861   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
862   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
863   $version ||= $schema->VERSION || '1.x';
864
865   eval "use SQL::Translator";
866   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
867
868   my $sqlt = SQL::Translator->new({
869 #      debug => 1,
870       add_drop_table => 1,
871   });
872   foreach my $db (@$databases)
873   {
874     $sqlt->reset();
875     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
876 #    $sqlt->parser_args({'DBIx::Class' => $schema);
877     $sqlt->data($schema);
878     $sqlt->producer($db);
879
880     my $file;
881     my $filename = $schema->ddl_filename($db, $dir, $version);
882     if(-e $filename)
883     {
884       $self->throw_exception("$filename already exists, skipping $db");
885       next;
886     }
887     open($file, ">$filename") 
888       or $self->throw_exception("Can't open $filename for writing ($!)");
889     my $output = $sqlt->translate;
890 #use Data::Dumper;
891 #    print join(":", keys %{$schema->source_registrations});
892 #    print Dumper($sqlt->schema);
893     if(!$output)
894     {
895       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
896       next;
897     }
898     print $file $output;
899     close($file);
900   }
901
902 }
903
904 =head2 deployment_statements
905
906 Create the statements for L</deploy> and
907 L<DBIx::Class::Schema/deploy>.
908
909 =cut
910
911 sub deployment_statements {
912   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
913   $type ||= $self->sqlt_type;
914   $version ||= $schema->VERSION || '1.x';
915   $dir ||= './';
916   eval "use SQL::Translator";
917   if(!$@)
918   {
919     eval "use SQL::Translator::Parser::DBIx::Class;";
920     $self->throw_exception($@) if $@;
921     eval "use SQL::Translator::Producer::${type};";
922     $self->throw_exception($@) if $@;
923     my $tr = SQL::Translator->new(%$sqltargs);
924     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
925     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
926   }
927
928   my $filename = $schema->ddl_filename($type, $dir, $version);
929   if(!-f $filename)
930   {
931 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
932       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
933       return;
934   }
935   my $file;
936   open($file, "<$filename") 
937       or $self->throw_exception("Can't open $filename ($!)");
938   my @rows = <$file>;
939   close($file);
940
941   return join('', @rows);
942   
943 }
944
945 =head2 deploy
946
947 Sends the appropriate statements to create or modify tables to the
948 db. This would normally be called through
949 L<DBIx::Class::Schema/deploy>.
950
951 =cut
952
953 sub deploy {
954   my ($self, $schema, $type, $sqltargs) = @_;
955   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
956     for ( split(";\n", $statement)) {
957       next if($_ =~ /^--/);
958       next if(!$_);
959 #      next if($_ =~ /^DROP/m);
960       next if($_ =~ /^BEGIN TRANSACTION/m);
961       next if($_ =~ /^COMMIT/m);
962       $self->debugobj->query_begin($_) if $self->debug;
963       $self->dbh->do($_) or warn "SQL was:\n $_";
964       $self->debugobj->query_end($_) if $self->debug;
965     }
966   }
967 }
968
969 =head2 datetime_parser
970
971 Returns the datetime parser class
972
973 =cut
974
975 sub datetime_parser {
976   my $self = shift;
977   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
978 }
979
980 =head2 datetime_parser_type
981
982 Defines (returns) the datetime parser class - currently hardwired to
983 L<DateTime::Format::MySQL>
984
985 =cut
986
987 sub datetime_parser_type { "DateTime::Format::MySQL"; }
988
989 =head2 build_datetime_parser
990
991 See L</datetime_parser>
992
993 =cut
994
995 sub build_datetime_parser {
996   my $self = shift;
997   my $type = $self->datetime_parser_type(@_);
998   eval "use ${type}";
999   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1000   return $type;
1001 }
1002
1003 sub DESTROY { shift->disconnect }
1004
1005 1;
1006
1007 =head1 SQL METHODS
1008
1009 The module defines a set of methods within the DBIC::SQL::Abstract
1010 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1011 SQL query functions.
1012
1013 The following methods are extended:-
1014
1015 =over 4
1016
1017 =item delete
1018
1019 =item insert
1020
1021 =item select
1022
1023 =item update
1024
1025 =item limit_dialect
1026
1027 Accessor for setting limit dialect. This is useful
1028 for JDBC-bridge among others where the remote SQL-dialect cannot
1029 be determined by the name of the driver alone.
1030
1031 This option can also be set via L</connect_info>.
1032
1033 =item quote_char
1034
1035 Specifies what characters to use to quote table and column names. If 
1036 you use this you will want to specify L<name_sep> as well.
1037
1038 quote_char expectes either a single character, in which case is it is placed
1039 on either side of the table/column, or an arrayref of length 2 in which case the
1040 table/column name is placed between the elements.
1041
1042 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1043 use C<quote_char(qw/[ ]/)>.
1044
1045 This option can also be set via L</connect_info>.
1046
1047 =item name_sep
1048
1049 This only needs to be used in conjunction with L<quote_char>, and is used to 
1050 specify the charecter that seperates elements (schemas, tables, columns) from 
1051 each other. In most cases this is simply a C<.>.
1052
1053 This option can also be set via L</connect_info>.
1054
1055 =back
1056
1057 =head1 ENVIRONMENT VARIABLES
1058
1059 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1060
1061 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1062 is produced (as when the L<debug> method is set).
1063
1064 If the value is of the form C<1=/path/name> then the trace output is
1065 written to the file C</path/name>.
1066
1067 This environment variable is checked when the storage object is first
1068 created (when you call connect on your schema).  So, run-time changes 
1069 to this environment variable will not take effect unless you also 
1070 re-connect on your schema.
1071
1072 =head1 AUTHORS
1073
1074 Matt S. Trout <mst@shadowcatsystems.co.uk>
1075
1076 Andy Grundman <andy@hybridized.org>
1077
1078 =head1 LICENSE
1079
1080 You may distribute this code under the same terms as Perl itself.
1081
1082 =cut
1083