Improve on_connect_do docs
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
137     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
138       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
139     } else {
140       $join_clause = ' JOIN ';
141     }
142     push(@sqlf, $join_clause);
143
144     if (ref $to eq 'ARRAY') {
145       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
146     } else {
147       push(@sqlf, $self->_make_as($to));
148     }
149     push(@sqlf, ' ON ', $self->_join_condition($on));
150   }
151   return join('', @sqlf);
152 }
153
154 sub _make_as {
155   my ($self, $from) = @_;
156   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
157                      reverse each %{$self->_skip_options($from)});
158 }
159
160 sub _skip_options {
161   my ($self, $hash) = @_;
162   my $clean_hash = {};
163   $clean_hash->{$_} = $hash->{$_}
164     for grep {!/^-/} keys %$hash;
165   return $clean_hash;
166 }
167
168 sub _join_condition {
169   my ($self, $cond) = @_;
170   if (ref $cond eq 'HASH') {
171     my %j;
172     for (keys %$cond) {
173       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
174     };
175     return $self->_recurse_where(\%j);
176   } elsif (ref $cond eq 'ARRAY') {
177     return join(' OR ', map { $self->_join_condition($_) } @$cond);
178   } else {
179     die "Can't handle this yet!";
180   }
181 }
182
183 sub _quote {
184   my ($self, $label) = @_;
185   return '' unless defined $label;
186   return "*" if $label eq '*';
187   return $label unless $self->{quote_char};
188   if(ref $self->{quote_char} eq "ARRAY"){
189     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
190       if !defined $self->{name_sep};
191     my $sep = $self->{name_sep};
192     return join($self->{name_sep},
193         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
194        split(/\Q$sep\E/,$label));
195   }
196   return $self->SUPER::_quote($label);
197 }
198
199 sub _RowNum {
200    my $self = shift;
201    my $c;
202    $_[0] =~ s/SELECT (.*?) FROM/
203      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
204    $self->SUPER::_RowNum(@_);
205 }
206
207 sub limit_dialect {
208     my $self = shift;
209     $self->{limit_dialect} = shift if @_;
210     return $self->{limit_dialect};
211 }
212
213 sub quote_char {
214     my $self = shift;
215     $self->{quote_char} = shift if @_;
216     return $self->{quote_char};
217 }
218
219 sub name_sep {
220     my $self = shift;
221     $self->{name_sep} = shift if @_;
222     return $self->{name_sep};
223 }
224
225 } # End of BEGIN block
226
227 use base qw/DBIx::Class/;
228
229 __PACKAGE__->load_components(qw/AccessorGroup/);
230
231 __PACKAGE__->mk_group_accessors('simple' =>
232   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
233      cursor on_connect_do transaction_depth/);
234
235 =head1 NAME
236
237 DBIx::Class::Storage::DBI - DBI storage handler
238
239 =head1 SYNOPSIS
240
241 =head1 DESCRIPTION
242
243 This class represents the connection to the database
244
245 =head1 METHODS
246
247 =head2 new
248
249 =cut
250
251 sub new {
252   my $new = bless({}, ref $_[0] || $_[0]);
253   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
254   $new->transaction_depth(0);
255
256   $new->debugobj(new DBIx::Class::Storage::Statistics());
257
258   my $fh;
259   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
260      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
261     $fh = IO::File->new($1, 'w')
262       or $new->throw_exception("Cannot open trace file $1");
263   } else {
264     $fh = IO::File->new('>&STDERR');
265   }
266   $new->debugfh($fh);
267   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
268   return $new;
269 }
270
271 =head2 throw_exception
272
273 Throws an exception - croaks.
274
275 =cut
276
277 sub throw_exception {
278   my ($self, $msg) = @_;
279   croak($msg);
280 }
281
282 =head2 connect_info
283
284 The arguments of C<connect_info> are always a single array reference.
285
286 This is normally accessed via L<DBIx::Class::Schema/connection>, which
287 encapsulates its argument list in an arrayref before calling
288 C<connect_info> here.
289
290 The arrayref can either contain the same set of arguments one would
291 normally pass to L<DBI/connect>, or a lone code reference which returns
292 a connected database handle.
293
294 In either case, there is an optional final element within the arrayref
295 which can hold a hashref of connection-specific Storage::DBI options.
296 These include C<on_connect_do>, and the sql_maker options
297 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
298
299   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
300
301   ->connect_info([ sub { DBI->connect(...) } ]);
302
303   ->connect_info(
304     [
305       'dbi:Pg:dbname=foo',
306       'postgres',
307       'my_pg_password',
308       { AutoCommit => 0 },
309       { quote_char => q{`}, name_sep => q{@} },
310     ]
311   );
312
313   ->connect_info(
314     [
315       sub { DBI->connect(...) },
316       { quote_char => q{`}, name_sep => q{@} },
317     ]
318   );
319
320 =head2 on_connect_do
321
322   $schema->storage->on_connect_do(['PRAGMA synchronous = OFF']);
323
324 Call this after C<< $schema->connect >> to have the sql statements
325 given executed on every db connect.
326
327 This option can also be set via L</connect_info>.
328
329 =head2 debug
330
331 Causes SQL trace information to be emitted on the C<debugobj> object.
332 (or C<STDERR> if C<debugobj> has not specifically been set).
333
334 =head2 debugfh
335
336 Set or retrieve the filehandle used for trace/debug output.  This should be
337 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
338 set to be STDERR - although see information on the
339 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
340
341 =cut
342
343 sub debugfh {
344     my $self = shift;
345
346     if ($self->debugobj->can('debugfh')) {
347         return $self->debugobj->debugfh(@_);
348     }
349 }
350
351 =head2 debugobj
352
353 Sets or retrieves the object used for metric collection. Defaults to an instance
354 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
355 method of using a coderef as a callback.  See the aforementioned Statistics
356 class for more information.
357
358 =head2 debugcb
359
360 Sets a callback to be executed each time a statement is run; takes a sub
361 reference.  Callback is executed as $sub->($op, $info) where $op is
362 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
363
364 See L<debugobj> for a better way.
365
366 =cut
367
368 sub debugcb {
369     my $self = shift;
370
371     if ($self->debugobj->can('callback')) {
372         return $self->debugobj->callback(@_);
373     }
374 }
375
376 =head2 disconnect
377
378 Disconnect the L<DBI> handle, performing a rollback first if the
379 database is not in C<AutoCommit> mode.
380
381 =cut
382
383 sub disconnect {
384   my ($self) = @_;
385
386   if( $self->connected ) {
387     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
388     $self->_dbh->disconnect;
389     $self->_dbh(undef);
390   }
391 }
392
393 =head2 connected
394
395 Check if the L<DBI> handle is connected.  Returns true if the handle
396 is connected.
397
398 =cut
399
400 sub connected { my ($self) = @_;
401
402   if(my $dbh = $self->_dbh) {
403       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
404           $self->_sql_maker(undef);
405           return $self->_dbh(undef);
406       }
407       elsif($self->_conn_pid != $$) {
408           $self->_dbh->{InactiveDestroy} = 1;
409           $self->_sql_maker(undef);
410           return $self->_dbh(undef)
411       }
412       return ($dbh->FETCH('Active') && $dbh->ping);
413   }
414
415   return 0;
416 }
417
418 =head2 ensure_connected
419
420 Check whether the database handle is connected - if not then make a
421 connection.
422
423 =cut
424
425 sub ensure_connected {
426   my ($self) = @_;
427
428   unless ($self->connected) {
429     $self->_populate_dbh;
430   }
431 }
432
433 =head2 dbh
434
435 Returns the dbh - a data base handle of class L<DBI>.
436
437 =cut
438
439 sub dbh {
440   my ($self) = @_;
441
442   $self->ensure_connected;
443   return $self->_dbh;
444 }
445
446 sub _sql_maker_args {
447     my ($self) = @_;
448     
449     return ( limit_dialect => $self->dbh );
450 }
451
452 =head2 sql_maker
453
454 Returns a C<sql_maker> object - normally an object of class
455 C<DBIC::SQL::Abstract>.
456
457 =cut
458
459 sub sql_maker {
460   my ($self) = @_;
461   unless ($self->_sql_maker) {
462     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
463   }
464   return $self->_sql_maker;
465 }
466
467 sub connect_info {
468   my ($self, $info_arg) = @_;
469
470   if($info_arg) {
471     my %sql_maker_opts;
472     my $info = [ @$info_arg ]; # copy because we can alter it
473     my $last_info = $info->[-1];
474     if(ref $last_info eq 'HASH') {
475       my $used;
476       if(my $on_connect_do = $last_info->{on_connect_do}) {
477         $used = 1;
478         $self->on_connect_do($on_connect_do);
479       }
480       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
481         if(my $opt_val = $last_info->{$sql_maker_opt}) {
482           $used = 1;
483           $sql_maker_opts{$sql_maker_opt} = $opt_val;
484         }
485       }
486
487       # remove our options hashref if it was there, to avoid confusing
488       #   DBI in the case the user didn't use all 4 DBI options, as in:
489       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
490       pop(@$info) if $used;
491     }
492
493     $self->_connect_info($info);
494     $self->sql_maker->$_($sql_maker_opts{$_}) for(keys %sql_maker_opts);
495   }
496
497   $self->_connect_info;
498 }
499
500 sub _populate_dbh {
501   my ($self) = @_;
502   my @info = @{$self->_connect_info || []};
503   $self->_dbh($self->_connect(@info));
504
505   if(ref $self eq 'DBIx::Class::Storage::DBI') {
506     my $driver = $self->_dbh->{Driver}->{Name};
507     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
508       bless $self, "DBIx::Class::Storage::DBI::${driver}";
509       $self->_rebless() if $self->can('_rebless');
510     }
511   }
512
513   # if on-connect sql statements are given execute them
514   foreach my $sql_statement (@{$self->on_connect_do || []}) {
515     $self->debugobj->query_start($sql_statement) if $self->debug();
516     $self->_dbh->do($sql_statement);
517     $self->debugobj->query_end($sql_statement) if $self->debug();
518   }
519
520   $self->_conn_pid($$);
521   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
522 }
523
524 sub _connect {
525   my ($self, @info) = @_;
526
527   $self->throw_exception("You failed to provide any connection info")
528       if !@info;
529
530   my ($old_connect_via, $dbh);
531
532   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
533       $old_connect_via = $DBI::connect_via;
534       $DBI::connect_via = 'connect';
535   }
536
537   eval {
538     $dbh = ref $info[0] eq 'CODE'
539          ? &{$info[0]}
540          : DBI->connect(@info);
541   };
542
543   $DBI::connect_via = $old_connect_via if $old_connect_via;
544
545   if (!$dbh || $@) {
546     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
547   }
548
549   $dbh;
550 }
551
552 =head2 txn_begin
553
554 Calls begin_work on the current dbh.
555
556 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
557 an entire code block to be executed transactionally.
558
559 =cut
560
561 sub txn_begin {
562   my $self = shift;
563   if ($self->{transaction_depth}++ == 0) {
564     my $dbh = $self->dbh;
565     if ($dbh->{AutoCommit}) {
566       $self->debugobj->txn_begin()
567         if ($self->debug);
568       $dbh->begin_work;
569     }
570   }
571 }
572
573 =head2 txn_commit
574
575 Issues a commit against the current dbh.
576
577 =cut
578
579 sub txn_commit {
580   my $self = shift;
581   my $dbh = $self->dbh;
582   if ($self->{transaction_depth} == 0) {
583     unless ($dbh->{AutoCommit}) {
584       $self->debugobj->txn_commit()
585         if ($self->debug);
586       $dbh->commit;
587     }
588   }
589   else {
590     if (--$self->{transaction_depth} == 0) {
591       $self->debugobj->txn_commit()
592         if ($self->debug);
593       $dbh->commit;
594     }
595   }
596 }
597
598 =head2 txn_rollback
599
600 Issues a rollback against the current dbh. A nested rollback will
601 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
602 which allows the rollback to propagate to the outermost transaction.
603
604 =cut
605
606 sub txn_rollback {
607   my $self = shift;
608
609   eval {
610     my $dbh = $self->dbh;
611     if ($self->{transaction_depth} == 0) {
612       unless ($dbh->{AutoCommit}) {
613         $self->debugobj->txn_rollback()
614           if ($self->debug);
615         $dbh->rollback;
616       }
617     }
618     else {
619       if (--$self->{transaction_depth} == 0) {
620         $self->debugobj->txn_rollback()
621           if ($self->debug);
622         $dbh->rollback;
623       }
624       else {
625         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
626       }
627     }
628   };
629
630   if ($@) {
631     my $error = $@;
632     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
633     $error =~ /$exception_class/ and $self->throw_exception($error);
634     $self->{transaction_depth} = 0;          # ensure that a failed rollback
635     $self->throw_exception($error);          # resets the transaction depth
636   }
637 }
638
639 sub _execute {
640   my ($self, $op, $extra_bind, $ident, @args) = @_;
641   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
642   unshift(@bind, @$extra_bind) if $extra_bind;
643   if ($self->debug) {
644       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
645       $self->debugobj->query_start($sql, @debug_bind);
646   }
647   my $sth = eval { $self->sth($sql,$op) };
648
649   if (!$sth || $@) {
650     $self->throw_exception(
651       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
652     );
653   }
654   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
655   my $rv;
656   if ($sth) {
657     my $time = time();
658     $rv = eval { $sth->execute(@bind) };
659
660     if ($@ || !$rv) {
661       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
662     }
663   } else {
664     $self->throw_exception("'$sql' did not generate a statement.");
665   }
666   if ($self->debug) {
667       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
668       $self->debugobj->query_end($sql, @debug_bind);
669   }
670   return (wantarray ? ($rv, $sth, @bind) : $rv);
671 }
672
673 sub insert {
674   my ($self, $ident, $to_insert) = @_;
675   $self->throw_exception(
676     "Couldn't insert ".join(', ',
677       map "$_ => $to_insert->{$_}", keys %$to_insert
678     )." into ${ident}"
679   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
680   return $to_insert;
681 }
682
683 sub update {
684   return shift->_execute('update' => [], @_);
685 }
686
687 sub delete {
688   return shift->_execute('delete' => [], @_);
689 }
690
691 sub _select {
692   my ($self, $ident, $select, $condition, $attrs) = @_;
693   my $order = $attrs->{order_by};
694   if (ref $condition eq 'SCALAR') {
695     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
696   }
697   if (exists $attrs->{group_by} || $attrs->{having}) {
698     $order = {
699       group_by => $attrs->{group_by},
700       having => $attrs->{having},
701       ($order ? (order_by => $order) : ())
702     };
703   }
704   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
705   if ($attrs->{software_limit} ||
706       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
707         $attrs->{software_limit} = 1;
708   } else {
709     $self->throw_exception("rows attribute must be positive if present")
710       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
711     push @args, $attrs->{rows}, $attrs->{offset};
712   }
713   return $self->_execute(@args);
714 }
715
716 =head2 select
717
718 Handle a SQL select statement.
719
720 =cut
721
722 sub select {
723   my $self = shift;
724   my ($ident, $select, $condition, $attrs) = @_;
725   return $self->cursor->new($self, \@_, $attrs);
726 }
727
728 =head2 select_single
729
730 Performs a select, fetch and return of data - handles a single row
731 only.
732
733 =cut
734
735 # Need to call finish() to work round broken DBDs
736
737 sub select_single {
738   my $self = shift;
739   my ($rv, $sth, @bind) = $self->_select(@_);
740   my @row = $sth->fetchrow_array;
741   $sth->finish();
742   return @row;
743 }
744
745 =head2 sth
746
747 Returns a L<DBI> sth (statement handle) for the supplied SQL.
748
749 =cut
750
751 sub sth {
752   my ($self, $sql) = @_;
753   # 3 is the if_active parameter which avoids active sth re-use
754   return $self->dbh->prepare_cached($sql, {}, 3);
755 }
756
757 =head2 columns_info_for
758
759 Returns database type info for a given table columns.
760
761 =cut
762
763 sub columns_info_for {
764   my ($self, $table) = @_;
765
766   my $dbh = $self->dbh;
767
768   if ($dbh->can('column_info')) {
769     my %result;
770     my $old_raise_err = $dbh->{RaiseError};
771     my $old_print_err = $dbh->{PrintError};
772     $dbh->{RaiseError} = 1;
773     $dbh->{PrintError} = 0;
774     eval {
775       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
776       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
777       $sth->execute();
778       while ( my $info = $sth->fetchrow_hashref() ){
779         my %column_info;
780         $column_info{data_type}   = $info->{TYPE_NAME};
781         $column_info{size}      = $info->{COLUMN_SIZE};
782         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
783         $column_info{default_value} = $info->{COLUMN_DEF};
784         my $col_name = $info->{COLUMN_NAME};
785         $col_name =~ s/^\"(.*)\"$/$1/;
786
787         $result{$col_name} = \%column_info;
788       }
789     };
790     $dbh->{RaiseError} = $old_raise_err;
791     $dbh->{PrintError} = $old_print_err;
792     return \%result if !$@;
793   }
794
795   my %result;
796   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
797   $sth->execute;
798   my @columns = @{$sth->{NAME_lc}};
799   for my $i ( 0 .. $#columns ){
800     my %column_info;
801     my $type_num = $sth->{TYPE}->[$i];
802     my $type_name;
803     if(defined $type_num && $dbh->can('type_info')) {
804       my $type_info = $dbh->type_info($type_num);
805       $type_name = $type_info->{TYPE_NAME} if $type_info;
806     }
807     $column_info{data_type} = $type_name ? $type_name : $type_num;
808     $column_info{size} = $sth->{PRECISION}->[$i];
809     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
810
811     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
812       $column_info{data_type} = $1;
813       $column_info{size}    = $2;
814     }
815
816     $result{$columns[$i]} = \%column_info;
817   }
818
819   return \%result;
820 }
821
822 =head2 last_insert_id
823
824 Return the row id of the last insert.
825
826 =cut
827
828 sub last_insert_id {
829   my ($self, $row) = @_;
830     
831   return $self->dbh->func('last_insert_rowid');
832
833 }
834
835 =head2 sqlt_type
836
837 Returns the database driver name.
838
839 =cut
840
841 sub sqlt_type { shift->dbh->{Driver}->{Name} }
842
843 =head2 create_ddl_dir (EXPERIMENTAL)
844
845 =over 4
846
847 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
848
849 =back
850
851 Creates an SQL file based on the Schema, for each of the specified
852 database types, in the given directory.
853
854 Note that this feature is currently EXPERIMENTAL and may not work correctly
855 across all databases, or fully handle complex relationships.
856
857 =cut
858
859 sub create_ddl_dir
860 {
861   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
862
863   if(!$dir || !-d $dir)
864   {
865     warn "No directory given, using ./\n";
866     $dir = "./";
867   }
868   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
869   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
870   $version ||= $schema->VERSION || '1.x';
871
872   eval "use SQL::Translator";
873   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
874
875   my $sqlt = SQL::Translator->new({
876 #      debug => 1,
877       add_drop_table => 1,
878   });
879   foreach my $db (@$databases)
880   {
881     $sqlt->reset();
882     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
883 #    $sqlt->parser_args({'DBIx::Class' => $schema);
884     $sqlt->data($schema);
885     $sqlt->producer($db);
886
887     my $file;
888     my $filename = $schema->ddl_filename($db, $dir, $version);
889     if(-e $filename)
890     {
891       $self->throw_exception("$filename already exists, skipping $db");
892       next;
893     }
894     open($file, ">$filename") 
895       or $self->throw_exception("Can't open $filename for writing ($!)");
896     my $output = $sqlt->translate;
897 #use Data::Dumper;
898 #    print join(":", keys %{$schema->source_registrations});
899 #    print Dumper($sqlt->schema);
900     if(!$output)
901     {
902       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
903       next;
904     }
905     print $file $output;
906     close($file);
907   }
908
909 }
910
911 =head2 deployment_statements
912
913 Create the statements for L</deploy> and
914 L<DBIx::Class::Schema/deploy>.
915
916 =cut
917
918 sub deployment_statements {
919   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
920   # Need to be connected to get the correct sqlt_type
921   $self->ensure_connected() unless $type;
922   $type ||= $self->sqlt_type;
923   $version ||= $schema->VERSION || '1.x';
924   $dir ||= './';
925   eval "use SQL::Translator";
926   if(!$@)
927   {
928     eval "use SQL::Translator::Parser::DBIx::Class;";
929     $self->throw_exception($@) if $@;
930     eval "use SQL::Translator::Producer::${type};";
931     $self->throw_exception($@) if $@;
932     my $tr = SQL::Translator->new(%$sqltargs);
933     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
934     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
935   }
936
937   my $filename = $schema->ddl_filename($type, $dir, $version);
938   if(!-f $filename)
939   {
940 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
941       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
942       return;
943   }
944   my $file;
945   open($file, "<$filename") 
946       or $self->throw_exception("Can't open $filename ($!)");
947   my @rows = <$file>;
948   close($file);
949
950   return join('', @rows);
951   
952 }
953
954 =head2 deploy
955
956 Sends the appropriate statements to create or modify tables to the
957 db. This would normally be called through
958 L<DBIx::Class::Schema/deploy>.
959
960 =cut
961
962 sub deploy {
963   my ($self, $schema, $type, $sqltargs) = @_;
964   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
965     for ( split(";\n", $statement)) {
966       next if($_ =~ /^--/);
967       next if(!$_);
968 #      next if($_ =~ /^DROP/m);
969       next if($_ =~ /^BEGIN TRANSACTION/m);
970       next if($_ =~ /^COMMIT/m);
971       $self->debugobj->query_start($_) if $self->debug;
972       $self->dbh->do($_) or warn "SQL was:\n $_";
973       $self->debugobj->query_end($_) if $self->debug;
974     }
975   }
976 }
977
978 =head2 datetime_parser
979
980 Returns the datetime parser class
981
982 =cut
983
984 sub datetime_parser {
985   my $self = shift;
986   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
987 }
988
989 =head2 datetime_parser_type
990
991 Defines (returns) the datetime parser class - currently hardwired to
992 L<DateTime::Format::MySQL>
993
994 =cut
995
996 sub datetime_parser_type { "DateTime::Format::MySQL"; }
997
998 =head2 build_datetime_parser
999
1000 See L</datetime_parser>
1001
1002 =cut
1003
1004 sub build_datetime_parser {
1005   my $self = shift;
1006   my $type = $self->datetime_parser_type(@_);
1007   eval "use ${type}";
1008   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1009   return $type;
1010 }
1011
1012 sub DESTROY { shift->disconnect }
1013
1014 1;
1015
1016 =head1 SQL METHODS
1017
1018 The module defines a set of methods within the DBIC::SQL::Abstract
1019 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1020 SQL query functions.
1021
1022 The following methods are extended:-
1023
1024 =over 4
1025
1026 =item delete
1027
1028 =item insert
1029
1030 =item select
1031
1032 =item update
1033
1034 =item limit_dialect
1035
1036 Accessor for setting limit dialect. This is useful
1037 for JDBC-bridge among others where the remote SQL-dialect cannot
1038 be determined by the name of the driver alone.
1039
1040 This option can also be set via L</connect_info>.
1041
1042 =item quote_char
1043
1044 Specifies what characters to use to quote table and column names. If 
1045 you use this you will want to specify L<name_sep> as well.
1046
1047 quote_char expectes either a single character, in which case is it is placed
1048 on either side of the table/column, or an arrayref of length 2 in which case the
1049 table/column name is placed between the elements.
1050
1051 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1052 use C<quote_char(qw/[ ]/)>.
1053
1054 This option can also be set via L</connect_info>.
1055
1056 =item name_sep
1057
1058 This only needs to be used in conjunction with L<quote_char>, and is used to 
1059 specify the charecter that seperates elements (schemas, tables, columns) from 
1060 each other. In most cases this is simply a C<.>.
1061
1062 This option can also be set via L</connect_info>.
1063
1064 =back
1065
1066 =head1 ENVIRONMENT VARIABLES
1067
1068 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1069
1070 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1071 is produced (as when the L<debug> method is set).
1072
1073 If the value is of the form C<1=/path/name> then the trace output is
1074 written to the file C</path/name>.
1075
1076 This environment variable is checked when the storage object is first
1077 created (when you call connect on your schema).  So, run-time changes 
1078 to this environment variable will not take effect unless you also 
1079 re-connect on your schema.
1080
1081 =head1 AUTHORS
1082
1083 Matt S. Trout <mst@shadowcatsystems.co.uk>
1084
1085 Andy Grundman <andy@hybridized.org>
1086
1087 =head1 LICENSE
1088
1089 You may distribute this code under the same terms as Perl itself.
1090
1091 =cut
1092