override _run_connection_actions for internal connection setup in sybase stuff, much...
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Sybase.pm
1 package DBIx::Class::Storage::DBI::Sybase;
2
3 use strict;
4 use warnings;
5
6 use base qw/
7     DBIx::Class::Storage::DBI::Sybase::Common
8     DBIx::Class::Storage::DBI::AutoCast
9 /;
10 use mro 'c3';
11 use Carp::Clan qw/^DBIx::Class/;
12 use List::Util();
13 use Sub::Name();
14 use Data::Dumper::Concise();
15
16 __PACKAGE__->mk_group_accessors('simple' =>
17     qw/_identity _blob_log_on_update _writer_storage _is_extra_storage
18        _bulk_storage _is_bulk_storage _began_bulk_work
19        _bulk_disabled_due_to_coderef_connect_info_warned
20        _identity_method/
21 );
22
23 my @also_proxy_to_extra_storages = qw/
24   connect_call_set_auto_cast auto_cast connect_call_blob_setup
25   connect_call_datetime_setup
26
27   disconnect _connect_info _sql_maker _sql_maker_opts disable_sth_caching
28   auto_savepoint unsafe cursor_class debug debugobj schema
29 /;
30
31 =head1 NAME
32
33 DBIx::Class::Storage::DBI::Sybase - Sybase support for DBIx::Class
34
35 =head1 SYNOPSIS
36
37 This subclass supports L<DBD::Sybase> for real Sybase databases.  If you are
38 using an MSSQL database via L<DBD::Sybase>, your storage will be reblessed to
39 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
40
41 =head1 DESCRIPTION
42
43 If your version of Sybase does not support placeholders, then your storage
44 will be reblessed to L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>. You can
45 also enable that driver explicitly, see the documentation for more details.
46
47 With this driver there is unfortunately no way to get the C<last_insert_id>
48 without doing a C<SELECT MAX(col)>. This is done safely in a transaction
49 (locking the table.) See L</INSERTS WITH PLACEHOLDERS>.
50
51 A recommended L<DBIx::Class::Storage::DBI/connect_info> setting:
52
53   on_connect_call => [['datetime_setup'], ['blob_setup', log_on_update => 0]]
54
55 =head1 METHODS
56
57 =cut
58
59 sub _rebless {
60   my $self = shift;
61
62   if (ref($self) eq 'DBIx::Class::Storage::DBI::Sybase') {
63     my $dbtype = eval {
64       @{$self->_get_dbh->selectrow_arrayref(qq{sp_server_info \@attribute_id=1})}[2]
65     } || '';
66     $self->throw_exception("Unable to estable connection to determine database type: $@")
67       if $@;
68
69     $dbtype =~ s/\W/_/gi;
70     my $subclass = "DBIx::Class::Storage::DBI::Sybase::${dbtype}";
71
72     if ($dbtype && $self->load_optional_class($subclass)) {
73       bless $self, $subclass;
74       $self->_rebless;
75     } else { # real Sybase
76       my $no_bind_vars = 'DBIx::Class::Storage::DBI::Sybase::NoBindVars';
77
78       if ($self->using_freetds) {
79         carp <<'EOF' unless $ENV{DBIC_SYBASE_FREETDS_NOWARN};
80
81 You are using FreeTDS with Sybase.
82
83 We will do our best to support this configuration, but please consider this
84 support experimental.
85
86 TEXT/IMAGE columns will definitely not work.
87
88 You are encouraged to recompile DBD::Sybase with the Sybase Open Client libraries
89 instead.
90
91 See perldoc DBIx::Class::Storage::DBI::Sybase for more details.
92
93 To turn off this warning set the DBIC_SYBASE_FREETDS_NOWARN environment
94 variable.
95 EOF
96         if (not $self->_typeless_placeholders_supported) {
97           if ($self->_placeholders_supported) {
98             $self->auto_cast(1);
99           } else {
100             $self->ensure_class_loaded($no_bind_vars);
101             bless $self, $no_bind_vars;
102             $self->_rebless;
103           }
104         }
105       }
106       elsif (not $self->_get_dbh->{syb_dynamic_supported}) {
107         # not necessarily FreeTDS, but no placeholders nevertheless
108         $self->ensure_class_loaded($no_bind_vars);
109         bless $self, $no_bind_vars;
110         $self->_rebless;
111       } elsif (not $self->_typeless_placeholders_supported) {
112         # this is highly unlikely, but we check just in case
113         $self->auto_cast(1);
114       }
115     }
116   }
117 }
118
119 sub _init {
120   my $self = shift;
121   $self->_set_max_connect(256);
122
123 # create storage for insert/(update blob) transactions,
124 # unless this is that storage
125   return if $self->_is_extra_storage;
126
127   my $writer_storage = (ref $self)->new;
128
129   $writer_storage->_is_extra_storage(1);
130   $writer_storage->connect_info($self->connect_info);
131   $writer_storage->auto_cast($self->auto_cast);
132
133   $self->_writer_storage($writer_storage);
134
135 # create a bulk storage unless connect_info is a coderef
136   return
137     if (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE';
138
139   my $bulk_storage = (ref $self)->new;
140
141   $bulk_storage->_is_extra_storage(1);
142   $bulk_storage->_is_bulk_storage(1); # for special ->disconnect acrobatics
143   $bulk_storage->connect_info($self->connect_info);
144
145 # this is why
146   $bulk_storage->_dbi_connect_info->[0] .= ';bulkLogin=1';
147
148   $self->_bulk_storage($bulk_storage);
149 }
150
151 for my $method (@also_proxy_to_extra_storages) {
152   no strict 'refs';
153   no warnings 'redefine';
154
155   my $replaced = __PACKAGE__->can($method);
156
157   *{$method} = Sub::Name::subname $method => sub {
158     my $self = shift;
159     $self->_writer_storage->$replaced(@_) if $self->_writer_storage;
160     $self->_bulk_storage->$replaced(@_)   if $self->_bulk_storage;
161     return $self->$replaced(@_);
162   };
163 }
164
165 sub disconnect {
166   my $self = shift;
167
168 # Even though we call $sth->finish for uses off the bulk API, there's still an
169 # "active statement" warning on disconnect, which we throw away here.
170 # This is due to the bug described in insert_bulk.
171 # Currently a noop because 'prepare' is used instead of 'prepare_cached'.
172   local $SIG{__WARN__} = sub {
173     warn $_[0] unless $_[0] =~ /active statement/i;
174   } if $self->_is_bulk_storage;
175
176 # so that next transaction gets a dbh
177   $self->_began_bulk_work(0) if $self->_is_bulk_storage;
178
179   $self->next::method;
180 }
181
182 # Set up session settings for Sybase databases for the connection.
183 #
184 # Make sure we have CHAINED mode turned on if AutoCommit is off in non-FreeTDS
185 # DBD::Sybase (since we don't know how DBD::Sybase was compiled.) If however
186 # we're using FreeTDS, CHAINED mode turns on an implicit transaction which we
187 # only want when AutoCommit is off.
188 #
189 # Also SET TEXTSIZE for FreeTDS because LongReadLen doesn't work.
190 sub _run_connection_actions {
191   my $self = shift;
192
193   if ($self->_is_bulk_storage) {
194 # this should be cleared on every reconnect
195     $self->_began_bulk_work(0);
196     return;
197   }
198
199   if (not $self->using_freetds) {
200     $self->_dbh->{syb_chained_txn} = 1;
201   } else {
202     # based on LongReadLen in connect_info
203     $self->set_textsize;
204
205     if ($self->_dbh_autocommit) {
206       $self->_dbh->do('SET CHAINED OFF');
207     } else {
208       $self->_dbh->do('SET CHAINED ON');
209     }
210   }
211
212   $self->next::method(@_);
213 }
214
215 =head2 connect_call_blob_setup
216
217 Used as:
218
219   on_connect_call => [ [ 'blob_setup', log_on_update => 0 ] ]
220
221 Does C<< $dbh->{syb_binary_images} = 1; >> to return C<IMAGE> data as raw binary
222 instead of as a hex string.
223
224 Recommended.
225
226 Also sets the C<log_on_update> value for blob write operations. The default is
227 C<1>, but C<0> is better if your database is configured for it.
228
229 See
230 L<DBD::Sybase/Handling_IMAGE/TEXT_data_with_syb_ct_get_data()/syb_ct_send_data()>.
231
232 =cut
233
234 sub connect_call_blob_setup {
235   my $self = shift;
236   my %args = @_;
237   my $dbh = $self->_dbh;
238   $dbh->{syb_binary_images} = 1;
239
240   $self->_blob_log_on_update($args{log_on_update})
241     if exists $args{log_on_update};
242 }
243
244 sub _is_lob_type {
245   my $self = shift;
246   my $type = shift;
247   $type && $type =~ /(?:text|image|lob|bytea|binary|memo)/i;
248 }
249
250 sub _is_lob_column {
251   my ($self, $source, $column) = @_;
252
253   return $self->_is_lob_type($source->column_info($column)->{data_type});
254 }
255
256 sub _prep_for_execute {
257   my $self = shift;
258   my ($op, $extra_bind, $ident, $args) = @_;
259
260   my ($sql, $bind) = $self->next::method (@_);
261
262   my $table = Scalar::Util::blessed($ident) ? $ident->from : $ident;
263
264   my $bind_info = $self->_resolve_column_info(
265     $ident, [map $_->[0], @{$bind}]
266   );
267   my $bound_identity_col = List::Util::first
268     { $bind_info->{$_}{is_auto_increment} }
269     (keys %$bind_info)
270   ;
271   my $identity_col = Scalar::Util::blessed($ident) &&
272     List::Util::first
273     { $ident->column_info($_)->{is_auto_increment} }
274     $ident->columns
275   ;
276
277   if (($op eq 'insert' && $bound_identity_col) ||
278       ($op eq 'update' && exists $args->[0]{$identity_col})) {
279     $sql = join ("\n",
280       $self->_set_table_identity_sql($op => $table, 'on'),
281       $sql,
282       $self->_set_table_identity_sql($op => $table, 'off'),
283     );
284   }
285
286   if ($op eq 'insert' && (not $bound_identity_col) && $identity_col &&
287       (not $self->{insert_bulk})) {
288     $sql =
289       "$sql\n" .
290       $self->_fetch_identity_sql($ident, $identity_col);
291   }
292
293   return ($sql, $bind);
294 }
295
296 sub _set_table_identity_sql {
297   my ($self, $op, $table, $on_off) = @_;
298
299   return sprintf 'SET IDENTITY_%s %s %s',
300     uc($op), $self->sql_maker->_quote($table), uc($on_off);
301 }
302
303 # Stolen from SQLT, with some modifications. This is a makeshift
304 # solution before a sane type-mapping library is available, thus
305 # the 'our' for easy overrides.
306 our %TYPE_MAPPING  = (
307     number    => 'numeric',
308     money     => 'money',
309     varchar   => 'varchar',
310     varchar2  => 'varchar',
311     timestamp => 'datetime',
312     text      => 'varchar',
313     real      => 'double precision',
314     comment   => 'text',
315     bit       => 'bit',
316     tinyint   => 'smallint',
317     float     => 'double precision',
318     serial    => 'numeric',
319     bigserial => 'numeric',
320     boolean   => 'varchar',
321     long      => 'varchar',
322 );
323
324 sub _native_data_type {
325   my ($self, $type) = @_;
326
327   $type = lc $type;
328   $type =~ s/\s* identity//x;
329
330   return uc($TYPE_MAPPING{$type} || $type);
331 }
332
333 sub _fetch_identity_sql {
334   my ($self, $source, $col) = @_;
335
336   return sprintf ("SELECT MAX(%s) FROM %s",
337     map { $self->sql_maker->_quote ($_) } ($col, $source->from)
338   );
339 }
340
341 sub _execute {
342   my $self = shift;
343   my ($op) = @_;
344
345   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
346
347   if ($op eq 'insert') {
348     $self->_identity($sth->fetchrow_array);
349     $sth->finish;
350   }
351
352   return wantarray ? ($rv, $sth, @bind) : $rv;
353 }
354
355 sub last_insert_id { shift->_identity }
356
357 # handles TEXT/IMAGE and transaction for last_insert_id
358 sub insert {
359   my $self = shift;
360   my ($source, $to_insert) = @_;
361
362   my $identity_col = (List::Util::first
363     { $source->column_info($_)->{is_auto_increment} }
364     $source->columns) || '';
365
366   # check for empty insert
367   # INSERT INTO foo DEFAULT VALUES -- does not work with Sybase
368   # try to insert explicit 'DEFAULT's instead (except for identity)
369   if (not %$to_insert) {
370     for my $col ($source->columns) {
371       next if $col eq $identity_col;
372       $to_insert->{$col} = \'DEFAULT';
373     }
374   }
375
376   my $blob_cols = $self->_remove_blob_cols($source, $to_insert);
377
378   # do we need the horrific SELECT MAX(COL) hack?
379   my $dumb_last_insert_id =
380        $identity_col
381     && (not exists $to_insert->{$identity_col})
382     && ($self->_identity_method||'') ne '@@IDENTITY';
383
384   my $next = $self->next::can;
385
386   # we are already in a transaction, or there are no blobs
387   # and we don't need the PK - just (try to) do it
388   if ($self->{transaction_depth}
389         || (!$blob_cols && !$dumb_last_insert_id)
390   ) {
391     return $self->_insert (
392       $next, $source, $to_insert, $blob_cols, $identity_col
393     );
394   }
395
396   # otherwise use the _writer_storage to do the insert+transaction on another
397   # connection
398   my $guard = $self->_writer_storage->txn_scope_guard;
399
400   my $updated_cols = $self->_writer_storage->_insert (
401     $next, $source, $to_insert, $blob_cols, $identity_col
402   );
403
404   $self->_identity($self->_writer_storage->_identity);
405
406   $guard->commit;
407
408   return $updated_cols;
409 }
410
411 sub _insert {
412   my ($self, $next, $source, $to_insert, $blob_cols, $identity_col) = @_;
413
414   my $updated_cols = $self->$next ($source, $to_insert);
415
416   my $final_row = {
417     ($identity_col ?
418       ($identity_col => $self->last_insert_id($source, $identity_col)) : ()),
419     %$to_insert,
420     %$updated_cols,
421   };
422
423   $self->_insert_blobs ($source, $blob_cols, $final_row) if $blob_cols;
424
425   return $updated_cols;
426 }
427
428 sub update {
429   my $self = shift;
430   my ($source, $fields, $where, @rest) = @_;
431
432   my $wantarray = wantarray;
433
434   my $blob_cols = $self->_remove_blob_cols($source, $fields);
435
436   my $table = $source->name;
437
438   my $identity_col = List::Util::first
439     { $source->column_info($_)->{is_auto_increment} }
440     $source->columns;
441
442   my $is_identity_update = $identity_col && defined $fields->{$identity_col};
443
444   return $self->next::method(@_) unless $blob_cols;
445
446 # If there are any blobs in $where, Sybase will return a descriptive error
447 # message.
448 # XXX blobs can still be used with a LIKE query, and this should be handled.
449
450 # update+blob update(s) done atomically on separate connection
451   $self = $self->_writer_storage;
452
453   my $guard = $self->txn_scope_guard;
454
455 # First update the blob columns to be updated to '' (taken from $fields, where
456 # it is originally put by _remove_blob_cols .)
457   my %blobs_to_empty = map { ($_ => delete $fields->{$_}) } keys %$blob_cols;
458
459 # We can't only update NULL blobs, because blobs cannot be in the WHERE clause.
460
461   $self->next::method($source, \%blobs_to_empty, $where, @rest);
462
463 # Now update the blobs before the other columns in case the update of other
464 # columns makes the search condition invalid.
465   $self->_update_blobs($source, $blob_cols, $where);
466
467   my @res;
468   if (%$fields) {
469     if ($wantarray) {
470       @res    = $self->next::method(@_);
471     }
472     elsif (defined $wantarray) {
473       $res[0] = $self->next::method(@_);
474     }
475     else {
476       $self->next::method(@_);
477     }
478   }
479
480   $guard->commit;
481
482   return $wantarray ? @res : $res[0];
483 }
484
485 sub insert_bulk {
486   my $self = shift;
487   my ($source, $cols, $data) = @_;
488
489   my $identity_col = List::Util::first
490     { $source->column_info($_)->{is_auto_increment} }
491     $source->columns;
492
493   my $is_identity_insert = (List::Util::first
494     { $_ eq $identity_col }
495     @{$cols}
496   ) ? 1 : 0;
497
498   my @source_columns = $source->columns;
499
500   my $use_bulk_api =
501     $self->_bulk_storage &&
502     $self->_get_dbh->{syb_has_blk};
503
504   if ((not $use_bulk_api) &&
505       (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE' &&
506       (not $self->_bulk_disabled_due_to_coderef_connect_info_warned)) {
507     carp <<'EOF';
508 Bulk API support disabled due to use of a CODEREF connect_info. Reverting to
509 regular array inserts.
510 EOF
511     $self->_bulk_disabled_due_to_coderef_connect_info_warned(1);
512   }
513
514   if (not $use_bulk_api) {
515     my $blob_cols = $self->_remove_blob_cols_array($source, $cols, $data);
516
517 # _execute_array uses a txn anyway, but it ends too early in case we need to
518 # select max(col) to get the identity for inserting blobs.
519     ($self, my $guard) = $self->{transaction_depth} == 0 ?
520       ($self->_writer_storage, $self->_writer_storage->txn_scope_guard)
521       :
522       ($self, undef);
523
524     local $self->{insert_bulk} = 1;
525
526     $self->next::method(@_);
527
528     if ($blob_cols) {
529       if ($is_identity_insert) {
530         $self->_insert_blobs_array ($source, $blob_cols, $cols, $data);
531       }
532       else {
533         my @cols_with_identities = (@$cols, $identity_col);
534
535         ## calculate identities
536         # XXX This assumes identities always increase by 1, which may or may not
537         # be true.
538         my ($last_identity) =
539           $self->_dbh->selectrow_array (
540             $self->_fetch_identity_sql($source, $identity_col)
541           );
542         my @identities = (($last_identity - @$data + 1) .. $last_identity);
543
544         my @data_with_identities = map [@$_, shift @identities], @$data;
545
546         $self->_insert_blobs_array (
547           $source, $blob_cols, \@cols_with_identities, \@data_with_identities
548         );
549       }
550     }
551
552     $guard->commit if $guard;
553
554     return;
555   }
556
557 # otherwise, use the bulk API
558
559 # rearrange @$data so that columns are in database order
560   my %orig_idx;
561   @orig_idx{@$cols} = 0..$#$cols;
562
563   my %new_idx;
564   @new_idx{@source_columns} = 0..$#source_columns;
565
566   my @new_data;
567   for my $datum (@$data) {
568     my $new_datum = [];
569     for my $col (@source_columns) {
570 # identity data will be 'undef' if not $is_identity_insert
571 # columns with defaults will also be 'undef'
572       $new_datum->[ $new_idx{$col} ] =
573         exists $orig_idx{$col} ? $datum->[ $orig_idx{$col} ] : undef;
574     }
575     push @new_data, $new_datum;
576   }
577
578 # bcp identity index is 1-based
579   my $identity_idx = exists $new_idx{$identity_col} ?
580     $new_idx{$identity_col} + 1 : 0;
581
582 ## Set a client-side conversion error handler, straight from DBD::Sybase docs.
583 # This ignores any data conversion errors detected by the client side libs, as
584 # they are usually harmless.
585   my $orig_cslib_cb = DBD::Sybase::set_cslib_cb(
586     Sub::Name::subname insert_bulk => sub {
587       my ($layer, $origin, $severity, $errno, $errmsg, $osmsg, $blkmsg) = @_;
588
589       return 1 if $errno == 36;
590
591       carp
592         "Layer: $layer, Origin: $origin, Severity: $severity, Error: $errno" .
593         ($errmsg ? "\n$errmsg" : '') .
594         ($osmsg  ? "\n$osmsg"  : '')  .
595         ($blkmsg ? "\n$blkmsg" : '');
596
597       return 0;
598   });
599
600   eval {
601     my $bulk = $self->_bulk_storage;
602
603     my $guard = $bulk->txn_scope_guard;
604
605 ## XXX get this to work instead of our own $sth
606 ## will require SQLA or *Hacks changes for ordered columns
607 #    $bulk->next::method($source, \@source_columns, \@new_data, {
608 #      syb_bcp_attribs => {
609 #        identity_flag   => $is_identity_insert,
610 #        identity_column => $identity_idx,
611 #      }
612 #    });
613     my $sql = 'INSERT INTO ' .
614       $bulk->sql_maker->_quote($source->name) . ' (' .
615 # colname list is ignored for BCP, but does no harm
616       (join ', ', map $bulk->sql_maker->_quote($_), @source_columns) . ') '.
617       ' VALUES ('.  (join ', ', ('?') x @source_columns) . ')';
618
619 ## XXX there's a bug in the DBD::Sybase bulk support that makes $sth->finish for
620 ## a prepare_cached statement ineffective. Replace with ->sth when fixed, or
621 ## better yet the version above. Should be fixed in DBD::Sybase .
622     my $sth = $bulk->_get_dbh->prepare($sql,
623 #      'insert', # op
624       {
625         syb_bcp_attribs => {
626           identity_flag   => $is_identity_insert,
627           identity_column => $identity_idx,
628         }
629       }
630     );
631
632     my @bind = do {
633       my $idx = 0;
634       map [ $_, $idx++ ], @source_columns;
635     };
636
637     $self->_execute_array(
638       $source, $sth, \@bind, \@source_columns, \@new_data, sub {
639         $guard->commit
640       }
641     );
642
643     $bulk->_query_end($sql);
644   };
645
646   my $exception = $@;
647   DBD::Sybase::set_cslib_cb($orig_cslib_cb);
648
649   if ($exception =~ /-Y option/) {
650     carp <<"EOF";
651
652 Sybase bulk API operation failed due to character set incompatibility, reverting
653 to regular array inserts:
654
655 *** Try unsetting the LANG environment variable.
656
657 $exception
658 EOF
659     $self->_bulk_storage(undef);
660     unshift @_, $self;
661     goto \&insert_bulk;
662   }
663   elsif ($exception) {
664 # rollback makes the bulkLogin connection unusable
665     $self->_bulk_storage->disconnect;
666     $self->throw_exception($exception);
667   }
668 }
669
670 sub _dbh_execute_array {
671   my ($self, $sth, $tuple_status, $cb) = @_;
672
673   my $rv = $self->next::method($sth, $tuple_status);
674   $cb->() if $cb;
675
676   return $rv;
677 }
678
679 # Make sure blobs are not bound as placeholders, and return any non-empty ones
680 # as a hash.
681 sub _remove_blob_cols {
682   my ($self, $source, $fields) = @_;
683
684   my %blob_cols;
685
686   for my $col (keys %$fields) {
687     if ($self->_is_lob_column($source, $col)) {
688       my $blob_val = delete $fields->{$col};
689       if (not defined $blob_val) {
690         $fields->{$col} = \'NULL';
691       }
692       else {
693         $fields->{$col} = \"''";
694         $blob_cols{$col} = $blob_val unless $blob_val eq '';
695       }
696     }
697   }
698
699   return %blob_cols ? \%blob_cols : undef;
700 }
701
702 # same for insert_bulk
703 sub _remove_blob_cols_array {
704   my ($self, $source, $cols, $data) = @_;
705
706   my @blob_cols;
707
708   for my $i (0..$#$cols) {
709     my $col = $cols->[$i];
710
711     if ($self->_is_lob_column($source, $col)) {
712       for my $j (0..$#$data) {
713         my $blob_val = delete $data->[$j][$i];
714         if (not defined $blob_val) {
715           $data->[$j][$i] = \'NULL';
716         }
717         else {
718           $data->[$j][$i] = \"''";
719           $blob_cols[$j][$i] = $blob_val
720             unless $blob_val eq '';
721         }
722       }
723     }
724   }
725
726   return @blob_cols ? \@blob_cols : undef;
727 }
728
729 sub _update_blobs {
730   my ($self, $source, $blob_cols, $where) = @_;
731
732   my (@primary_cols) = $source->primary_columns;
733
734   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
735     unless @primary_cols;
736
737 # check if we're updating a single row by PK
738   my $pk_cols_in_where = 0;
739   for my $col (@primary_cols) {
740     $pk_cols_in_where++ if defined $where->{$col};
741   }
742   my @rows;
743
744   if ($pk_cols_in_where == @primary_cols) {
745     my %row_to_update;
746     @row_to_update{@primary_cols} = @{$where}{@primary_cols};
747     @rows = \%row_to_update;
748   } else {
749     my $cursor = $self->select ($source, \@primary_cols, $where, {});
750     @rows = map {
751       my %row; @row{@primary_cols} = @$_; \%row
752     } $cursor->all;
753   }
754
755   for my $row (@rows) {
756     $self->_insert_blobs($source, $blob_cols, $row);
757   }
758 }
759
760 sub _insert_blobs {
761   my ($self, $source, $blob_cols, $row) = @_;
762   my $dbh = $self->_get_dbh;
763
764   my $table = $source->name;
765
766   my %row = %$row;
767   my (@primary_cols) = $source->primary_columns;
768
769   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
770     unless @primary_cols;
771
772   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without primary key values')
773     if ((grep { defined $row{$_} } @primary_cols) != @primary_cols);
774
775   for my $col (keys %$blob_cols) {
776     my $blob = $blob_cols->{$col};
777
778     my %where = map { ($_, $row{$_}) } @primary_cols;
779
780     my $cursor = $self->select ($source, [$col], \%where, {});
781     $cursor->next;
782     my $sth = $cursor->sth;
783
784     if (not $sth) {
785
786       $self->throw_exception(
787           "Could not find row in table '$table' for blob update:\n"
788         . Data::Dumper::Concise::Dumper (\%where)
789       );
790     }
791
792     eval {
793       do {
794         $sth->func('CS_GET', 1, 'ct_data_info') or die $sth->errstr;
795       } while $sth->fetch;
796
797       $sth->func('ct_prepare_send') or die $sth->errstr;
798
799       my $log_on_update = $self->_blob_log_on_update;
800       $log_on_update    = 1 if not defined $log_on_update;
801
802       $sth->func('CS_SET', 1, {
803         total_txtlen => length($blob),
804         log_on_update => $log_on_update
805       }, 'ct_data_info') or die $sth->errstr;
806
807       $sth->func($blob, length($blob), 'ct_send_data') or die $sth->errstr;
808
809       $sth->func('ct_finish_send') or die $sth->errstr;
810     };
811     my $exception = $@;
812     $sth->finish if $sth;
813     if ($exception) {
814       if ($self->using_freetds) {
815         $self->throw_exception (
816           'TEXT/IMAGE operation failed, probably because you are using FreeTDS: '
817           . $exception
818         );
819       } else {
820         $self->throw_exception($exception);
821       }
822     }
823   }
824 }
825
826 sub _insert_blobs_array {
827   my ($self, $source, $blob_cols, $cols, $data) = @_;
828
829   for my $i (0..$#$data) {
830     my $datum = $data->[$i];
831
832     my %row;
833     @row{ @$cols } = @$datum;
834
835     my %blob_vals;
836     for my $j (0..$#$cols) {
837       if (exists $blob_cols->[$i][$j]) {
838         $blob_vals{ $cols->[$j] } = $blob_cols->[$i][$j];
839       }
840     }
841
842     $self->_insert_blobs ($source, \%blob_vals, \%row);
843   }
844 }
845
846 =head2 connect_call_datetime_setup
847
848 Used as:
849
850   on_connect_call => 'datetime_setup'
851
852 In L<DBIx::Class::Storage::DBI/connect_info> to set:
853
854   $dbh->syb_date_fmt('ISO_strict'); # output fmt: 2004-08-21T14:36:48.080Z
855   $dbh->do('set dateformat mdy');   # input fmt:  08/13/1979 18:08:55.080
856
857 On connection for use with L<DBIx::Class::InflateColumn::DateTime>, using
858 L<DateTime::Format::Sybase>, which you will need to install.
859
860 This works for both C<DATETIME> and C<SMALLDATETIME> columns, although
861 C<SMALLDATETIME> columns only have minute precision.
862
863 =cut
864
865 {
866   my $old_dbd_warned = 0;
867
868   sub connect_call_datetime_setup {
869     my $self = shift;
870     my $dbh = $self->_get_dbh;
871
872     if ($dbh->can('syb_date_fmt')) {
873       # amazingly, this works with FreeTDS
874       $dbh->syb_date_fmt('ISO_strict');
875     } elsif (not $old_dbd_warned) {
876       carp "Your DBD::Sybase is too old to support ".
877       "DBIx::Class::InflateColumn::DateTime, please upgrade!";
878       $old_dbd_warned = 1;
879     }
880
881     $dbh->do('SET DATEFORMAT mdy');
882
883     1;
884   }
885 }
886
887 sub datetime_parser_type { "DateTime::Format::Sybase" }
888
889 # ->begin_work and such have no effect with FreeTDS but we run them anyway to
890 # let the DBD keep any state it needs to.
891 #
892 # If they ever do start working, the extra statements will do no harm (because
893 # Sybase supports nested transactions.)
894
895 sub _dbh_begin_work {
896   my $self = shift;
897
898 # bulkLogin=1 connections are always in a transaction, and can only call BEGIN
899 # TRAN once. However, we need to make sure there's a $dbh.
900   return if $self->_is_bulk_storage && $self->_dbh && $self->_began_bulk_work;
901
902   $self->next::method(@_);
903
904   if ($self->using_freetds) {
905     $self->_get_dbh->do('BEGIN TRAN');
906   }
907
908   $self->_began_bulk_work(1) if $self->_is_bulk_storage;
909 }
910
911 sub _dbh_commit {
912   my $self = shift;
913   if ($self->using_freetds) {
914     $self->_dbh->do('COMMIT');
915   }
916   return $self->next::method(@_);
917 }
918
919 sub _dbh_rollback {
920   my $self = shift;
921   if ($self->using_freetds) {
922     $self->_dbh->do('ROLLBACK');
923   }
924   return $self->next::method(@_);
925 }
926
927 # savepoint support using ASE syntax
928
929 sub _svp_begin {
930   my ($self, $name) = @_;
931
932   $self->_get_dbh->do("SAVE TRANSACTION $name");
933 }
934
935 # A new SAVE TRANSACTION with the same name releases the previous one.
936 sub _svp_release { 1 }
937
938 sub _svp_rollback {
939   my ($self, $name) = @_;
940
941   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
942 }
943
944 1;
945
946 =head1 Schema::Loader Support
947
948 There is an experimental branch of L<DBIx::Class::Schema::Loader> that will
949 allow you to dump a schema from most (if not all) versions of Sybase.
950
951 It is available via subversion from:
952
953   http://dev.catalyst.perl.org/repos/bast/branches/DBIx-Class-Schema-Loader/current/
954
955 =head1 FreeTDS
956
957 This driver supports L<DBD::Sybase> compiled against FreeTDS
958 (L<http://www.freetds.org/>) to the best of our ability, however it is
959 recommended that you recompile L<DBD::Sybase> against the Sybase Open Client
960 libraries. They are a part of the Sybase ASE distribution:
961
962 The Open Client FAQ is here:
963 L<http://www.isug.com/Sybase_FAQ/ASE/section7.html>.
964
965 Sybase ASE for Linux (which comes with the Open Client libraries) may be
966 downloaded here: L<http://response.sybase.com/forms/ASE_Linux_Download>.
967
968 To see if you're using FreeTDS check C<< $schema->storage->using_freetds >>, or run:
969
970   perl -MDBI -le 'my $dbh = DBI->connect($dsn, $user, $pass); print $dbh->{syb_oc_version}'
971
972 Some versions of the libraries involved will not support placeholders, in which
973 case the storage will be reblessed to
974 L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>.
975
976 In some configurations, placeholders will work but will throw implicit type
977 conversion errors for anything that's not expecting a string. In such a case,
978 the C<auto_cast> option from L<DBIx::Class::Storage::DBI::AutoCast> is
979 automatically set, which you may enable on connection with
980 L<DBIx::Class::Storage::DBI::AutoCast/connect_call_set_auto_cast>. The type info
981 for the C<CAST>s is taken from the L<DBIx::Class::ResultSource/data_type>
982 definitions in your Result classes, and are mapped to a Sybase type (if it isn't
983 already) using a mapping based on L<SQL::Translator>.
984
985 In other configurations, placeholers will work just as they do with the Sybase
986 Open Client libraries.
987
988 Inserts or updates of TEXT/IMAGE columns will B<NOT> work with FreeTDS.
989
990 =head1 INSERTS WITH PLACEHOLDERS
991
992 With placeholders enabled, inserts are done in a transaction so that there are
993 no concurrency issues with getting the inserted identity value using
994 C<SELECT MAX(col)>, which is the only way to get the C<IDENTITY> value in this
995 mode.
996
997 In addition, they are done on a separate connection so that it's possible to
998 have active cursors when doing an insert.
999
1000 When using C<DBIx::Class::Storage::DBI::Sybase::NoBindVars> transactions are
1001 disabled, as there are no concurrency issues with C<SELECT @@IDENTITY> as it's a
1002 session variable.
1003
1004 =head1 TRANSACTIONS
1005
1006 Due to limitations of the TDS protocol, L<DBD::Sybase>, or both; you cannot
1007 begin a transaction while there are active cursors; nor can you use multiple
1008 active cursors within a transaction. An active cursor is, for example, a
1009 L<ResultSet|DBIx::Class::ResultSet> that has been executed using C<next> or
1010 C<first> but has not been exhausted or L<reset|DBIx::Class::ResultSet/reset>.
1011
1012 For example, this will not work:
1013
1014   $schema->txn_do(sub {
1015     my $rs = $schema->resultset('Book');
1016     while (my $row = $rs->next) {
1017       $schema->resultset('MetaData')->create({
1018         book_id => $row->id,
1019         ...
1020       });
1021     }
1022   });
1023
1024 This won't either:
1025
1026   my $first_row = $large_rs->first;
1027   $schema->txn_do(sub { ... });
1028
1029 Transactions done for inserts in C<AutoCommit> mode when placeholders are in use
1030 are not affected, as they are done on an extra database handle.
1031
1032 Some workarounds:
1033
1034 =over 4
1035
1036 =item * use L<DBIx::Class::Storage::DBI::Replicated>
1037
1038 =item * L<connect|DBIx::Class::Schema/connect> another L<Schema|DBIx::Class::Schema>
1039
1040 =item * load the data from your cursor with L<DBIx::Class::ResultSet/all>
1041
1042 =back
1043
1044 =head1 MAXIMUM CONNECTIONS
1045
1046 The TDS protocol makes separate connections to the server for active statements
1047 in the background. By default the number of such connections is limited to 25,
1048 on both the client side and the server side.
1049
1050 This is a bit too low for a complex L<DBIx::Class> application, so on connection
1051 the client side setting is set to C<256> (see L<DBD::Sybase/maxConnect>.) You
1052 can override it to whatever setting you like in the DSN.
1053
1054 See
1055 L<http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.ase_15.0.sag1/html/sag1/sag1272.htm>
1056 for information on changing the setting on the server side.
1057
1058 =head1 DATES
1059
1060 See L</connect_call_datetime_setup> to setup date formats
1061 for L<DBIx::Class::InflateColumn::DateTime>.
1062
1063 =head1 TEXT/IMAGE COLUMNS
1064
1065 L<DBD::Sybase> compiled with FreeTDS will B<NOT> allow you to insert or update
1066 C<TEXT/IMAGE> columns.
1067
1068 Setting C<< $dbh->{LongReadLen} >> will also not work with FreeTDS use either:
1069
1070   $schema->storage->dbh->do("SET TEXTSIZE $bytes");
1071
1072 or
1073
1074   $schema->storage->set_textsize($bytes);
1075
1076 instead.
1077
1078 However, the C<LongReadLen> you pass in
1079 L<DBIx::Class::Storage::DBI/connect_info> is used to execute the equivalent
1080 C<SET TEXTSIZE> command on connection.
1081
1082 See L</connect_call_blob_setup> for a L<DBIx::Class::Storage::DBI/connect_info>
1083 setting you need to work with C<IMAGE> columns.
1084
1085 =head1 BULK API
1086
1087 The experimental L<DBD::Sybase> Bulk API support is used for
1088 L<populate|DBIx::Class::ResultSet/populate> in B<void> context, in a transaction
1089 on a separate connection.
1090
1091 To use this feature effectively, use a large number of rows for each
1092 L<populate|DBIx::Class::ResultSet/populate> call, eg.:
1093
1094   while (my $rows = $data_source->get_100_rows()) {
1095     $rs->populate($rows);
1096   }
1097
1098 B<NOTE:> the L<add_columns|DBIx::Class::ResultSource/add_columns>
1099 calls in your C<Result> classes B<must> list columns in database order for this
1100 to work. Also, you may have to unset the C<LANG> environment variable before
1101 loading your app, if it doesn't match the character set of your database.
1102
1103 When inserting IMAGE columns using this method, you'll need to use
1104 L</connect_call_blob_setup> as well.
1105
1106 =head1 TODO
1107
1108 =over
1109
1110 =item *
1111
1112 Transitions to AutoCommit=0 (starting a transaction) mode by exhausting
1113 any active cursors, using eager cursors.
1114
1115 =item *
1116
1117 Real limits and limited counts using stored procedures deployed on startup.
1118
1119 =item *
1120
1121 Adaptive Server Anywhere (ASA) support, with possible SQLA::Limit support.
1122
1123 =item *
1124
1125 Blob update with a LIKE query on a blob, without invalidating the WHERE condition.
1126
1127 =item *
1128
1129 bulk_insert using prepare_cached (see comments.)
1130
1131 =back
1132
1133 =head1 AUTHOR
1134
1135 See L<DBIx::Class/CONTRIBUTORS>.
1136
1137 =head1 LICENSE
1138
1139 You may distribute this code under the same terms as Perl itself.
1140
1141 =cut
1142 # vim:sts=2 sw=2: