POD::Coverage additions
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     if (ref($to) eq 'HASH' and exists($to->{-join_type})) {
137       $join_clause = ' '.uc($to->{-join_type}).' JOIN ';
138     } else {
139       $join_clause = ' JOIN ';
140     }
141     push(@sqlf, $join_clause);
142
143     if (ref $to eq 'ARRAY') {
144       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
145     } else {
146       push(@sqlf, $self->_make_as($to));
147     }
148     push(@sqlf, ' ON ', $self->_join_condition($on));
149   }
150   return join('', @sqlf);
151 }
152
153 sub _make_as {
154   my ($self, $from) = @_;
155   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
156                      reverse each %{$self->_skip_options($from)});
157 }
158
159 sub _skip_options {
160   my ($self, $hash) = @_;
161   my $clean_hash = {};
162   $clean_hash->{$_} = $hash->{$_}
163     for grep {!/^-/} keys %$hash;
164   return $clean_hash;
165 }
166
167 sub _join_condition {
168   my ($self, $cond) = @_;
169   if (ref $cond eq 'HASH') {
170     my %j;
171     for (keys %$cond) {
172       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
173     };
174     return $self->_recurse_where(\%j);
175   } elsif (ref $cond eq 'ARRAY') {
176     return join(' OR ', map { $self->_join_condition($_) } @$cond);
177   } else {
178     die "Can't handle this yet!";
179   }
180 }
181
182 sub _quote {
183   my ($self, $label) = @_;
184   return '' unless defined $label;
185   return "*" if $label eq '*';
186   return $label unless $self->{quote_char};
187   if(ref $self->{quote_char} eq "ARRAY"){
188     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
189       if !defined $self->{name_sep};
190     my $sep = $self->{name_sep};
191     return join($self->{name_sep},
192         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
193        split(/\Q$sep\E/,$label));
194   }
195   return $self->SUPER::_quote($label);
196 }
197
198 sub _RowNum {
199    my $self = shift;
200    my $c;
201    $_[0] =~ s/SELECT (.*?) FROM/
202      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
203    $self->SUPER::_RowNum(@_);
204 }
205
206 # Accessor for setting limit dialect. This is useful
207 # for JDBC-bridge among others where the remote SQL-dialect cannot
208 # be determined by the name of the driver alone.
209 #
210 sub limit_dialect {
211     my $self = shift;
212     $self->{limit_dialect} = shift if @_;
213     return $self->{limit_dialect};
214 }
215
216 sub quote_char {
217     my $self = shift;
218     $self->{quote_char} = shift if @_;
219     return $self->{quote_char};
220 }
221
222 sub name_sep {
223     my $self = shift;
224     $self->{name_sep} = shift if @_;
225     return $self->{name_sep};
226 }
227
228 } # End of BEGIN block
229
230 use base qw/DBIx::Class/;
231
232 __PACKAGE__->load_components(qw/AccessorGroup/);
233
234 __PACKAGE__->mk_group_accessors('simple' =>
235   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
236      cursor on_connect_do transaction_depth/);
237
238 =head2 new
239
240 =cut
241
242 sub new {
243   my $new = bless({}, ref $_[0] || $_[0]);
244   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
245   $new->transaction_depth(0);
246
247   $new->debugobj(new DBIx::Class::Storage::Statistics());
248
249   my $fh;
250   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
251      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
252     $fh = IO::File->new($1, 'w')
253       or $new->throw_exception("Cannot open trace file $1");
254   } else {
255     $fh = IO::File->new('>&STDERR');
256   }
257   $new->debugfh($fh);
258   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
259   return $new;
260 }
261
262 =head2 throw_exception
263
264 Throws an exception - croaks.
265
266 =cut
267
268 sub throw_exception {
269   my ($self, $msg) = @_;
270   croak($msg);
271 }
272
273 =head1 NAME
274
275 DBIx::Class::Storage::DBI - DBI storage handler
276
277 =head1 SYNOPSIS
278
279 =head1 DESCRIPTION
280
281 This class represents the connection to the database
282
283 =head1 METHODS
284
285 =cut
286
287 =head2 connect_info
288
289 Connection information arrayref.  Can either be the same arguments
290 one would pass to DBI->connect, or a code-reference which returns
291 a connected database handle.  In either case, there is an optional
292 final element in the arrayref, which can hold a hashref of
293 connection-specific Storage::DBI options.  These include
294 C<on_connect_do>, and the sql_maker options C<limit_dialect>,
295 C<quote_char>, and C<name_sep>.  Examples:
296
297   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
298   ->connect_info(sub { DBI->connect(...) });
299   ->connect_info([ 'dbi:Pg:dbname=foo',
300                    'postgres',
301                    '',
302                    { AutoCommit => 0 },
303                    { quote_char => q{`}, name_sep => q{@} },
304                  ]);
305
306 =head2 on_connect_do
307
308 Executes the sql statements given as a listref on every db connect.
309
310 =head2 quote_char
311
312 Specifies what characters to use to quote table and column names. If 
313 you use this you will want to specify L<name_sep> as well.
314
315 quote_char expectes either a single character, in which case is it is placed
316 on either side of the table/column, or an array of length 2 in which case the
317 table/column name is placed between the elements.
318
319 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
320 use C<quote_char(qw/[ ]/)>.
321
322 =head2 name_sep
323
324 This only needs to be used in conjunction with L<quote_char>, and is used to 
325 specify the charecter that seperates elements (schemas, tables, columns) from 
326 each other. In most cases this is simply a C<.>.
327
328 =head2 debug
329
330 Causes SQL trace information to be emitted on the C<debugobj> object.
331 (or C<STDERR> if C<debugobj> has not specifically been set).
332
333 =head2 debugfh
334
335 Set or retrieve the filehandle used for trace/debug output.  This should be
336 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
337 set to be STDERR - although see information on the
338 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
339
340 =cut
341
342 sub debugfh {
343     my $self = shift;
344
345     if ($self->debugobj->can('debugfh')) {
346         return $self->debugobj->debugfh(@_);
347     }
348 }
349
350 =head2 debugobj
351
352 Sets or retrieves the object used for metric collection. Defaults to an instance
353 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
354 method of using a coderef as a callback.  See the aforementioned Statistics
355 class for more information.
356
357 =head2 debugcb
358
359 Sets a callback to be executed each time a statement is run; takes a sub
360 reference.  Callback is executed as $sub->($op, $info) where $op is
361 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
362
363 See L<debugobj> for a better way.
364
365 =cut
366
367 sub debugcb {
368     my $self = shift;
369
370     if ($self->debugobj->can('callback')) {
371         return $self->debugobj->callback(@_);
372     }
373 }
374
375 =head2 disconnect
376
377 Disconnect the L<DBI> handle, performing a rollback first if the
378 database is not in C<AutoCommit> mode.
379
380 =cut
381
382 sub disconnect {
383   my ($self) = @_;
384
385   if( $self->connected ) {
386     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
387     $self->_dbh->disconnect;
388     $self->_dbh(undef);
389   }
390 }
391
392 =head2 connected
393
394 Check if the L<DBI> handle is connected.  Returns true if the handle
395 is connected.
396
397 =cut
398
399 sub connected { my ($self) = @_;
400
401   if(my $dbh = $self->_dbh) {
402       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
403           $self->_sql_maker(undef);
404           return $self->_dbh(undef);
405       }
406       elsif($self->_conn_pid != $$) {
407           $self->_dbh->{InactiveDestroy} = 1;
408           $self->_sql_maker(undef);
409           return $self->_dbh(undef)
410       }
411       return ($dbh->FETCH('Active') && $dbh->ping);
412   }
413
414   return 0;
415 }
416
417 =head2 ensure_connected
418
419 Check whether the database handle is connected - if not then make a
420 connection.
421
422 =cut
423
424 sub ensure_connected {
425   my ($self) = @_;
426
427   unless ($self->connected) {
428     $self->_populate_dbh;
429   }
430 }
431
432 =head2 dbh
433
434 Returns the dbh - a data base handle of class L<DBI>.
435
436 =cut
437
438 sub dbh {
439   my ($self) = @_;
440
441   $self->ensure_connected;
442   return $self->_dbh;
443 }
444
445 sub _sql_maker_args {
446     my ($self) = @_;
447     
448     return ( limit_dialect => $self->dbh );
449 }
450
451 =head2 sql_maker
452
453 Returns a C<sql_maker> object - normally an object of class
454 C<DBIC::SQL::Abstract>.
455
456 =cut
457
458 sub sql_maker {
459   my ($self) = @_;
460   unless ($self->_sql_maker) {
461     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
462   }
463   return $self->_sql_maker;
464 }
465
466 sub connect_info {
467     my ($self, $info_arg) = @_;
468
469     if($info_arg) {
470         my $info = [ @$info_arg ]; # copy because we can alter it
471         my $last_info = $info->[-1];
472         if(ref $last_info eq 'HASH') {
473             my $used;
474             if(my $on_connect_do = $last_info->{on_connect_do}) {
475                $used = 1;
476                $self->on_connect_do($on_connect_do);
477             }
478             for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
479                 if(my $opt_val = $last_info->{$sql_maker_opt}) {
480                     $used = 1;
481                     $self->sql_maker->$sql_maker_opt($opt_val);
482                 }
483             }
484
485             # remove our options hashref if it was there, to avoid confusing
486             #   DBI in the case the user didn't use all 4 DBI options, as in:
487             #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
488             pop(@$info) if $used;
489         }
490
491         $self->_connect_info($info);
492     }
493
494     $self->_connect_info;
495 }
496
497 sub _populate_dbh {
498   my ($self) = @_;
499   my @info = @{$self->_connect_info || []};
500   $self->_dbh($self->_connect(@info));
501   my $driver = $self->_dbh->{Driver}->{Name};
502   eval "require DBIx::Class::Storage::DBI::${driver}";
503   unless ($@) {
504     bless $self, "DBIx::Class::Storage::DBI::${driver}";
505     $self->_rebless() if $self->can('_rebless');
506   }
507   # if on-connect sql statements are given execute them
508   foreach my $sql_statement (@{$self->on_connect_do || []}) {
509     $self->debugobj->query_start($sql_statement) if $self->debug();
510     $self->_dbh->do($sql_statement);
511     $self->debugobj->query_end($sql_statement) if $self->debug();
512   }
513
514   $self->_conn_pid($$);
515   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
516 }
517
518 sub _connect {
519   my ($self, @info) = @_;
520
521   $self->throw_exception("You failed to provide any connection info")
522       if !@info;
523
524   my ($old_connect_via, $dbh);
525
526   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
527       $old_connect_via = $DBI::connect_via;
528       $DBI::connect_via = 'connect';
529   }
530
531   eval {
532     if(ref $info[0] eq 'CODE') {
533         $dbh = &{$info[0]};
534     }
535     else {
536         $dbh = DBI->connect(@info);
537     }
538   };
539
540   $DBI::connect_via = $old_connect_via if $old_connect_via;
541
542   if (!$dbh || $@) {
543     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
544   }
545
546   $dbh;
547 }
548
549 =head2 txn_begin
550
551 Calls begin_work on the current dbh.
552
553 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
554 an entire code block to be executed transactionally.
555
556 =cut
557
558 sub txn_begin {
559   my $self = shift;
560   if ($self->{transaction_depth}++ == 0) {
561     my $dbh = $self->dbh;
562     if ($dbh->{AutoCommit}) {
563       $self->debugobj->txn_begin()
564         if ($self->debug);
565       $dbh->begin_work;
566     }
567   }
568 }
569
570 =head2 txn_commit
571
572 Issues a commit against the current dbh.
573
574 =cut
575
576 sub txn_commit {
577   my $self = shift;
578   my $dbh = $self->dbh;
579   if ($self->{transaction_depth} == 0) {
580     unless ($dbh->{AutoCommit}) {
581       $self->debugobj->txn_commit()
582         if ($self->debug);
583       $dbh->commit;
584     }
585   }
586   else {
587     if (--$self->{transaction_depth} == 0) {
588       $self->debugobj->txn_commit()
589         if ($self->debug);
590       $dbh->commit;
591     }
592   }
593 }
594
595 =head2 txn_rollback
596
597 Issues a rollback against the current dbh. A nested rollback will
598 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
599 which allows the rollback to propagate to the outermost transaction.
600
601 =cut
602
603 sub txn_rollback {
604   my $self = shift;
605
606   eval {
607     my $dbh = $self->dbh;
608     if ($self->{transaction_depth} == 0) {
609       unless ($dbh->{AutoCommit}) {
610         $self->debugobj->txn_rollback()
611           if ($self->debug);
612         $dbh->rollback;
613       }
614     }
615     else {
616       if (--$self->{transaction_depth} == 0) {
617         $self->debugobj->txn_rollback()
618           if ($self->debug);
619         $dbh->rollback;
620       }
621       else {
622         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
623       }
624     }
625   };
626
627   if ($@) {
628     my $error = $@;
629     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
630     $error =~ /$exception_class/ and $self->throw_exception($error);
631     $self->{transaction_depth} = 0;          # ensure that a failed rollback
632     $self->throw_exception($error);          # resets the transaction depth
633   }
634 }
635
636 sub _execute {
637   my ($self, $op, $extra_bind, $ident, @args) = @_;
638   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
639   unshift(@bind, @$extra_bind) if $extra_bind;
640   if ($self->debug) {
641       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
642       $self->debugobj->query_start($sql, @debug_bind);
643   }
644   my $sth = eval { $self->sth($sql,$op) };
645
646   if (!$sth || $@) {
647     $self->throw_exception(
648       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
649     );
650   }
651   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
652   my $rv;
653   if ($sth) {
654     my $time = time();
655     $rv = eval { $sth->execute(@bind) };
656
657     if ($@ || !$rv) {
658       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
659     }
660   } else {
661     $self->throw_exception("'$sql' did not generate a statement.");
662   }
663   if ($self->debug) {
664       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
665       $self->debugobj->query_end($sql, @debug_bind);
666   }
667   return (wantarray ? ($rv, $sth, @bind) : $rv);
668 }
669
670 sub insert {
671   my ($self, $ident, $to_insert) = @_;
672   $self->throw_exception(
673     "Couldn't insert ".join(', ',
674       map "$_ => $to_insert->{$_}", keys %$to_insert
675     )." into ${ident}"
676   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
677   return $to_insert;
678 }
679
680 sub update {
681   return shift->_execute('update' => [], @_);
682 }
683
684 sub delete {
685   return shift->_execute('delete' => [], @_);
686 }
687
688 sub _select {
689   my ($self, $ident, $select, $condition, $attrs) = @_;
690   my $order = $attrs->{order_by};
691   if (ref $condition eq 'SCALAR') {
692     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
693   }
694   if (exists $attrs->{group_by} || $attrs->{having}) {
695     $order = {
696       group_by => $attrs->{group_by},
697       having => $attrs->{having},
698       ($order ? (order_by => $order) : ())
699     };
700   }
701   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
702   if ($attrs->{software_limit} ||
703       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
704         $attrs->{software_limit} = 1;
705   } else {
706     $self->throw_exception("rows attribute must be positive if present")
707       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
708     push @args, $attrs->{rows}, $attrs->{offset};
709   }
710   return $self->_execute(@args);
711 }
712
713 =head2 select
714
715 Handle a SQL select statement.
716
717 =cut
718
719 sub select {
720   my $self = shift;
721   my ($ident, $select, $condition, $attrs) = @_;
722   return $self->cursor->new($self, \@_, $attrs);
723 }
724
725 =head2 select_single
726
727 Performs a select, fetch and return of data - handles a single row
728 only.
729
730 =cut
731
732 # Need to call finish() to work round broken DBDs
733
734 sub select_single {
735   my $self = shift;
736   my ($rv, $sth, @bind) = $self->_select(@_);
737   my @row = $sth->fetchrow_array;
738   $sth->finish();
739   return @row;
740 }
741
742 =head2 sth
743
744 Returns a L<DBI> sth (statement handle) for the supplied SQL.
745
746 =cut
747
748 sub sth {
749   my ($self, $sql) = @_;
750   # 3 is the if_active parameter which avoids active sth re-use
751   return $self->dbh->prepare_cached($sql, {}, 3);
752 }
753
754 =head2 columns_info_for
755
756 Returns database type info for a given table columns.
757
758 =cut
759
760 sub columns_info_for {
761   my ($self, $table) = @_;
762
763   my $dbh = $self->dbh;
764
765   if ($dbh->can('column_info')) {
766     my %result;
767     my $old_raise_err = $dbh->{RaiseError};
768     my $old_print_err = $dbh->{PrintError};
769     $dbh->{RaiseError} = 1;
770     $dbh->{PrintError} = 0;
771     eval {
772       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
773       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
774       $sth->execute();
775       while ( my $info = $sth->fetchrow_hashref() ){
776         my %column_info;
777         $column_info{data_type}   = $info->{TYPE_NAME};
778         $column_info{size}      = $info->{COLUMN_SIZE};
779         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
780         $column_info{default_value} = $info->{COLUMN_DEF};
781         my $col_name = $info->{COLUMN_NAME};
782         $col_name =~ s/^\"(.*)\"$/$1/;
783
784         $result{$col_name} = \%column_info;
785       }
786     };
787     $dbh->{RaiseError} = $old_raise_err;
788     $dbh->{PrintError} = $old_print_err;
789     return \%result if !$@;
790   }
791
792   my %result;
793   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
794   $sth->execute;
795   my @columns = @{$sth->{NAME_lc}};
796   for my $i ( 0 .. $#columns ){
797     my %column_info;
798     my $type_num = $sth->{TYPE}->[$i];
799     my $type_name;
800     if(defined $type_num && $dbh->can('type_info')) {
801       my $type_info = $dbh->type_info($type_num);
802       $type_name = $type_info->{TYPE_NAME} if $type_info;
803     }
804     $column_info{data_type} = $type_name ? $type_name : $type_num;
805     $column_info{size} = $sth->{PRECISION}->[$i];
806     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
807
808     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
809       $column_info{data_type} = $1;
810       $column_info{size}    = $2;
811     }
812
813     $result{$columns[$i]} = \%column_info;
814   }
815
816   return \%result;
817 }
818
819 =head2 last_insert_id
820
821 Return the row id of the last insert.
822
823 =cut
824
825 sub last_insert_id {
826   my ($self, $row) = @_;
827     
828   return $self->dbh->func('last_insert_rowid');
829
830 }
831
832 =head2 sqlt_type
833
834 Returns the database driver name.
835
836 =cut
837
838 sub sqlt_type { shift->dbh->{Driver}->{Name} }
839
840 =head2 create_ddl_dir (EXPERIMENTAL)
841
842 =over 4
843
844 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
845
846 =back
847
848 Creates an SQL file based on the Schema, for each of the specified
849 database types, in the given directory.
850
851 Note that this feature is currently EXPERIMENTAL and may not work correctly
852 across all databases, or fully handle complex relationships.
853
854 =cut
855
856 sub create_ddl_dir
857 {
858   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
859
860   if(!$dir || !-d $dir)
861   {
862     warn "No directory given, using ./\n";
863     $dir = "./";
864   }
865   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
866   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
867   $version ||= $schema->VERSION || '1.x';
868
869   eval "use SQL::Translator";
870   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
871
872   my $sqlt = SQL::Translator->new({
873 #      debug => 1,
874       add_drop_table => 1,
875   });
876   foreach my $db (@$databases)
877   {
878     $sqlt->reset();
879     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
880 #    $sqlt->parser_args({'DBIx::Class' => $schema);
881     $sqlt->data($schema);
882     $sqlt->producer($db);
883
884     my $file;
885     my $filename = $schema->ddl_filename($db, $dir, $version);
886     if(-e $filename)
887     {
888       $self->throw_exception("$filename already exists, skipping $db");
889       next;
890     }
891     open($file, ">$filename") 
892       or $self->throw_exception("Can't open $filename for writing ($!)");
893     my $output = $sqlt->translate;
894 #use Data::Dumper;
895 #    print join(":", keys %{$schema->source_registrations});
896 #    print Dumper($sqlt->schema);
897     if(!$output)
898     {
899       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
900       next;
901     }
902     print $file $output;
903     close($file);
904   }
905
906 }
907
908 =head2 deployment_statements
909
910 Create the statements for L</deploy> and
911 L<DBIx::Class::Schema/deploy>.
912
913 =cut
914
915 sub deployment_statements {
916   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
917   $type ||= $self->sqlt_type;
918   $version ||= $schema->VERSION || '1.x';
919   $dir ||= './';
920   eval "use SQL::Translator";
921   if(!$@)
922   {
923     eval "use SQL::Translator::Parser::DBIx::Class;";
924     $self->throw_exception($@) if $@;
925     eval "use SQL::Translator::Producer::${type};";
926     $self->throw_exception($@) if $@;
927     my $tr = SQL::Translator->new(%$sqltargs);
928     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
929     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
930   }
931
932   my $filename = $schema->ddl_filename($type, $dir, $version);
933   if(!-f $filename)
934   {
935 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
936       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
937       return;
938   }
939   my $file;
940   open($file, "<$filename") 
941       or $self->throw_exception("Can't open $filename ($!)");
942   my @rows = <$file>;
943   close($file);
944
945   return join('', @rows);
946   
947 }
948
949 =head2 deploy
950
951 Sends the appropriate statements to create or modify tables to the
952 db. This would normally be called through
953 L<DBIx::Class::Schema/deploy>.
954
955 =cut
956
957 sub deploy {
958   my ($self, $schema, $type, $sqltargs) = @_;
959   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
960     for ( split(";\n", $statement)) {
961       next if($_ =~ /^--/);
962       next if(!$_);
963 #      next if($_ =~ /^DROP/m);
964       next if($_ =~ /^BEGIN TRANSACTION/m);
965       next if($_ =~ /^COMMIT/m);
966       $self->debugobj->query_begin($_) if $self->debug;
967       $self->dbh->do($_) or warn "SQL was:\n $_";
968       $self->debugobj->query_end($_) if $self->debug;
969     }
970   }
971 }
972
973 =head2 datetime_parser
974
975 Returns the datetime parser class
976
977 =cut
978
979 sub datetime_parser {
980   my $self = shift;
981   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
982 }
983
984 =head2 datetime_parser_type
985
986 Defines (returns) the datetime parser class - currently hardwired to
987 L<DateTime::Format::MySQL>
988
989 =cut
990
991 sub datetime_parser_type { "DateTime::Format::MySQL"; }
992
993 =head2 build_datetime_parser
994
995 See L</datetime_parser>
996
997 =cut
998
999 sub build_datetime_parser {
1000   my $self = shift;
1001   my $type = $self->datetime_parser_type(@_);
1002   eval "use ${type}";
1003   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1004   return $type;
1005 }
1006
1007 sub DESTROY { shift->disconnect }
1008
1009 1;
1010
1011 =head1 SQL METHODS
1012
1013 The module defines a set of methods within the DBIC::SQL::Abstract
1014 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1015 SQL query functions.
1016
1017 The following methods are extended:-
1018
1019 =over 4
1020
1021 =item delete
1022
1023 =item insert
1024
1025 =item select
1026
1027 =item update
1028
1029 =item limit_dialect
1030
1031 =item quote_char
1032
1033 =item name_sep
1034
1035 =back
1036
1037 =head1 ENVIRONMENT VARIABLES
1038
1039 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1040
1041 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1042 is produced (as when the L<debug> method is set).
1043
1044 If the value is of the form C<1=/path/name> then the trace output is
1045 written to the file C</path/name>.
1046
1047 This environment variable is checked when the storage object is first
1048 created (when you call connect on your schema).  So, run-time changes 
1049 to this environment variable will not take effect unless you also 
1050 re-connect on your schema.
1051
1052 =head1 AUTHORS
1053
1054 Matt S. Trout <mst@shadowcatsystems.co.uk>
1055
1056 Andy Grundman <andy@hybridized.org>
1057
1058 =head1 LICENSE
1059
1060 You may distribute this code under the same terms as Perl itself.
1061
1062 =cut
1063