remove unsafe_insert
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / Storage / DBI / Sybase.pm
1 package DBIx::Class::Storage::DBI::Sybase;
2
3 use strict;
4 use warnings;
5
6 use base qw/
7     DBIx::Class::Storage::DBI::Sybase::Common
8     DBIx::Class::Storage::DBI::AutoCast
9 /;
10 use mro 'c3';
11 use Carp::Clan qw/^DBIx::Class/;
12 use List::Util ();
13
14 __PACKAGE__->mk_group_accessors('simple' =>
15     qw/_identity _blob_log_on_update _insert_dbh _identity_method/
16 );
17
18 =head1 NAME
19
20 DBIx::Class::Storage::DBI::Sybase - Sybase support for DBIx::Class
21
22 =head1 SYNOPSIS
23
24 This subclass supports L<DBD::Sybase> for real Sybase databases.  If you are
25 using an MSSQL database via L<DBD::Sybase>, your storage will be reblessed to
26 L<DBIx::Class::Storage::DBI::Sybase::Microsoft_SQL_Server>.
27
28 =head1 DESCRIPTION
29
30 If your version of Sybase does not support placeholders, then your storage
31 will be reblessed to L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>. You can
32 also enable that driver explicitly, see the documentation for more details.
33
34 With this driver there is unfortunately no way to get the C<last_insert_id>
35 without doing a C<SELECT MAX(col)>. This is done safely in a transaction
36 (locking the table.) See L</INSERTS WITH PLACEHOLDERS>.
37
38 A recommended L<DBIx::Class::Storage::DBI/connect_info> setting:
39
40   on_connect_call => [['datetime_setup'], ['blob_setup', log_on_update => 0]]
41
42 =head1 METHODS
43
44 =cut
45
46 sub _rebless {
47   my $self = shift;
48
49   if (ref($self) eq 'DBIx::Class::Storage::DBI::Sybase') {
50     my $dbtype = eval {
51       @{$self->_get_dbh->selectrow_arrayref(qq{sp_server_info \@attribute_id=1})}[2]
52     } || '';
53
54     my $exception = $@;
55     $dbtype =~ s/\W/_/gi;
56     my $subclass = "DBIx::Class::Storage::DBI::Sybase::${dbtype}";
57
58     if (!$exception && $dbtype && $self->load_optional_class($subclass)) {
59       bless $self, $subclass;
60       $self->_rebless;
61     } else { # real Sybase
62       my $no_bind_vars = 'DBIx::Class::Storage::DBI::Sybase::NoBindVars';
63
64       if ($self->using_freetds) {
65         carp <<'EOF' unless $ENV{DBIC_SYBASE_FREETDS_NOWARN};
66
67 You are using FreeTDS with Sybase.
68
69 We will do our best to support this configuration, but please consider this
70 support experimental.
71
72 TEXT/IMAGE columns will definitely not work.
73
74 You are encouraged to recompile DBD::Sybase with the Sybase Open Client libraries
75 instead.
76
77 See perldoc DBIx::Class::Storage::DBI::Sybase for more details.
78
79 To turn off this warning set the DBIC_SYBASE_FREETDS_NOWARN environment
80 variable.
81 EOF
82         if (not $self->_typeless_placeholders_supported) {
83           if ($self->_placeholders_supported) {
84             $self->auto_cast(1);
85           } else {
86             $self->ensure_class_loaded($no_bind_vars);
87             bless $self, $no_bind_vars;
88             $self->_rebless;
89           }
90         }
91
92         $self->set_textsize; # based on LongReadLen in connect_info
93
94       }
95       elsif (not $self->_get_dbh->{syb_dynamic_supported}) {
96         # not necessarily FreeTDS, but no placeholders nevertheless
97         $self->ensure_class_loaded($no_bind_vars);
98         bless $self, $no_bind_vars;
99         $self->_rebless;
100       } elsif (not $self->_typeless_placeholders_supported) {
101 # this is highly unlikely, but we check just in case
102         $self->auto_cast(1);
103       }
104
105       $self->_set_max_connect(256);
106     }
107   }
108 }
109
110 # Make sure we have CHAINED mode turned on if AutoCommit is off in non-FreeTDS
111 # DBD::Sybase (since we don't know how DBD::Sybase was compiled.) If however
112 # we're using FreeTDS, CHAINED mode turns on an implicit transaction which we
113 # only want when AutoCommit is off.
114 sub _populate_dbh {
115   my $self = shift;
116
117   $self->next::method(@_);
118
119   if (not $self->using_freetds) {
120     $self->_dbh->{syb_chained_txn} = 1;
121   } else {
122     if ($self->_dbh_autocommit) {
123       $self->_dbh->do('SET CHAINED OFF');
124     } else {
125       $self->_dbh->do('SET CHAINED ON');
126     }
127   }
128 }
129
130 =head2 connect_call_blob_setup
131
132 Used as:
133
134   on_connect_call => [ [ 'blob_setup', log_on_update => 0 ] ]
135
136 Does C<< $dbh->{syb_binary_images} = 1; >> to return C<IMAGE> data as raw binary
137 instead of as a hex string.
138
139 Recommended.
140
141 Also sets the C<log_on_update> value for blob write operations. The default is
142 C<1>, but C<0> is better if your database is configured for it.
143
144 See
145 L<DBD::Sybase/Handling_IMAGE/TEXT_data_with_syb_ct_get_data()/syb_ct_send_data()>.
146
147 =cut
148
149 sub connect_call_blob_setup {
150   my $self = shift;
151   my %args = @_;
152   my $dbh = $self->_dbh;
153   $dbh->{syb_binary_images} = 1;
154
155   $self->_blob_log_on_update($args{log_on_update})
156     if exists $args{log_on_update};
157 }
158
159 sub _is_lob_type {
160   my $self = shift;
161   my $type = shift;
162   $type && $type =~ /(?:text|image|lob|bytea|binary|memo)/i;
163 }
164
165 sub _prep_for_execute {
166   my $self = shift;
167   my ($op, $extra_bind, $ident, $args) = @_;
168
169   my ($sql, $bind) = $self->next::method (@_);
170
171   if ($op eq 'insert') {
172     my $table = $ident->from;
173
174     my $bind_info = $self->_resolve_column_info(
175       $ident, [map $_->[0], @{$bind}]
176     );
177     my $identity_col = List::Util::first
178       { $bind_info->{$_}{is_auto_increment} }
179       (keys %$bind_info)
180     ;
181
182     if ($identity_col) {
183       $sql = join ("\n",
184         "SET IDENTITY_INSERT $table ON",
185         $sql,
186         "SET IDENTITY_INSERT $table OFF",
187       );
188     }
189     else {
190       $identity_col = List::Util::first
191         { $ident->column_info($_)->{is_auto_increment} }
192         $ident->columns
193       ;
194     }
195
196     if ($identity_col) {
197       $sql =
198         "$sql\n" .
199         $self->_fetch_identity_sql($ident, $identity_col);
200     }
201   }
202
203   return ($sql, $bind);
204 }
205
206 # Stolen from SQLT, with some modifications. This is a makeshift
207 # solution before a sane type-mapping library is available, thus
208 # the 'our' for easy overrides.
209 our %TYPE_MAPPING  = (
210     number    => 'numeric',
211     money     => 'money',
212     varchar   => 'varchar',
213     varchar2  => 'varchar',
214     timestamp => 'datetime',
215     text      => 'varchar',
216     real      => 'double precision',
217     comment   => 'text',
218     bit       => 'bit',
219     tinyint   => 'smallint',
220     float     => 'double precision',
221     serial    => 'numeric',
222     bigserial => 'numeric',
223     boolean   => 'varchar',
224     long      => 'varchar',
225 );
226
227 sub _native_data_type {
228   my ($self, $type) = @_;
229
230   $type = lc $type;
231   $type =~ s/\s* identity//x;
232
233   return uc($TYPE_MAPPING{$type} || $type);
234 }
235
236 sub _fetch_identity_sql {
237   my ($self, $source, $col) = @_;
238
239   return "SELECT MAX($col) FROM ".$source->from;
240 }
241
242 sub _execute {
243   my $self = shift;
244   my ($op) = @_;
245
246   my ($rv, $sth, @bind) = $self->dbh_do($self->can('_dbh_execute'), @_);
247
248   if ($op eq 'insert') {
249     $self->_identity($sth->fetchrow_array);
250     $sth->finish;
251   }
252
253   return wantarray ? ($rv, $sth, @bind) : $rv;
254 }
255
256 sub last_insert_id { shift->_identity }
257
258 # handles TEXT/IMAGE and transaction for last_insert_id
259 sub insert {
260   my $self = shift;
261   my ($source, $to_insert) = @_;
262
263   my $blob_cols = $self->_remove_blob_cols($source, $to_insert);
264
265 # insert+blob insert done atomically, on _insert_dbh
266   (my ($guard), local ($self->{_dbh})) = do {
267     $self->_insert_dbh($self->_connect(@{ $self->_dbi_connect_info }))
268       unless $self->_insert_dbh;
269
270     my $new_guard = $self->txn_scope_guard;
271
272 # _dbh_begin_work may reconnect, if so we need to update _insert_dbh
273     $self->_insert_dbh($self->_dbh);
274
275     ($new_guard, $self->_insert_dbh)
276   } if $blob_cols;
277
278   my $need_last_insert_id = 0;
279
280   my ($identity_col) =
281     map $_->[0],
282     grep $_->[1]{is_auto_increment},
283     map [ $_, $source->column_info($_) ],
284     $source->columns;
285
286   $need_last_insert_id = 1
287     if $identity_col && (not exists $to_insert->{$identity_col});
288
289   # We have to do the insert in a transaction to avoid race conditions with the
290   # SELECT MAX(COL) identity method used when placeholders are enabled.
291   my $updated_cols = do {
292     no warnings 'uninitialized';
293     if (
294       $need_last_insert_id &&
295       $self->_identity_method ne '@@IDENTITY' &&
296       !$self->{transaction_depth}
297     ) {
298       $self->_insert_dbh($self->_connect(@{ $self->_dbi_connect_info }))
299         unless $self->_insert_dbh;
300       local $self->{_dbh} = $self->_insert_dbh;
301
302       my $guard = $self->txn_scope_guard;
303
304 # _dbh_begin_work may reconnect, if so we need to update _insert_dbh
305       $self->_insert_dbh($self->_dbh);
306
307       my $upd_cols = $self->next::method (@_);
308       $guard->commit;
309
310       $upd_cols;
311     }
312     else {
313       $self->next::method(@_);
314     }
315   };
316
317   $self->_insert_blobs($source, $blob_cols, $to_insert) if $blob_cols;
318
319   $guard->commit if $guard;
320
321   return $updated_cols;
322 }
323
324 sub update {
325   my $self = shift;
326   my ($source, $fields, $where) = @_;
327
328   my $wantarray = wantarray;
329
330   my $blob_cols = $self->_remove_blob_cols($source, $fields);
331
332 # update+blob update(s) done atomically
333   my $guard = $self->txn_scope_guard if $blob_cols;
334
335   my @res;
336   if ($wantarray) {
337     @res    = $self->next::method(@_);
338   }
339   elsif (defined $wantarray) {
340     $res[0] = $self->next::method(@_);
341   }
342   else {
343     $self->next::method(@_);
344   }
345
346   $self->_update_blobs($source, $blob_cols, $where) if $blob_cols;
347
348   $guard->commit if $guard;
349
350   return $wantarray ? @res : $res[0];
351 }
352
353 sub _remove_blob_cols {
354   my ($self, $source, $fields) = @_;
355
356   my %blob_cols;
357
358   for my $col (keys %$fields) {
359     if ($self->_is_lob_type($source->column_info($col)->{data_type})) {
360       $blob_cols{$col} = delete $fields->{$col};
361       $fields->{$col} = \"''";
362     }
363   }
364
365   return keys %blob_cols ? \%blob_cols : undef;
366 }
367
368 sub _update_blobs {
369   my ($self, $source, $blob_cols, $where) = @_;
370
371   my (@primary_cols) = $source->primary_columns;
372
373   croak "Cannot update TEXT/IMAGE column(s) without a primary key"
374     unless @primary_cols;
375
376 # check if we're updating a single row by PK
377   my $pk_cols_in_where = 0;
378   for my $col (@primary_cols) {
379     $pk_cols_in_where++ if defined $where->{$col};
380   }
381   my @rows;
382
383   if ($pk_cols_in_where == @primary_cols) {
384     my %row_to_update;
385     @row_to_update{@primary_cols} = @{$where}{@primary_cols};
386     @rows = \%row_to_update;
387   } else {
388     my $rs = $source->resultset->search(
389       $where,
390       {
391         result_class => 'DBIx::Class::ResultClass::HashRefInflator',
392         select => \@primary_cols
393       }
394     );
395     @rows = $rs->all; # statement must finish
396   }
397
398   for my $row (@rows) {
399     $self->_insert_blobs($source, $blob_cols, $row);
400   }
401 }
402
403 sub _insert_blobs {
404   my ($self, $source, $blob_cols, $row) = @_;
405   my $dbh = $self->_get_dbh;
406
407   my $table = $source->from;
408
409   my %row = %$row;
410   my (@primary_cols) = $source->primary_columns;
411
412   croak "Cannot update TEXT/IMAGE column(s) without a primary key"
413     unless @primary_cols;
414
415   if ((grep { defined $row{$_} } @primary_cols) != @primary_cols) {
416     if (@primary_cols == 1) {
417       my $col = $primary_cols[0];
418       $row{$col} = $self->last_insert_id($source, $col);
419     } else {
420       croak "Cannot update TEXT/IMAGE column(s) without primary key values";
421     }
422   }
423
424   for my $col (keys %$blob_cols) {
425     my $blob = $blob_cols->{$col};
426
427     my %where = map { ($_, $row{$_}) } @primary_cols;
428     my $cursor = $source->resultset->search(\%where, {
429       select => [$col]
430     })->cursor;
431     $cursor->next;
432     my $sth = $cursor->sth;
433
434     eval {
435       do {
436         $sth->func('CS_GET', 1, 'ct_data_info') or die $sth->errstr;
437       } while $sth->fetch;
438
439       $sth->func('ct_prepare_send') or die $sth->errstr;
440
441       my $log_on_update = $self->_blob_log_on_update;
442       $log_on_update    = 1 if not defined $log_on_update;
443
444       $sth->func('CS_SET', 1, {
445         total_txtlen => length($blob),
446         log_on_update => $log_on_update
447       }, 'ct_data_info') or die $sth->errstr;
448
449       $sth->func($blob, length($blob), 'ct_send_data') or die $sth->errstr;
450
451       $sth->func('ct_finish_send') or die $sth->errstr;
452     };
453     my $exception = $@;
454     $sth->finish if $sth;
455     if ($exception) {
456       if ($self->using_freetds) {
457         croak (
458           'TEXT/IMAGE operation failed, probably because you are using FreeTDS: '
459           . $exception
460         );
461       } else {
462         croak $exception;
463       }
464     }
465   }
466 }
467
468 =head2 connect_call_datetime_setup
469
470 Used as:
471
472   on_connect_call => 'datetime_setup'
473
474 In L<DBIx::Class::Storage::DBI/connect_info> to set:
475
476   $dbh->syb_date_fmt('ISO_strict'); # output fmt: 2004-08-21T14:36:48.080Z
477   $dbh->do('set dateformat mdy');   # input fmt:  08/13/1979 18:08:55.080
478
479 On connection for use with L<DBIx::Class::InflateColumn::DateTime>, using
480 L<DateTime::Format::Sybase>, which you will need to install.
481
482 This works for both C<DATETIME> and C<SMALLDATETIME> columns, although
483 C<SMALLDATETIME> columns only have minute precision.
484
485 =cut
486
487 {
488   my $old_dbd_warned = 0;
489
490   sub connect_call_datetime_setup {
491     my $self = shift;
492     my $dbh = $self->_dbh;
493
494     if ($dbh->can('syb_date_fmt')) {
495       # amazingly, this works with FreeTDS
496       $dbh->syb_date_fmt('ISO_strict');
497     } elsif (not $old_dbd_warned) {
498       carp "Your DBD::Sybase is too old to support ".
499       "DBIx::Class::InflateColumn::DateTime, please upgrade!";
500       $old_dbd_warned = 1;
501     }
502
503     $dbh->do('SET DATEFORMAT mdy');
504
505     1;
506   }
507 }
508
509 sub datetime_parser_type { "DateTime::Format::Sybase" }
510
511 # ->begin_work and such have no effect with FreeTDS but we run them anyway to
512 # let the DBD keep any state it needs to.
513 #
514 # If they ever do start working, the extra statements will do no harm (because
515 # Sybase supports nested transactions.)
516
517 sub _dbh_begin_work {
518   my $self = shift;
519   $self->next::method(@_);
520   if ($self->using_freetds) {
521     $self->_get_dbh->do('BEGIN TRAN');
522   }
523 }
524
525 sub _dbh_commit {
526   my $self = shift;
527   if ($self->using_freetds) {
528     $self->_dbh->do('COMMIT');
529   }
530   return $self->next::method(@_);
531 }
532
533 sub _dbh_rollback {
534   my $self = shift;
535   if ($self->using_freetds) {
536     $self->_dbh->do('ROLLBACK');
537   }
538   return $self->next::method(@_);
539 }
540
541 # savepoint support using ASE syntax
542
543 sub _svp_begin {
544   my ($self, $name) = @_;
545
546   $self->_get_dbh->do("SAVE TRANSACTION $name");
547 }
548
549 # A new SAVE TRANSACTION with the same name releases the previous one.
550 sub _svp_release { 1 }
551
552 sub _svp_rollback {
553   my ($self, $name) = @_;
554
555   $self->_get_dbh->do("ROLLBACK TRANSACTION $name");
556 }
557
558 1;
559
560 =head1 Schema::Loader Support
561
562 There is an experimental branch of L<DBIx::Class::Schema::Loader> that will
563 allow you to dump a schema from most (if not all) versions of Sybase.
564
565 It is available via subversion from:
566
567   http://dev.catalyst.perl.org/repos/bast/branches/DBIx-Class-Schema-Loader/current/
568
569 =head1 FreeTDS
570
571 This driver supports L<DBD::Sybase> compiled against FreeTDS
572 (L<http://www.freetds.org/>) to the best of our ability, however it is
573 recommended that you recompile L<DBD::Sybase> against the Sybase Open Client
574 libraries. They are a part of the Sybase ASE distribution:
575
576 The Open Client FAQ is here:
577 L<http://www.isug.com/Sybase_FAQ/ASE/section7.html>.
578
579 Sybase ASE for Linux (which comes with the Open Client libraries) may be
580 downloaded here: L<http://response.sybase.com/forms/ASE_Linux_Download>.
581
582 To see if you're using FreeTDS check C<< $schema->storage->using_freetds >>, or run:
583
584   perl -MDBI -le 'my $dbh = DBI->connect($dsn, $user, $pass); print $dbh->{syb_oc_version}'
585
586 Some versions of the libraries involved will not support placeholders, in which
587 case the storage will be reblessed to
588 L<DBIx::Class::Storage::DBI::Sybase::NoBindVars>.
589
590 In some configurations, placeholders will work but will throw implicit type
591 conversion errors for anything that's not expecting a string. In such a case,
592 the C<auto_cast> option from L<DBIx::Class::Storage::DBI::AutoCast> is
593 automatically set, which you may enable on connection with
594 L<DBIx::Class::Storage::DBI::AutoCast/connect_call_set_auto_cast>. The type info
595 for the C<CAST>s is taken from the L<DBIx::Class::ResultSource/data_type>
596 definitions in your Result classes, and are mapped to a Sybase type (if it isn't
597 already) using a mapping based on L<SQL::Translator>.
598
599 In other configurations, placeholers will work just as they do with the Sybase
600 Open Client libraries.
601
602 Inserts or updates of TEXT/IMAGE columns will B<NOT> work with FreeTDS.
603
604 =head1 INSERTS WITH PLACEHOLDERS
605
606 With placeholders enabled, inserts are done in a transaction so that there are
607 no concurrency issues with getting the inserted identity value using
608 C<SELECT MAX(col)>, which is the only way to get the C<IDENTITY> value in this
609 mode.
610
611 When using C<DBIx::Class::Storage::DBI::Sybase::NoBindVars> transactions are
612 disabled, as there are no concurrency issues with C<SELECT @@IDENTITY> as it's a
613 session variable.
614
615 =head1 TRANSACTIONS
616
617 Due to limitations of the TDS protocol, L<DBD::Sybase>, or both; you cannot
618 begin a transaction while there are active cursors. An active cursor is, for
619 example, a L<ResultSet|DBIx::Class::ResultSet> that has been executed using
620 C<next> or C<first> but has not been exhausted or
621 L<reset|DBIx::Class::ResultSet/reset>.
622
623 For example, this will not work:
624
625   $schema->txn_do(sub {
626     my $rs = $schema->resultset('Book');
627     while (my $row = $rs->next) {
628       $schema->resultset('MetaData')->create({
629         book_id => $row->id,
630         ...
631       });
632     }
633   });
634
635 Transactions done for inserts in C<AutoCommit> mode when placeholders are in use
636 are not affected, as they use an extra database handle to do the insert.
637
638 Some workarounds:
639
640 =over 4
641
642 =item * use L<DBIx::Class::Storage::DBI::Replicated>
643
644 =item * L<connect|DBIx::Class::Schema/connect> another L<Schema|DBIx::Class::Schema>
645
646 =item * load the data from your cursor with L<DBIx::Class::ResultSet/all>
647
648 =back
649
650 =head1 MAXIMUM CONNECTIONS
651
652 The TDS protocol makes separate connections to the server for active statements
653 in the background. By default the number of such connections is limited to 25,
654 on both the client side and the server side.
655
656 This is a bit too low for a complex L<DBIx::Class> application, so on connection
657 the client side setting is set to C<256> (see L<DBD::Sybase/maxConnect>.) You
658 can override it to whatever setting you like in the DSN.
659
660 See
661 L<http://infocenter.sybase.com/help/index.jsp?topic=/com.sybase.help.ase_15.0.sag1/html/sag1/sag1272.htm>
662 for information on changing the setting on the server side.
663
664 =head1 DATES
665
666 See L</connect_call_datetime_setup> to setup date formats
667 for L<DBIx::Class::InflateColumn::DateTime>.
668
669 =head1 TEXT/IMAGE COLUMNS
670
671 L<DBD::Sybase> compiled with FreeTDS will B<NOT> allow you to insert or update
672 C<TEXT/IMAGE> columns.
673
674 Setting C<< $dbh->{LongReadLen} >> will also not work with FreeTDS use either:
675
676   $schema->storage->dbh->do("SET TEXTSIZE $bytes");
677
678 or
679
680   $schema->storage->set_textsize($bytes);
681
682 instead.
683
684 However, the C<LongReadLen> you pass in
685 L<DBIx::Class::Storage::DBI/connect_info> is used to execute the equivalent
686 C<SET TEXTSIZE> command on connection.
687
688 See L</connect_call_blob_setup> for a L<DBIx::Class::Storage::DBI/connect_info>
689 setting you need to work with C<IMAGE> columns.
690
691 =head1 AUTHOR
692
693 See L<DBIx::Class/CONTRIBUTORS>.
694
695 =head1 LICENSE
696
697 You may distribute this code under the same terms as Perl itself.
698
699 =cut
700 # vim:sts=2 sw=2: