Restore ability to handle underdefined root (t/prefetch/incomplete.t)
[dbsrgits/DBIx-Class.git] / t / 73oracle_hq.t
1 use strict;
2 use warnings;
3
4 use Test::Exception;
5 use Test::More;
6 use DBIx::Class::Optional::Dependencies ();
7 use lib qw(t/lib);
8 use DBICTest::RunMode;
9 use DBIC::SqlMakerTest;
10
11 use DBIx::Class::SQLMaker::LimitDialects;
12 my $ROWS = DBIx::Class::SQLMaker::LimitDialects->__rows_bindtype,
13 my $TOTAL = DBIx::Class::SQLMaker::LimitDialects->__total_bindtype,
14
15 $ENV{NLS_SORT} = "BINARY";
16 $ENV{NLS_COMP} = "BINARY";
17 $ENV{NLS_LANG} = "AMERICAN";
18
19 my ($dsn,  $user,  $pass)  = @ENV{map { "DBICTEST_ORA_${_}" }  qw/DSN USER PASS/};
20
21 plan skip_all => 'Set $ENV{DBICTEST_ORA_DSN}, _USER and _PASS to run this test.'
22  unless ($dsn && $user && $pass);
23
24 plan skip_all => 'Test needs ' . DBIx::Class::Optional::Dependencies->req_missing_for ('rdbms_oracle')
25   unless DBIx::Class::Optional::Dependencies->req_ok_for ('rdbms_oracle');
26
27 use DBICTest::Schema::Artist;
28 BEGIN {
29   DBICTest::Schema::Artist->add_column('parentid');
30
31   DBICTest::Schema::Artist->has_many(
32     children => 'DBICTest::Schema::Artist',
33     { 'foreign.parentid' => 'self.artistid' }
34   );
35
36   DBICTest::Schema::Artist->belongs_to(
37     parent => 'DBICTest::Schema::Artist',
38     { 'foreign.artistid' => 'self.parentid' }
39   );
40 }
41
42 use DBICTest;
43 use DBICTest::Schema;
44
45 my $schema = DBICTest::Schema->connect($dsn, $user, $pass);
46
47 note "Oracle Version: " . $schema->storage->_server_info->{dbms_version};
48
49 my $dbh = $schema->storage->dbh;
50 do_creates($dbh);
51
52 ### test hierarchical queries
53 {
54   $schema->resultset('Artist')->create ({
55     name => 'root',
56     rank => 1,
57     cds => [],
58     children => [
59       {
60         name => 'child1',
61         rank => 2,
62         children => [
63           {
64             name => 'grandchild',
65             rank => 3,
66             cds => [
67               {
68                 title => "grandchilds's cd" ,
69                 year => '2008',
70                 tracks => [
71                   {
72                     position => 1,
73                     title => 'Track 1 grandchild',
74                   }
75                 ],
76               }
77             ],
78             children => [
79               {
80                 name => 'greatgrandchild',
81                 rank => 3,
82               }
83             ],
84           }
85         ],
86       },
87       {
88         name => 'child2',
89         rank => 3,
90       },
91     ],
92   });
93
94   $schema->resultset('Artist')->create({
95     name => 'cycle-root',
96     children => [
97       {
98         name => 'cycle-child1',
99         children => [ { name => 'cycle-grandchild' } ],
100       },
101       {
102         name => 'cycle-child2'
103       },
104     ],
105   });
106
107   $schema->resultset('Artist')->find({ name => 'cycle-root' })
108     ->update({ parentid => { -ident => 'artistid' } });
109
110   # select the whole tree
111   {
112     my $rs = $schema->resultset('Artist')->search({}, {
113       start_with => { name => 'root' },
114       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
115     });
116
117     is_same_sql_bind (
118       $rs->as_query,
119       '(
120         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
121           FROM artist me
122         START WITH name = ?
123         CONNECT BY parentid = PRIOR artistid
124       )',
125       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
126             => 'root'] ],
127     );
128     is_deeply (
129       [ $rs->get_column ('name')->all ],
130       [ qw/root child1 grandchild greatgrandchild child2/ ],
131       'got artist tree',
132     );
133
134     is_same_sql_bind (
135       $rs->count_rs->as_query,
136       '(
137         SELECT COUNT( * )
138           FROM artist me
139         START WITH name = ?
140         CONNECT BY parentid = PRIOR artistid
141       )',
142       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
143             => 'root'] ],
144     );
145
146     is( $rs->count, 5, 'Connect By count ok' );
147   }
148
149   # use order siblings by statement
150   SKIP: {
151     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
152     skip q{Oracle8i doesn't support ORDER SIBLINGS BY}, 1
153       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
154
155     my $rs = $schema->resultset('Artist')->search({}, {
156       start_with => { name => 'root' },
157       connect_by => { parentid => { -prior => { -ident =>  'artistid' } } },
158       order_siblings_by => { -desc => 'name' },
159     });
160
161     is_same_sql_bind (
162       $rs->as_query,
163       '(
164         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
165           FROM artist me
166         START WITH name = ?
167         CONNECT BY parentid = PRIOR artistid
168         ORDER SIBLINGS BY name DESC
169       )',
170       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
171             => 'root'] ],
172     );
173
174     is_deeply (
175       [ $rs->get_column ('name')->all ],
176       [ qw/root child2 child1 grandchild greatgrandchild/ ],
177       'Order Siblings By ok',
178     );
179   }
180
181   # get the root node
182   {
183     my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
184       start_with => { name => 'root' },
185       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
186     });
187
188     is_same_sql_bind (
189       $rs->as_query,
190       '(
191         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
192           FROM artist me
193         WHERE ( parentid IS NULL )
194         START WITH name = ?
195         CONNECT BY parentid = PRIOR artistid
196       )',
197       [ [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
198             => 'root'] ],
199     );
200
201     is_deeply(
202       [ $rs->get_column('name')->all ],
203       [ 'root' ],
204       'found root node',
205     );
206   }
207
208   # combine a connect by with a join
209   SKIP: {
210     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/state21b.htm#2066123
211     skip q{Oracle8i doesn't support connect by with join}, 1
212       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
213
214     my $rs = $schema->resultset('Artist')->search(
215       {'cds.title' => { -like => '%cd'} },
216       {
217         join => 'cds',
218         start_with => { 'me.name' => 'root' },
219         connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
220       }
221     );
222
223     is_same_sql_bind (
224       $rs->as_query,
225       '(
226         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
227           FROM artist me
228           LEFT JOIN cd cds ON cds.artist = me.artistid
229         WHERE ( cds.title LIKE ? )
230         START WITH me.name = ?
231         CONNECT BY parentid = PRIOR artistid
232       )',
233       [
234         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
235             => '%cd'],
236         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
237             => 'root'],
238       ],
239     );
240
241     is_deeply(
242       [ $rs->get_column('name')->all ],
243       [ 'grandchild' ],
244       'Connect By with a join result name ok'
245     );
246
247     is_same_sql_bind (
248       $rs->count_rs->as_query,
249       '(
250         SELECT COUNT( * )
251           FROM artist me
252           LEFT JOIN cd cds ON cds.artist = me.artistid
253         WHERE ( cds.title LIKE ? )
254         START WITH me.name = ?
255         CONNECT BY parentid = PRIOR artistid
256       )',
257       [
258         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'cds.title', 'sqlt_size' => 100 }
259             => '%cd'],
260         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'me.name', 'sqlt_size' => 100 }
261             => 'root'],
262       ],
263     );
264
265     is( $rs->count, 1, 'Connect By with a join; count ok' );
266   }
267
268   # combine a connect by with order_by
269   {
270     my $rs = $schema->resultset('Artist')->search({}, {
271       start_with => { name => 'root' },
272       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
273       order_by => { -asc => [ 'LEVEL', 'name' ] },
274     });
275
276     is_same_sql_bind (
277       $rs->as_query,
278       '(
279         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
280           FROM artist me
281         START WITH name = ?
282         CONNECT BY parentid = PRIOR artistid
283         ORDER BY LEVEL ASC, name ASC
284       )',
285       [
286         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
287             => 'root'],
288       ],
289     );
290
291
292     # Don't use "$rs->get_column ('name')->all" they build a query arround the $rs.
293     #   If $rs has a order by, the order by is in the subquery and this doesn't work with Oracle 8i.
294     # TODO: write extra test and fix order by handling on Oracle 8i
295     is_deeply (
296       [ map { $_->[1] } $rs->cursor->all ],
297       [ qw/root child1 child2 grandchild greatgrandchild/ ],
298       'Connect By with a order_by - result name ok (without get_column)'
299     );
300
301     SKIP: {
302       skip q{Connect By with a order_by - result name ok (with get_column), Oracle8i doesn't support order by in a subquery},1
303         if $schema->storage->_server_info->{normalized_dbms_version} < 9;
304       is_deeply (
305         [  $rs->get_column ('name')->all ],
306         [ qw/root child1 child2 grandchild greatgrandchild/ ],
307         'Connect By with a order_by - result name ok (with get_column)'
308       );
309     }
310   }
311
312
313   # limit a connect by
314   SKIP: {
315     skip q{Oracle8i doesn't support order by in a subquery}, 1
316       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
317
318     my $rs = $schema->resultset('Artist')->search({}, {
319       start_with => { name => 'root' },
320       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
321       order_by => [ { -asc => 'name' }, {  -desc => 'artistid' } ],
322       rows => 2,
323     });
324
325     is_same_sql_bind (
326       $rs->as_query,
327       '(
328         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
329           FROM (
330             SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
331               FROM artist me
332             START WITH name = ?
333             CONNECT BY parentid = PRIOR artistid
334             ORDER BY name ASC, artistid DESC
335           ) me
336         WHERE ROWNUM <= ?
337       )',
338       [
339         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
340             => 'root'], [ $ROWS => 2 ],
341       ],
342     );
343
344     is_deeply (
345       [ $rs->get_column ('name')->all ],
346       [qw/child1 child2/],
347       'LIMIT a Connect By query - correct names'
348     );
349
350     is_same_sql_bind (
351       $rs->count_rs->as_query,
352       '(
353         SELECT COUNT( * )
354           FROM (
355             SELECT me.artistid
356               FROM (
357                 SELECT me.artistid
358                   FROM artist me
359                 START WITH name = ?
360                 CONNECT BY parentid = PRIOR artistid
361               ) me
362             WHERE ROWNUM <= ?
363           ) me
364       )',
365       [
366         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
367             => 'root'],
368         [ $ROWS => 2 ],
369       ],
370     );
371
372     is( $rs->count, 2, 'Connect By; LIMIT count ok' );
373   }
374
375   # combine a connect_by with group_by and having
376   # add some bindvals to make sure things still work
377   {
378     my $rs = $schema->resultset('Artist')->search({}, {
379       select => \[ 'COUNT(rank) + ?', [ __cbind => 3 ] ],
380       as => 'cnt',
381       start_with => { name => 'root' },
382       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
383       group_by => \[ 'rank + ? ', [ __gbind =>  1] ],
384       having => \[ 'count(rank) < ?', [ cnt => 2 ] ],
385     });
386
387     is_same_sql_bind (
388       $rs->as_query,
389       '(
390         SELECT COUNT(rank) + ?
391           FROM artist me
392         START WITH name = ?
393         CONNECT BY parentid = PRIOR artistid
394         GROUP BY( rank + ? ) HAVING count(rank) < ?
395       )',
396       [
397         [ { dbic_colname => '__cbind' }
398             => 3 ],
399         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
400             => 'root'],
401         [ { dbic_colname => '__gbind' }
402             => 1 ],
403         [ { dbic_colname => 'cnt' }
404             => 2 ],
405       ],
406     );
407
408     is_deeply (
409       [ $rs->get_column ('cnt')->all ],
410       [4, 4],
411       'Group By a Connect By query - correct values'
412     );
413   }
414
415   # select the whole cycle tree without nocylce
416   {
417     my $rs = $schema->resultset('Artist')->search({}, {
418       start_with => { name => 'cycle-root' },
419       connect_by => { parentid => { -prior => { -ident => 'artistid' } } },
420     });
421
422     # ORA-01436:  CONNECT BY loop in user data
423     throws_ok { $rs->get_column ('name')->all } qr/ORA-01436/,
424       "connect by initify loop detection without nocycle";
425   }
426
427   # select the whole cycle tree with nocylce
428   SKIP: {
429     # http://download.oracle.com/docs/cd/A87860_01/doc/server.817/a85397/expressi.htm#1023748
430     skip q{Oracle8i doesn't support connect by nocycle}, 1
431       if $schema->storage->_server_info->{normalized_dbms_version} < 9;
432
433     my $rs = $schema->resultset('Artist')->search({}, {
434       start_with => { name => 'cycle-root' },
435       '+select'  => \ 'CONNECT_BY_ISCYCLE',
436       '+as'      => [ 'connector' ],
437       connect_by_nocycle => { parentid => { -prior => { -ident => 'artistid' } } },
438     });
439
440     is_same_sql_bind (
441       $rs->as_query,
442       '(
443         SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
444           FROM artist me
445         START WITH name = ?
446         CONNECT BY NOCYCLE parentid = PRIOR artistid
447       )',
448       [
449         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
450             => 'cycle-root'],
451       ],
452     );
453     is_deeply (
454       [ $rs->get_column ('name')->all ],
455       [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
456       'got artist tree with nocycle (name)',
457     );
458     is_deeply (
459       [ $rs->get_column ('connector')->all ],
460       [ qw/1 0 0 0/ ],
461       'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
462     );
463
464     is_same_sql_bind (
465       $rs->count_rs->as_query,
466       '(
467         SELECT COUNT( * )
468           FROM artist me
469         START WITH name = ?
470         CONNECT BY NOCYCLE parentid = PRIOR artistid
471       )',
472       [
473         [ { 'sqlt_datatype' => 'varchar', 'dbic_colname' => 'name', 'sqlt_size' => 100 }
474             => 'cycle-root'],
475       ],
476     );
477
478     is( $rs->count, 4, 'Connect By Nocycle count ok' );
479   }
480 }
481
482 done_testing;
483
484 sub do_creates {
485   my $dbh = shift;
486
487   eval {
488     $dbh->do("DROP SEQUENCE artist_autoinc_seq");
489     $dbh->do("DROP SEQUENCE artist_pk_seq");
490     $dbh->do("DROP SEQUENCE cd_seq");
491     $dbh->do("DROP SEQUENCE track_seq");
492     $dbh->do("DROP TABLE artist");
493     $dbh->do("DROP TABLE track");
494     $dbh->do("DROP TABLE cd");
495   };
496
497   $dbh->do("CREATE SEQUENCE artist_pk_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
498   $dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
499   $dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
500
501   $dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), autoinc_col NUMBER(12), rank NUMBER(38), charfield VARCHAR2(10))");
502   $dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
503
504   $dbh->do("CREATE TABLE cd (cdid NUMBER(12), artist NUMBER(12), title VARCHAR(255), year VARCHAR(4), genreid NUMBER(12), single_track NUMBER(12))");
505   $dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
506
507   $dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
508   $dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
509
510   $dbh->do(qq{
511     CREATE OR REPLACE TRIGGER artist_insert_trg_pk
512     BEFORE INSERT ON artist
513     FOR EACH ROW
514       BEGIN
515         IF :new.artistid IS NULL THEN
516           SELECT artist_pk_seq.nextval
517           INTO :new.artistid
518           FROM DUAL;
519         END IF;
520       END;
521   });
522   $dbh->do(qq{
523     CREATE OR REPLACE TRIGGER cd_insert_trg
524     BEFORE INSERT OR UPDATE ON cd
525     FOR EACH ROW
526
527       DECLARE
528       tmpVar NUMBER;
529
530       BEGIN
531         tmpVar := 0;
532
533         IF :new.cdid IS NULL THEN
534           SELECT cd_seq.nextval
535           INTO tmpVar
536           FROM dual;
537
538           :new.cdid := tmpVar;
539         END IF;
540       END;
541   });
542   $dbh->do(qq{
543     CREATE OR REPLACE TRIGGER track_insert_trg
544     BEFORE INSERT ON track
545     FOR EACH ROW
546       BEGIN
547         IF :new.trackid IS NULL THEN
548           SELECT track_seq.nextval
549           INTO :new.trackid
550           FROM DUAL;
551         END IF;
552       END;
553   });
554 }
555
556 # clean up our mess
557 END {
558   if ($schema and my $dbh = $schema->storage->dbh) {
559     eval { $dbh->do($_) } for (
560       'DROP SEQUENCE artist_pk_seq',
561       'DROP SEQUENCE cd_seq',
562       'DROP SEQUENCE track_seq',
563       'DROP TABLE artist',
564       'DROP TABLE track',
565       'DROP TABLE cd',
566     );
567   };
568   undef $schema;
569 }