fixed wrong debugging hook call query_begin() to query_start() in Storage::DBI
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
137     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
138       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
139     } else {
140       $join_clause = ' JOIN ';
141     }
142     push(@sqlf, $join_clause);
143
144     if (ref $to eq 'ARRAY') {
145       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
146     } else {
147       push(@sqlf, $self->_make_as($to));
148     }
149     push(@sqlf, ' ON ', $self->_join_condition($on));
150   }
151   return join('', @sqlf);
152 }
153
154 sub _make_as {
155   my ($self, $from) = @_;
156   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
157                      reverse each %{$self->_skip_options($from)});
158 }
159
160 sub _skip_options {
161   my ($self, $hash) = @_;
162   my $clean_hash = {};
163   $clean_hash->{$_} = $hash->{$_}
164     for grep {!/^-/} keys %$hash;
165   return $clean_hash;
166 }
167
168 sub _join_condition {
169   my ($self, $cond) = @_;
170   if (ref $cond eq 'HASH') {
171     my %j;
172     for (keys %$cond) {
173       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
174     };
175     return $self->_recurse_where(\%j);
176   } elsif (ref $cond eq 'ARRAY') {
177     return join(' OR ', map { $self->_join_condition($_) } @$cond);
178   } else {
179     die "Can't handle this yet!";
180   }
181 }
182
183 sub _quote {
184   my ($self, $label) = @_;
185   return '' unless defined $label;
186   return "*" if $label eq '*';
187   return $label unless $self->{quote_char};
188   if(ref $self->{quote_char} eq "ARRAY"){
189     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
190       if !defined $self->{name_sep};
191     my $sep = $self->{name_sep};
192     return join($self->{name_sep},
193         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
194        split(/\Q$sep\E/,$label));
195   }
196   return $self->SUPER::_quote($label);
197 }
198
199 sub _RowNum {
200    my $self = shift;
201    my $c;
202    $_[0] =~ s/SELECT (.*?) FROM/
203      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
204    $self->SUPER::_RowNum(@_);
205 }
206
207 sub limit_dialect {
208     my $self = shift;
209     $self->{limit_dialect} = shift if @_;
210     return $self->{limit_dialect};
211 }
212
213 sub quote_char {
214     my $self = shift;
215     $self->{quote_char} = shift if @_;
216     return $self->{quote_char};
217 }
218
219 sub name_sep {
220     my $self = shift;
221     $self->{name_sep} = shift if @_;
222     return $self->{name_sep};
223 }
224
225 } # End of BEGIN block
226
227 use base qw/DBIx::Class/;
228
229 __PACKAGE__->load_components(qw/AccessorGroup/);
230
231 __PACKAGE__->mk_group_accessors('simple' =>
232   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
233      cursor on_connect_do transaction_depth/);
234
235 =head2 new
236
237 =cut
238
239 sub new {
240   my $new = bless({}, ref $_[0] || $_[0]);
241   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
242   $new->transaction_depth(0);
243
244   $new->debugobj(new DBIx::Class::Storage::Statistics());
245
246   my $fh;
247   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
248      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
249     $fh = IO::File->new($1, 'w')
250       or $new->throw_exception("Cannot open trace file $1");
251   } else {
252     $fh = IO::File->new('>&STDERR');
253   }
254   $new->debugfh($fh);
255   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
256   return $new;
257 }
258
259 =head2 throw_exception
260
261 Throws an exception - croaks.
262
263 =cut
264
265 sub throw_exception {
266   my ($self, $msg) = @_;
267   croak($msg);
268 }
269
270 =head1 NAME
271
272 DBIx::Class::Storage::DBI - DBI storage handler
273
274 =head1 SYNOPSIS
275
276 =head1 DESCRIPTION
277
278 This class represents the connection to the database
279
280 =head1 METHODS
281
282 =cut
283
284 =head2 connect_info
285
286 The arguments of C<connect_info> are always a single array reference.
287
288 This is normally accessed via L<DBIx::Class::Schema/connection>, which
289 encapsulates its argument list in an arrayref before calling
290 C<connect_info> here.
291
292 The arrayref can either contain the same set of arguments one would
293 normally pass to L<DBI/connect>, or a lone code reference which returns
294 a connected database handle.
295
296 In either case, there is an optional final element within the arrayref
297 which can hold a hashref of connection-specific Storage::DBI options.
298 These include C<on_connect_do>, and the sql_maker options
299 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
300
301   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
302
303   ->connect_info([ sub { DBI->connect(...) } ]);
304
305   ->connect_info(
306     [
307       'dbi:Pg:dbname=foo',
308       'postgres',
309       'my_pg_password',
310       { AutoCommit => 0 },
311       { quote_char => q{`}, name_sep => q{@} },
312     ]
313   );
314
315   ->connect_info(
316     [
317       sub { DBI->connect(...) },
318       { quote_char => q{`}, name_sep => q{@} },
319     ]
320   );
321
322 =head2 on_connect_do
323
324 Executes the sql statements given as a listref on every db connect.
325
326 This option can also be set via L</connect_info>.
327
328 =head2 debug
329
330 Causes SQL trace information to be emitted on the C<debugobj> object.
331 (or C<STDERR> if C<debugobj> has not specifically been set).
332
333 =head2 debugfh
334
335 Set or retrieve the filehandle used for trace/debug output.  This should be
336 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
337 set to be STDERR - although see information on the
338 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
339
340 =cut
341
342 sub debugfh {
343     my $self = shift;
344
345     if ($self->debugobj->can('debugfh')) {
346         return $self->debugobj->debugfh(@_);
347     }
348 }
349
350 =head2 debugobj
351
352 Sets or retrieves the object used for metric collection. Defaults to an instance
353 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
354 method of using a coderef as a callback.  See the aforementioned Statistics
355 class for more information.
356
357 =head2 debugcb
358
359 Sets a callback to be executed each time a statement is run; takes a sub
360 reference.  Callback is executed as $sub->($op, $info) where $op is
361 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
362
363 See L<debugobj> for a better way.
364
365 =cut
366
367 sub debugcb {
368     my $self = shift;
369
370     if ($self->debugobj->can('callback')) {
371         return $self->debugobj->callback(@_);
372     }
373 }
374
375 =head2 disconnect
376
377 Disconnect the L<DBI> handle, performing a rollback first if the
378 database is not in C<AutoCommit> mode.
379
380 =cut
381
382 sub disconnect {
383   my ($self) = @_;
384
385   if( $self->connected ) {
386     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
387     $self->_dbh->disconnect;
388     $self->_dbh(undef);
389   }
390 }
391
392 =head2 connected
393
394 Check if the L<DBI> handle is connected.  Returns true if the handle
395 is connected.
396
397 =cut
398
399 sub connected { my ($self) = @_;
400
401   if(my $dbh = $self->_dbh) {
402       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
403           $self->_sql_maker(undef);
404           return $self->_dbh(undef);
405       }
406       elsif($self->_conn_pid != $$) {
407           $self->_dbh->{InactiveDestroy} = 1;
408           $self->_sql_maker(undef);
409           return $self->_dbh(undef)
410       }
411       return ($dbh->FETCH('Active') && $dbh->ping);
412   }
413
414   return 0;
415 }
416
417 =head2 ensure_connected
418
419 Check whether the database handle is connected - if not then make a
420 connection.
421
422 =cut
423
424 sub ensure_connected {
425   my ($self) = @_;
426
427   unless ($self->connected) {
428     $self->_populate_dbh;
429   }
430 }
431
432 =head2 dbh
433
434 Returns the dbh - a data base handle of class L<DBI>.
435
436 =cut
437
438 sub dbh {
439   my ($self) = @_;
440
441   $self->ensure_connected;
442   return $self->_dbh;
443 }
444
445 sub _sql_maker_args {
446     my ($self) = @_;
447     
448     return ( limit_dialect => $self->dbh );
449 }
450
451 =head2 sql_maker
452
453 Returns a C<sql_maker> object - normally an object of class
454 C<DBIC::SQL::Abstract>.
455
456 =cut
457
458 sub sql_maker {
459   my ($self) = @_;
460   unless ($self->_sql_maker) {
461     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
462   }
463   return $self->_sql_maker;
464 }
465
466 sub connect_info {
467   my ($self, $info_arg) = @_;
468
469   if($info_arg) {
470     my %sql_maker_opts;
471     my $info = [ @$info_arg ]; # copy because we can alter it
472     my $last_info = $info->[-1];
473     if(ref $last_info eq 'HASH') {
474       my $used;
475       if(my $on_connect_do = $last_info->{on_connect_do}) {
476         $used = 1;
477         $self->on_connect_do($on_connect_do);
478       }
479       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
480         if(my $opt_val = $last_info->{$sql_maker_opt}) {
481           $used = 1;
482           $sql_maker_opts{$sql_maker_opt} = $opt_val;
483         }
484       }
485
486       # remove our options hashref if it was there, to avoid confusing
487       #   DBI in the case the user didn't use all 4 DBI options, as in:
488       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
489       pop(@$info) if $used;
490     }
491
492     $self->_connect_info($info);
493     $self->sql_maker->$_($sql_maker_opts{$_}) for(keys %sql_maker_opts);
494   }
495
496   $self->_connect_info;
497 }
498
499 sub _populate_dbh {
500   my ($self) = @_;
501   my @info = @{$self->_connect_info || []};
502   $self->_dbh($self->_connect(@info));
503
504   if(ref $self eq 'DBIx::Class::Storage::DBI') {
505     my $driver = $self->_dbh->{Driver}->{Name};
506     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
507       bless $self, "DBIx::Class::Storage::DBI::${driver}";
508       $self->_rebless() if $self->can('_rebless');
509     }
510   }
511
512   # if on-connect sql statements are given execute them
513   foreach my $sql_statement (@{$self->on_connect_do || []}) {
514     $self->debugobj->query_start($sql_statement) if $self->debug();
515     $self->_dbh->do($sql_statement);
516     $self->debugobj->query_end($sql_statement) if $self->debug();
517   }
518
519   $self->_conn_pid($$);
520   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
521 }
522
523 sub _connect {
524   my ($self, @info) = @_;
525
526   $self->throw_exception("You failed to provide any connection info")
527       if !@info;
528
529   my ($old_connect_via, $dbh);
530
531   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
532       $old_connect_via = $DBI::connect_via;
533       $DBI::connect_via = 'connect';
534   }
535
536   eval {
537     $dbh = ref $info[0] eq 'CODE'
538          ? &{$info[0]}
539          : DBI->connect(@info);
540   };
541
542   $DBI::connect_via = $old_connect_via if $old_connect_via;
543
544   if (!$dbh || $@) {
545     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
546   }
547
548   $dbh;
549 }
550
551 =head2 txn_begin
552
553 Calls begin_work on the current dbh.
554
555 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
556 an entire code block to be executed transactionally.
557
558 =cut
559
560 sub txn_begin {
561   my $self = shift;
562   if ($self->{transaction_depth}++ == 0) {
563     my $dbh = $self->dbh;
564     if ($dbh->{AutoCommit}) {
565       $self->debugobj->txn_begin()
566         if ($self->debug);
567       $dbh->begin_work;
568     }
569   }
570 }
571
572 =head2 txn_commit
573
574 Issues a commit against the current dbh.
575
576 =cut
577
578 sub txn_commit {
579   my $self = shift;
580   my $dbh = $self->dbh;
581   if ($self->{transaction_depth} == 0) {
582     unless ($dbh->{AutoCommit}) {
583       $self->debugobj->txn_commit()
584         if ($self->debug);
585       $dbh->commit;
586     }
587   }
588   else {
589     if (--$self->{transaction_depth} == 0) {
590       $self->debugobj->txn_commit()
591         if ($self->debug);
592       $dbh->commit;
593     }
594   }
595 }
596
597 =head2 txn_rollback
598
599 Issues a rollback against the current dbh. A nested rollback will
600 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
601 which allows the rollback to propagate to the outermost transaction.
602
603 =cut
604
605 sub txn_rollback {
606   my $self = shift;
607
608   eval {
609     my $dbh = $self->dbh;
610     if ($self->{transaction_depth} == 0) {
611       unless ($dbh->{AutoCommit}) {
612         $self->debugobj->txn_rollback()
613           if ($self->debug);
614         $dbh->rollback;
615       }
616     }
617     else {
618       if (--$self->{transaction_depth} == 0) {
619         $self->debugobj->txn_rollback()
620           if ($self->debug);
621         $dbh->rollback;
622       }
623       else {
624         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
625       }
626     }
627   };
628
629   if ($@) {
630     my $error = $@;
631     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
632     $error =~ /$exception_class/ and $self->throw_exception($error);
633     $self->{transaction_depth} = 0;          # ensure that a failed rollback
634     $self->throw_exception($error);          # resets the transaction depth
635   }
636 }
637
638 sub _execute {
639   my ($self, $op, $extra_bind, $ident, @args) = @_;
640   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
641   unshift(@bind, @$extra_bind) if $extra_bind;
642   if ($self->debug) {
643       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
644       $self->debugobj->query_start($sql, @debug_bind);
645   }
646   my $sth = eval { $self->sth($sql,$op) };
647
648   if (!$sth || $@) {
649     $self->throw_exception(
650       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
651     );
652   }
653   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
654   my $rv;
655   if ($sth) {
656     my $time = time();
657     $rv = eval { $sth->execute(@bind) };
658
659     if ($@ || !$rv) {
660       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
661     }
662   } else {
663     $self->throw_exception("'$sql' did not generate a statement.");
664   }
665   if ($self->debug) {
666       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
667       $self->debugobj->query_end($sql, @debug_bind);
668   }
669   return (wantarray ? ($rv, $sth, @bind) : $rv);
670 }
671
672 sub insert {
673   my ($self, $ident, $to_insert) = @_;
674   $self->throw_exception(
675     "Couldn't insert ".join(', ',
676       map "$_ => $to_insert->{$_}", keys %$to_insert
677     )." into ${ident}"
678   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
679   return $to_insert;
680 }
681
682 sub update {
683   return shift->_execute('update' => [], @_);
684 }
685
686 sub delete {
687   return shift->_execute('delete' => [], @_);
688 }
689
690 sub _select {
691   my ($self, $ident, $select, $condition, $attrs) = @_;
692   my $order = $attrs->{order_by};
693   if (ref $condition eq 'SCALAR') {
694     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
695   }
696   if (exists $attrs->{group_by} || $attrs->{having}) {
697     $order = {
698       group_by => $attrs->{group_by},
699       having => $attrs->{having},
700       ($order ? (order_by => $order) : ())
701     };
702   }
703   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
704   if ($attrs->{software_limit} ||
705       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
706         $attrs->{software_limit} = 1;
707   } else {
708     $self->throw_exception("rows attribute must be positive if present")
709       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
710     push @args, $attrs->{rows}, $attrs->{offset};
711   }
712   return $self->_execute(@args);
713 }
714
715 =head2 select
716
717 Handle a SQL select statement.
718
719 =cut
720
721 sub select {
722   my $self = shift;
723   my ($ident, $select, $condition, $attrs) = @_;
724   return $self->cursor->new($self, \@_, $attrs);
725 }
726
727 =head2 select_single
728
729 Performs a select, fetch and return of data - handles a single row
730 only.
731
732 =cut
733
734 # Need to call finish() to work round broken DBDs
735
736 sub select_single {
737   my $self = shift;
738   my ($rv, $sth, @bind) = $self->_select(@_);
739   my @row = $sth->fetchrow_array;
740   $sth->finish();
741   return @row;
742 }
743
744 =head2 sth
745
746 Returns a L<DBI> sth (statement handle) for the supplied SQL.
747
748 =cut
749
750 sub sth {
751   my ($self, $sql) = @_;
752   # 3 is the if_active parameter which avoids active sth re-use
753   return $self->dbh->prepare_cached($sql, {}, 3);
754 }
755
756 =head2 columns_info_for
757
758 Returns database type info for a given table columns.
759
760 =cut
761
762 sub columns_info_for {
763   my ($self, $table) = @_;
764
765   my $dbh = $self->dbh;
766
767   if ($dbh->can('column_info')) {
768     my %result;
769     my $old_raise_err = $dbh->{RaiseError};
770     my $old_print_err = $dbh->{PrintError};
771     $dbh->{RaiseError} = 1;
772     $dbh->{PrintError} = 0;
773     eval {
774       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
775       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
776       $sth->execute();
777       while ( my $info = $sth->fetchrow_hashref() ){
778         my %column_info;
779         $column_info{data_type}   = $info->{TYPE_NAME};
780         $column_info{size}      = $info->{COLUMN_SIZE};
781         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
782         $column_info{default_value} = $info->{COLUMN_DEF};
783         my $col_name = $info->{COLUMN_NAME};
784         $col_name =~ s/^\"(.*)\"$/$1/;
785
786         $result{$col_name} = \%column_info;
787       }
788     };
789     $dbh->{RaiseError} = $old_raise_err;
790     $dbh->{PrintError} = $old_print_err;
791     return \%result if !$@;
792   }
793
794   my %result;
795   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
796   $sth->execute;
797   my @columns = @{$sth->{NAME_lc}};
798   for my $i ( 0 .. $#columns ){
799     my %column_info;
800     my $type_num = $sth->{TYPE}->[$i];
801     my $type_name;
802     if(defined $type_num && $dbh->can('type_info')) {
803       my $type_info = $dbh->type_info($type_num);
804       $type_name = $type_info->{TYPE_NAME} if $type_info;
805     }
806     $column_info{data_type} = $type_name ? $type_name : $type_num;
807     $column_info{size} = $sth->{PRECISION}->[$i];
808     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
809
810     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
811       $column_info{data_type} = $1;
812       $column_info{size}    = $2;
813     }
814
815     $result{$columns[$i]} = \%column_info;
816   }
817
818   return \%result;
819 }
820
821 =head2 last_insert_id
822
823 Return the row id of the last insert.
824
825 =cut
826
827 sub last_insert_id {
828   my ($self, $row) = @_;
829     
830   return $self->dbh->func('last_insert_rowid');
831
832 }
833
834 =head2 sqlt_type
835
836 Returns the database driver name.
837
838 =cut
839
840 sub sqlt_type { shift->dbh->{Driver}->{Name} }
841
842 =head2 create_ddl_dir (EXPERIMENTAL)
843
844 =over 4
845
846 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
847
848 =back
849
850 Creates an SQL file based on the Schema, for each of the specified
851 database types, in the given directory.
852
853 Note that this feature is currently EXPERIMENTAL and may not work correctly
854 across all databases, or fully handle complex relationships.
855
856 =cut
857
858 sub create_ddl_dir
859 {
860   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
861
862   if(!$dir || !-d $dir)
863   {
864     warn "No directory given, using ./\n";
865     $dir = "./";
866   }
867   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
868   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
869   $version ||= $schema->VERSION || '1.x';
870
871   eval "use SQL::Translator";
872   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
873
874   my $sqlt = SQL::Translator->new({
875 #      debug => 1,
876       add_drop_table => 1,
877   });
878   foreach my $db (@$databases)
879   {
880     $sqlt->reset();
881     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
882 #    $sqlt->parser_args({'DBIx::Class' => $schema);
883     $sqlt->data($schema);
884     $sqlt->producer($db);
885
886     my $file;
887     my $filename = $schema->ddl_filename($db, $dir, $version);
888     if(-e $filename)
889     {
890       $self->throw_exception("$filename already exists, skipping $db");
891       next;
892     }
893     open($file, ">$filename") 
894       or $self->throw_exception("Can't open $filename for writing ($!)");
895     my $output = $sqlt->translate;
896 #use Data::Dumper;
897 #    print join(":", keys %{$schema->source_registrations});
898 #    print Dumper($sqlt->schema);
899     if(!$output)
900     {
901       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
902       next;
903     }
904     print $file $output;
905     close($file);
906   }
907
908 }
909
910 =head2 deployment_statements
911
912 Create the statements for L</deploy> and
913 L<DBIx::Class::Schema/deploy>.
914
915 =cut
916
917 sub deployment_statements {
918   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
919   $type ||= $self->sqlt_type;
920   $version ||= $schema->VERSION || '1.x';
921   $dir ||= './';
922   eval "use SQL::Translator";
923   if(!$@)
924   {
925     eval "use SQL::Translator::Parser::DBIx::Class;";
926     $self->throw_exception($@) if $@;
927     eval "use SQL::Translator::Producer::${type};";
928     $self->throw_exception($@) if $@;
929     my $tr = SQL::Translator->new(%$sqltargs);
930     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
931     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
932   }
933
934   my $filename = $schema->ddl_filename($type, $dir, $version);
935   if(!-f $filename)
936   {
937 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
938       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
939       return;
940   }
941   my $file;
942   open($file, "<$filename") 
943       or $self->throw_exception("Can't open $filename ($!)");
944   my @rows = <$file>;
945   close($file);
946
947   return join('', @rows);
948   
949 }
950
951 =head2 deploy
952
953 Sends the appropriate statements to create or modify tables to the
954 db. This would normally be called through
955 L<DBIx::Class::Schema/deploy>.
956
957 =cut
958
959 sub deploy {
960   my ($self, $schema, $type, $sqltargs) = @_;
961   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
962     for ( split(";\n", $statement)) {
963       next if($_ =~ /^--/);
964       next if(!$_);
965 #      next if($_ =~ /^DROP/m);
966       next if($_ =~ /^BEGIN TRANSACTION/m);
967       next if($_ =~ /^COMMIT/m);
968       $self->debugobj->query_start($_) if $self->debug;
969       $self->dbh->do($_) or warn "SQL was:\n $_";
970       $self->debugobj->query_end($_) if $self->debug;
971     }
972   }
973 }
974
975 =head2 datetime_parser
976
977 Returns the datetime parser class
978
979 =cut
980
981 sub datetime_parser {
982   my $self = shift;
983   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
984 }
985
986 =head2 datetime_parser_type
987
988 Defines (returns) the datetime parser class - currently hardwired to
989 L<DateTime::Format::MySQL>
990
991 =cut
992
993 sub datetime_parser_type { "DateTime::Format::MySQL"; }
994
995 =head2 build_datetime_parser
996
997 See L</datetime_parser>
998
999 =cut
1000
1001 sub build_datetime_parser {
1002   my $self = shift;
1003   my $type = $self->datetime_parser_type(@_);
1004   eval "use ${type}";
1005   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1006   return $type;
1007 }
1008
1009 sub DESTROY { shift->disconnect }
1010
1011 1;
1012
1013 =head1 SQL METHODS
1014
1015 The module defines a set of methods within the DBIC::SQL::Abstract
1016 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1017 SQL query functions.
1018
1019 The following methods are extended:-
1020
1021 =over 4
1022
1023 =item delete
1024
1025 =item insert
1026
1027 =item select
1028
1029 =item update
1030
1031 =item limit_dialect
1032
1033 Accessor for setting limit dialect. This is useful
1034 for JDBC-bridge among others where the remote SQL-dialect cannot
1035 be determined by the name of the driver alone.
1036
1037 This option can also be set via L</connect_info>.
1038
1039 =item quote_char
1040
1041 Specifies what characters to use to quote table and column names. If 
1042 you use this you will want to specify L<name_sep> as well.
1043
1044 quote_char expectes either a single character, in which case is it is placed
1045 on either side of the table/column, or an arrayref of length 2 in which case the
1046 table/column name is placed between the elements.
1047
1048 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1049 use C<quote_char(qw/[ ]/)>.
1050
1051 This option can also be set via L</connect_info>.
1052
1053 =item name_sep
1054
1055 This only needs to be used in conjunction with L<quote_char>, and is used to 
1056 specify the charecter that seperates elements (schemas, tables, columns) from 
1057 each other. In most cases this is simply a C<.>.
1058
1059 This option can also be set via L</connect_info>.
1060
1061 =back
1062
1063 =head1 ENVIRONMENT VARIABLES
1064
1065 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1066
1067 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1068 is produced (as when the L<debug> method is set).
1069
1070 If the value is of the form C<1=/path/name> then the trace output is
1071 written to the file C</path/name>.
1072
1073 This environment variable is checked when the storage object is first
1074 created (when you call connect on your schema).  So, run-time changes 
1075 to this environment variable will not take effect unless you also 
1076 re-connect on your schema.
1077
1078 =head1 AUTHORS
1079
1080 Matt S. Trout <mst@shadowcatsystems.co.uk>
1081
1082 Andy Grundman <andy@hybridized.org>
1083
1084 =head1 LICENSE
1085
1086 You may distribute this code under the same terms as Perl itself.
1087
1088 =cut
1089