fixed order of rows difference between first and subsequent pages for Oracle
[dbsrgits/DBIx-Class.git] / t / 73oracle_hq.t
1 use strict;
2 use warnings;
3
4 use Test::Exception;
5 use Test::More;
6
7 use lib qw(t/lib);
8 use DBIC::SqlMakerTest;
9
10 use DBIx::Class::SQLMaker::LimitDialects;
11 my $ROWS = DBIx::Class::SQLMaker::LimitDialects->__rows_bindtype,
12 my $TOTAL = DBIx::Class::SQLMaker::LimitDialects->__total_bindtype,
13
14 $ENV{NLS_SORT} = "BINARY";
15 $ENV{NLS_COMP} = "BINARY";
16 $ENV{NLS_LANG} = "AMERICAN";
17
18 plan skip_all => 'Test needs ' . DBIx::Class::Optional::Dependencies->req_missing_for ('test_rdbms_oracle')
19   unless DBIx::Class::Optional::Dependencies->req_ok_for ('test_rdbms_oracle');
20
21 my ($dsn,  $user,  $pass)  = @ENV{map { "DBICTEST_ORA_${_}" }  qw/DSN USER PASS/};
22
23 plan skip_all => 'Set $ENV{DBICTEST_ORA_DSN}, _USER and _PASS to run this test.'
24  unless ($dsn && $user && $pass);
25
26 use DBICTest::Schema::Artist;
27 BEGIN {
28   DBICTest::Schema::Artist->add_column('parentid');
29
30   DBICTest::Schema::Artist->has_many(
31     children => 'DBICTest::Schema::Artist',
32     { 'foreign.parentid' => 'self.artistid' }
33   );
34
35   DBICTest::Schema::Artist->belongs_to(
36     parent => 'DBICTest::Schema::Artist',
37     { 'foreign.artistid' => 'self.parentid' }
38   );
39 }
40
41 use DBICTest::Schema;
42
43 my $schema = DBICTest::Schema->connect($dsn, $user, $pass);
44
45 note "Oracle Version: " . $schema->storage->_server_info->{dbms_version};
46
47 my $dbh = $schema->storage->dbh;
48 do_creates($dbh);
49
50 ### test hierarchical queries
51 {
52   $schema->resultset('Artist')->create ({
53     name => 'root',
54     rank => 1,
55     cds => [],
56     children => [
57       {
58         name => 'child1',
59         rank => 2,
60         children => [
61           {
62             name => 'grandchild',
63             rank => 3,
64             cds => [
65               {
66                 title => "grandchilds's cd" ,
67                 year => '2008',
68                 tracks => [
69                   {
70                     position => 1,
71                     title => 'Track 1 grandchild',
72                   }
73                 ],
74               }
75             ],
76             children => [
77               {
78                 name => 'greatgrandchild',
79                 rank => 3,
80               }
81             ],
82           }
83         ],
84       },
85       {
86         name => 'child2',
87         rank => 3,
88       },
89     ],
90   });
91
92   $schema->resultset('Artist')->create({
93     name => 'cycle-root',
94     children => [
95       {
96         name => 'cycle-child1',
97         children => [ { name => 'cycle-grandchild' } ],
98       },
99       {
100         name => 'cycle-child2'
101       },
102     ],
103   });
104
105   $schema->resultset('Artist')->find({ name => 'cycle-root' })
106     ->update({ parentid => { -ident => 'artistid' } });
107
108   # select the whole tree
109   {
110     my $rs = $schema->resultset('Artist')->search({}, {
111       start_with => { name => 'root' },
112       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
113     });
114
115     is_same_sql_bind (
116       $rs->as_query,
117       '(
118         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
119           FROM artist me
120         START WITH name = ?
121         CONNECT BY parentid = PRIOR artistid
122       )',
123       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
124             => 'root'] ],
125     );
126     is_deeply (
127       [ $rs->get_column ('name')->all ],
128       [ qw/root child1 grandchild greatgrandchild child2/ ],
129       'got artist tree',
130     );
131
132     is_same_sql_bind (
133       $rs->count_rs->as_query,
134       '(
135         SELECT COUNT( * )
136           FROM artist me
137         START WITH name = ?
138         CONNECT BY parentid = PRIOR artistid
139       )',
140       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
141             => 'root'] ],
142     );
143
144     is( $rs->count, 5, 'Connect By count ok' );
145   }
146
147   # use order siblings by statement
148   SKIP: {
149     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
150     skip q{Oracle8i doesn't support ORDER SIBLINGS BY}, 1
151       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
152
153     my $rs = $schema->resultset('Artist')->search({}, {
154       start_with => { name => 'root' },
155       connect_by => { parentid => { -prior => { -ident =>  'artistid' } } },
156       order_siblings_by => { -desc => 'name' },
157     });
158
159     is_same_sql_bind (
160       $rs->as_query,
161       '(
162         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
163           FROM artist me
164         START WITH name = ?
165         CONNECT BY parentid = PRIOR artistid
166         ORDER SIBLINGS BY name DESC
167       )',
168       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
169             => 'root'] ],
170     );
171
172     is_deeply (
173       [ $rs->get_column ('name')->all ],
174       [ qw/root child2 child1 grandchild greatgrandchild/ ],
175       'Order Siblings By ok',
176     );
177   }
178
179   # get the root node
180   {
181     my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
182       start_with => { name => 'root' },
183       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
184     });
185
186     is_same_sql_bind (
187       $rs->as_query,
188       '(
189         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
190           FROM artist me
191         WHERE ( parentid IS NULL )
192         START WITH name = ?
193         CONNECT BY parentid = PRIOR artistid
194       )',
195       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
196             => 'root'] ],
197     );
198
199     is_deeply(
200       [ $rs->get_column('name')->all ],
201       [ 'root' ],
202       'found root node',
203     );
204   }
205
206   # combine a connect by with a join
207   SKIP: {
208     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
209     skip q{Oracle8i doesn't support connect by with join}, 1
210       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
211
212     my $rs = $schema->resultset('Artist')->search(
213       {'cds.title' => { -like => '%cd'} },
214       {
215         join => 'cds',
216         start_with => { 'me.name' => 'root' },
217         connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
218       }
219     );
220
221     is_same_sql_bind (
222       $rs->as_query,
223       '(
224         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
225           FROM artist me
226           LEFT JOIN cd cds ON cds.artist = me.artistid
227         WHERE ( cds.title LIKE ? )
228         START WITH me.name = ?
229         CONNECT BY parentid = PRIOR artistid
230       )',
231       [
232         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
233             => '%cd'],
234         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
235             => 'root'],
236       ],
237     );
238
239     is_deeply(
240       [ $rs->get_column('name')->all ],
241       [ 'grandchild' ],
242       'Connect By with a join result name ok'
243     );
244
245     is_same_sql_bind (
246       $rs->count_rs->as_query,
247       '(
248         SELECT COUNT( * )
249           FROM artist me
250           LEFT JOIN cd cds ON cds.artist = me.artistid
251         WHERE ( cds.title LIKE ? )
252         START WITH me.name = ?
253         CONNECT BY parentid = PRIOR artistid
254       )',
255       [
256         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
257             => '%cd'],
258         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
259             => 'root'],
260       ],
261     );
262
263     is( $rs->count, 1, 'Connect By with a join; count ok' );
264   }
265
266   # combine a connect by with order_by
267   {
268     my $rs = $schema->resultset('Artist')->search({}, {
269       start_with => { name => 'root' },
270       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
271       order_by => { -asc => [ 'LEVEL', 'name' ] },
272     });
273
274     is_same_sql_bind (
275       $rs->as_query,
276       '(
277         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
278           FROM artist me
279         START WITH name = ?
280         CONNECT BY parentid = PRIOR artistid
281         ORDER BY LEVEL ASC, name ASC
282       )',
283       [
284         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
285             => 'root'],
286       ],
287     );
288
289
290     # Don't use "$rs->get_column ('name')->all" they build a query arround the $rs.
291     #   If $rs has a order by, the order by is in the subquery and this doesn't work with Oracle 8i.
292     # TODO: write extra test and fix order by handling on Oracle 8i
293     is_deeply (
294       [ map { $_->[1] } $rs->cursor->all ],
295       [ qw/root child1 child2 grandchild greatgrandchild/ ],
296       'Connect By with a order_by - result name ok (without get_column)'
297     );
298
299     SKIP: {
300       skip q{Connect By with a order_by - result name ok (with get_column), Oracle8i doesn't support order by in a subquery},1
301         if $schema->storage->_server_info->{normalized_dbms_version} < 9;
302       is_deeply (
303         [  $rs->get_column ('name')->all ],
304         [ qw/root child1 child2 grandchild greatgrandchild/ ],
305         'Connect By with a order_by - result name ok (with get_column)'
306       );
307     }
308   }
309
310
311   # limit a connect by
312   SKIP: {
313     skip q{Oracle8i doesn't support order by in a subquery}, 1
314       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
315
316     my $rs = $schema->resultset('Artist')->search({}, {
317       start_with => { name => 'root' },
318       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
319       order_by => [ { -asc => 'name' }, {  -desc => 'artistid' } ],
320       rows => 2,
321     });
322
323     is_same_sql_bind (
324       $rs->as_query,
325       '(
326         SELECT artistid, name, rank, charfield, parentid
327           FROM (
328             SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
329               FROM artist me
330             START WITH name = ?
331             CONNECT BY parentid = PRIOR artistid
332             ORDER BY name ASC, artistid DESC
333           ) me
334         WHERE ROWNUM <= ?
335       )',
336       [
337         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
338             => 'root'], [ $ROWS => 2 ],
339       ],
340     );
341
342     is_deeply (
343       [ $rs->get_column ('name')->all ],
344       [qw/child1 child2/],
345       'LIMIT a Connect By query - correct names'
346     );
347
348     is_same_sql_bind (
349       $rs->count_rs->as_query,
350       '(
351         SELECT COUNT( * )
352           FROM (
353             SELECT artistid
354               FROM (
355                 SELECT artistid, ROWNUM rownum__index
356                   FROM (
357                     SELECT me.artistid
358                       FROM artist me
359                     START WITH name = ?
360                     CONNECT BY parentid = PRIOR artistid
361                   ) me
362               ) me
363             WHERE rownum__index BETWEEN ? AND ?
364           ) me
365       )',
366       [
367         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
368             => 'root'],
369         [ $ROWS => 1 ],
370         [ $TOTAL => 2 ],
371       ],
372     );
373
374     is( $rs->count, 2, 'Connect By; LIMIT count ok' );
375   }
376
377   # combine a connect_by with group_by and having
378   # add some bindvals to make sure things still work
379   {
380     my $rs = $schema->resultset('Artist')->search({}, {
381       select => \[ 'COUNT(rank) + ?', [ __cbind => 3 ] ],
382       as => 'cnt',
383       start_with => { name => 'root' },
384       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
385       group_by => \[ 'rank + ? ', [ __gbind =>  1] ],
386       having => \[ 'count(rank) < ?', [ cnt => 2 ] ],
387     });
388
389     is_same_sql_bind (
390       $rs->as_query,
391       '(
392         SELECT COUNT(rank) + ?
393           FROM artist me
394         START WITH name = ?
395         CONNECT BY parentid = PRIOR artistid
396         GROUP BY( rank + ? ) HAVING count(rank) < ?
397       )',
398       [
399         [ { dbic_colname => '__cbind' }
400             => 3 ],
401         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
402             => 'root'],
403         [ { dbic_colname => '__gbind' }
404             => 1 ],
405         [ { dbic_colname => 'cnt' }
406             => 2 ],
407       ],
408     );
409
410     is_deeply (
411       [ $rs->get_column ('cnt')->all ],
412       [4, 4],
413       'Group By a Connect By query - correct values'
414     );
415   }
416
417   # select the whole cycle tree without nocylce
418   {
419     my $rs = $schema->resultset('Artist')->search({}, {
420       start_with => { name => 'cycle-root' },
421       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
422     });
423
424     # ORA-01436:  CONNECT BY loop in user data
425     throws_ok { $rs->get_column ('name')->all } qr/ORA-01436/,
426       "connect by initify loop detection without nocycle";
427   }
428
429   # select the whole cycle tree with nocylce
430   SKIP: {
431     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/expressi.htm#1023748
432     skip q{Oracle8i doesn't support connect by nocycle}, 1
433       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
434
435     my $rs = $schema->resultset('Artist')->search({}, {
436       start_with => { name => 'cycle-root' },
437       '+select'  => \ 'CONNECT_BY_ISCYCLE',
438       '+as'      => [ 'connector' ],
439       connect_by_nocycle => { parentid => { -prior => { -ident => 'artistid' } } },
440     });
441
442     is_same_sql_bind (
443       $rs->as_query,
444       '(
445         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
446           FROM artist me
447         START WITH name = ?
448         CONNECT BY NOCYCLE parentid = PRIOR artistid
449       )',
450       [
451         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
452             => 'cycle-root'],
453       ],
454     );
455     is_deeply (
456       [ $rs->get_column ('name')->all ],
457       [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
458       'got artist tree with nocycle (name)',
459     );
460     is_deeply (
461       [ $rs->get_column ('connector')->all ],
462       [ qw/1 0 0 0/ ],
463       'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
464     );
465
466     is_same_sql_bind (
467       $rs->count_rs->as_query,
468       '(
469         SELECT COUNT( * )
470           FROM artist me
471         START WITH name = ?
472         CONNECT BY NOCYCLE parentid = PRIOR artistid
473       )',
474       [
475         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
476             => 'cycle-root'],
477       ],
478     );
479
480     is( $rs->count, 4, 'Connect By Nocycle count ok' );
481   }
482 }
483
484 done_testing;
485
486 sub do_creates {
487   my $dbh = shift;
488
489   eval {
490     $dbh->do("DROP SEQUENCE artist_autoinc_seq");
491     $dbh->do("DROP SEQUENCE artist_pk_seq");
492     $dbh->do("DROP SEQUENCE cd_seq");
493     $dbh->do("DROP SEQUENCE track_seq");
494     $dbh->do("DROP TABLE artist");
495     $dbh->do("DROP TABLE track");
496     $dbh->do("DROP TABLE cd");
497   };
498
499   $dbh->do("CREATE SEQUENCE artist_pk_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
500   $dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
501   $dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
502
503   $dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), autoinc_col NUMBER(12), rank NUMBER(38), charfield VARCHAR2(10))");
504   $dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
505
506   $dbh->do("CREATE TABLE cd (cdid NUMBER(12), artist NUMBER(12), title VARCHAR(255), year VARCHAR(4), genreid NUMBER(12), single_track NUMBER(12))");
507   $dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
508
509   $dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
510   $dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
511
512   $dbh->do(qq{
513     CREATE OR REPLACE TRIGGER artist_insert_trg_pk
514     BEFORE INSERT ON artist
515     FOR EACH ROW
516       BEGIN
517         IF :new.artistid IS NULL THEN
518           SELECT artist_pk_seq.nextval
519           INTO :new.artistid
520           FROM DUAL;
521         END IF;
522       END;
523   });
524   $dbh->do(qq{
525     CREATE OR REPLACE TRIGGER cd_insert_trg
526     BEFORE INSERT OR UPDATE ON cd
527     FOR EACH ROW
528
529       DECLARE
530       tmpVar NUMBER;
531
532       BEGIN
533         tmpVar := 0;
534
535         IF :new.cdid IS NULL THEN
536           SELECT cd_seq.nextval
537           INTO tmpVar
538           FROM dual;
539
540           :new.cdid := tmpVar;
541         END IF;
542       END;
543   });
544   $dbh->do(qq{
545     CREATE OR REPLACE TRIGGER track_insert_trg
546     BEFORE INSERT ON track
547     FOR EACH ROW
548       BEGIN
549         IF :new.trackid IS NULL THEN
550           SELECT track_seq.nextval
551           INTO :new.trackid
552           FROM DUAL;
553         END IF;
554       END;
555   });
556 }
557
558 # clean up our mess
559 END {
560   eval {
561     my $dbh = $schema->storage->dbh;
562     $dbh->do("DROP SEQUENCE artist_pk_seq");
563     $dbh->do("DROP SEQUENCE cd_seq");
564     $dbh->do("DROP SEQUENCE track_seq");
565     $dbh->do("DROP TABLE artist");
566     $dbh->do("DROP TABLE track");
567     $dbh->do("DROP TABLE cd");
568   };
569 }