Fix pessimization of offset-less Oracle limits, introduced in 6a6394f1
[dbsrgits/DBIx-Class.git] / t / 73oracle_hq.t
1 use strict;
2 use warnings;
3
4 use Test::Exception;
5 use Test::More;
6 use DBIx::Class::Optional::Dependencies ();
7 use lib qw(t/lib);
8 use DBIC::SqlMakerTest;
9
10 use DBIx::Class::SQLMaker::LimitDialects;
11 my $ROWS = DBIx::Class::SQLMaker::LimitDialects->__rows_bindtype,
12 my $TOTAL = DBIx::Class::SQLMaker::LimitDialects->__total_bindtype,
13
14 $ENV{NLS_SORT} = "BINARY";
15 $ENV{NLS_COMP} = "BINARY";
16 $ENV{NLS_LANG} = "AMERICAN";
17
18 my ($dsn,  $user,  $pass)  = @ENV{map { "DBICTEST_ORA_${_}" }  qw/DSN USER PASS/};
19
20 plan skip_all => 'Set $ENV{DBICTEST_ORA_DSN}, _USER and _PASS to run this test.'
21  unless ($dsn && $user && $pass);
22
23 plan skip_all => 'Test needs ' . DBIx::Class::Optional::Dependencies->req_missing_for ('rdbms_oracle')
24   unless DBIx::Class::Optional::Dependencies->req_ok_for ('rdbms_oracle');
25
26 use DBICTest::Schema::Artist;
27 BEGIN {
28   DBICTest::Schema::Artist->add_column('parentid');
29
30   DBICTest::Schema::Artist->has_many(
31     children => 'DBICTest::Schema::Artist',
32     { 'foreign.parentid' => 'self.artistid' }
33   );
34
35   DBICTest::Schema::Artist->belongs_to(
36     parent => 'DBICTest::Schema::Artist',
37     { 'foreign.artistid' => 'self.parentid' }
38   );
39 }
40
41 use DBICTest::Schema;
42
43 my $schema = DBICTest::Schema->connect($dsn, $user, $pass);
44
45 note "Oracle Version: " . $schema->storage->_server_info->{dbms_version};
46
47 my $dbh = $schema->storage->dbh;
48 do_creates($dbh);
49
50 ### test hierarchical queries
51 {
52   $schema->resultset('Artist')->create ({
53     name => 'root',
54     rank => 1,
55     cds => [],
56     children => [
57       {
58         name => 'child1',
59         rank => 2,
60         children => [
61           {
62             name => 'grandchild',
63             rank => 3,
64             cds => [
65               {
66                 title => "grandchilds's cd" ,
67                 year => '2008',
68                 tracks => [
69                   {
70                     position => 1,
71                     title => 'Track 1 grandchild',
72                   }
73                 ],
74               }
75             ],
76             children => [
77               {
78                 name => 'greatgrandchild',
79                 rank => 3,
80               }
81             ],
82           }
83         ],
84       },
85       {
86         name => 'child2',
87         rank => 3,
88       },
89     ],
90   });
91
92   $schema->resultset('Artist')->create({
93     name => 'cycle-root',
94     children => [
95       {
96         name => 'cycle-child1',
97         children => [ { name => 'cycle-grandchild' } ],
98       },
99       {
100         name => 'cycle-child2'
101       },
102     ],
103   });
104
105   $schema->resultset('Artist')->find({ name => 'cycle-root' })
106     ->update({ parentid => { -ident => 'artistid' } });
107
108   # select the whole tree
109   {
110     my $rs = $schema->resultset('Artist')->search({}, {
111       start_with => { name => 'root' },
112       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
113     });
114
115     is_same_sql_bind (
116       $rs->as_query,
117       '(
118         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
119           FROM artist me
120         START WITH name = ?
121         CONNECT BY parentid = PRIOR artistid
122       )',
123       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
124             => 'root'] ],
125     );
126     is_deeply (
127       [ $rs->get_column ('name')->all ],
128       [ qw/root child1 grandchild greatgrandchild child2/ ],
129       'got artist tree',
130     );
131
132     is_same_sql_bind (
133       $rs->count_rs->as_query,
134       '(
135         SELECT COUNT( * )
136           FROM artist me
137         START WITH name = ?
138         CONNECT BY parentid = PRIOR artistid
139       )',
140       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
141             => 'root'] ],
142     );
143
144     is( $rs->count, 5, 'Connect By count ok' );
145   }
146
147   # use order siblings by statement
148   SKIP: {
149     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
150     skip q{Oracle8i doesn't support ORDER SIBLINGS BY}, 1
151       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
152
153     my $rs = $schema->resultset('Artist')->search({}, {
154       start_with => { name => 'root' },
155       connect_by => { parentid => { -prior => { -ident =>  'artistid' } } },
156       order_siblings_by => { -desc => 'name' },
157     });
158
159     is_same_sql_bind (
160       $rs->as_query,
161       '(
162         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
163           FROM artist me
164         START WITH name = ?
165         CONNECT BY parentid = PRIOR artistid
166         ORDER SIBLINGS BY name DESC
167       )',
168       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
169             => 'root'] ],
170     );
171
172     is_deeply (
173       [ $rs->get_column ('name')->all ],
174       [ qw/root child2 child1 grandchild greatgrandchild/ ],
175       'Order Siblings By ok',
176     );
177   }
178
179   # get the root node
180   {
181     my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
182       start_with => { name => 'root' },
183       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
184     });
185
186     is_same_sql_bind (
187       $rs->as_query,
188       '(
189         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
190           FROM artist me
191         WHERE ( parentid IS NULL )
192         START WITH name = ?
193         CONNECT BY parentid = PRIOR artistid
194       )',
195       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
196             => 'root'] ],
197     );
198
199     is_deeply(
200       [ $rs->get_column('name')->all ],
201       [ 'root' ],
202       'found root node',
203     );
204   }
205
206   # combine a connect by with a join
207   SKIP: {
208     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
209     skip q{Oracle8i doesn't support connect by with join}, 1
210       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
211
212     my $rs = $schema->resultset('Artist')->search(
213       {'cds.title' => { -like => '%cd'} },
214       {
215         join => 'cds',
216         start_with => { 'me.name' => 'root' },
217         connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
218       }
219     );
220
221     is_same_sql_bind (
222       $rs->as_query,
223       '(
224         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
225           FROM artist me
226           LEFT JOIN cd cds ON cds.artist = me.artistid
227         WHERE ( cds.title LIKE ? )
228         START WITH me.name = ?
229         CONNECT BY parentid = PRIOR artistid
230       )',
231       [
232         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
233             => '%cd'],
234         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
235             => 'root'],
236       ],
237     );
238
239     is_deeply(
240       [ $rs->get_column('name')->all ],
241       [ 'grandchild' ],
242       'Connect By with a join result name ok'
243     );
244
245     is_same_sql_bind (
246       $rs->count_rs->as_query,
247       '(
248         SELECT COUNT( * )
249           FROM artist me
250           LEFT JOIN cd cds ON cds.artist = me.artistid
251         WHERE ( cds.title LIKE ? )
252         START WITH me.name = ?
253         CONNECT BY parentid = PRIOR artistid
254       )',
255       [
256         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
257             => '%cd'],
258         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
259             => 'root'],
260       ],
261     );
262
263     is( $rs->count, 1, 'Connect By with a join; count ok' );
264   }
265
266   # combine a connect by with order_by
267   {
268     my $rs = $schema->resultset('Artist')->search({}, {
269       start_with => { name => 'root' },
270       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
271       order_by => { -asc => [ 'LEVEL', 'name' ] },
272     });
273
274     is_same_sql_bind (
275       $rs->as_query,
276       '(
277         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
278           FROM artist me
279         START WITH name = ?
280         CONNECT BY parentid = PRIOR artistid
281         ORDER BY LEVEL ASC, name ASC
282       )',
283       [
284         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
285             => 'root'],
286       ],
287     );
288
289
290     # Don't use "$rs->get_column ('name')->all" they build a query arround the $rs.
291     #   If $rs has a order by, the order by is in the subquery and this doesn't work with Oracle 8i.
292     # TODO: write extra test and fix order by handling on Oracle 8i
293     is_deeply (
294       [ map { $_->[1] } $rs->cursor->all ],
295       [ qw/root child1 child2 grandchild greatgrandchild/ ],
296       'Connect By with a order_by - result name ok (without get_column)'
297     );
298
299     SKIP: {
300       skip q{Connect By with a order_by - result name ok (with get_column), Oracle8i doesn't support order by in a subquery},1
301         if $schema->storage->_server_info->{normalized_dbms_version} < 9;
302       is_deeply (
303         [  $rs->get_column ('name')->all ],
304         [ qw/root child1 child2 grandchild greatgrandchild/ ],
305         'Connect By with a order_by - result name ok (with get_column)'
306       );
307     }
308   }
309
310
311   # limit a connect by
312   SKIP: {
313     skip q{Oracle8i doesn't support order by in a subquery}, 1
314       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
315
316     my $rs = $schema->resultset('Artist')->search({}, {
317       start_with => { name => 'root' },
318       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
319       order_by => [ { -asc => 'name' }, {  -desc => 'artistid' } ],
320       rows => 2,
321     });
322
323     is_same_sql_bind (
324       $rs->as_query,
325       '(
326         SELECT artistid, name, rank, charfield, parentid
327           FROM (
328             SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
329               FROM artist me
330             START WITH name = ?
331             CONNECT BY parentid = PRIOR artistid
332             ORDER BY name ASC, artistid DESC
333           ) me
334         WHERE ROWNUM <= ?
335       )',
336       [
337         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
338             => 'root'], [ $ROWS => 2 ],
339       ],
340     );
341
342     is_deeply (
343       [ $rs->get_column ('name')->all ],
344       [qw/child1 child2/],
345       'LIMIT a Connect By query - correct names'
346     );
347
348     is_same_sql_bind (
349       $rs->count_rs->as_query,
350       '(
351         SELECT COUNT( * )
352           FROM (
353             SELECT artistid
354               FROM (
355                 SELECT me.artistid
356                   FROM artist me
357                 START WITH name = ?
358                 CONNECT BY parentid = PRIOR artistid
359               ) me
360             WHERE ROWNUM <= ?
361           ) me
362       )',
363       [
364         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
365             => 'root'],
366         [ $ROWS => 2 ],
367       ],
368     );
369
370     is( $rs->count, 2, 'Connect By; LIMIT count ok' );
371   }
372
373   # combine a connect_by with group_by and having
374   # add some bindvals to make sure things still work
375   {
376     my $rs = $schema->resultset('Artist')->search({}, {
377       select => \[ 'COUNT(rank) + ?', [ __cbind => 3 ] ],
378       as => 'cnt',
379       start_with => { name => 'root' },
380       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
381       group_by => \[ 'rank + ? ', [ __gbind =>  1] ],
382       having => \[ 'count(rank) < ?', [ cnt => 2 ] ],
383     });
384
385     is_same_sql_bind (
386       $rs->as_query,
387       '(
388         SELECT COUNT(rank) + ?
389           FROM artist me
390         START WITH name = ?
391         CONNECT BY parentid = PRIOR artistid
392         GROUP BY( rank + ? ) HAVING count(rank) < ?
393       )',
394       [
395         [ { dbic_colname => '__cbind' }
396             => 3 ],
397         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
398             => 'root'],
399         [ { dbic_colname => '__gbind' }
400             => 1 ],
401         [ { dbic_colname => 'cnt' }
402             => 2 ],
403       ],
404     );
405
406     is_deeply (
407       [ $rs->get_column ('cnt')->all ],
408       [4, 4],
409       'Group By a Connect By query - correct values'
410     );
411   }
412
413   # select the whole cycle tree without nocylce
414   {
415     my $rs = $schema->resultset('Artist')->search({}, {
416       start_with => { name => 'cycle-root' },
417       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
418     });
419
420     # ORA-01436:  CONNECT BY loop in user data
421     throws_ok { $rs->get_column ('name')->all } qr/ORA-01436/,
422       "connect by initify loop detection without nocycle";
423   }
424
425   # select the whole cycle tree with nocylce
426   SKIP: {
427     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/expressi.htm#1023748
428     skip q{Oracle8i doesn't support connect by nocycle}, 1
429       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
430
431     my $rs = $schema->resultset('Artist')->search({}, {
432       start_with => { name => 'cycle-root' },
433       '+select'  => \ 'CONNECT_BY_ISCYCLE',
434       '+as'      => [ 'connector' ],
435       connect_by_nocycle => { parentid => { -prior => { -ident => 'artistid' } } },
436     });
437
438     is_same_sql_bind (
439       $rs->as_query,
440       '(
441         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
442           FROM artist me
443         START WITH name = ?
444         CONNECT BY NOCYCLE parentid = PRIOR artistid
445       )',
446       [
447         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
448             => 'cycle-root'],
449       ],
450     );
451     is_deeply (
452       [ $rs->get_column ('name')->all ],
453       [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
454       'got artist tree with nocycle (name)',
455     );
456     is_deeply (
457       [ $rs->get_column ('connector')->all ],
458       [ qw/1 0 0 0/ ],
459       'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
460     );
461
462     is_same_sql_bind (
463       $rs->count_rs->as_query,
464       '(
465         SELECT COUNT( * )
466           FROM artist me
467         START WITH name = ?
468         CONNECT BY NOCYCLE parentid = PRIOR artistid
469       )',
470       [
471         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
472             => 'cycle-root'],
473       ],
474     );
475
476     is( $rs->count, 4, 'Connect By Nocycle count ok' );
477   }
478 }
479
480 done_testing;
481
482 sub do_creates {
483   my $dbh = shift;
484
485   eval {
486     $dbh->do("DROP SEQUENCE artist_autoinc_seq");
487     $dbh->do("DROP SEQUENCE artist_pk_seq");
488     $dbh->do("DROP SEQUENCE cd_seq");
489     $dbh->do("DROP SEQUENCE track_seq");
490     $dbh->do("DROP TABLE artist");
491     $dbh->do("DROP TABLE track");
492     $dbh->do("DROP TABLE cd");
493   };
494
495   $dbh->do("CREATE SEQUENCE artist_pk_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
496   $dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
497   $dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
498
499   $dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), autoinc_col NUMBER(12), rank NUMBER(38), charfield VARCHAR2(10))");
500   $dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
501
502   $dbh->do("CREATE TABLE cd (cdid NUMBER(12), artist NUMBER(12), title VARCHAR(255), year VARCHAR(4), genreid NUMBER(12), single_track NUMBER(12))");
503   $dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
504
505   $dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
506   $dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
507
508   $dbh->do(qq{
509     CREATE OR REPLACE TRIGGER artist_insert_trg_pk
510     BEFORE INSERT ON artist
511     FOR EACH ROW
512       BEGIN
513         IF :new.artistid IS NULL THEN
514           SELECT artist_pk_seq.nextval
515           INTO :new.artistid
516           FROM DUAL;
517         END IF;
518       END;
519   });
520   $dbh->do(qq{
521     CREATE OR REPLACE TRIGGER cd_insert_trg
522     BEFORE INSERT OR UPDATE ON cd
523     FOR EACH ROW
524
525       DECLARE
526       tmpVar NUMBER;
527
528       BEGIN
529         tmpVar := 0;
530
531         IF :new.cdid IS NULL THEN
532           SELECT cd_seq.nextval
533           INTO tmpVar
534           FROM dual;
535
536           :new.cdid := tmpVar;
537         END IF;
538       END;
539   });
540   $dbh->do(qq{
541     CREATE OR REPLACE TRIGGER track_insert_trg
542     BEFORE INSERT ON track
543     FOR EACH ROW
544       BEGIN
545         IF :new.trackid IS NULL THEN
546           SELECT track_seq.nextval
547           INTO :new.trackid
548           FROM DUAL;
549         END IF;
550       END;
551   });
552 }
553
554 # clean up our mess
555 END {
556   if ($schema and my $dbh = $schema->storage->dbh) {
557     eval { $dbh->do($_) } for (
558       'DROP SEQUENCE artist_pk_seq',
559       'DROP SEQUENCE cd_seq',
560       'DROP SEQUENCE track_seq',
561       'DROP TABLE artist',
562       'DROP TABLE track',
563       'DROP TABLE cd',
564     );
565   };
566   undef $schema;
567 }