skip empty queries to eliminate spurious warnings on ->deploy
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI.pm
1 package DBIx::Class::Storage::DBI;
2 # -*- mode: cperl; cperl-indent-level: 2 -*-
3
4 use base 'DBIx::Class::Storage';
5
6 use strict;
7 use warnings;
8 use DBI;
9 use SQL::Abstract::Limit;
10 use DBIx::Class::Storage::DBI::Cursor;
11 use DBIx::Class::Storage::Statistics;
12 use IO::File;
13 use Carp::Clan qw/DBIx::Class/;
14 BEGIN {
15
16 package DBIC::SQL::Abstract; # Would merge upstream, but nate doesn't reply :(
17
18 use base qw/SQL::Abstract::Limit/;
19
20 # This prevents the caching of $dbh in S::A::L, I believe
21 sub new {
22   my $self = shift->SUPER::new(@_);
23
24   # If limit_dialect is a ref (like a $dbh), go ahead and replace
25   #   it with what it resolves to:
26   $self->{limit_dialect} = $self->_find_syntax($self->{limit_dialect})
27     if ref $self->{limit_dialect};
28
29   $self;
30 }
31
32 # While we're at it, this should make LIMIT queries more efficient,
33 #  without digging into things too deeply
34 sub _find_syntax {
35   my ($self, $syntax) = @_;
36   $self->{_cached_syntax} ||= $self->SUPER::_find_syntax($syntax);
37 }
38
39 sub select {
40   my ($self, $table, $fields, $where, $order, @rest) = @_;
41   $table = $self->_quote($table) unless ref($table);
42   local $self->{rownum_hack_count} = 1
43     if (defined $rest[0] && $self->{limit_dialect} eq 'RowNum');
44   @rest = (-1) unless defined $rest[0];
45   die "LIMIT 0 Does Not Compute" if $rest[0] == 0;
46     # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
47   local $self->{having_bind} = [];
48   my ($sql, @ret) = $self->SUPER::select(
49     $table, $self->_recurse_fields($fields), $where, $order, @rest
50   );
51   return wantarray ? ($sql, @ret, @{$self->{having_bind}}) : $sql;
52 }
53
54 sub insert {
55   my $self = shift;
56   my $table = shift;
57   $table = $self->_quote($table) unless ref($table);
58   $self->SUPER::insert($table, @_);
59 }
60
61 sub update {
62   my $self = shift;
63   my $table = shift;
64   $table = $self->_quote($table) unless ref($table);
65   $self->SUPER::update($table, @_);
66 }
67
68 sub delete {
69   my $self = shift;
70   my $table = shift;
71   $table = $self->_quote($table) unless ref($table);
72   $self->SUPER::delete($table, @_);
73 }
74
75 sub _emulate_limit {
76   my $self = shift;
77   if ($_[3] == -1) {
78     return $_[1].$self->_order_by($_[2]);
79   } else {
80     return $self->SUPER::_emulate_limit(@_);
81   }
82 }
83
84 sub _recurse_fields {
85   my ($self, $fields) = @_;
86   my $ref = ref $fields;
87   return $self->_quote($fields) unless $ref;
88   return $$fields if $ref eq 'SCALAR';
89
90   if ($ref eq 'ARRAY') {
91     return join(', ', map {
92       $self->_recurse_fields($_)
93       .(exists $self->{rownum_hack_count}
94          ? ' AS col'.$self->{rownum_hack_count}++
95          : '')
96      } @$fields);
97   } elsif ($ref eq 'HASH') {
98     foreach my $func (keys %$fields) {
99       return $self->_sqlcase($func)
100         .'( '.$self->_recurse_fields($fields->{$func}).' )';
101     }
102   }
103 }
104
105 sub _order_by {
106   my $self = shift;
107   my $ret = '';
108   my @extra;
109   if (ref $_[0] eq 'HASH') {
110     if (defined $_[0]->{group_by}) {
111       $ret = $self->_sqlcase(' group by ')
112                .$self->_recurse_fields($_[0]->{group_by});
113     }
114     if (defined $_[0]->{having}) {
115       my $frag;
116       ($frag, @extra) = $self->_recurse_where($_[0]->{having});
117       push(@{$self->{having_bind}}, @extra);
118       $ret .= $self->_sqlcase(' having ').$frag;
119     }
120     if (defined $_[0]->{order_by}) {
121       $ret .= $self->SUPER::_order_by($_[0]->{order_by});
122     }
123   } elsif (ref $_[0] eq 'SCALAR') {
124     $ret = $self->_sqlcase(' order by ').${ $_[0] };
125   } elsif (ref $_[0] eq 'ARRAY' && @{$_[0]}) {
126     my @order = @{+shift};
127     $ret = $self->_sqlcase(' order by ')
128           .join(', ', map {
129                         my $r = $self->_order_by($_, @_);
130                         $r =~ s/^ ?ORDER BY //i;
131                         $r;
132                       } @order);
133   } else {
134     $ret = $self->SUPER::_order_by(@_);
135   }
136   return $ret;
137 }
138
139 sub _order_directions {
140   my ($self, $order) = @_;
141   $order = $order->{order_by} if ref $order eq 'HASH';
142   return $self->SUPER::_order_directions($order);
143 }
144
145 sub _table {
146   my ($self, $from) = @_;
147   if (ref $from eq 'ARRAY') {
148     return $self->_recurse_from(@$from);
149   } elsif (ref $from eq 'HASH') {
150     return $self->_make_as($from);
151   } else {
152     return $from; # would love to quote here but _table ends up getting called
153                   # twice during an ->select without a limit clause due to
154                   # the way S::A::Limit->select works. should maybe consider
155                   # bypassing this and doing S::A::select($self, ...) in
156                   # our select method above. meantime, quoting shims have
157                   # been added to select/insert/update/delete here
158   }
159 }
160
161 sub _recurse_from {
162   my ($self, $from, @join) = @_;
163   my @sqlf;
164   push(@sqlf, $self->_make_as($from));
165   foreach my $j (@join) {
166     my ($to, $on) = @$j;
167
168     # check whether a join type exists
169     my $join_clause = '';
170     my $to_jt = ref($to) eq 'ARRAY' ? $to->[0] : $to;
171     if (ref($to_jt) eq 'HASH' and exists($to_jt->{-join_type})) {
172       $join_clause = ' '.uc($to_jt->{-join_type}).' JOIN ';
173     } else {
174       $join_clause = ' JOIN ';
175     }
176     push(@sqlf, $join_clause);
177
178     if (ref $to eq 'ARRAY') {
179       push(@sqlf, '(', $self->_recurse_from(@$to), ')');
180     } else {
181       push(@sqlf, $self->_make_as($to));
182     }
183     push(@sqlf, ' ON ', $self->_join_condition($on));
184   }
185   return join('', @sqlf);
186 }
187
188 sub _make_as {
189   my ($self, $from) = @_;
190   return join(' ', map { (ref $_ eq 'SCALAR' ? $$_ : $self->_quote($_)) }
191                      reverse each %{$self->_skip_options($from)});
192 }
193
194 sub _skip_options {
195   my ($self, $hash) = @_;
196   my $clean_hash = {};
197   $clean_hash->{$_} = $hash->{$_}
198     for grep {!/^-/} keys %$hash;
199   return $clean_hash;
200 }
201
202 sub _join_condition {
203   my ($self, $cond) = @_;
204   if (ref $cond eq 'HASH') {
205     my %j;
206     for (keys %$cond) {
207       my $x = '= '.$self->_quote($cond->{$_}); $j{$_} = \$x;
208     };
209     return $self->_recurse_where(\%j);
210   } elsif (ref $cond eq 'ARRAY') {
211     return join(' OR ', map { $self->_join_condition($_) } @$cond);
212   } else {
213     die "Can't handle this yet!";
214   }
215 }
216
217 sub _quote {
218   my ($self, $label) = @_;
219   return '' unless defined $label;
220   return "*" if $label eq '*';
221   return $label unless $self->{quote_char};
222   if(ref $self->{quote_char} eq "ARRAY"){
223     return $self->{quote_char}->[0] . $label . $self->{quote_char}->[1]
224       if !defined $self->{name_sep};
225     my $sep = $self->{name_sep};
226     return join($self->{name_sep},
227         map { $self->{quote_char}->[0] . $_ . $self->{quote_char}->[1]  }
228        split(/\Q$sep\E/,$label));
229   }
230   return $self->SUPER::_quote($label);
231 }
232
233 sub limit_dialect {
234     my $self = shift;
235     $self->{limit_dialect} = shift if @_;
236     return $self->{limit_dialect};
237 }
238
239 sub quote_char {
240     my $self = shift;
241     $self->{quote_char} = shift if @_;
242     return $self->{quote_char};
243 }
244
245 sub name_sep {
246     my $self = shift;
247     $self->{name_sep} = shift if @_;
248     return $self->{name_sep};
249 }
250
251 } # End of BEGIN block
252
253 use base qw/DBIx::Class/;
254
255 __PACKAGE__->load_components(qw/AccessorGroup/);
256
257 __PACKAGE__->mk_group_accessors('simple' =>
258   qw/_connect_info _dbh _sql_maker _sql_maker_opts _conn_pid _conn_tid
259      debug debugobj cursor on_connect_do transaction_depth/);
260
261 =head1 NAME
262
263 DBIx::Class::Storage::DBI - DBI storage handler
264
265 =head1 SYNOPSIS
266
267 =head1 DESCRIPTION
268
269 This class represents the connection to the database
270
271 =head1 METHODS
272
273 =head2 new
274
275 =cut
276
277 sub new {
278   my $new = bless({}, ref $_[0] || $_[0]);
279   $new->cursor("DBIx::Class::Storage::DBI::Cursor");
280   $new->transaction_depth(0);
281
282   $new->debugobj(new DBIx::Class::Storage::Statistics());
283
284   my $fh;
285
286   my $debug_env = $ENV{DBIX_CLASS_STORAGE_DBI_DEBUG}
287                   || $ENV{DBIC_TRACE};
288
289   if (defined($debug_env) && ($debug_env =~ /=(.+)$/)) {
290     $fh = IO::File->new($1, 'w')
291       or $new->throw_exception("Cannot open trace file $1");
292   } else {
293     $fh = IO::File->new('>&STDERR');
294   }
295   $new->debugfh($fh);
296   $new->debug(1) if $debug_env;
297   $new->_sql_maker_opts({});
298   return $new;
299 }
300
301 =head2 throw_exception
302
303 Throws an exception - croaks.
304
305 =cut
306
307 sub throw_exception {
308   my ($self, $msg) = @_;
309   croak($msg);
310 }
311
312 =head2 connect_info
313
314 The arguments of C<connect_info> are always a single array reference.
315
316 This is normally accessed via L<DBIx::Class::Schema/connection>, which
317 encapsulates its argument list in an arrayref before calling
318 C<connect_info> here.
319
320 The arrayref can either contain the same set of arguments one would
321 normally pass to L<DBI/connect>, or a lone code reference which returns
322 a connected database handle.
323
324 In either case, if the final argument in your connect_info happens
325 to be a hashref, C<connect_info> will look there for several
326 connection-specific options:
327
328 =over 4
329
330 =item on_connect_do
331
332 This can be set to an arrayref of literal sql statements, which will
333 be executed immediately after making the connection to the database
334 every time we [re-]connect.
335
336 =item limit_dialect 
337
338 Sets the limit dialect. This is useful for JDBC-bridge among others
339 where the remote SQL-dialect cannot be determined by the name of the
340 driver alone.
341
342 =item quote_char
343
344 Specifies what characters to use to quote table and column names. If 
345 you use this you will want to specify L<name_sep> as well.
346
347 quote_char expects either a single character, in which case is it is placed
348 on either side of the table/column, or an arrayref of length 2 in which case the
349 table/column name is placed between the elements.
350
351 For example under MySQL you'd use C<quote_char =E<gt> '`'>, and user SQL Server you'd 
352 use C<quote_char =E<gt> [qw/[ ]/]>.
353
354 =item name_sep
355
356 This only needs to be used in conjunction with L<quote_char>, and is used to 
357 specify the charecter that seperates elements (schemas, tables, columns) from 
358 each other. In most cases this is simply a C<.>.
359
360 =back
361
362 These options can be mixed in with your other L<DBI> connection attributes,
363 or placed in a seperate hashref after all other normal L<DBI> connection
364 arguments.
365
366 Every time C<connect_info> is invoked, any previous settings for
367 these options will be cleared before setting the new ones, regardless of
368 whether any options are specified in the new C<connect_info>.
369
370 Examples:
371
372   # Simple SQLite connection
373   ->connect_info([ 'dbi:SQLite:./foo.db' ]);
374
375   # Connect via subref
376   ->connect_info([ sub { DBI->connect(...) } ]);
377
378   # A bit more complicated
379   ->connect_info(
380     [
381       'dbi:Pg:dbname=foo',
382       'postgres',
383       'my_pg_password',
384       { AutoCommit => 0 },
385       { quote_char => q{"}, name_sep => q{.} },
386     ]
387   );
388
389   # Equivalent to the previous example
390   ->connect_info(
391     [
392       'dbi:Pg:dbname=foo',
393       'postgres',
394       'my_pg_password',
395       { AutoCommit => 0, quote_char => q{"}, name_sep => q{.} },
396     ]
397   );
398
399   # Subref + DBIC-specific connection options
400   ->connect_info(
401     [
402       sub { DBI->connect(...) },
403       {
404           quote_char => q{`},
405           name_sep => q{@},
406           on_connect_do => ['SET search_path TO myschema,otherschema,public'],
407       },
408     ]
409   );
410
411 =head2 on_connect_do
412
413 This method is deprecated in favor of setting via L</connect_info>.
414
415 =head2 debug
416
417 Causes SQL trace information to be emitted on the C<debugobj> object.
418 (or C<STDERR> if C<debugobj> has not specifically been set).
419
420 =head2 debugfh
421
422 Set or retrieve the filehandle used for trace/debug output.  This should be
423 an IO::Handle compatible ojbect (only the C<print> method is used.  Initially
424 set to be STDERR - although see information on the
425 L<DBIC_TRACE> environment variable.
426
427 =cut
428
429 sub debugfh {
430     my $self = shift;
431
432     if ($self->debugobj->can('debugfh')) {
433         return $self->debugobj->debugfh(@_);
434     }
435 }
436
437 =head2 debugobj
438
439 Sets or retrieves the object used for metric collection. Defaults to an instance
440 of L<DBIx::Class::Storage::Statistics> that is campatible with the original
441 method of using a coderef as a callback.  See the aforementioned Statistics
442 class for more information.
443
444 =head2 debugcb
445
446 Sets a callback to be executed each time a statement is run; takes a sub
447 reference.  Callback is executed as $sub->($op, $info) where $op is
448 SELECT/INSERT/UPDATE/DELETE and $info is what would normally be printed.
449
450 See L<debugobj> for a better way.
451
452 =cut
453
454 sub debugcb {
455     my $self = shift;
456
457     if ($self->debugobj->can('callback')) {
458         return $self->debugobj->callback(@_);
459     }
460 }
461
462 =head2 disconnect
463
464 Disconnect the L<DBI> handle, performing a rollback first if the
465 database is not in C<AutoCommit> mode.
466
467 =cut
468
469 sub disconnect {
470   my ($self) = @_;
471
472   if( $self->connected ) {
473     $self->_dbh->rollback unless $self->_dbh->{AutoCommit};
474     $self->_dbh->disconnect;
475     $self->_dbh(undef);
476   }
477 }
478
479 =head2 connected
480
481 Check if the L<DBI> handle is connected.  Returns true if the handle
482 is connected.
483
484 =cut
485
486 sub connected { my ($self) = @_;
487
488   if(my $dbh = $self->_dbh) {
489       if(defined $self->_conn_tid && $self->_conn_tid != threads->tid) {
490           return $self->_dbh(undef);
491       }
492       elsif($self->_conn_pid != $$) {
493           $self->_dbh->{InactiveDestroy} = 1;
494           return $self->_dbh(undef);
495       }
496       return ($dbh->FETCH('Active') && $dbh->ping);
497   }
498
499   return 0;
500 }
501
502 =head2 ensure_connected
503
504 Check whether the database handle is connected - if not then make a
505 connection.
506
507 =cut
508
509 sub ensure_connected {
510   my ($self) = @_;
511
512   unless ($self->connected) {
513     $self->_populate_dbh;
514   }
515 }
516
517 =head2 dbh
518
519 Returns the dbh - a data base handle of class L<DBI>.
520
521 =cut
522
523 sub dbh {
524   my ($self) = @_;
525
526   $self->ensure_connected;
527   return $self->_dbh;
528 }
529
530 sub _sql_maker_args {
531     my ($self) = @_;
532     
533     return ( limit_dialect => $self->dbh, %{$self->_sql_maker_opts} );
534 }
535
536 =head2 sql_maker
537
538 Returns a C<sql_maker> object - normally an object of class
539 C<DBIC::SQL::Abstract>.
540
541 =cut
542
543 sub sql_maker {
544   my ($self) = @_;
545   unless ($self->_sql_maker) {
546     $self->_sql_maker(new DBIC::SQL::Abstract( $self->_sql_maker_args ));
547   }
548   return $self->_sql_maker;
549 }
550
551 sub connect_info {
552   my ($self, $info_arg) = @_;
553
554   if($info_arg) {
555     # Kill sql_maker/_sql_maker_opts, so we get a fresh one with only
556     #  the new set of options
557     $self->_sql_maker(undef);
558     $self->_sql_maker_opts({});
559
560     my $info = [ @$info_arg ]; # copy because we can alter it
561     my $last_info = $info->[-1];
562     if(ref $last_info eq 'HASH') {
563       if(my $on_connect_do = delete $last_info->{on_connect_do}) {
564         $self->on_connect_do($on_connect_do);
565       }
566       for my $sql_maker_opt (qw/limit_dialect quote_char name_sep/) {
567         if(my $opt_val = delete $last_info->{$sql_maker_opt}) {
568           $self->_sql_maker_opts->{$sql_maker_opt} = $opt_val;
569         }
570       }
571
572       # Get rid of any trailing empty hashref
573       pop(@$info) if !keys %$last_info;
574     }
575
576     $self->_connect_info($info);
577   }
578
579   $self->_connect_info;
580 }
581
582 sub _populate_dbh {
583   my ($self) = @_;
584   my @info = @{$self->_connect_info || []};
585   $self->_dbh($self->_connect(@info));
586
587   if(ref $self eq 'DBIx::Class::Storage::DBI') {
588     my $driver = $self->_dbh->{Driver}->{Name};
589     if ($self->load_optional_class("DBIx::Class::Storage::DBI::${driver}")) {
590       bless $self, "DBIx::Class::Storage::DBI::${driver}";
591       $self->_rebless() if $self->can('_rebless');
592     }
593   }
594
595   # if on-connect sql statements are given execute them
596   foreach my $sql_statement (@{$self->on_connect_do || []}) {
597     $self->debugobj->query_start($sql_statement) if $self->debug();
598     $self->_dbh->do($sql_statement);
599     $self->debugobj->query_end($sql_statement) if $self->debug();
600   }
601
602   $self->_conn_pid($$);
603   $self->_conn_tid(threads->tid) if $INC{'threads.pm'};
604 }
605
606 sub _connect {
607   my ($self, @info) = @_;
608
609   $self->throw_exception("You failed to provide any connection info")
610       if !@info;
611
612   my ($old_connect_via, $dbh);
613
614   if ($INC{'Apache/DBI.pm'} && $ENV{MOD_PERL}) {
615       $old_connect_via = $DBI::connect_via;
616       $DBI::connect_via = 'connect';
617   }
618
619   eval {
620     $dbh = ref $info[0] eq 'CODE'
621          ? &{$info[0]}
622          : DBI->connect(@info);
623   };
624
625   $DBI::connect_via = $old_connect_via if $old_connect_via;
626
627   if (!$dbh || $@) {
628     $self->throw_exception("DBI Connection failed: " . ($@ || $DBI::errstr));
629   }
630
631   $dbh;
632 }
633
634 =head2 txn_begin
635
636 Calls begin_work on the current dbh.
637
638 See L<DBIx::Class::Schema> for the txn_do() method, which allows for
639 an entire code block to be executed transactionally.
640
641 =cut
642
643 sub txn_begin {
644   my $self = shift;
645   if ($self->{transaction_depth}++ == 0) {
646     my $dbh = $self->dbh;
647     if ($dbh->{AutoCommit}) {
648       $self->debugobj->txn_begin()
649         if ($self->debug);
650       $dbh->begin_work;
651     }
652   }
653 }
654
655 =head2 txn_commit
656
657 Issues a commit against the current dbh.
658
659 =cut
660
661 sub txn_commit {
662   my $self = shift;
663   my $dbh = $self->dbh;
664   if ($self->{transaction_depth} == 0) {
665     unless ($dbh->{AutoCommit}) {
666       $self->debugobj->txn_commit()
667         if ($self->debug);
668       $dbh->commit;
669     }
670   }
671   else {
672     if (--$self->{transaction_depth} == 0) {
673       $self->debugobj->txn_commit()
674         if ($self->debug);
675       $dbh->commit;
676     }
677   }
678 }
679
680 =head2 txn_rollback
681
682 Issues a rollback against the current dbh. A nested rollback will
683 throw a L<DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION> exception,
684 which allows the rollback to propagate to the outermost transaction.
685
686 =cut
687
688 sub txn_rollback {
689   my $self = shift;
690
691   eval {
692     my $dbh = $self->dbh;
693     if ($self->{transaction_depth} == 0) {
694       unless ($dbh->{AutoCommit}) {
695         $self->debugobj->txn_rollback()
696           if ($self->debug);
697         $dbh->rollback;
698       }
699     }
700     else {
701       if (--$self->{transaction_depth} == 0) {
702         $self->debugobj->txn_rollback()
703           if ($self->debug);
704         $dbh->rollback;
705       }
706       else {
707         die DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION->new;
708       }
709     }
710   };
711
712   if ($@) {
713     my $error = $@;
714     my $exception_class = "DBIx::Class::Storage::NESTED_ROLLBACK_EXCEPTION";
715     $error =~ /$exception_class/ and $self->throw_exception($error);
716     $self->{transaction_depth} = 0;          # ensure that a failed rollback
717     $self->throw_exception($error);          # resets the transaction depth
718   }
719 }
720
721 sub _execute {
722   my ($self, $op, $extra_bind, $ident, @args) = @_;
723   my ($sql, @bind) = $self->sql_maker->$op($ident, @args);
724   unshift(@bind, @$extra_bind) if $extra_bind;
725   if ($self->debug) {
726       my @debug_bind = map { defined $_ ? qq{'$_'} : q{'NULL'} } @bind;
727       $self->debugobj->query_start($sql, @debug_bind);
728   }
729   my $sth = eval { $self->sth($sql,$op) };
730
731   if (!$sth || $@) {
732     $self->throw_exception(
733       'no sth generated via sql (' . ($@ || $self->_dbh->errstr) . "): $sql"
734     );
735   }
736   @bind = map { ref $_ ? ''.$_ : $_ } @bind; # stringify args
737   my $rv;
738   if ($sth) {
739     my $time = time();
740     $rv = eval { $sth->execute(@bind) };
741
742     if ($@ || !$rv) {
743       $self->throw_exception("Error executing '$sql': ".($@ || $sth->errstr));
744     }
745   } else {
746     $self->throw_exception("'$sql' did not generate a statement.");
747   }
748   if ($self->debug) {
749       my @debug_bind = map { defined $_ ? qq{`$_'} : q{`NULL'} } @bind;
750       $self->debugobj->query_end($sql, @debug_bind);
751   }
752   return (wantarray ? ($rv, $sth, @bind) : $rv);
753 }
754
755 sub insert {
756   my ($self, $ident, $to_insert) = @_;
757   $self->throw_exception(
758     "Couldn't insert ".join(', ',
759       map "$_ => $to_insert->{$_}", keys %$to_insert
760     )." into ${ident}"
761   ) unless ($self->_execute('insert' => [], $ident, $to_insert));
762   return $to_insert;
763 }
764
765 sub update {
766   return shift->_execute('update' => [], @_);
767 }
768
769 sub delete {
770   return shift->_execute('delete' => [], @_);
771 }
772
773 sub _select {
774   my ($self, $ident, $select, $condition, $attrs) = @_;
775   my $order = $attrs->{order_by};
776   if (ref $condition eq 'SCALAR') {
777     $order = $1 if $$condition =~ s/ORDER BY (.*)$//i;
778   }
779   if (exists $attrs->{group_by} || $attrs->{having}) {
780     $order = {
781       group_by => $attrs->{group_by},
782       having => $attrs->{having},
783       ($order ? (order_by => $order) : ())
784     };
785   }
786   my @args = ('select', $attrs->{bind}, $ident, $select, $condition, $order);
787   if ($attrs->{software_limit} ||
788       $self->sql_maker->_default_limit_syntax eq "GenericSubQ") {
789         $attrs->{software_limit} = 1;
790   } else {
791     $self->throw_exception("rows attribute must be positive if present")
792       if (defined($attrs->{rows}) && !($attrs->{rows} > 0));
793     push @args, $attrs->{rows}, $attrs->{offset};
794   }
795   return $self->_execute(@args);
796 }
797
798 =head2 select
799
800 Handle a SQL select statement.
801
802 =cut
803
804 sub select {
805   my $self = shift;
806   my ($ident, $select, $condition, $attrs) = @_;
807   return $self->cursor->new($self, \@_, $attrs);
808 }
809
810 =head2 select_single
811
812 Performs a select, fetch and return of data - handles a single row
813 only.
814
815 =cut
816
817 # Need to call finish() to work round broken DBDs
818
819 sub select_single {
820   my $self = shift;
821   my ($rv, $sth, @bind) = $self->_select(@_);
822   my @row = $sth->fetchrow_array;
823   $sth->finish();
824   return @row;
825 }
826
827 =head2 sth
828
829 Returns a L<DBI> sth (statement handle) for the supplied SQL.
830
831 =cut
832
833 sub sth {
834   my ($self, $sql) = @_;
835   # 3 is the if_active parameter which avoids active sth re-use
836   return $self->dbh->prepare_cached($sql, {}, 3);
837 }
838
839 =head2 columns_info_for
840
841 Returns database type info for a given table columns.
842
843 =cut
844
845 sub columns_info_for {
846   my ($self, $table) = @_;
847
848   my $dbh = $self->dbh;
849
850   if ($dbh->can('column_info')) {
851     my %result;
852     my $old_raise_err = $dbh->{RaiseError};
853     my $old_print_err = $dbh->{PrintError};
854     $dbh->{RaiseError} = 1;
855     $dbh->{PrintError} = 0;
856     eval {
857       my ($schema,$tab) = $table =~ /^(.+?)\.(.+)$/ ? ($1,$2) : (undef,$table);
858       my $sth = $dbh->column_info( undef,$schema, $tab, '%' );
859       $sth->execute();
860       while ( my $info = $sth->fetchrow_hashref() ){
861         my %column_info;
862         $column_info{data_type}   = $info->{TYPE_NAME};
863         $column_info{size}      = $info->{COLUMN_SIZE};
864         $column_info{is_nullable}   = $info->{NULLABLE} ? 1 : 0;
865         $column_info{default_value} = $info->{COLUMN_DEF};
866         my $col_name = $info->{COLUMN_NAME};
867         $col_name =~ s/^\"(.*)\"$/$1/;
868
869         $result{$col_name} = \%column_info;
870       }
871     };
872     $dbh->{RaiseError} = $old_raise_err;
873     $dbh->{PrintError} = $old_print_err;
874     return \%result if !$@;
875   }
876
877   my %result;
878   my $sth = $dbh->prepare("SELECT * FROM $table WHERE 1=0");
879   $sth->execute;
880   my @columns = @{$sth->{NAME_lc}};
881   for my $i ( 0 .. $#columns ){
882     my %column_info;
883     my $type_num = $sth->{TYPE}->[$i];
884     my $type_name;
885     if(defined $type_num && $dbh->can('type_info')) {
886       my $type_info = $dbh->type_info($type_num);
887       $type_name = $type_info->{TYPE_NAME} if $type_info;
888     }
889     $column_info{data_type} = $type_name ? $type_name : $type_num;
890     $column_info{size} = $sth->{PRECISION}->[$i];
891     $column_info{is_nullable} = $sth->{NULLABLE}->[$i] ? 1 : 0;
892
893     if ($column_info{data_type} =~ m/^(.*?)\((.*?)\)$/) {
894       $column_info{data_type} = $1;
895       $column_info{size}    = $2;
896     }
897
898     $result{$columns[$i]} = \%column_info;
899   }
900
901   return \%result;
902 }
903
904 =head2 last_insert_id
905
906 Return the row id of the last insert.
907
908 =cut
909
910 sub last_insert_id {
911   my ($self, $row) = @_;
912     
913   return $self->dbh->func('last_insert_rowid');
914
915 }
916
917 =head2 sqlt_type
918
919 Returns the database driver name.
920
921 =cut
922
923 sub sqlt_type { shift->dbh->{Driver}->{Name} }
924
925 =head2 create_ddl_dir (EXPERIMENTAL)
926
927 =over 4
928
929 =item Arguments: $schema \@databases, $version, $directory, $sqlt_args
930
931 =back
932
933 Creates an SQL file based on the Schema, for each of the specified
934 database types, in the given directory.
935
936 Note that this feature is currently EXPERIMENTAL and may not work correctly
937 across all databases, or fully handle complex relationships.
938
939 =cut
940
941 sub create_ddl_dir
942 {
943   my ($self, $schema, $databases, $version, $dir, $sqltargs) = @_;
944
945   if(!$dir || !-d $dir)
946   {
947     warn "No directory given, using ./\n";
948     $dir = "./";
949   }
950   $databases ||= ['MySQL', 'SQLite', 'PostgreSQL'];
951   $databases = [ $databases ] if(ref($databases) ne 'ARRAY');
952   $version ||= $schema->VERSION || '1.x';
953
954   eval "use SQL::Translator";
955   $self->throw_exception("Can't deploy without SQL::Translator: $@") if $@;
956
957   my $sqlt = SQL::Translator->new({
958 #      debug => 1,
959       add_drop_table => 1,
960   });
961   foreach my $db (@$databases)
962   {
963     $sqlt->reset();
964     $sqlt->parser('SQL::Translator::Parser::DBIx::Class');
965 #    $sqlt->parser_args({'DBIx::Class' => $schema);
966     $sqlt->data($schema);
967     $sqlt->producer($db);
968
969     my $file;
970     my $filename = $schema->ddl_filename($db, $dir, $version);
971     if(-e $filename)
972     {
973       $self->throw_exception("$filename already exists, skipping $db");
974       next;
975     }
976     open($file, ">$filename") 
977       or $self->throw_exception("Can't open $filename for writing ($!)");
978     my $output = $sqlt->translate;
979 #use Data::Dumper;
980 #    print join(":", keys %{$schema->source_registrations});
981 #    print Dumper($sqlt->schema);
982     if(!$output)
983     {
984       $self->throw_exception("Failed to translate to $db. (" . $sqlt->error . ")");
985       next;
986     }
987     print $file $output;
988     close($file);
989   }
990
991 }
992
993 =head2 deployment_statements
994
995 Create the statements for L</deploy> and
996 L<DBIx::Class::Schema/deploy>.
997
998 =cut
999
1000 sub deployment_statements {
1001   my ($self, $schema, $type, $version, $dir, $sqltargs) = @_;
1002   # Need to be connected to get the correct sqlt_type
1003   $self->ensure_connected() unless $type;
1004   $type ||= $self->sqlt_type;
1005   $version ||= $schema->VERSION || '1.x';
1006   $dir ||= './';
1007   eval "use SQL::Translator";
1008   if(!$@)
1009   {
1010     eval "use SQL::Translator::Parser::DBIx::Class;";
1011     $self->throw_exception($@) if $@;
1012     eval "use SQL::Translator::Producer::${type};";
1013     $self->throw_exception($@) if $@;
1014     my $tr = SQL::Translator->new(%$sqltargs);
1015     SQL::Translator::Parser::DBIx::Class::parse( $tr, $schema );
1016     return "SQL::Translator::Producer::${type}"->can('produce')->($tr);
1017   }
1018
1019   my $filename = $schema->ddl_filename($type, $dir, $version);
1020   if(!-f $filename)
1021   {
1022 #      $schema->create_ddl_dir([ $type ], $version, $dir, $sqltargs);
1023       $self->throw_exception("No SQL::Translator, and no Schema file found, aborting deploy");
1024       return;
1025   }
1026   my $file;
1027   open($file, "<$filename") 
1028       or $self->throw_exception("Can't open $filename ($!)");
1029   my @rows = <$file>;
1030   close($file);
1031
1032   return join('', @rows);
1033   
1034 }
1035
1036 =head2 deploy
1037
1038 Sends the appropriate statements to create or modify tables to the
1039 db. This would normally be called through
1040 L<DBIx::Class::Schema/deploy>.
1041
1042 =cut
1043
1044 sub deploy {
1045   my ($self, $schema, $type, $sqltargs) = @_;
1046   foreach my $statement ( $self->deployment_statements($schema, $type, undef, undef, { no_comments => 1, %$sqltargs }) ) {
1047     for ( split(";\n", $statement)) {
1048       next if($_ =~ /^--/);
1049       next if(!$_);
1050 #      next if($_ =~ /^DROP/m);
1051       next if($_ =~ /^BEGIN TRANSACTION/m);
1052       next if($_ =~ /^COMMIT/m);
1053       next if $_ =~ /^\s+$/; # skip whitespace only
1054       $self->debugobj->query_start($_) if $self->debug;
1055       $self->dbh->do($_) or warn "SQL was:\n $_";
1056       $self->debugobj->query_end($_) if $self->debug;
1057     }
1058   }
1059 }
1060
1061 =head2 datetime_parser
1062
1063 Returns the datetime parser class
1064
1065 =cut
1066
1067 sub datetime_parser {
1068   my $self = shift;
1069   return $self->{datetime_parser} ||= $self->build_datetime_parser(@_);
1070 }
1071
1072 =head2 datetime_parser_type
1073
1074 Defines (returns) the datetime parser class - currently hardwired to
1075 L<DateTime::Format::MySQL>
1076
1077 =cut
1078
1079 sub datetime_parser_type { "DateTime::Format::MySQL"; }
1080
1081 =head2 build_datetime_parser
1082
1083 See L</datetime_parser>
1084
1085 =cut
1086
1087 sub build_datetime_parser {
1088   my $self = shift;
1089   my $type = $self->datetime_parser_type(@_);
1090   eval "use ${type}";
1091   $self->throw_exception("Couldn't load ${type}: $@") if $@;
1092   return $type;
1093 }
1094
1095 sub DESTROY { shift->disconnect }
1096
1097 1;
1098
1099 =head1 SQL METHODS
1100
1101 The module defines a set of methods within the DBIC::SQL::Abstract
1102 namespace.  These build on L<SQL::Abstract::Limit> to provide the
1103 SQL query functions.
1104
1105 The following methods are extended:-
1106
1107 =over 4
1108
1109 =item delete
1110
1111 =item insert
1112
1113 =item select
1114
1115 =item update
1116
1117 =item limit_dialect
1118
1119 See L</connect_info> for details.
1120 For setting, this method is deprecated in favor of L</connect_info>.
1121
1122 =item quote_char
1123
1124 See L</connect_info> for details.
1125 For setting, this method is deprecated in favor of L</connect_info>.
1126
1127 =item name_sep
1128
1129 See L</connect_info> for details.
1130 For setting, this method is deprecated in favor of L</connect_info>.
1131
1132 =back
1133
1134 =head1 ENVIRONMENT VARIABLES
1135
1136 =head2 DBIC_TRACE
1137
1138 If C<DBIC_TRACE> is set then SQL trace information
1139 is produced (as when the L<debug> method is set).
1140
1141 If the value is of the form C<1=/path/name> then the trace output is
1142 written to the file C</path/name>.
1143
1144 This environment variable is checked when the storage object is first
1145 created (when you call connect on your schema).  So, run-time changes 
1146 to this environment variable will not take effect unless you also 
1147 re-connect on your schema.
1148
1149 =head2 DBIX_CLASS_STORAGE_DBI_DEBUG
1150
1151 Old name for DBIC_TRACE
1152
1153 =head1 AUTHORS
1154
1155 Matt S. Trout <mst@shadowcatsystems.co.uk>
1156
1157 Andy Grundman <andy@hybridized.org>
1158
1159 =head1 LICENSE
1160
1161 You may distribute this code under the same terms as Perl itself.
1162
1163 =cut
1164