2b4b68b2b7b5ca8bcf11e5dc8bcdcc35a560986a
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 sub select {
21   my ($self, $table, $fields, $where, $order, @rest) = @_;
22   $table = $self->_quote($table) unless ref($table);
23   @rest = (-1) unless defined $rest[0];
24   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
25     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
26   local $self->{having_bind} = [];
27   my ($sql, @ret) = $self->SUPER::select(
28     $table, $self->_recurse_fields($fields), $where, $order, @rest
29   );
30   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
31 }
32
33 sub insert {
34   my $self = shift;
35   my $table = shift;
36   $table = $self->_quote($table) unless ref($table);
37   $self->SUPER::insert($table, @_);
38 }
39
40 sub update {
41   my $self = shift;
42   my $table = shift;
43   $table = $self->_quote($table) unless ref($table);
44   $self->SUPER::update($table, @_);
45 }
46
47 sub delete {
48   my $self = shift;
49   my $table = shift;
50   $table = $self->_quote($table) unless ref($table);
51   $self->SUPER::delete($table, @_);
52 }
53
54 sub _emulate_limit {
55   my $self = shift;
56   if ($_[3] == -1) {
57     return $_[1].$self->_order_by($_[2]);
58   } else {
59     return $self->SUPER::_emulate_limit(@_);
60   }
61 }
62
63 sub _recurse_fields {
64   my ($self, $fields) = @_;
65   my $ref = ref $fields;
66   return $self->_quote($fields) unless $ref;
67   return $$fields if $ref eq 'SCALAR';
68
69   if ($ref eq 'ARRAY') {
70     return join(', ', map { $self->_recurse_fields($_) } @$fields);
71   } elsif ($ref eq 'HASH') {
72     foreach my $func (keys %$fields) {
73       return $self->_sqlcase($func)
74         .'( '.$self->_recurse_fields($fields->{$func}).' )';
75     }
76   }
77 }
78
79 sub _order_by {
80   my $self = shift;
81   my $ret = '';
82   my @extra;
83   if (ref $_[0] eq 'HASH') {
84     if (defined $_[0]->{group_by}) {
85       $ret = $self->_sqlcase(' group by ')
86                .$self->_recurse_fields($_[0]->{group_by});
87     }
88     if (defined $_[0]->{having}) {
89       my $frag;
90       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
91       push(@{$self->{having_bind}}, @extra);
92       $ret .= $self->_sqlcase(' having ').$frag;
93     }
94     if (defined $_[0]->{order_by}) {
95       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
96     }
97   } elsif(ref $_[0] eq 'SCALAR') {
98     $ret = $self->_sqlcase(' order by ').${ $_[0] };
99   } else {
100     $ret = $self->SUPER::_order_by(@_);
101   }
102   return $ret;
103 }
104
105 sub _order_directions {
106   my ($self, $order) = @_;
107   $order = $order->{order_by} if ref $order eq 'HASH';
108   return $self->SUPER::_order_directions($order);
109 }
110
111 sub _table {
112   my ($self, $from) = @_;
113   if (ref $from eq 'ARRAY') {
114     return $self->_recurse_from(@$from);
115   } elsif (ref $from eq 'HASH') {
116     return $self->_make_as($from);
117   } else {
118     return $from; # would love to quote here but _table ends up getting called
119                   # twice during an ->select without a limit clause due to
120                   # the way S::A::Limit->select works. should maybe consider
121                   # bypassing this and doing S::A::select($self, ...) in
122                   # our select method above. meantime, quoting shims have
123                   # been added to select/insert/update/delete here
124   }
125 }
126
127 sub _recurse_from {
128   my ($self, $from, @join) = @_;
129   my @sqlf;
130   push(@sqlf, $self->_make_as($from));
131   foreach my $j (@join) {
132     my ($to, $on) = @$j;
133
134     # check whether a join type exists
135     my $join_clause = '';
136     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
137     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
138       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
139     } else {
140       $join_clause = ' JOIN ';
141     }
142     push(@sqlf, $join_clause);
143
144     if (ref $to eq 'ARRAY') {
145       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
146     } else {
147       push(@sqlf, $self->_make_as($to));
148     }
149     push(@sqlf, ' ON ', $self->_join_condition($on));
150   }
151   return join('', @sqlf);
152 }
153
154 sub _make_as {
155   my ($self, $from) = @_;
156   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
157                      reverse each %{$self->_skip_options($from)});
158 }
159
160 sub _skip_options {
161   my ($self, $hash) = @_;
162   my $clean_hash = {};
163   $clean_hash->{$_} = $hash->{$_}
164     for grep {!/^-/} keys %$hash;
165   return $clean_hash;
166 }
167
168 sub _join_condition {
169   my ($self, $cond) = @_;
170   if (ref $cond eq 'HASH') {
171     my %j;
172     for (keys %$cond) {
173       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
174     };
175     return $self->_recurse_where(\%j);
176   } elsif (ref $cond eq 'ARRAY') {
177     return join(' OR ', map { $self->_join_condition($_) } @$cond);
178   } else {
179     die "Can't handle this yet!";
180   }
181 }
182
183 sub _quote {
184   my ($self, $label) = @_;
185   return '' unless defined $label;
186   return "*" if $label eq '*';
187   return $label unless $self->{quote_char};
188   if(ref $self->{quote_char} eq "ARRAY"){
189     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
190       if !defined $self->{name_sep};
191     my $sep = $self->{name_sep};
192     return join($self->{name_sep},
193         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
194        split(/\Q$sep\E/,$label));
195   }
196   return $self->SUPER::_quote($label);
197 }
198
199 sub _RowNum {
200    my $self = shift;
201    my $c;
202    $_[0] =~ s/SELECT (.*?) FROM/
203      'SELECT '.join(', ', map { $_.' AS col'.++$c } split(', ', $1)).' FROM'/e;
204    $self->SUPER::_RowNum(@_);
205 }
206
207 sub limit_dialect {
208     my $self = shift;
209     $self->{limit_dialect} = shift if @_;
210     return $self->{limit_dialect};
211 }
212
213 sub quote_char {
214     my $self = shift;
215     $self->{quote_char} = shift if @_;
216     return $self->{quote_char};
217 }
218
219 sub name_sep {
220     my $self = shift;
221     $self->{name_sep} = shift if @_;
222     return $self->{name_sep};
223 }
224
225 } # End of BEGIN block
226
227 use base qw/DBIx::Class/;
228
229 __PACKAGE__->load_components(qw/AccessorGroup/);
230
231 __PACKAGE__->mk_group_accessors('simple' =>
232   qw/_connect_info _dbh _sql_maker _conn_pid _conn_tid debug debugobj
233      cursor on_connect_do transaction_depth/);
234
235 =head2 new
236
237 =cut
238
239 sub new {
240   my $new = bless({}, ref $_[0] || $_[0]);
241   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
242   $new->transaction_depth(0);
243
244   $new->debugobj(new DBIx::Class::Storage::Statistics());
245
246   my $fh;
247   if (defined($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}) &&
248      ($ENV{DBIX_CLASS_STORAGE_DBI_DEBUG} =~ /=(.+)$/)) {
249     $fh = IO::File->new($1, 'w')
250       or $new->throw_exception("Cannot open trace file $1");
251   } else {
252     $fh = IO::File->new('>&STDERR');
253   }
254   $new->debugfh($fh);
255   $new->debug(1) if $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG};
256   return $new;
257 }
258
259 =head2 throw_exception
260
261 Throws an exception - croaks.
262
263 =cut
264
265 sub throw_exception {
266   my ($self, $msg) = @_;
267   croak($msg);
268 }
269
270 =head1 NAME
271
272 DBIx::Class::Storage::DBI - DBI storage handler
273
274 =head1 SYNOPSIS
275
276 =head1 DESCRIPTION
277
278 This class represents the connection to the database
279
280 =head1 METHODS
281
282 =cut
283
284 =head2 connect_info
285
286 The arguments of C<connect_info> are always a single array reference.
287
288 This is normally accessed via L<DBIx::Class::Schema/connection>, which
289 encapsulates its argument list in an arrayref before calling
290 C<connect_info> here.
291
292 The arrayref can either contain the same set of arguments one would
293 normally pass to L<DBI/connect>, or a lone code reference which returns
294 a connected database handle.
295
296 In either case, there is an optional final element within the arrayref
297 which can hold a hashref of connection-specific Storage::DBI options.
298 These include C<on_connect_do>, and the sql_maker options
299 C<limit_dialect>, C<quote_char>, and C<name_sep>.  Examples:
300
301   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
302
303   ->connect_info([ sub { DBI->connect(...) } ]);
304
305   ->connect_info(
306     [
307       'dbi:Pg:dbname=foo',
308       'postgres',
309       'my_pg_password',
310       { AutoCommit => 0 },
311       { quote_char => q{`}, name_sep => q{@} },
312     ]
313   );
314
315   ->connect_info(
316     [
317       sub { DBI->connect(...) },
318       { quote_char => q{`}, name_sep => q{@} },
319     ]
320   );
321
322 =head2 on_connect_do
323
324 Executes the sql statements given as a listref on every db connect.
325
326 This option can also be set via L</connect_info>.
327
328 =head2 debug
329
330 Causes SQL trace information to be emitted on the C<debugobj> object.
331 (or C<STDERR> if C<debugobj> has not specifically been set).
332
333 =head2 debugfh
334
335 Set or retrieve the filehandle used for trace/debug output.  This should be
336 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
337 set to be STDERR - although see information on the
338 L<DBIX_CLASS_STORAGE_DBI_DEBUG> environment variable.
339
340 =cut
341
342 sub debugfh {
343     my $self = shift;
344
345     if ($self->debugobj->can('debugfh')) {
346         return $self->debugobj->debugfh(@_);
347     }
348 }
349
350 =head2 debugobj
351
352 Sets or retrieves the object used for metric collection. Defaults to an instance
353 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
354 method of using a coderef as a callback.  See the aforementioned Statistics
355 class for more information.
356
357 =head2 debugcb
358
359 Sets a callback to be executed each time a statement is run; takes a sub
360 reference.  Callback is executed as $sub->($op, $info) where $op is
361 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
362
363 See L<debugobj> for a better way.
364
365 =cut
366
367 sub debugcb {
368     my $self = shift;
369
370     if ($self->debugobj->can('callback')) {
371         return $self->debugobj->callback(@_);
372     }
373 }
374
375 =head2 disconnect
376
377 Disconnect the L<DBI> handle, performing a rollback first if the
378 database is not in C<AutoCommit> mode.
379
380 =cut
381
382 sub disconnect {
383   my ($self) = @_;
384
385   if( $self->connected ) {
386     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
387     $self->_dbh->disconnect;
388     $self->_dbh(undef);
389   }
390 }
391
392 =head2 connected
393
394 Check if the L<DBI> handle is connected.  Returns true if the handle
395 is connected.
396
397 =cut
398
399 sub connected { my ($self) = @_;
400
401   if(my $dbh = $self->_dbh) {
402       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
403           $self->_sql_maker(undef);
404           return $self->_dbh(undef);
405       }
406       elsif($self->_conn_pid != $$) {
407           $self->_dbh->{InactiveDestroy} = 1;
408           $self->_sql_maker(undef);
409           return $self->_dbh(undef)
410       }
411       return ($dbh->FETCH('Active') && $dbh->ping);
412   }
413
414   return 0;
415 }
416
417 =head2 ensure_connected
418
419 Check whether the database handle is connected - if not then make a
420 connection.
421
422 =cut
423
424 sub ensure_connected {
425   my ($self) = @_;
426
427   unless ($self->connected) {
428     $self->_populate_dbh;
429   }
430 }
431
432 =head2 dbh
433
434 Returns the dbh - a data base handle of class L<DBI>.
435
436 =cut
437
438 sub dbh {
439   my ($self) = @_;
440
441   $self->ensure_connected;
442   return $self->_dbh;
443 }
444
445 sub _sql_maker_args {
446     my ($self) = @_;
447     
448     return ( limit_dialect => $self->dbh );
449 }
450
451 =head2 sql_maker
452
453 Returns a C<sql_maker> object - normally an object of class
454 C<DBIC::SQL::Abstract>.
455
456 =cut
457
458 sub sql_maker {
459   my ($self) = @_;
460   unless ($self->_sql_maker) {
461     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
462   }
463   return $self->_sql_maker;
464 }
465
466 sub connect_info {
467   my ($self, $info_arg) = @_;
468
469   if($info_arg) {
470     my $info = [ @$info_arg ]; # copy because we can alter it
471     my $last_info = $info->[-1];
472     if(ref $last_info eq 'HASH') {
473       my $used;
474       if(my $on_connect_do = $last_info->{on_connect_do}) {
475         $used = 1;
476         $self->on_connect_do($on_connect_do);
477       }
478       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
479         if(my $opt_val = $last_info->{$sql_maker_opt}) {
480           $used = 1;
481           $self->sql_maker->$sql_maker_opt($opt_val);
482         }
483       }
484
485       # remove our options hashref if it was there, to avoid confusing
486       #   DBI in the case the user didn't use all 4 DBI options, as in:
487       #   [ 'dbi:SQLite:foo.db', { quote_char => q{`} } ]
488       pop(@$info) if $used;
489     }
490
491     $self->_connect_info($info);
492   }
493
494   $self->_connect_info;
495 }
496
497 sub _populate_dbh {
498   my ($self) = @_;
499   my @info = @{$self->_connect_info || []};
500   $self->_dbh($self->_connect(@info));
501
502   if(ref $self eq 'DBIx::Class::Storage::DBI') {
503     my $driver = $self->_dbh->{Driver}->{Name};
504     eval "require DBIx::Class::Storage::DBI::${driver}";
505     unless ($@) {
506       bless $self, "DBIx::Class::Storage::DBI::${driver}";
507       $self->_rebless() if $self->can('_rebless');
508     }
509   }
510
511   # if on-connect sql statements are given execute them
512   foreach my $sql_statement (@{$self->on_connect_do || []}) {
513     $self->debugobj->query_start($sql_statement) if $self->debug();
514     $self->_dbh->do($sql_statement);
515     $self->debugobj->query_end($sql_statement) if $self->debug();
516   }
517
518   $self->_conn_pid($$);
519   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
520 }
521
522 sub _connect {
523   my ($self, @info) = @_;
524
525   $self->throw_exception("You failed to provide any connection info")
526       if !@info;
527
528   my ($old_connect_via, $dbh);
529
530   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
531       $old_connect_via = $DBI::connect_via;
532       $DBI::connect_via = 'connect';
533   }
534
535   eval {
536     $dbh = ref $info[0] eq 'CODE'
537          ? &{$info[0]}
538          : DBI->connect(@info);
539   };
540
541   $DBI::connect_via = $old_connect_via if $old_connect_via;
542
543   if (!$dbh || $@) {
544     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
545   }
546
547   $dbh;
548 }
549
550 =head2 txn_begin
551
552 Calls begin_work on the current dbh.
553
554 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
555 an entire code block to be executed transactionally.
556
557 =cut
558
559 sub txn_begin {
560   my $self = shift;
561   if ($self->{transaction_depth}++ == 0) {
562     my $dbh = $self->dbh;
563     if ($dbh->{AutoCommit}) {
564       $self->debugobj->txn_begin()
565         if ($self->debug);
566       $dbh->begin_work;
567     }
568   }
569 }
570
571 =head2 txn_commit
572
573 Issues a commit against the current dbh.
574
575 =cut
576
577 sub txn_commit {
578   my $self = shift;
579   my $dbh = $self->dbh;
580   if ($self->{transaction_depth} == 0) {
581     unless ($dbh->{AutoCommit}) {
582       $self->debugobj->txn_commit()
583         if ($self->debug);
584       $dbh->commit;
585     }
586   }
587   else {
588     if (--$self->{transaction_depth} == 0) {
589       $self->debugobj->txn_commit()
590         if ($self->debug);
591       $dbh->commit;
592     }
593   }
594 }
595
596 =head2 txn_rollback
597
598 Issues a rollback against the current dbh. A nested rollback will
599 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
600 which allows the rollback to propagate to the outermost transaction.
601
602 =cut
603
604 sub txn_rollback {
605   my $self = shift;
606
607   eval {
608     my $dbh = $self->dbh;
609     if ($self->{transaction_depth} == 0) {
610       unless ($dbh->{AutoCommit}) {
611         $self->debugobj->txn_rollback()
612           if ($self->debug);
613         $dbh->rollback;
614       }
615     }
616     else {
617       if (--$self->{transaction_depth} == 0) {
618         $self->debugobj->txn_rollback()
619           if ($self->debug);
620         $dbh->rollback;
621       }
622       else {
623         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
624       }
625     }
626   };
627
628   if ($@) {
629     my $error = $@;
630     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
631     $error =~ /$exception_class/ and $self->throw_exception($error);
632     $self->{transaction_depth} = 0;          # ensure that a failed rollback
633     $self->throw_exception($error);          # resets the transaction depth
634   }
635 }
636
637 sub _execute {
638   my ($self, $op, $extra_bind, $ident, @args) = @_;
639   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
640   unshift(@bind, @$extra_bind) if $extra_bind;
641   if ($self->debug) {
642       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
643       $self->debugobj->query_start($sql, @debug_bind);
644   }
645   my $sth = eval { $self->sth($sql,$op) };
646
647   if (!$sth || $@) {
648     $self->throw_exception(
649       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
650     );
651   }
652   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
653   my $rv;
654   if ($sth) {
655     my $time = time();
656     $rv = eval { $sth->execute(@bind) };
657
658     if ($@ || !$rv) {
659       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
660     }
661   } else {
662     $self->throw_exception("'$sql' did not generate a statement.");
663   }
664   if ($self->debug) {
665       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
666       $self->debugobj->query_end($sql, @debug_bind);
667   }
668   return (wantarray ? ($rv, $sth, @bind) : $rv);
669 }
670
671 sub insert {
672   my ($self, $ident, $to_insert) = @_;
673   $self->throw_exception(
674     "Couldn't insert ".join(', ',
675       map "$_ => $to_insert->{$_}", keys %$to_insert
676     )." into ${ident}"
677   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
678   return $to_insert;
679 }
680
681 sub update {
682   return shift->_execute('update' => [], @_);
683 }
684
685 sub delete {
686   return shift->_execute('delete' => [], @_);
687 }
688
689 sub _select {
690   my ($self, $ident, $select, $condition, $attrs) = @_;
691   my $order = $attrs->{order_by};
692   if (ref $condition eq 'SCALAR') {
693     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
694   }
695   if (exists $attrs->{group_by} || $attrs->{having}) {
696     $order = {
697       group_by => $attrs->{group_by},
698       having => $attrs->{having},
699       ($order ? (order_by => $order) : ())
700     };
701   }
702   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
703   if ($attrs->{software_limit} ||
704       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
705         $attrs->{software_limit} = 1;
706   } else {
707     $self->throw_exception("rows attribute must be positive if present")
708       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
709     push @args, $attrs->{rows}, $attrs->{offset};
710   }
711   return $self->_execute(@args);
712 }
713
714 =head2 select
715
716 Handle a SQL select statement.
717
718 =cut
719
720 sub select {
721   my $self = shift;
722   my ($ident, $select, $condition, $attrs) = @_;
723   return $self->cursor->new($self, \@_, $attrs);
724 }
725
726 =head2 select_single
727
728 Performs a select, fetch and return of data - handles a single row
729 only.
730
731 =cut
732
733 # Need to call finish() to work round broken DBDs
734
735 sub select_single {
736   my $self = shift;
737   my ($rv, $sth, @bind) = $self->_select(@_);
738   my @row = $sth->fetchrow_array;
739   $sth->finish();
740   return @row;
741 }
742
743 =head2 sth
744
745 Returns a L<DBI> sth (statement handle) for the supplied SQL.
746
747 =cut
748
749 sub sth {
750   my ($self, $sql) = @_;
751   # 3 is the if_active parameter which avoids active sth re-use
752   return $self->dbh->prepare_cached($sql, {}, 3);
753 }
754
755 =head2 columns_info_for
756
757 Returns database type info for a given table columns.
758
759 =cut
760
761 sub columns_info_for {
762   my ($self, $table) = @_;
763
764   my $dbh = $self->dbh;
765
766   if ($dbh->can('column_info')) {
767     my %result;
768     my $old_raise_err = $dbh->{RaiseError};
769     my $old_print_err = $dbh->{PrintError};
770     $dbh->{RaiseError} = 1;
771     $dbh->{PrintError} = 0;
772     eval {
773       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
774       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
775       $sth->execute();
776       while ( my $info = $sth->fetchrow_hashref() ){
777         my %column_info;
778         $column_info{data_type}   = $info->{TYPE_NAME};
779         $column_info{size}      = $info->{COLUMN_SIZE};
780         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
781         $column_info{default_value} = $info->{COLUMN_DEF};
782         my $col_name = $info->{COLUMN_NAME};
783         $col_name =~ s/^\"(.*)\"$/$1/;
784
785         $result{$col_name} = \%column_info;
786       }
787     };
788     $dbh->{RaiseError} = $old_raise_err;
789     $dbh->{PrintError} = $old_print_err;
790     return \%result if !$@;
791   }
792
793   my %result;
794   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
795   $sth->execute;
796   my @columns = @{$sth->{NAME_lc}};
797   for my $i ( 0 .. $#columns ){
798     my %column_info;
799     my $type_num = $sth->{TYPE}->[$i];
800     my $type_name;
801     if(defined $type_num && $dbh->can('type_info')) {
802       my $type_info = $dbh->type_info($type_num);
803       $type_name = $type_info->{TYPE_NAME} if $type_info;
804     }
805     $column_info{data_type} = $type_name ? $type_name : $type_num;
806     $column_info{size} = $sth->{PRECISION}->[$i];
807     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
808
809     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
810       $column_info{data_type} = $1;
811       $column_info{size}    = $2;
812     }
813
814     $result{$columns[$i]} = \%column_info;
815   }
816
817   return \%result;
818 }
819
820 =head2 last_insert_id
821
822 Return the row id of the last insert.
823
824 =cut
825
826 sub last_insert_id {
827   my ($self, $row) = @_;
828     
829   return $self->dbh->func('last_insert_rowid');
830
831 }
832
833 =head2 sqlt_type
834
835 Returns the database driver name.
836
837 =cut
838
839 sub sqlt_type { shift->dbh->{Driver}->{Name} }
840
841 =head2 create_ddl_dir (EXPERIMENTAL)
842
843 =over 4
844
845 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
846
847 =back
848
849 Creates an SQL file based on the Schema, for each of the specified
850 database types, in the given directory.
851
852 Note that this feature is currently EXPERIMENTAL and may not work correctly
853 across all databases, or fully handle complex relationships.
854
855 =cut
856
857 sub create_ddl_dir
858 {
859   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
860
861   if(!$dir || !-d $dir)
862   {
863     warn "No directory given, using ./\n";
864     $dir = "./";
865   }
866   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
867   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
868   $version ||= $schema->VERSION || '1.x';
869
870   eval "use SQL::Translator";
871   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
872
873   my $sqlt = SQL::Translator->new({
874 #      debug => 1,
875       add_drop_table => 1,
876   });
877   foreach my $db (@$databases)
878   {
879     $sqlt->reset();
880     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
881 #    $sqlt->parser_args({'DBIx::Class' => $schema);
882     $sqlt->data($schema);
883     $sqlt->producer($db);
884
885     my $file;
886     my $filename = $schema->ddl_filename($db, $dir, $version);
887     if(-e $filename)
888     {
889       $self->throw_exception("$filename already exists, skipping $db");
890       next;
891     }
892     open($file, ">$filename") 
893       or $self->throw_exception("Can't open $filename for writing ($!)");
894     my $output = $sqlt->translate;
895 #use Data::Dumper;
896 #    print join(":", keys %{$schema->source_registrations});
897 #    print Dumper($sqlt->schema);
898     if(!$output)
899     {
900       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
901       next;
902     }
903     print $file $output;
904     close($file);
905   }
906
907 }
908
909 =head2 deployment_statements
910
911 Create the statements for L</deploy> and
912 L<DBIx::Class::Schema/deploy>.
913
914 =cut
915
916 sub deployment_statements {
917   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
918   $type ||= $self->sqlt_type;
919   $version ||= $schema->VERSION || '1.x';
920   $dir ||= './';
921   eval "use SQL::Translator";
922   if(!$@)
923   {
924     eval "use SQL::Translator::Parser::DBIx::Class;";
925     $self->throw_exception($@) if $@;
926     eval "use SQL::Translator::Producer::${type};";
927     $self->throw_exception($@) if $@;
928     my $tr = SQL::Translator->new(%$sqltargs);
929     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
930     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
931   }
932
933   my $filename = $schema->ddl_filename($type, $dir, $version);
934   if(!-f $filename)
935   {
936 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
937       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
938       return;
939   }
940   my $file;
941   open($file, "<$filename") 
942       or $self->throw_exception("Can't open $filename ($!)");
943   my @rows = <$file>;
944   close($file);
945
946   return join('', @rows);
947   
948 }
949
950 =head2 deploy
951
952 Sends the appropriate statements to create or modify tables to the
953 db. This would normally be called through
954 L<DBIx::Class::Schema/deploy>.
955
956 =cut
957
958 sub deploy {
959   my ($self, $schema, $type, $sqltargs) = @_;
960   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, $sqltargs) ) {
961     for ( split(";\n", $statement)) {
962       next if($_ =~ /^--/);
963       next if(!$_);
964 #      next if($_ =~ /^DROP/m);
965       next if($_ =~ /^BEGIN TRANSACTION/m);
966       next if($_ =~ /^COMMIT/m);
967       $self->debugobj->query_begin($_) if $self->debug;
968       $self->dbh->do($_) or warn "SQL was:\n $_";
969       $self->debugobj->query_end($_) if $self->debug;
970     }
971   }
972 }
973
974 =head2 datetime_parser
975
976 Returns the datetime parser class
977
978 =cut
979
980 sub datetime_parser {
981   my $self = shift;
982   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
983 }
984
985 =head2 datetime_parser_type
986
987 Defines (returns) the datetime parser class - currently hardwired to
988 L<DateTime::Format::MySQL>
989
990 =cut
991
992 sub datetime_parser_type { "DateTime::Format::MySQL"; }
993
994 =head2 build_datetime_parser
995
996 See L</datetime_parser>
997
998 =cut
999
1000 sub build_datetime_parser {
1001   my $self = shift;
1002   my $type = $self->datetime_parser_type(@_);
1003   eval "use ${type}";
1004   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1005   return $type;
1006 }
1007
1008 sub DESTROY { shift->disconnect }
1009
1010 1;
1011
1012 =head1 SQL METHODS
1013
1014 The module defines a set of methods within the DBIC::SQL::Abstract
1015 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1016 SQL query functions.
1017
1018 The following methods are extended:-
1019
1020 =over 4
1021
1022 =item delete
1023
1024 =item insert
1025
1026 =item select
1027
1028 =item update
1029
1030 =item limit_dialect
1031
1032 Accessor for setting limit dialect. This is useful
1033 for JDBC-bridge among others where the remote SQL-dialect cannot
1034 be determined by the name of the driver alone.
1035
1036 This option can also be set via L</connect_info>.
1037
1038 =item quote_char
1039
1040 Specifies what characters to use to quote table and column names. If 
1041 you use this you will want to specify L<name_sep> as well.
1042
1043 quote_char expectes either a single character, in which case is it is placed
1044 on either side of the table/column, or an arrayref of length 2 in which case the
1045 table/column name is placed between the elements.
1046
1047 For example under MySQL you'd use C<quote_char('`')>, and user SQL Server you'd 
1048 use C<quote_char(qw/[ ]/)>.
1049
1050 This option can also be set via L</connect_info>.
1051
1052 =item name_sep
1053
1054 This only needs to be used in conjunction with L<quote_char>, and is used to 
1055 specify the charecter that seperates elements (schemas, tables, columns) from 
1056 each other. In most cases this is simply a C<.>.
1057
1058 This option can also be set via L</connect_info>.
1059
1060 =back
1061
1062 =head1 ENVIRONMENT VARIABLES
1063
1064 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1065
1066 If C<DBIX_CLASS_STORAGE_DBI_DEBUG> is set then SQL trace information
1067 is produced (as when the L<debug> method is set).
1068
1069 If the value is of the form C<1=/path/name> then the trace output is
1070 written to the file C</path/name>.
1071
1072 This environment variable is checked when the storage object is first
1073 created (when you call connect on your schema).  So, run-time changes 
1074 to this environment variable will not take effect unless you also 
1075 re-connect on your schema.
1076
1077 =head1 AUTHORS
1078
1079 Matt S. Trout <mst@shadowcatsystems.co.uk>
1080
1081 Andy Grundman <andy@hybridized.org>
1082
1083 =head1 LICENSE
1084
1085 You may distribute this code under the same terms as Perl itself.
1086
1087 =cut
1088