fix connection setup for Sybase
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Sybase.pm
1 package DBIx::Class::Storage::DBI::Sybase;
2
3 use strict;
4 use warnings;
5
6 use base qw/
7     DBIx::Class::Storage::DBI::Sybase::Common
8     DBIx::Class::Storage::DBI::AutoCast
9 /;
10 use mro 'c3';
11 use Carp::Clan qw/^DBIx::Class/;
12 use List::Util();
13 use Sub::Name();
14 use Data::Dumper::Concise();
15
16 __PACKAGE__->mk_group_accessors('simple' =>
17     qw/_identity _blob_log_on_update _writer_storage _is_extra_storage
18        _bulk_storage _is_bulk_storage _began_bulk_work
19        _bulk_disabled_due_to_coderef_connect_info_warned
20        _identity_method/
21 );
22
23 my @also_proxy_to_extra_storages = qw/
24   connect_call_set_auto_cast auto_cast connect_call_blob_setup
25   connect_call_datetime_setup
26
27   disconnect _connect_info _sql_maker _sql_maker_opts disable_sth_caching
28   auto_savepoint unsafe cursor_class debug debugobj schema
29 /;
30
31 =head1 NAME
32
33 DBIx::Class::Storage::DBI::Sybase - Sybase support for DBIx::Class
34
35 =head1 SYNOPSIS
36
37 This subclass supports L<DBD::Sybase> for real Sybase databases.  If you are
38 using an MSSQL database via L<DBD::Sybase>, your storage will be reblessed to
39 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
40
41 =head1 DESCRIPTION
42
43 If your version of Sybase does not support placeholders, then your storage
44 will be reblessed to L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>. You can
45 also enable that driver explicitly, see the documentation for more details.
46
47 With this driver there is unfortunately no way to get the C<last_insert_id>
48 without doing a C<SELECT MAX(col)>. This is done safely in a transaction
49 (locking the table.) See L</INSERTS WITH PLACEHOLDERS>.
50
51 A recommended L<DBIx::Class::Storage::DBI/connect_info> setting:
52
53   on_connect_call => [['datetime_setup'], ['blob_setup', log_on_update => 0]]
54
55 =head1 METHODS
56
57 =cut
58
59 sub _rebless {
60   my $self = shift;
61
62   if (ref($self) eq 'DBIx::Class::Storage::DBI::Sybase') {
63     my $dbtype = eval {
64       @{$self->_get_dbh->selectrow_arrayref(qq{sp_server_info \@attribute_id=1})}[2]
65     } || '';
66     $self->throw_exception("Unable to estable connection to determine database type: $@")
67       if $@;
68
69     $dbtype =~ s/\W/_/gi;
70     my $subclass = "DBIx::Class::Storage::DBI::Sybase::${dbtype}";
71
72     if ($dbtype && $self->load_optional_class($subclass)) {
73       bless $self, $subclass;
74       $self->_rebless;
75     } else { # real Sybase
76       my $no_bind_vars = 'DBIx::Class::Storage::DBI::Sybase::NoBindVars';
77
78       if ($self->using_freetds) {
79         carp <<'EOF' unless $ENV{DBIC_SYBASE_FREETDS_NOWARN};
80
81 You are using FreeTDS with Sybase.
82
83 We will do our best to support this configuration, but please consider this
84 support experimental.
85
86 TEXT/IMAGE columns will definitely not work.
87
88 You are encouraged to recompile DBD::Sybase with the Sybase Open Client libraries
89 instead.
90
91 See perldoc DBIx::Class::Storage::DBI::Sybase for more details.
92
93 To turn off this warning set the DBIC_SYBASE_FREETDS_NOWARN environment
94 variable.
95 EOF
96         if (not $self->_typeless_placeholders_supported) {
97           if ($self->_placeholders_supported) {
98             $self->auto_cast(1);
99           } else {
100             $self->ensure_class_loaded($no_bind_vars);
101             bless $self, $no_bind_vars;
102             $self->_rebless;
103           }
104         }
105       }
106       elsif (not $self->_get_dbh->{syb_dynamic_supported}) {
107         # not necessarily FreeTDS, but no placeholders nevertheless
108         $self->ensure_class_loaded($no_bind_vars);
109         bless $self, $no_bind_vars;
110         $self->_rebless;
111       } elsif (not $self->_typeless_placeholders_supported) {
112         # this is highly unlikely, but we check just in case
113         $self->auto_cast(1);
114       }
115     }
116   }
117 }
118
119 sub _init {
120   my $self = shift;
121   $self->_set_max_connect(256);
122
123 # this is also done on _populate_dbh, but storage may not be reblessed yet
124   $self->_syb_setup_connection;
125
126 # create storage for insert/(update blob) transactions,
127 # unless this is that storage
128   return if $self->_is_extra_storage;
129
130   my $writer_storage = (ref $self)->new;
131
132   $writer_storage->_is_extra_storage(1);
133   $writer_storage->connect_info($self->connect_info);
134   $writer_storage->auto_cast($self->auto_cast);
135
136   $self->_writer_storage($writer_storage);
137
138 # create a bulk storage unless connect_info is a coderef
139   return
140     if (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE';
141
142   my $bulk_storage = (ref $self)->new;
143
144   $bulk_storage->_is_extra_storage(1);
145   $bulk_storage->_is_bulk_storage(1); # for special ->disconnect acrobatics
146   $bulk_storage->connect_info($self->connect_info);
147
148 # this is why
149   $bulk_storage->_dbi_connect_info->[0] .= ';bulkLogin=1';
150
151   $self->_bulk_storage($bulk_storage);
152 }
153
154 for my $method (@also_proxy_to_extra_storages) {
155   no strict 'refs';
156   no warnings 'redefine';
157
158   my $replaced = __PACKAGE__->can($method);
159
160   *{$method} = Sub::Name::subname $method => sub {
161     my $self = shift;
162     $self->_writer_storage->$replaced(@_) if $self->_writer_storage;
163     $self->_bulk_storage->$replaced(@_)   if $self->_bulk_storage;
164     return $self->$replaced(@_);
165   };
166 }
167
168 sub disconnect {
169   my $self = shift;
170
171 # Even though we call $sth->finish for uses off the bulk API, there's still an
172 # "active statement" warning on disconnect, which we throw away here.
173 # This is due to the bug described in insert_bulk.
174 # Currently a noop because 'prepare' is used instead of 'prepare_cached'.
175   local $SIG{__WARN__} = sub {
176     warn $_[0] unless $_[0] =~ /active statement/i;
177   } if $self->_is_bulk_storage;
178
179 # so that next transaction gets a dbh
180   $self->_began_bulk_work(0) if $self->_is_bulk_storage;
181
182   $self->next::method;
183 }
184
185 sub _populate_dbh {
186   my $self = shift;
187
188   $self->next::method(@_);
189
190   $self->_syb_setup_connection;
191 }
192
193 # Set up session settings for Sybase databases for the connection, called from
194 # _populate_dbh and _init (before _driver_determined .)
195 #
196 # Make sure we have CHAINED mode turned on if AutoCommit is off in non-FreeTDS
197 # DBD::Sybase (since we don't know how DBD::Sybase was compiled.) If however
198 # we're using FreeTDS, CHAINED mode turns on an implicit transaction which we
199 # only want when AutoCommit is off.
200 sub _syb_setup_connection {
201   my $self = shift;
202
203   return unless $self->_driver_determined; # otherwise we screw up MSSQL
204
205   if ($self->_is_bulk_storage) {
206 # this should be cleared on every reconnect
207     $self->_began_bulk_work(0);
208     return;
209   }
210
211   if (not $self->using_freetds) {
212     $self->_dbh->{syb_chained_txn} = 1;
213   } else {
214     # based on LongReadLen in connect_info
215     $self->set_textsize;
216
217     if ($self->_dbh_autocommit) {
218       $self->_dbh->do('SET CHAINED OFF');
219     } else {
220       $self->_dbh->do('SET CHAINED ON');
221     }
222   }
223 }
224
225 =head2 connect_call_blob_setup
226
227 Used as:
228
229   on_connect_call => [ [ 'blob_setup', log_on_update => 0 ] ]
230
231 Does C<< $dbh->{syb_binary_images} = 1; >> to return C<IMAGE> data as raw binary
232 instead of as a hex string.
233
234 Recommended.
235
236 Also sets the C<log_on_update> value for blob write operations. The default is
237 C<1>, but C<0> is better if your database is configured for it.
238
239 See
240 L<DBD::Sybase/Handling_IMAGE/TEXT_data_with_syb_ct_get_data()/syb_ct_send_data()>.
241
242 =cut
243
244 sub connect_call_blob_setup {
245   my $self = shift;
246   my %args = @_;
247   my $dbh = $self->_dbh;
248   $dbh->{syb_binary_images} = 1;
249
250   $self->_blob_log_on_update($args{log_on_update})
251     if exists $args{log_on_update};
252 }
253
254 sub _is_lob_type {
255   my $self = shift;
256   my $type = shift;
257   $type && $type =~ /(?:text|image|lob|bytea|binary|memo)/i;
258 }
259
260 sub _is_lob_column {
261   my ($self, $source, $column) = @_;
262
263   return $self->_is_lob_type($source->column_info($column)->{data_type});
264 }
265
266 sub _prep_for_execute {
267   my $self = shift;
268   my ($op, $extra_bind, $ident, $args) = @_;
269
270   my ($sql, $bind) = $self->next::method (@_);
271
272   my $table = Scalar::Util::blessed($ident) ? $ident->from : $ident;
273
274   my $bind_info = $self->_resolve_column_info(
275     $ident, [map $_->[0], @{$bind}]
276   );
277   my $bound_identity_col = List::Util::first
278     { $bind_info->{$_}{is_auto_increment} }
279     (keys %$bind_info)
280   ;
281   my $identity_col = Scalar::Util::blessed($ident) &&
282     List::Util::first
283     { $ident->column_info($_)->{is_auto_increment} }
284     $ident->columns
285   ;
286
287   if (($op eq 'insert' && $bound_identity_col) ||
288       ($op eq 'update' && exists $args->[0]{$identity_col})) {
289     $sql = join ("\n",
290       $self->_set_table_identity_sql($op => $table, 'on'),
291       $sql,
292       $self->_set_table_identity_sql($op => $table, 'off'),
293     );
294   }
295
296   if ($op eq 'insert' && (not $bound_identity_col) && $identity_col &&
297       (not $self->{insert_bulk})) {
298     $sql =
299       "$sql\n" .
300       $self->_fetch_identity_sql($ident, $identity_col);
301   }
302
303   return ($sql, $bind);
304 }
305
306 sub _set_table_identity_sql {
307   my ($self, $op, $table, $on_off) = @_;
308
309   return sprintf 'SET IDENTITY_%s %s %s',
310     uc($op), $self->sql_maker->_quote($table), uc($on_off);
311 }
312
313 # Stolen from SQLT, with some modifications. This is a makeshift
314 # solution before a sane type-mapping library is available, thus
315 # the 'our' for easy overrides.
316 our %TYPE_MAPPING  = (
317     number    => 'numeric',
318     money     => 'money',
319     varchar   => 'varchar',
320     varchar2  => 'varchar',
321     timestamp => 'datetime',
322     text      => 'varchar',
323     real      => 'double precision',
324     comment   => 'text',
325     bit       => 'bit',
326     tinyint   => 'smallint',
327     float     => 'double precision',
328     serial    => 'numeric',
329     bigserial => 'numeric',
330     boolean   => 'varchar',
331     long      => 'varchar',
332 );
333
334 sub _native_data_type {
335   my ($self, $type) = @_;
336
337   $type = lc $type;
338   $type =~ s/\s* identity//x;
339
340   return uc($TYPE_MAPPING{$type} || $type);
341 }
342
343 sub _fetch_identity_sql {
344   my ($self, $source, $col) = @_;
345
346   return sprintf ("SELECT MAX(%s) FROM %s",
347     map { $self->sql_maker->_quote ($_) } ($col, $source->from)
348   );
349 }
350
351 sub _execute {
352   my $self = shift;
353   my ($op) = @_;
354
355   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
356
357   if ($op eq 'insert') {
358     $self->_identity($sth->fetchrow_array);
359     $sth->finish;
360   }
361
362   return wantarray ? ($rv, $sth, @bind) : $rv;
363 }
364
365 sub last_insert_id { shift->_identity }
366
367 # handles TEXT/IMAGE and transaction for last_insert_id
368 sub insert {
369   my $self = shift;
370   my ($source, $to_insert) = @_;
371
372   my $identity_col = (List::Util::first
373     { $source->column_info($_)->{is_auto_increment} }
374     $source->columns) || '';
375
376   # check for empty insert
377   # INSERT INTO foo DEFAULT VALUES -- does not work with Sybase
378   # try to insert explicit 'DEFAULT's instead (except for identity)
379   if (not %$to_insert) {
380     for my $col ($source->columns) {
381       next if $col eq $identity_col;
382       $to_insert->{$col} = \'DEFAULT';
383     }
384   }
385
386   my $blob_cols = $self->_remove_blob_cols($source, $to_insert);
387
388   # do we need the horrific SELECT MAX(COL) hack?
389   my $dumb_last_insert_id =
390        $identity_col
391     && (not exists $to_insert->{$identity_col})
392     && ($self->_identity_method||'') ne '@@IDENTITY';
393
394   my $next = $self->next::can;
395
396   # we are already in a transaction, or there are no blobs
397   # and we don't need the PK - just (try to) do it
398   if ($self->{transaction_depth}
399         || (!$blob_cols && !$dumb_last_insert_id)
400   ) {
401     return $self->_insert (
402       $next, $source, $to_insert, $blob_cols, $identity_col
403     );
404   }
405
406   # otherwise use the _writer_storage to do the insert+transaction on another
407   # connection
408   my $guard = $self->_writer_storage->txn_scope_guard;
409
410   my $updated_cols = $self->_writer_storage->_insert (
411     $next, $source, $to_insert, $blob_cols, $identity_col
412   );
413
414   $self->_identity($self->_writer_storage->_identity);
415
416   $guard->commit;
417
418   return $updated_cols;
419 }
420
421 sub _insert {
422   my ($self, $next, $source, $to_insert, $blob_cols, $identity_col) = @_;
423
424   my $updated_cols = $self->$next ($source, $to_insert);
425
426   my $final_row = {
427     ($identity_col ?
428       ($identity_col => $self->last_insert_id($source, $identity_col)) : ()),
429     %$to_insert,
430     %$updated_cols,
431   };
432
433   $self->_insert_blobs ($source, $blob_cols, $final_row) if $blob_cols;
434
435   return $updated_cols;
436 }
437
438 sub update {
439   my $self = shift;
440   my ($source, $fields, $where, @rest) = @_;
441
442   my $wantarray = wantarray;
443
444   my $blob_cols = $self->_remove_blob_cols($source, $fields);
445
446   my $table = $source->name;
447
448   my $identity_col = List::Util::first
449     { $source->column_info($_)->{is_auto_increment} }
450     $source->columns;
451
452   my $is_identity_update = $identity_col && defined $fields->{$identity_col};
453
454   return $self->next::method(@_) unless $blob_cols;
455
456 # If there are any blobs in $where, Sybase will return a descriptive error
457 # message.
458 # XXX blobs can still be used with a LIKE query, and this should be handled.
459
460 # update+blob update(s) done atomically on separate connection
461   $self = $self->_writer_storage;
462
463   my $guard = $self->txn_scope_guard;
464
465 # First update the blob columns to be updated to '' (taken from $fields, where
466 # it is originally put by _remove_blob_cols .)
467   my %blobs_to_empty = map { ($_ => delete $fields->{$_}) } keys %$blob_cols;
468
469 # We can't only update NULL blobs, because blobs cannot be in the WHERE clause.
470
471   $self->next::method($source, \%blobs_to_empty, $where, @rest);
472
473 # Now update the blobs before the other columns in case the update of other
474 # columns makes the search condition invalid.
475   $self->_update_blobs($source, $blob_cols, $where);
476
477   my @res;
478   if (%$fields) {
479     if ($wantarray) {
480       @res    = $self->next::method(@_);
481     }
482     elsif (defined $wantarray) {
483       $res[0] = $self->next::method(@_);
484     }
485     else {
486       $self->next::method(@_);
487     }
488   }
489
490   $guard->commit;
491
492   return $wantarray ? @res : $res[0];
493 }
494
495 sub insert_bulk {
496   my $self = shift;
497   my ($source, $cols, $data) = @_;
498
499   my $identity_col = List::Util::first
500     { $source->column_info($_)->{is_auto_increment} }
501     $source->columns;
502
503   my $is_identity_insert = (List::Util::first
504     { $_ eq $identity_col }
505     @{$cols}
506   ) ? 1 : 0;
507
508   my @source_columns = $source->columns;
509
510   my $use_bulk_api =
511     $self->_bulk_storage &&
512     $self->_get_dbh->{syb_has_blk};
513
514   if ((not $use_bulk_api) &&
515       (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE' &&
516       (not $self->_bulk_disabled_due_to_coderef_connect_info_warned)) {
517     carp <<'EOF';
518 Bulk API support disabled due to use of a CODEREF connect_info. Reverting to
519 regular array inserts.
520 EOF
521     $self->_bulk_disabled_due_to_coderef_connect_info_warned(1);
522   }
523
524   if (not $use_bulk_api) {
525     my $blob_cols = $self->_remove_blob_cols_array($source, $cols, $data);
526
527 # _execute_array uses a txn anyway, but it ends too early in case we need to
528 # select max(col) to get the identity for inserting blobs.
529     ($self, my $guard) = $self->{transaction_depth} == 0 ?
530       ($self->_writer_storage, $self->_writer_storage->txn_scope_guard)
531       :
532       ($self, undef);
533
534     local $self->{insert_bulk} = 1;
535
536     $self->next::method(@_);
537
538     if ($blob_cols) {
539       if ($is_identity_insert) {
540         $self->_insert_blobs_array ($source, $blob_cols, $cols, $data);
541       }
542       else {
543         my @cols_with_identities = (@$cols, $identity_col);
544
545         ## calculate identities
546         # XXX This assumes identities always increase by 1, which may or may not
547         # be true.
548         my ($last_identity) =
549           $self->_dbh->selectrow_array (
550             $self->_fetch_identity_sql($source, $identity_col)
551           );
552         my @identities = (($last_identity - @$data + 1) .. $last_identity);
553
554         my @data_with_identities = map [@$_, shift @identities], @$data;
555
556         $self->_insert_blobs_array (
557           $source, $blob_cols, \@cols_with_identities, \@data_with_identities
558         );
559       }
560     }
561
562     $guard->commit if $guard;
563
564     return;
565   }
566
567 # otherwise, use the bulk API
568
569 # rearrange @$data so that columns are in database order
570   my %orig_idx;
571   @orig_idx{@$cols} = 0..$#$cols;
572
573   my %new_idx;
574   @new_idx{@source_columns} = 0..$#source_columns;
575
576   my @new_data;
577   for my $datum (@$data) {
578     my $new_datum = [];
579     for my $col (@source_columns) {
580 # identity data will be 'undef' if not $is_identity_insert
581 # columns with defaults will also be 'undef'
582       $new_datum->[ $new_idx{$col} ] =
583         exists $orig_idx{$col} ? $datum->[ $orig_idx{$col} ] : undef;
584     }
585     push @new_data, $new_datum;
586   }
587
588 # bcp identity index is 1-based
589   my $identity_idx = exists $new_idx{$identity_col} ?
590     $new_idx{$identity_col} + 1 : 0;
591
592 ## Set a client-side conversion error handler, straight from DBD::Sybase docs.
593 # This ignores any data conversion errors detected by the client side libs, as
594 # they are usually harmless.
595   my $orig_cslib_cb = DBD::Sybase::set_cslib_cb(
596     Sub::Name::subname insert_bulk => sub {
597       my ($layer, $origin, $severity, $errno, $errmsg, $osmsg, $blkmsg) = @_;
598
599       return 1 if $errno == 36;
600
601       carp
602         "Layer: $layer, Origin: $origin, Severity: $severity, Error: $errno" .
603         ($errmsg ? "\n$errmsg" : '') .
604         ($osmsg  ? "\n$osmsg"  : '')  .
605         ($blkmsg ? "\n$blkmsg" : '');
606
607       return 0;
608   });
609
610   eval {
611     my $bulk = $self->_bulk_storage;
612
613     my $guard = $bulk->txn_scope_guard;
614
615 ## XXX get this to work instead of our own $sth
616 ## will require SQLA or *Hacks changes for ordered columns
617 #    $bulk->next::method($source, \@source_columns, \@new_data, {
618 #      syb_bcp_attribs => {
619 #        identity_flag   => $is_identity_insert,
620 #        identity_column => $identity_idx,
621 #      }
622 #    });
623     my $sql = 'INSERT INTO ' .
624       $bulk->sql_maker->_quote($source->name) . ' (' .
625 # colname list is ignored for BCP, but does no harm
626       (join ', ', map $bulk->sql_maker->_quote($_), @source_columns) . ') '.
627       ' VALUES ('.  (join ', ', ('?') x @source_columns) . ')';
628
629 ## XXX there's a bug in the DBD::Sybase bulk support that makes $sth->finish for
630 ## a prepare_cached statement ineffective. Replace with ->sth when fixed, or
631 ## better yet the version above. Should be fixed in DBD::Sybase .
632     my $sth = $bulk->_get_dbh->prepare($sql,
633 #      'insert', # op
634       {
635         syb_bcp_attribs => {
636           identity_flag   => $is_identity_insert,
637           identity_column => $identity_idx,
638         }
639       }
640     );
641
642     my @bind = do {
643       my $idx = 0;
644       map [ $_, $idx++ ], @source_columns;
645     };
646
647     $self->_execute_array(
648       $source, $sth, \@bind, \@source_columns, \@new_data, sub {
649         $guard->commit
650       }
651     );
652
653     $bulk->_query_end($sql);
654   };
655
656   my $exception = $@;
657   DBD::Sybase::set_cslib_cb($orig_cslib_cb);
658
659   if ($exception =~ /-Y option/) {
660     carp <<"EOF";
661
662 Sybase bulk API operation failed due to character set incompatibility, reverting
663 to regular array inserts:
664
665 *** Try unsetting the LANG environment variable.
666
667 $exception
668 EOF
669     $self->_bulk_storage(undef);
670     unshift @_, $self;
671     goto \&insert_bulk;
672   }
673   elsif ($exception) {
674 # rollback makes the bulkLogin connection unusable
675     $self->_bulk_storage->disconnect;
676     $self->throw_exception($exception);
677   }
678 }
679
680 sub _dbh_execute_array {
681   my ($self, $sth, $tuple_status, $cb) = @_;
682
683   my $rv = $self->next::method($sth, $tuple_status);
684   $cb->() if $cb;
685
686   return $rv;
687 }
688
689 # Make sure blobs are not bound as placeholders, and return any non-empty ones
690 # as a hash.
691 sub _remove_blob_cols {
692   my ($self, $source, $fields) = @_;
693
694   my %blob_cols;
695
696   for my $col (keys %$fields) {
697     if ($self->_is_lob_column($source, $col)) {
698       my $blob_val = delete $fields->{$col};
699       if (not defined $blob_val) {
700         $fields->{$col} = \'NULL';
701       }
702       else {
703         $fields->{$col} = \"''";
704         $blob_cols{$col} = $blob_val unless $blob_val eq '';
705       }
706     }
707   }
708
709   return %blob_cols ? \%blob_cols : undef;
710 }
711
712 # same for insert_bulk
713 sub _remove_blob_cols_array {
714   my ($self, $source, $cols, $data) = @_;
715
716   my @blob_cols;
717
718   for my $i (0..$#$cols) {
719     my $col = $cols->[$i];
720
721     if ($self->_is_lob_column($source, $col)) {
722       for my $j (0..$#$data) {
723         my $blob_val = delete $data->[$j][$i];
724         if (not defined $blob_val) {
725           $data->[$j][$i] = \'NULL';
726         }
727         else {
728           $data->[$j][$i] = \"''";
729           $blob_cols[$j][$i] = $blob_val
730             unless $blob_val eq '';
731         }
732       }
733     }
734   }
735
736   return @blob_cols ? \@blob_cols : undef;
737 }
738
739 sub _update_blobs {
740   my ($self, $source, $blob_cols, $where) = @_;
741
742   my (@primary_cols) = $source->primary_columns;
743
744   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
745     unless @primary_cols;
746
747 # check if we're updating a single row by PK
748   my $pk_cols_in_where = 0;
749   for my $col (@primary_cols) {
750     $pk_cols_in_where++ if defined $where->{$col};
751   }
752   my @rows;
753
754   if ($pk_cols_in_where == @primary_cols) {
755     my %row_to_update;
756     @row_to_update{@primary_cols} = @{$where}{@primary_cols};
757     @rows = \%row_to_update;
758   } else {
759     my $cursor = $self->select ($source, \@primary_cols, $where, {});
760     @rows = map {
761       my %row; @row{@primary_cols} = @$_; \%row
762     } $cursor->all;
763   }
764
765   for my $row (@rows) {
766     $self->_insert_blobs($source, $blob_cols, $row);
767   }
768 }
769
770 sub _insert_blobs {
771   my ($self, $source, $blob_cols, $row) = @_;
772   my $dbh = $self->_get_dbh;
773
774   my $table = $source->name;
775
776   my %row = %$row;
777   my (@primary_cols) = $source->primary_columns;
778
779   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
780     unless @primary_cols;
781
782   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without primary key values')
783     if ((grep { defined $row{$_} } @primary_cols) != @primary_cols);
784
785   for my $col (keys %$blob_cols) {
786     my $blob = $blob_cols->{$col};
787
788     my %where = map { ($_, $row{$_}) } @primary_cols;
789
790     my $cursor = $self->select ($source, [$col], \%where, {});
791     $cursor->next;
792     my $sth = $cursor->sth;
793
794     if (not $sth) {
795
796       $self->throw_exception(
797           "Could not find row in table '$table' for blob update:\n"
798         . Data::Dumper::Concise::Dumper (\%where)
799       );
800     }
801
802     eval {
803       do {
804         $sth->func('CS_GET', 1, 'ct_data_info') or die $sth->errstr;
805       } while $sth->fetch;
806
807       $sth->func('ct_prepare_send') or die $sth->errstr;
808
809       my $log_on_update = $self->_blob_log_on_update;
810       $log_on_update    = 1 if not defined $log_on_update;
811
812       $sth->func('CS_SET', 1, {
813         total_txtlen => length($blob),
814         log_on_update => $log_on_update
815       }, 'ct_data_info') or die $sth->errstr;
816
817       $sth->func($blob, length($blob), 'ct_send_data') or die $sth->errstr;
818
819       $sth->func('ct_finish_send') or die $sth->errstr;
820     };
821     my $exception = $@;
822     $sth->finish if $sth;
823     if ($exception) {
824       if ($self->using_freetds) {
825         $self->throw_exception (
826           'TEXT/IMAGE operation failed, probably because you are using FreeTDS: '
827           . $exception
828         );
829       } else {
830         $self->throw_exception($exception);
831       }
832     }
833   }
834 }
835
836 sub _insert_blobs_array {
837   my ($self, $source, $blob_cols, $cols, $data) = @_;
838
839   for my $i (0..$#$data) {
840     my $datum = $data->[$i];
841
842     my %row;
843     @row{ @$cols } = @$datum;
844
845     my %blob_vals;
846     for my $j (0..$#$cols) {
847       if (exists $blob_cols->[$i][$j]) {
848         $blob_vals{ $cols->[$j] } = $blob_cols->[$i][$j];
849       }
850     }
851
852     $self->_insert_blobs ($source, \%blob_vals, \%row);
853   }
854 }
855
856 =head2 connect_call_datetime_setup
857
858 Used as:
859
860   on_connect_call => 'datetime_setup'
861
862 In L<DBIx::Class::Storage::DBI/connect_info> to set:
863
864   $dbh->syb_date_fmt('ISO_strict'); # output fmt: 2004-08-21T14:36:48.080Z
865   $dbh->do('set dateformat mdy');   # input fmt:  08/13/1979 18:08:55.080
866
867 On connection for use with L<DBIx::Class::InflateColumn::DateTime>, using
868 L<DateTime::Format::Sybase>, which you will need to install.
869
870 This works for both C<DATETIME> and C<SMALLDATETIME> columns, although
871 C<SMALLDATETIME> columns only have minute precision.
872
873 =cut
874
875 {
876   my $old_dbd_warned = 0;
877
878   sub connect_call_datetime_setup {
879     my $self = shift;
880     my $dbh = $self->_get_dbh;
881
882     if ($dbh->can('syb_date_fmt')) {
883       # amazingly, this works with FreeTDS
884       $dbh->syb_date_fmt('ISO_strict');
885     } elsif (not $old_dbd_warned) {
886       carp "Your DBD::Sybase is too old to support ".
887       "DBIx::Class::InflateColumn::DateTime, please upgrade!";
888       $old_dbd_warned = 1;
889     }
890
891     $dbh->do('SET DATEFORMAT mdy');
892
893     1;
894   }
895 }
896
897 sub datetime_parser_type { "DateTime::Format::Sybase" }
898
899 # ->begin_work and such have no effect with FreeTDS but we run them anyway to
900 # let the DBD keep any state it needs to.
901 #
902 # If they ever do start working, the extra statements will do no harm (because
903 # Sybase supports nested transactions.)
904
905 sub _dbh_begin_work {
906   my $self = shift;
907
908 # bulkLogin=1 connections are always in a transaction, and can only call BEGIN
909 # TRAN once. However, we need to make sure there's a $dbh.
910   return if $self->_is_bulk_storage && $self->_dbh && $self->_began_bulk_work;
911
912   $self->next::method(@_);
913
914   if ($self->using_freetds) {
915     $self->_get_dbh->do('BEGIN TRAN');
916   }
917
918   $self->_began_bulk_work(1) if $self->_is_bulk_storage;
919 }
920
921 sub _dbh_commit {
922   my $self = shift;
923   if ($self->using_freetds) {
924     $self->_dbh->do('COMMIT');
925   }
926   return $self->next::method(@_);
927 }
928
929 sub _dbh_rollback {
930   my $self = shift;
931   if ($self->using_freetds) {
932     $self->_dbh->do('ROLLBACK');
933   }
934   return $self->next::method(@_);
935 }
936
937 # savepoint support using ASE syntax
938
939 sub _svp_begin {
940   my ($self, $name) = @_;
941
942   $self->_get_dbh->do("SAVE TRANSACTION $name");
943 }
944
945 # A new SAVE TRANSACTION with the same name releases the previous one.
946 sub _svp_release { 1 }
947
948 sub _svp_rollback {
949   my ($self, $name) = @_;
950
951   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
952 }
953
954 1;
955
956 =head1 Schema::Loader Support
957
958 There is an experimental branch of L<DBIx::Class::Schema::Loader> that will
959 allow you to dump a schema from most (if not all) versions of Sybase.
960
961 It is available via subversion from:
962
963   http://dev.catalyst.perl.org/repos/bast/branches/DBIx-Class-Schema-Loader/current/
964
965 =head1 FreeTDS
966
967 This driver supports L<DBD::Sybase> compiled against FreeTDS
968 (L<http://www.freetds.org/>) to the best of our ability, however it is
969 recommended that you recompile L<DBD::Sybase> against the Sybase Open Client
970 libraries. They are a part of the Sybase ASE distribution:
971
972 The Open Client FAQ is here:
973 L<http://www.isug.com/Sybase_FAQ/ASE/section7.html>.
974
975 Sybase ASE for Linux (which comes with the Open Client libraries) may be
976 downloaded here: L<http://response.sybase.com/forms/ASE_Linux_Download>.
977
978 To see if you're using FreeTDS check C<< $schema->storage->using_freetds >>, or run:
979
980   perl -MDBI -le 'my $dbh = DBI->connect($dsn, $user, $pass); print $dbh->{syb_oc_version}'
981
982 Some versions of the libraries involved will not support placeholders, in which
983 case the storage will be reblessed to
984 L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>.
985
986 In some configurations, placeholders will work but will throw implicit type
987 conversion errors for anything that's not expecting a string. In such a case,
988 the C<auto_cast> option from L<DBIx::Class::Storage::DBI::AutoCast> is
989 automatically set, which you may enable on connection with
990 L<DBIx::Class::Storage::DBI::AutoCast/connect_call_set_auto_cast>. The type info
991 for the C<CAST>s is taken from the L<DBIx::Class::ResultSource/data_type>
992 definitions in your Result classes, and are mapped to a Sybase type (if it isn't
993 already) using a mapping based on L<SQL::Translator>.
994
995 In other configurations, placeholers will work just as they do with the Sybase
996 Open Client libraries.
997
998 Inserts or updates of TEXT/IMAGE columns will B<NOT> work with FreeTDS.
999
1000 =head1 INSERTS WITH PLACEHOLDERS
1001
1002 With placeholders enabled, inserts are done in a transaction so that there are
1003 no concurrency issues with getting the inserted identity value using
1004 C<SELECT MAX(col)>, which is the only way to get the C<IDENTITY> value in this
1005 mode.
1006
1007 In addition, they are done on a separate connection so that it's possible to
1008 have active cursors when doing an insert.
1009
1010 When using C<DBIx::Class::Storage::DBI::Sybase::NoBindVars> transactions are
1011 disabled, as there are no concurrency issues with C<SELECT @@IDENTITY> as it's a
1012 session variable.
1013
1014 =head1 TRANSACTIONS
1015
1016 Due to limitations of the TDS protocol, L<DBD::Sybase>, or both; you cannot
1017 begin a transaction while there are active cursors; nor can you use multiple
1018 active cursors within a transaction. An active cursor is, for example, a
1019 L<ResultSet|DBIx::Class::ResultSet> that has been executed using C<next> or
1020 C<first> but has not been exhausted or L<reset|DBIx::Class::ResultSet/reset>.
1021
1022 For example, this will not work:
1023
1024   $schema->txn_do(sub {
1025     my $rs = $schema->resultset('Book');
1026     while (my $row = $rs->next) {
1027       $schema->resultset('MetaData')->create({
1028         book_id => $row->id,
1029         ...
1030       });
1031     }
1032   });
1033
1034 This won't either:
1035
1036   my $first_row = $large_rs->first;
1037   $schema->txn_do(sub { ... });
1038
1039 Transactions done for inserts in C<AutoCommit> mode when placeholders are in use
1040 are not affected, as they are done on an extra database handle.
1041
1042 Some workarounds:
1043
1044 =over 4
1045
1046 =item * use L<DBIx::Class::Storage::DBI::Replicated>
1047
1048 =item * L<connect|DBIx::Class::Schema/connect> another L<Schema|DBIx::Class::Schema>
1049
1050 =item * load the data from your cursor with L<DBIx::Class::ResultSet/all>
1051
1052 =back
1053
1054 =head1 MAXIMUM CONNECTIONS
1055
1056 The TDS protocol makes separate connections to the server for active statements
1057 in the background. By default the number of such connections is limited to 25,
1058 on both the client side and the server side.
1059
1060 This is a bit too low for a complex L<DBIx::Class> application, so on connection
1061 the client side setting is set to C<256> (see L<DBD::Sybase/maxConnect>.) You
1062 can override it to whatever setting you like in the DSN.
1063
1064 See
1065 L<http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.ase_15.0.sag1/html/sag1/sag1272.htm>
1066 for information on changing the setting on the server side.
1067
1068 =head1 DATES
1069
1070 See L</connect_call_datetime_setup> to setup date formats
1071 for L<DBIx::Class::InflateColumn::DateTime>.
1072
1073 =head1 TEXT/IMAGE COLUMNS
1074
1075 L<DBD::Sybase> compiled with FreeTDS will B<NOT> allow you to insert or update
1076 C<TEXT/IMAGE> columns.
1077
1078 Setting C<< $dbh->{LongReadLen} >> will also not work with FreeTDS use either:
1079
1080   $schema->storage->dbh->do("SET TEXTSIZE $bytes");
1081
1082 or
1083
1084   $schema->storage->set_textsize($bytes);
1085
1086 instead.
1087
1088 However, the C<LongReadLen> you pass in
1089 L<DBIx::Class::Storage::DBI/connect_info> is used to execute the equivalent
1090 C<SET TEXTSIZE> command on connection.
1091
1092 See L</connect_call_blob_setup> for a L<DBIx::Class::Storage::DBI/connect_info>
1093 setting you need to work with C<IMAGE> columns.
1094
1095 =head1 BULK API
1096
1097 The experimental L<DBD::Sybase> Bulk API support is used for
1098 L<populate|DBIx::Class::ResultSet/populate> in B<void> context, in a transaction
1099 on a separate connection.
1100
1101 To use this feature effectively, use a large number of rows for each
1102 L<populate|DBIx::Class::ResultSet/populate> call, eg.:
1103
1104   while (my $rows = $data_source->get_100_rows()) {
1105     $rs->populate($rows);
1106   }
1107
1108 B<NOTE:> the L<add_columns|DBIx::Class::ResultSource/add_columns>
1109 calls in your C<Result> classes B<must> list columns in database order for this
1110 to work. Also, you may have to unset the C<LANG> environment variable before
1111 loading your app, if it doesn't match the character set of your database.
1112
1113 When inserting IMAGE columns using this method, you'll need to use
1114 L</connect_call_blob_setup> as well.
1115
1116 =head1 TODO
1117
1118 =over
1119
1120 =item *
1121
1122 Transitions to AutoCommit=0 (starting a transaction) mode by exhausting
1123 any active cursors, using eager cursors.
1124
1125 =item *
1126
1127 Real limits and limited counts using stored procedures deployed on startup.
1128
1129 =item *
1130
1131 Adaptive Server Anywhere (ASA) support, with possible SQLA::Limit support.
1132
1133 =item *
1134
1135 Blob update with a LIKE query on a blob, without invalidating the WHERE condition.
1136
1137 =item *
1138
1139 bulk_insert using prepare_cached (see comments.)
1140
1141 =back
1142
1143 =head1 AUTHOR
1144
1145 See L<DBIx::Class/CONTRIBUTORS>.
1146
1147 =head1 LICENSE
1148
1149 You may distribute this code under the same terms as Perl itself.
1150
1151 =cut
1152 # vim:sts=2 sw=2: