fix insert with all defaults
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Sybase.pm
1 package DBIx::Class::Storage::DBI::Sybase;
2
3 use strict;
4 use warnings;
5
6 use base qw/
7     DBIx::Class::Storage::DBI::Sybase::Common
8     DBIx::Class::Storage::DBI::AutoCast
9 /;
10 use mro 'c3';
11 use Carp::Clan qw/^DBIx::Class/;
12 use List::Util ();
13 use Sub::Name ();
14
15 __PACKAGE__->mk_group_accessors('simple' =>
16     qw/_identity _blob_log_on_update _writer_storage _is_extra_storage
17        _bulk_storage _is_bulk_storage _began_bulk_work
18        _bulk_disabled_due_to_coderef_connect_info_warned
19        _identity_method/
20 );
21
22 my @also_proxy_to_extra_storages = qw/
23   connect_call_set_auto_cast auto_cast connect_call_blob_setup
24   connect_call_datetime_setup
25
26   disconnect _connect_info _sql_maker _sql_maker_opts disable_sth_caching
27   auto_savepoint unsafe cursor_class debug debugobj schema
28 /;
29
30 =head1 NAME
31
32 DBIx::Class::Storage::DBI::Sybase - Sybase support for DBIx::Class
33
34 =head1 SYNOPSIS
35
36 This subclass supports L<DBD::Sybase> for real Sybase databases.  If you are
37 using an MSSQL database via L<DBD::Sybase>, your storage will be reblessed to
38 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
39
40 =head1 DESCRIPTION
41
42 If your version of Sybase does not support placeholders, then your storage
43 will be reblessed to L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>. You can
44 also enable that driver explicitly, see the documentation for more details.
45
46 With this driver there is unfortunately no way to get the C<last_insert_id>
47 without doing a C<SELECT MAX(col)>. This is done safely in a transaction
48 (locking the table.) See L</INSERTS WITH PLACEHOLDERS>.
49
50 A recommended L<DBIx::Class::Storage::DBI/connect_info> setting:
51
52   on_connect_call => [['datetime_setup'], ['blob_setup', log_on_update => 0]]
53
54 =head1 METHODS
55
56 =cut
57
58 sub _rebless {
59   my $self = shift;
60
61   if (ref($self) eq 'DBIx::Class::Storage::DBI::Sybase') {
62     my $dbtype = eval {
63       @{$self->_get_dbh->selectrow_arrayref(qq{sp_server_info \@attribute_id=1})}[2]
64     } || '';
65
66     my $exception = $@;
67     $dbtype =~ s/\W/_/gi;
68     my $subclass = "DBIx::Class::Storage::DBI::Sybase::${dbtype}";
69
70     if (!$exception && $dbtype && $self->load_optional_class($subclass)) {
71       bless $self, $subclass;
72       $self->_rebless;
73     } else { # real Sybase
74       my $no_bind_vars = 'DBIx::Class::Storage::DBI::Sybase::NoBindVars';
75
76       if ($self->using_freetds) {
77         carp <<'EOF' unless $ENV{DBIC_SYBASE_FREETDS_NOWARN};
78
79 You are using FreeTDS with Sybase.
80
81 We will do our best to support this configuration, but please consider this
82 support experimental.
83
84 TEXT/IMAGE columns will definitely not work.
85
86 You are encouraged to recompile DBD::Sybase with the Sybase Open Client libraries
87 instead.
88
89 See perldoc DBIx::Class::Storage::DBI::Sybase for more details.
90
91 To turn off this warning set the DBIC_SYBASE_FREETDS_NOWARN environment
92 variable.
93 EOF
94         if (not $self->_typeless_placeholders_supported) {
95           if ($self->_placeholders_supported) {
96             $self->auto_cast(1);
97           } else {
98             $self->ensure_class_loaded($no_bind_vars);
99             bless $self, $no_bind_vars;
100             $self->_rebless;
101           }
102         }
103       }
104       elsif (not $self->_get_dbh->{syb_dynamic_supported}) {
105         # not necessarily FreeTDS, but no placeholders nevertheless
106         $self->ensure_class_loaded($no_bind_vars);
107         bless $self, $no_bind_vars;
108         $self->_rebless;
109       } elsif (not $self->_typeless_placeholders_supported) {
110         # this is highly unlikely, but we check just in case
111         $self->auto_cast(1);
112       }
113     }
114   }
115 }
116
117 sub _init {
118   my $self = shift;
119   $self->_set_max_connect(256);
120
121   # based on LongReadLen in connect_info
122   $self->set_textsize if $self->using_freetds;
123
124 # create storage for insert/(update blob) transactions,
125 # unless this is that storage
126   return if $self->_is_extra_storage;
127
128   my $writer_storage = (ref $self)->new;
129
130   $writer_storage->_is_extra_storage(1);
131   $writer_storage->connect_info($self->connect_info);
132   $writer_storage->auto_cast($self->auto_cast);
133
134   $self->_writer_storage($writer_storage);
135
136 # create a bulk storage unless connect_info is a coderef
137   return
138     if (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE';
139
140   my $bulk_storage = (ref $self)->new;
141
142   $bulk_storage->_is_extra_storage(1);
143   $bulk_storage->_is_bulk_storage(1); # for special ->disconnect acrobatics
144   $bulk_storage->connect_info($self->connect_info);
145
146 # this is why
147   $bulk_storage->_dbi_connect_info->[0] .= ';bulkLogin=1';
148
149   $self->_bulk_storage($bulk_storage);
150 }
151
152 for my $method (@also_proxy_to_extra_storages) {
153   no strict 'refs';
154   no warnings 'redefine';
155
156   my $replaced = __PACKAGE__->can($method);
157
158   *{$method} = Sub::Name::subname $method => sub {
159     my $self = shift;
160     $self->_writer_storage->$replaced(@_) if $self->_writer_storage;
161     $self->_bulk_storage->$replaced(@_)   if $self->_bulk_storage;
162     return $self->$replaced(@_);
163   };
164 }
165
166 sub disconnect {
167   my $self = shift;
168
169 # Even though we call $sth->finish for uses off the bulk API, there's still an
170 # "active statement" warning on disconnect, which we throw away here.
171 # This is due to the bug described in insert_bulk.
172 # Currently a noop because 'prepare' is used instead of 'prepare_cached'.
173   local $SIG{__WARN__} = sub {
174     warn $_[0] unless $_[0] =~ /active statement/i;
175   } if $self->_is_bulk_storage;
176
177 # so that next transaction gets a dbh
178   $self->_began_bulk_work(0) if $self->_is_bulk_storage;
179
180   $self->next::method;
181 }
182
183 # Make sure we have CHAINED mode turned on if AutoCommit is off in non-FreeTDS
184 # DBD::Sybase (since we don't know how DBD::Sybase was compiled.) If however
185 # we're using FreeTDS, CHAINED mode turns on an implicit transaction which we
186 # only want when AutoCommit is off.
187 sub _populate_dbh {
188   my $self = shift;
189
190   $self->next::method(@_);
191   
192   if ($self->_is_bulk_storage) {
193 # this should be cleared on every reconnect
194     $self->_began_bulk_work(0);
195     return;
196   }
197
198   if (not $self->using_freetds) {
199     $self->_dbh->{syb_chained_txn} = 1;
200   } else {
201     if ($self->_dbh_autocommit) {
202       $self->_dbh->do('SET CHAINED OFF');
203     } else {
204       $self->_dbh->do('SET CHAINED ON');
205     }
206   }
207 }
208
209 =head2 connect_call_blob_setup
210
211 Used as:
212
213   on_connect_call => [ [ 'blob_setup', log_on_update => 0 ] ]
214
215 Does C<< $dbh->{syb_binary_images} = 1; >> to return C<IMAGE> data as raw binary
216 instead of as a hex string.
217
218 Recommended.
219
220 Also sets the C<log_on_update> value for blob write operations. The default is
221 C<1>, but C<0> is better if your database is configured for it.
222
223 See
224 L<DBD::Sybase/Handling_IMAGE/TEXT_data_with_syb_ct_get_data()/syb_ct_send_data()>.
225
226 =cut
227
228 sub connect_call_blob_setup {
229   my $self = shift;
230   my %args = @_;
231   my $dbh = $self->_dbh;
232   $dbh->{syb_binary_images} = 1;
233
234   $self->_blob_log_on_update($args{log_on_update})
235     if exists $args{log_on_update};
236 }
237
238 sub _is_lob_type {
239   my $self = shift;
240   my $type = shift;
241   $type && $type =~ /(?:text|image|lob|bytea|binary|memo)/i;
242 }
243
244 sub _is_lob_column {
245   my ($self, $source, $column) = @_;
246
247   return $self->_is_lob_type($source->column_info($column)->{data_type});
248 }
249
250 sub _prep_for_execute {
251   my $self = shift;
252   my ($op, $extra_bind, $ident, $args) = @_;
253
254   my ($sql, $bind) = $self->next::method (@_);
255
256   if ($op eq 'insert') {
257     my $table = $ident->from;
258
259     my $bind_info = $self->_resolve_column_info(
260       $ident, [map $_->[0], @{$bind}]
261     );
262     my $identity_col = List::Util::first
263       { $bind_info->{$_}{is_auto_increment} }
264       (keys %$bind_info)
265     ;
266
267     if ($identity_col) {
268       $sql = join ("\n",
269         "SET IDENTITY_INSERT $table ON",
270         $sql,
271         "SET IDENTITY_INSERT $table OFF",
272       );
273     }
274     else {
275       $identity_col = List::Util::first
276         { $ident->column_info($_)->{is_auto_increment} }
277         $ident->columns
278       ;
279     }
280
281     if ($identity_col) {
282       $sql =
283         "$sql\n" .
284         $self->_fetch_identity_sql($ident, $identity_col);
285     }
286   }
287
288   return ($sql, $bind);
289 }
290
291 # Stolen from SQLT, with some modifications. This is a makeshift
292 # solution before a sane type-mapping library is available, thus
293 # the 'our' for easy overrides.
294 our %TYPE_MAPPING  = (
295     number    => 'numeric',
296     money     => 'money',
297     varchar   => 'varchar',
298     varchar2  => 'varchar',
299     timestamp => 'datetime',
300     text      => 'varchar',
301     real      => 'double precision',
302     comment   => 'text',
303     bit       => 'bit',
304     tinyint   => 'smallint',
305     float     => 'double precision',
306     serial    => 'numeric',
307     bigserial => 'numeric',
308     boolean   => 'varchar',
309     long      => 'varchar',
310 );
311
312 sub _native_data_type {
313   my ($self, $type) = @_;
314
315   $type = lc $type;
316   $type =~ s/\s* identity//x;
317
318   return uc($TYPE_MAPPING{$type} || $type);
319 }
320
321 sub _fetch_identity_sql {
322   my ($self, $source, $col) = @_;
323
324   return sprintf ("SELECT MAX(%s) FROM %s",
325     map { $self->sql_maker->_quote ($_) } ($col, $source->from)
326   );
327 }
328
329 sub _execute {
330   my $self = shift;
331   my ($op) = @_;
332
333   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
334
335   if ($op eq 'insert') {
336     $self->_identity($sth->fetchrow_array);
337     $sth->finish;
338   }
339
340   return wantarray ? ($rv, $sth, @bind) : $rv;
341 }
342
343 sub last_insert_id { shift->_identity }
344
345 # handles TEXT/IMAGE and transaction for last_insert_id
346 sub insert {
347   my $self = shift;
348   my ($source, $to_insert) = @_;
349
350   my $identity_col = (List::Util::first
351     { $source->column_info($_)->{is_auto_increment} }
352     $source->columns) || '';
353
354   # check for empty insert
355   # INSERT INTO foo DEFAULT VALUES -- does not work with Sybase
356   # try to insert explicit 'DEFAULT's instead (except for identity)
357   if (not %$to_insert) {
358     for my $col ($source->columns) {
359       next if $col eq $identity_col;
360       $to_insert->{$col} = \'DEFAULT';
361     }
362   }
363
364   my $blob_cols = $self->_remove_blob_cols($source, $to_insert);
365
366   # do we need the horrific SELECT MAX(COL) hack?
367   my $dumb_last_insert_id =
368        $identity_col
369     && (not exists $to_insert->{$identity_col})
370     && ($self->_identity_method||'') ne '@@IDENTITY';
371
372   my $next = $self->next::can;
373
374   # we are already in a transaction, or there are no blobs
375   # and we don't need the PK - just (try to) do it
376   if ($self->{transaction_depth}
377         || (!$blob_cols && !$dumb_last_insert_id) 
378   ) {
379     return $self->_insert (
380       $next, $source, $to_insert, $blob_cols, $identity_col
381     );
382   }
383
384   # otherwise use the _writer_storage to do the insert+transaction on another
385   # connection
386   my $guard = $self->_writer_storage->txn_scope_guard;
387
388   my $updated_cols = $self->_writer_storage->_insert (
389     $next, $source, $to_insert, $blob_cols, $identity_col
390   );
391
392   $self->_identity($self->_writer_storage->_identity);
393
394   $guard->commit;
395
396   return $updated_cols;
397 }
398
399 sub _insert {
400   my ($self, $next, $source, $to_insert, $blob_cols, $identity_col) = @_;
401
402   my $updated_cols = $self->$next ($source, $to_insert);
403
404   my $final_row = {
405     ($identity_col ?
406       ($identity_col => $self->last_insert_id($source, $identity_col)) : ()),
407     %$to_insert,
408     %$updated_cols,
409   };
410
411   $self->_insert_blobs ($source, $blob_cols, $final_row) if $blob_cols;
412
413   return $updated_cols;
414 }
415
416 sub update {
417   my $self = shift;
418   my ($source, $fields, $where, @rest) = @_;
419
420   my $wantarray = wantarray;
421
422   my $blob_cols = $self->_remove_blob_cols($source, $fields);
423
424   my $table = $source->name;
425
426   my $identity_col = List::Util::first
427     { $source->column_info($_)->{is_auto_increment} }
428     $source->columns;
429
430   my $is_identity_update = $identity_col && defined $fields->{$identity_col};
431
432   if (not $blob_cols) {
433     $self->_set_session_identity(UPDATE => $table, 'ON')
434       if $is_identity_update;
435
436     return $self->next::method(@_);
437
438     $self->_set_session_identity(UPDATE => $table, 'OFF')
439       if $is_identity_update;
440   }
441
442 # If there are any blobs in $where, Sybase will return a descriptive error
443 # message.
444
445 # update+blob update(s) done atomically on separate connection
446   $self = $self->_writer_storage;
447
448   my $guard = $self->txn_scope_guard;
449
450 # First update the blob columns to be updated to '' (taken from $fields, where
451 # it is originally put by _remove_blob_cols .)
452   my %blobs_to_empty = map { ($_ => delete $fields->{$_}) } keys %$blob_cols;
453
454   $self->next::method($source, \%blobs_to_empty, $where, @rest);
455
456 # Now update the blobs before the other columns in case the update of other
457 # columns makes the search condition invalid.
458   $self->_update_blobs($source, $blob_cols, $where);
459
460   my @res;
461   if (%$fields) {
462     $self->_set_session_identity(UPDATE => $table, 'ON')
463       if $is_identity_update;
464
465     if ($wantarray) {
466       @res    = $self->next::method(@_);
467     }
468     elsif (defined $wantarray) {
469       $res[0] = $self->next::method(@_);
470     }
471     else {
472       $self->next::method(@_);
473     }
474
475     $self->_set_session_identity(UPDATE => $table, 'OFF')
476       if $is_identity_update;
477   }
478
479   $guard->commit;
480
481   return $wantarray ? @res : $res[0];
482 }
483
484 # for IDENTITY_INSERT / IDENTITY_UPDATE
485 sub _set_session_identity {
486   my ($self, $op, $table, $off_on) = @_;
487
488   my $sql = sprintf (
489     'SET IDENTITY_%s %s %s',
490     uc $op,
491     $self->sql_maker->_quote($table),
492     uc $off_on,
493   );
494
495   $self->_query_start($sql);
496
497   my $dbh = $self->_get_dbh;
498   eval {
499     local $dbh->{RaiseError} = 1;
500     local $dbh->{PrintError} = 0;
501     $dbh->do ($sql)
502   };
503   my $exception = $@;
504
505   $self->_query_end($sql);
506
507   if ($exception) {
508     $self->throw_exception (sprintf "Error executing '%s': %s",
509       $sql,
510       $exception,
511     );
512   }
513 }
514
515 sub insert_bulk {
516   my $self = shift;
517   my ($source, $cols, $data) = @_;
518
519   my $identity_col = List::Util::first
520     { $source->column_info($_)->{is_auto_increment} }
521     $source->columns;
522
523   my $is_identity_insert = (List::Util::first
524     { $_ eq $identity_col }
525     @{$cols}
526   ) ? 1 : 0;
527
528   my @source_columns = $source->columns;
529
530   my $use_bulk_api =
531     $self->_bulk_storage &&
532     $self->_get_dbh->{syb_has_blk};
533
534   if ((not $use_bulk_api) &&
535       (Scalar::Util::reftype($self->_dbi_connect_info->[0])||'') eq 'CODE' &&
536       (not $self->_bulk_disabled_due_to_coderef_connect_info_warned)) {
537     carp <<'EOF';
538 Bulk API support disabled due to use of a CODEREF connect_info. Reverting to
539 regular array inserts.
540 EOF
541     $self->_bulk_disabled_due_to_coderef_connect_info_warned(1);
542   }
543
544   if (not $use_bulk_api) {
545     my $blob_cols = $self->_remove_blob_cols_array($source, $cols, $data);
546
547     ($self, my ($guard)) = do {
548       if ($self->{transaction_depth} == 0 && $blob_cols) {
549         ($self->_writer_storage, $self->_writer_storage->txn_scope_guard);
550       }
551       else {
552         ($self, undef);
553       }
554     };
555
556     $self->next::method(@_);
557
558     if ($blob_cols) {
559       if ($is_identity_insert) {
560         $self->_insert_blobs_array ($source, $blob_cols, $cols, $data);
561       }
562       else {
563         my @cols_with_identities = (@$cols, $identity_col);
564
565         ## calculate identities
566         # XXX This assumes identities always increase by 1, which may or may not
567         # be true.
568         my ($last_identity) =
569           $self->_dbh->selectrow_array (
570             $self->_fetch_identity_sql($source, $identity_col)
571           );
572         my @identities = (($last_identity - @$data + 1) .. $last_identity);
573
574         my @data_with_identities = map [@$_, shift @identities], @$data;
575
576         $self->_insert_blobs_array (
577           $source, $blob_cols, \@cols_with_identities, \@data_with_identities
578         );
579       }
580     }
581
582     $guard->commit if $guard;
583     return;
584   }
585
586 # otherwise, use the bulk API
587
588 # rearrange @$data so that columns are in database order
589   my %orig_idx;
590   @orig_idx{@$cols} = 0..$#$cols;
591
592   my %new_idx;
593   @new_idx{@source_columns} = 0..$#source_columns;
594
595   my @new_data;
596   for my $datum (@$data) {
597     my $new_datum = [];
598     for my $col (@source_columns) {
599 # identity data will be 'undef' if not $is_identity_insert
600 # columns with defaults will also be 'undef'
601       $new_datum->[ $new_idx{$col} ] =
602         exists $orig_idx{$col} ? $datum->[ $orig_idx{$col} ] : undef;
603     }
604     push @new_data, $new_datum;
605   }
606
607 # bcp identity index is 1-based
608   my $identity_idx = exists $new_idx{$identity_col} ?
609     $new_idx{$identity_col} + 1 : 0;
610
611 ## Set a client-side conversion error handler, straight from DBD::Sybase docs.
612 # This ignores any data conversion errors detected by the client side libs, as
613 # they are usually harmless.
614   my $orig_cslib_cb = DBD::Sybase::set_cslib_cb(
615     Sub::Name::subname insert_bulk => sub {
616       my ($layer, $origin, $severity, $errno, $errmsg, $osmsg, $blkmsg) = @_;
617
618       return 1 if $errno == 36;
619
620       carp
621         "Layer: $layer, Origin: $origin, Severity: $severity, Error: $errno" .
622         ($errmsg ? "\n$errmsg" : '') .
623         ($osmsg  ? "\n$osmsg"  : '')  .
624         ($blkmsg ? "\n$blkmsg" : '');
625
626       return 0;
627   });
628
629   eval {
630     my $bulk = $self->_bulk_storage;
631
632     my $guard = $bulk->txn_scope_guard;
633
634 ## XXX get this to work instead of our own $sth
635 ## will require SQLA or *Hacks changes for ordered columns
636 #    $bulk->next::method($source, \@source_columns, \@new_data, {
637 #      syb_bcp_attribs => {
638 #        identity_flag   => $is_identity_insert,
639 #        identity_column => $identity_idx,
640 #      }
641 #    });
642     my $sql = 'INSERT INTO ' .
643       $bulk->sql_maker->_quote($source->name) . ' (' .
644 # colname list is ignored for BCP, but does no harm
645       (join ', ', map $bulk->sql_maker->_quote($_), @source_columns) . ') '.
646       ' VALUES ('.  (join ', ', ('?') x @source_columns) . ')';
647
648 ## XXX there's a bug in the DBD::Sybase bulk support that makes $sth->finish for
649 ## a prepare_cached statement ineffective. Replace with ->sth when fixed, or
650 ## better yet the version above. Should be fixed in DBD::Sybase .
651     my $sth = $bulk->_get_dbh->prepare($sql,
652 #      'insert', # op
653       {
654         syb_bcp_attribs => {
655           identity_flag   => $is_identity_insert,
656           identity_column => $identity_idx,
657         }
658       }
659     );
660
661     my @bind = do {
662       my $idx = 0;
663       map [ $_, $idx++ ], @source_columns;
664     };
665
666     $self->_execute_array(
667       $source, $sth, \@bind, \@source_columns, \@new_data, $guard
668     );
669
670     $bulk->_query_end($sql);
671   };
672   my $exception = $@;
673   if ($exception =~ /-Y option/) {
674     carp <<"EOF";
675
676 Sybase bulk API operation failed due to character set incompatibility, reverting
677 to regular array inserts:
678
679 *** Try unsetting the LANG environment variable.
680
681 $@
682 EOF
683     $self->_bulk_storage(undef);
684     DBD::Sybase::set_cslib_cb($orig_cslib_cb);
685     unshift @_, $self;
686     goto \&insert_bulk;
687   }
688   elsif ($exception) {
689     DBD::Sybase::set_cslib_cb($orig_cslib_cb);
690 # rollback makes the bulkLogin connection unusable
691     $self->_bulk_storage->disconnect;
692     $self->throw_exception($exception);
693   }
694
695   DBD::Sybase::set_cslib_cb($orig_cslib_cb);
696 }
697
698 # Make sure blobs are not bound as placeholders, and return any non-empty ones
699 # as a hash.
700 sub _remove_blob_cols {
701   my ($self, $source, $fields) = @_;
702
703   my %blob_cols;
704
705   for my $col (keys %$fields) {
706     if ($self->_is_lob_column($source, $col)) {
707       my $blob_val = delete $fields->{$col};
708       if (not defined $blob_val) {
709         $fields->{$col} = \'NULL';
710       }
711       else {
712         $fields->{$col} = \"''";
713         $blob_cols{$col} = $blob_val unless $blob_val eq '';
714       }
715     }
716   }
717
718   return keys %blob_cols ? \%blob_cols : undef;
719 }
720
721 # same for insert_bulk
722 sub _remove_blob_cols_array {
723   my ($self, $source, $cols, $data) = @_;
724
725   my @blob_cols;
726
727   for my $i (0..$#$cols) {
728     my $col = $cols->[$i];
729
730     if ($self->_is_lob_column($source, $col)) {
731       for my $j (0..$#$data) {
732         my $blob_val = delete $data->[$j][$i];
733         if (not defined $blob_val) {
734           $data->[$j][$i] = \'NULL';
735         }
736         else {
737           $data->[$j][$i] = \"''";
738           $blob_cols[$j][$i] = $blob_val
739             unless $blob_val eq '';
740         }
741       }
742     }
743   }
744
745   return @blob_cols ? \@blob_cols : undef;
746 }
747
748 sub _update_blobs {
749   my ($self, $source, $blob_cols, $where) = @_;
750
751   my (@primary_cols) = $source->primary_columns;
752
753   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
754     unless @primary_cols;
755
756 # check if we're updating a single row by PK
757   my $pk_cols_in_where = 0;
758   for my $col (@primary_cols) {
759     $pk_cols_in_where++ if defined $where->{$col};
760   }
761   my @rows;
762
763   if ($pk_cols_in_where == @primary_cols) {
764     my %row_to_update;
765     @row_to_update{@primary_cols} = @{$where}{@primary_cols};
766     @rows = \%row_to_update;
767   } else {
768     my $cursor = $self->select ($source, \@primary_cols, $where, {});
769     @rows = map {
770       my %row; @row{@primary_cols} = @$_; \%row
771     } $cursor->all;
772   }
773
774   for my $row (@rows) {
775     $self->_insert_blobs($source, $blob_cols, $row);
776   }
777 }
778
779 sub _insert_blobs {
780   my ($self, $source, $blob_cols, $row) = @_;
781   my $dbh = $self->_get_dbh;
782
783   my $table = $source->name;
784
785   my %row = %$row;
786   my (@primary_cols) = $source->primary_columns;
787
788   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without a primary key')
789     unless @primary_cols;
790
791   $self->throw_exception('Cannot update TEXT/IMAGE column(s) without primary key values')
792     if ((grep { defined $row{$_} } @primary_cols) != @primary_cols);
793
794   for my $col (keys %$blob_cols) {
795     my $blob = $blob_cols->{$col};
796
797     my %where = map { ($_, $row{$_}) } @primary_cols;
798
799     my $cursor = $self->select ($source, [$col], \%where, {});
800     $cursor->next;
801     my $sth = $cursor->sth;
802
803     if (not $sth) {
804
805       $self->throw_exception(
806           "Could not find row in table '$table' for blob update:\n"
807         . $self->_pretty_print (\%where)
808       );
809     }
810
811     eval {
812       do {
813         $sth->func('CS_GET', 1, 'ct_data_info') or die $sth->errstr;
814       } while $sth->fetch;
815
816       $sth->func('ct_prepare_send') or die $sth->errstr;
817
818       my $log_on_update = $self->_blob_log_on_update;
819       $log_on_update    = 1 if not defined $log_on_update;
820
821       $sth->func('CS_SET', 1, {
822         total_txtlen => length($blob),
823         log_on_update => $log_on_update
824       }, 'ct_data_info') or die $sth->errstr;
825
826       $sth->func($blob, length($blob), 'ct_send_data') or die $sth->errstr;
827
828       $sth->func('ct_finish_send') or die $sth->errstr;
829     };
830     my $exception = $@;
831     $sth->finish if $sth;
832     if ($exception) {
833       if ($self->using_freetds) {
834         $self->throw_exception (
835           'TEXT/IMAGE operation failed, probably because you are using FreeTDS: '
836           . $exception
837         );
838       } else {
839         $self->throw_exception($exception);
840       }
841     }
842   }
843 }
844
845 sub _insert_blobs_array {
846   my ($self, $source, $blob_cols, $cols, $data) = @_;
847
848   for my $i (0..$#$data) {
849     my $datum = $data->[$i];
850
851     my %row;
852     @row{ @$cols } = @$datum;
853
854     my %blob_vals;
855     for my $j (0..$#$cols) {
856       if (exists $blob_cols->[$i][$j]) {
857         $blob_vals{ $cols->[$j] } = $blob_cols->[$i][$j];
858       }
859     }
860
861     $self->_insert_blobs ($source, \%blob_vals, \%row);
862   }
863 }
864
865 =head2 connect_call_datetime_setup
866
867 Used as:
868
869   on_connect_call => 'datetime_setup'
870
871 In L<DBIx::Class::Storage::DBI/connect_info> to set:
872
873   $dbh->syb_date_fmt('ISO_strict'); # output fmt: 2004-08-21T14:36:48.080Z
874   $dbh->do('set dateformat mdy');   # input fmt:  08/13/1979 18:08:55.080
875
876 On connection for use with L<DBIx::Class::InflateColumn::DateTime>, using
877 L<DateTime::Format::Sybase>, which you will need to install.
878
879 This works for both C<DATETIME> and C<SMALLDATETIME> columns, although
880 C<SMALLDATETIME> columns only have minute precision.
881
882 =cut
883
884 {
885   my $old_dbd_warned = 0;
886
887   sub connect_call_datetime_setup {
888     my $self = shift;
889     my $dbh = $self->_dbh;
890
891     if ($dbh->can('syb_date_fmt')) {
892       # amazingly, this works with FreeTDS
893       $dbh->syb_date_fmt('ISO_strict');
894     } elsif (not $old_dbd_warned) {
895       carp "Your DBD::Sybase is too old to support ".
896       "DBIx::Class::InflateColumn::DateTime, please upgrade!";
897       $old_dbd_warned = 1;
898     }
899
900     $dbh->do('SET DATEFORMAT mdy');
901
902     1;
903   }
904 }
905
906 sub datetime_parser_type { "DateTime::Format::Sybase" }
907
908 # ->begin_work and such have no effect with FreeTDS but we run them anyway to
909 # let the DBD keep any state it needs to.
910 #
911 # If they ever do start working, the extra statements will do no harm (because
912 # Sybase supports nested transactions.)
913
914 sub _dbh_begin_work {
915   my $self = shift;
916
917 # bulkLogin=1 connections are always in a transaction, and can only call BEGIN
918 # TRAN once. However, we need to make sure there's a $dbh.
919   return if $self->_is_bulk_storage && $self->_dbh && $self->_began_bulk_work;
920
921   $self->next::method(@_);
922
923   if ($self->using_freetds) {
924     $self->_get_dbh->do('BEGIN TRAN');
925   }
926
927   $self->_began_bulk_work(1) if $self->_is_bulk_storage;
928 }
929
930 sub _dbh_commit {
931   my $self = shift;
932   if ($self->using_freetds) {
933     $self->_dbh->do('COMMIT');
934   }
935   return $self->next::method(@_);
936 }
937
938 sub _dbh_rollback {
939   my $self = shift;
940   if ($self->using_freetds) {
941     $self->_dbh->do('ROLLBACK');
942   }
943   return $self->next::method(@_);
944 }
945
946 # savepoint support using ASE syntax
947
948 sub _svp_begin {
949   my ($self, $name) = @_;
950
951   $self->_get_dbh->do("SAVE TRANSACTION $name");
952 }
953
954 # A new SAVE TRANSACTION with the same name releases the previous one.
955 sub _svp_release { 1 }
956
957 sub _svp_rollback {
958   my ($self, $name) = @_;
959
960   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
961 }
962
963 1;
964
965 =head1 Schema::Loader Support
966
967 There is an experimental branch of L<DBIx::Class::Schema::Loader> that will
968 allow you to dump a schema from most (if not all) versions of Sybase.
969
970 It is available via subversion from:
971
972   http://dev.catalyst.perl.org/repos/bast/branches/DBIx-Class-Schema-Loader/current/
973
974 =head1 FreeTDS
975
976 This driver supports L<DBD::Sybase> compiled against FreeTDS
977 (L<http://www.freetds.org/>) to the best of our ability, however it is
978 recommended that you recompile L<DBD::Sybase> against the Sybase Open Client
979 libraries. They are a part of the Sybase ASE distribution:
980
981 The Open Client FAQ is here:
982 L<http://www.isug.com/Sybase_FAQ/ASE/section7.html>.
983
984 Sybase ASE for Linux (which comes with the Open Client libraries) may be
985 downloaded here: L<http://response.sybase.com/forms/ASE_Linux_Download>.
986
987 To see if you're using FreeTDS check C<< $schema->storage->using_freetds >>, or run:
988
989   perl -MDBI -le 'my $dbh = DBI->connect($dsn, $user, $pass); print $dbh->{syb_oc_version}'
990
991 Some versions of the libraries involved will not support placeholders, in which
992 case the storage will be reblessed to
993 L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>.
994
995 In some configurations, placeholders will work but will throw implicit type
996 conversion errors for anything that's not expecting a string. In such a case,
997 the C<auto_cast> option from L<DBIx::Class::Storage::DBI::AutoCast> is
998 automatically set, which you may enable on connection with
999 L<DBIx::Class::Storage::DBI::AutoCast/connect_call_set_auto_cast>. The type info
1000 for the C<CAST>s is taken from the L<DBIx::Class::ResultSource/data_type>
1001 definitions in your Result classes, and are mapped to a Sybase type (if it isn't
1002 already) using a mapping based on L<SQL::Translator>.
1003
1004 In other configurations, placeholers will work just as they do with the Sybase
1005 Open Client libraries.
1006
1007 Inserts or updates of TEXT/IMAGE columns will B<NOT> work with FreeTDS.
1008
1009 =head1 INSERTS WITH PLACEHOLDERS
1010
1011 With placeholders enabled, inserts are done in a transaction so that there are
1012 no concurrency issues with getting the inserted identity value using
1013 C<SELECT MAX(col)>, which is the only way to get the C<IDENTITY> value in this
1014 mode.
1015
1016 In addition, they are done on a separate connection so that it's possible to
1017 have active cursors when doing an insert.
1018
1019 When using C<DBIx::Class::Storage::DBI::Sybase::NoBindVars> transactions are
1020 disabled, as there are no concurrency issues with C<SELECT @@IDENTITY> as it's a
1021 session variable.
1022
1023 =head1 TRANSACTIONS
1024
1025 Due to limitations of the TDS protocol, L<DBD::Sybase>, or both; you cannot
1026 begin a transaction while there are active cursors; nor can you use multiple
1027 active cursors within a transaction. An active cursor is, for example, a
1028 L<ResultSet|DBIx::Class::ResultSet> that has been executed using C<next> or
1029 C<first> but has not been exhausted or L<reset|DBIx::Class::ResultSet/reset>.
1030
1031 For example, this will not work:
1032
1033   $schema->txn_do(sub {
1034     my $rs = $schema->resultset('Book');
1035     while (my $row = $rs->next) {
1036       $schema->resultset('MetaData')->create({
1037         book_id => $row->id,
1038         ...
1039       });
1040     }
1041   });
1042
1043 This won't either:
1044
1045   my $first_row = $large_rs->first;
1046   $schema->txn_do(sub { ... });
1047
1048 Transactions done for inserts in C<AutoCommit> mode when placeholders are in use
1049 are not affected, as they are done on an extra database handle.
1050
1051 Some workarounds:
1052
1053 =over 4
1054
1055 =item * use L<DBIx::Class::Storage::DBI::Replicated>
1056
1057 =item * L<connect|DBIx::Class::Schema/connect> another L<Schema|DBIx::Class::Schema>
1058
1059 =item * load the data from your cursor with L<DBIx::Class::ResultSet/all>
1060
1061 =back
1062
1063 =head1 MAXIMUM CONNECTIONS
1064
1065 The TDS protocol makes separate connections to the server for active statements
1066 in the background. By default the number of such connections is limited to 25,
1067 on both the client side and the server side.
1068
1069 This is a bit too low for a complex L<DBIx::Class> application, so on connection
1070 the client side setting is set to C<256> (see L<DBD::Sybase/maxConnect>.) You
1071 can override it to whatever setting you like in the DSN.
1072
1073 See
1074 L<http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.ase_15.0.sag1/html/sag1/sag1272.htm>
1075 for information on changing the setting on the server side.
1076
1077 =head1 DATES
1078
1079 See L</connect_call_datetime_setup> to setup date formats
1080 for L<DBIx::Class::InflateColumn::DateTime>.
1081
1082 =head1 TEXT/IMAGE COLUMNS
1083
1084 L<DBD::Sybase> compiled with FreeTDS will B<NOT> allow you to insert or update
1085 C<TEXT/IMAGE> columns.
1086
1087 Setting C<< $dbh->{LongReadLen} >> will also not work with FreeTDS use either:
1088
1089   $schema->storage->dbh->do("SET TEXTSIZE $bytes");
1090
1091 or
1092
1093   $schema->storage->set_textsize($bytes);
1094
1095 instead.
1096
1097 However, the C<LongReadLen> you pass in
1098 L<DBIx::Class::Storage::DBI/connect_info> is used to execute the equivalent
1099 C<SET TEXTSIZE> command on connection.
1100
1101 See L</connect_call_blob_setup> for a L<DBIx::Class::Storage::DBI/connect_info>
1102 setting you need to work with C<IMAGE> columns.
1103
1104 =head1 BULK API
1105
1106 The experimental L<DBD::Sybase> Bulk API support is used for
1107 L<populate|DBIx::Class::ResultSet/populate> in B<void> context, in a transaction
1108 on a separate connection.
1109
1110 To use this feature effectively, use a large number of rows for each
1111 L<populate|DBIx::Class::ResultSet/populate> call, eg.:
1112
1113   while (my $rows = $data_source->get_100_rows()) {
1114     $rs->populate($rows);
1115   }
1116
1117 B<NOTE:> the L<add_columns|DBIx::Class::ResultSource/add_columns>
1118 calls in your C<Result> classes B<must> list columns in database order for this
1119 to work. Also, you may have to unset the C<LANG> environment variable before
1120 loading your app, if it doesn't match the character set of your database.
1121
1122 When inserting IMAGE columns using this method, you'll need to use
1123 L</connect_call_blob_setup> as well.
1124
1125 =head1 AUTHOR
1126
1127 See L<DBIx::Class/CONTRIBUTORS>.
1128
1129 =head1 LICENSE
1130
1131 You may distribute this code under the same terms as Perl itself.
1132
1133 =cut
1134 # vim:sts=2 sw=2: