Merge 'trunk' into 'oracle_hierarchical_queries_rt39121'
Peter Rabbitson [Fri, 7 May 2010 21:35:07 +0000 (21:35 +0000)]
r9318@Thesaurus (orig r9305):  rabbit | 2010-05-05 11:49:51 +0200
 r9296@Thesaurus (orig r9283):  ribasushi | 2010-05-01 11:51:15 +0200
 Branch to clean up various limit dialects
 r9297@Thesaurus (orig r9284):  rabbit | 2010-05-01 11:55:04 +0200
 Preliminary version
 r9301@Thesaurus (orig r9288):  rabbit | 2010-05-03 18:31:24 +0200
 Fix incorrect comparison
 r9302@Thesaurus (orig r9289):  rabbit | 2010-05-03 18:32:36 +0200
 Do not add TOP prefixes to queries already containing it
 r9303@Thesaurus (orig r9290):  rabbit | 2010-05-03 18:33:15 +0200
 Add an as selector to a prefetch subquery to aid the subselecting-limit analyzer
 r9304@Thesaurus (orig r9291):  rabbit | 2010-05-03 18:34:49 +0200
 Rewrite mssql test to verify both types of limit dialects with and without quoting, rewrite the RNO, Top and RowNum dialects to rely on a factored out column re-aliaser
 r9305@Thesaurus (orig r9292):  rabbit | 2010-05-03 21:06:01 +0200
 Fix Top tests, make extra col selector order consistent
 r9307@Thesaurus (orig r9294):  ribasushi | 2010-05-04 00:50:35 +0200
 Fix test warning
 r9308@Thesaurus (orig r9295):  ribasushi | 2010-05-04 01:04:32 +0200
 Some databases (db2) do not like leading __s - use a different weird identifier for extra selector names
 r9313@Thesaurus (orig r9300):  rabbit | 2010-05-05 11:08:33 +0200
 Rename test
 r9314@Thesaurus (orig r9301):  rabbit | 2010-05-05 11:11:32 +0200
 If there was no offset, there is no sense in reordering
 r9315@Thesaurus (orig r9302):  rabbit | 2010-05-05 11:12:19 +0200
 Split and fix oracle tests
 r9317@Thesaurus (orig r9304):  rabbit | 2010-05-05 11:49:33 +0200
 Changes

r9321@Thesaurus (orig r9308):  rabbit | 2010-05-05 13:01:35 +0200
Changes
r9322@Thesaurus (orig r9309):  rabbit | 2010-05-05 13:02:39 +0200
Fix obsucre bug with as_subselect_rs (gah wrong commit msg)
r9323@Thesaurus (orig r9310):  rabbit | 2010-05-05 14:56:38 +0200
Forgotten pieces
r9329@Thesaurus (orig r9316):  rabbit | 2010-05-07 10:15:52 +0200
Failure to determine dbms version is *not* a fatal error - trap exceptions
r9330@Thesaurus (orig r9317):  caelum | 2010-05-07 11:57:24 +0200
detect row_number() over support in MSSQL if version detection fails
r9331@Thesaurus (orig r9318):  caelum | 2010-05-07 14:56:57 +0200
minor change
r9332@Thesaurus (orig r9319):  nigel | 2010-05-07 15:03:00 +0200
empty update OK even if row is not in database
r9333@Thesaurus (orig r9320):  nigel | 2010-05-07 15:28:06 +0200
Added reference to cascade_* in relationship attributes
r9334@Thesaurus (orig r9321):  nigel | 2010-05-07 15:39:37 +0200
empty update OK even if row is not in database (fixed)
r9335@Thesaurus (orig r9322):  nigel | 2010-05-07 15:48:19 +0200
empty update OK even if row is not in database (fixed2)
r9336@Thesaurus (orig r9323):  nigel | 2010-05-07 15:54:36 +0200
Clarification to cascade_update attribute documentation
r9337@Thesaurus (orig r9324):  nigel | 2010-05-07 16:08:17 +0200
Clarification cascade_* attribute defaults documentation

lib/DBIx/Class/SQLAHacks/Oracle.pm [new file with mode: 0644]
lib/DBIx/Class/Storage/DBI/Oracle/Generic.pm
t/73oracle.t
t/sqlahacks/oracle.t [new file with mode: 0644]

diff --git a/lib/DBIx/Class/SQLAHacks/Oracle.pm b/lib/DBIx/Class/SQLAHacks/Oracle.pm
new file mode 100644 (file)
index 0000000..0ca54b6
--- /dev/null
@@ -0,0 +1,131 @@
+package # Hide from PAUSE
+  DBIx::Class::SQLAHacks::Oracle;
+
+use warnings;
+use strict;
+
+use base qw( DBIx::Class::SQLAHacks );
+use Carp::Clan qw/^DBIx::Class|^SQL::Abstract/;
+
+# 
+#  TODO:
+#   - Review by experienced DBIC/SQL:A developers :-)
+#   - Problem with count and connect_by look the TODO in t/73oracle.t
+# 
+
+sub new {
+  my $self = shift;
+  my %opts = (ref $_[0] eq 'HASH') ? %{$_[0]} : @_;
+  push @{$opts{special_ops}}, {
+    regex => qr/^prior$/i,
+    handler => '_where_field_PRIOR',
+  };
+
+  $self->SUPER::new (\%opts);
+}
+
+sub select {
+    my ($self, $table, $fields, $where, $rs_attrs, @rest) = @_;
+
+    my ($sql, @bind) = $self->SUPER::select($table, $fields, $where, $rs_attrs, @rest);
+    push @bind, @{$self->{_oracle_connect_by_binds}};
+
+    return wantarray ? ($sql, @bind) : $sql;
+}
+
+sub _emulate_limit {
+    my ( $self, $syntax, $sql, $rs_attrs, $rows, $offset ) = @_;
+
+    my ($cb_sql, @cb_bind) = $self->_connect_by($rs_attrs);
+    $sql .= $cb_sql;
+    $self->{_oracle_connect_by_binds} = \@cb_bind;
+
+    return $self->SUPER::_emulate_limit($syntax, $sql, $rs_attrs, $rows, $offset);
+}
+
+sub _connect_by {
+    my ($self, $attrs) = @_;
+
+    my $sql = '';
+    my @bind;
+
+    if ( ref($attrs) eq 'HASH' ) {
+        if ( $attrs->{'start_with'} ) {
+            my ($ws, @wb) = $self->_recurse_where( $attrs->{'start_with'} );
+            $sql .= $self->_sqlcase(' start with ') . $ws;
+            push @bind, @wb;
+        }
+        if ( my $connect_by = $attrs->{'connect_by'} ) {
+            my ($connect_by_sql, @connect_by_sql_bind) = $self->_recurse_where( $attrs->{'connect_by'} );
+            $sql .= sprintf(" %s %s",
+                ( $attrs->{'nocycle'} ) ? $self->_sqlcase('connect by nocycle')
+                    : $self->_sqlcase('connect by'),
+                $connect_by_sql,
+            );
+            push @bind, @connect_by_sql_bind;
+            # $sql .= $self->_sqlcase(' connect by');
+            #             foreach my $key ( keys %$connect_by ) {
+            #                 $sql .= " $key = " . $connect_by->{$key};
+            #             }
+        }
+        if ( $attrs->{'order_siblings_by'} ) {
+            $sql .= $self->_order_siblings_by( $attrs->{'order_siblings_by'} );
+        }
+    }
+
+    return wantarray ? ($sql, @bind) : $sql;
+}
+
+sub _order_siblings_by {
+    my ( $self, $arg ) = @_;
+
+    my ( @sql, @bind );
+    for my $c ( $self->_order_by_chunks($arg) ) {
+        $self->_SWITCH_refkind(
+            $c,
+            {
+                SCALAR   => sub { push @sql, $c },
+                ARRAYREF => sub { push @sql, shift @$c; push @bind, @$c },
+            }
+        );
+    }
+
+    my $sql =
+      @sql
+      ? sprintf( '%s %s', $self->_sqlcase(' order siblings by'), join( ', ', @sql ) )
+      : '';
+
+    return wantarray ? ( $sql, @bind ) : $sql;
+}
+
+# we need to add a '=' only when PRIOR is used against a column diretly
+# i.e. when it is invoked by a special_op callback
+sub _where_field_PRIOR {
+  my ($self, $lhs, $op, $rhs) = @_;
+  my ($sql, @bind) = $self->_recurse_where ($rhs);
+
+  $sql = sprintf ('%s = %s %s ',
+    $self->_convert($self->_quote($lhs)),
+    $self->_sqlcase ($op),
+    $sql
+  );
+
+  return ($sql, @bind);
+}
+
+1;
+
+__END__
+
+=pod
+
+=head1 NAME
+
+DBIx::Class::SQLAHacks::Oracle - adds hierarchical query support for Oracle to SQL::Abstract
+
+=head1 DESCRIPTION
+
+See L<DBIx::Class::Storage::DBI::Oracle::Generic> for more informations about
+how to use hierarchical queries with DBIx::Class.
+
+=cut
index c832536..5f54bd6 100644 (file)
@@ -17,6 +17,51 @@ DBIx::Class::Storage::DBI::Oracle::Generic - Oracle Support for DBIx::Class
   __PACKAGE__->set_primary_key('id');
   __PACKAGE__->sequence('mysequence');
 
+  # Somewhere in your Code
+  # add some data to a table with a hierarchical relationship
+  $schema->resultset('Person')->create ({
+        firstname => 'foo',
+        lastname => 'bar',
+        children => [
+            {
+                firstname => 'child1',
+                lastname => 'bar',
+                children => [
+                    {
+                        firstname => 'grandchild',
+                        lastname => 'bar',
+                    }
+                ],
+            },
+            {
+                firstname => 'child2',
+                lastname => 'bar',
+            },
+        ],
+    });
+
+  # select from the hierarchical relationship
+  my $rs = $schema->resultset('Person')->search({},
+    {
+      'start_with' => { 'firstname' => 'foo', 'lastname' => 'bar' },
+      'connect_by' => { 'parentid' => { '-prior' => \'persionid' },
+      'order_siblings_by' => { -asc => 'name' },
+    };
+  );
+
+  # this will select the whole tree starting from person "foo bar", creating
+  # following query:
+  # SELECT
+  #     me.persionid me.firstname, me.lastname, me.parentid
+  # FROM
+  #     person me
+  # START WITH
+  #     firstname = 'foo' and lastname = 'bar'
+  # CONNECT BY
+  #     parentid = prior persionid
+  # ORDER SIBLINGS BY
+  #     firstname ASC
+
 =head1 DESCRIPTION
 
 This class implements base Oracle support. The subclass
@@ -30,6 +75,8 @@ versions before 9.
 use base qw/DBIx::Class::Storage::DBI/;
 use mro 'c3';
 
+__PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::Oracle');
+
 sub deployment_statements {
   my $self = shift;;
   my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
@@ -361,7 +408,7 @@ sub with_deferred_fk_checks {
   my $txn_scope_guard = $self->txn_scope_guard;
 
   $self->_do_query('alter session set constraints = deferred');
-  
+
   my $sg = Scope::Guard->new(sub {
     $self->_do_query('alter session set constraints = immediate');
   });
@@ -370,6 +417,99 @@ sub with_deferred_fk_checks {
     after => sub { $txn_scope_guard->commit });
 }
 
+=head1 ATTRIBUTES
+
+Following additional attributes can be used in resultsets.
+
+=head2 connect_by
+
+=over 4
+
+=item Value: \%connect_by
+
+=back
+
+A hashref of conditions used to specify the relationship between parent rows
+and child rows of the hierarchy.
+
+  connect_by => { parentid => 'prior personid' }
+
+  # adds a connect by statement to the query:
+  # SELECT
+  #     me.persionid me.firstname, me.lastname, me.parentid
+  # FROM
+  #     person me
+  # CONNECT BY
+  #     parentid = prior persionid
+
+=head2 nocycle
+
+=over 4
+
+=item Value: [1|0]
+
+=back
+
+If you want to use NOCYCLE set to 1.
+
+    connect_by => { parentid => 'prior personid' },
+    nocycle    => 1
+
+    # adds a connect by statement to the query:
+    # SELECT
+    #     me.persionid me.firstname, me.lastname, me.parentid
+    # FROM
+    #     person me
+    # CONNECT BY NOCYCLE
+    #     parentid = prior persionid
+
+
+=head2 start_with
+
+=over 4
+
+=item Value: \%condition
+
+=back
+
+A hashref of conditions which specify the root row(s) of the hierarchy.
+
+It uses the same syntax as L<DBIx::Class::ResultSet/search>
+
+  start_with => { firstname => 'Foo', lastname => 'Bar' }
+
+  # SELECT
+  #     me.persionid me.firstname, me.lastname, me.parentid
+  # FROM
+  #     person me
+  # START WITH
+  #     firstname = 'foo' and lastname = 'bar'
+  # CONNECT BY
+  #     parentid = prior persionid
+
+=head2 order_siblings_by
+
+=over 4
+
+=item Value: ($order_siblings_by | \@order_siblings_by)
+
+=back
+
+Which column(s) to order the siblings by.
+
+It uses the same syntax as L<DBIx::Class::ResultSet/order_by>
+
+  'order_siblings_by' => 'firstname ASC'
+
+  # SELECT
+  #     me.persionid me.firstname, me.lastname, me.parentid
+  # FROM
+  #     person me
+  # CONNECT BY
+  #     parentid = prior persionid
+  # ORDER SIBLINGS BY
+  #     firstname ASC
+
 =head1 AUTHOR
 
 See L<DBIx::Class/CONTRIBUTORS>.
index 0aa3ee7..8c94592 100644 (file)
@@ -30,8 +30,10 @@ use warnings;
 
 use Test::Exception;
 use Test::More;
+
 use lib qw(t/lib);
 use DBICTest;
+use DBIC::SqlMakerTest;
 
 my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_ORA_${_}" } qw/DSN USER PASS/};
 
@@ -48,6 +50,7 @@ my $dbh = $schema->storage->dbh;
 eval {
   $dbh->do("DROP SEQUENCE artist_seq");
   $dbh->do("DROP SEQUENCE cd_seq");
+  $dbh->do("DROP SEQUENCE track_seq");
   $dbh->do("DROP SEQUENCE pkid1_seq");
   $dbh->do("DROP SEQUENCE pkid2_seq");
   $dbh->do("DROP SEQUENCE nonpkid_seq");
@@ -58,11 +61,12 @@ eval {
 };
 $dbh->do("CREATE SEQUENCE artist_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
 $dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
+$dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
 $dbh->do("CREATE SEQUENCE pkid1_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
 $dbh->do("CREATE SEQUENCE pkid2_seq START WITH 10 MAXVALUE 999999 MINVALUE 0");
 $dbh->do("CREATE SEQUENCE nonpkid_seq START WITH 20 MAXVALUE 999999 MINVALUE 0");
 
-$dbh->do("CREATE TABLE artist (artistid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
+$dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
 $dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
 
 $dbh->do("CREATE TABLE sequence_test (pkid1 NUMBER(12), pkid2 NUMBER(12), nonpkid NUMBER(12), name VARCHAR(255))");
@@ -72,6 +76,7 @@ $dbh->do("CREATE TABLE cd (cdid NUMBER(12), artist NUMBER(12), title VARCHAR(255
 $dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
 
 $dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
+$dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
 
 $dbh->do(qq{
   CREATE OR REPLACE TRIGGER artist_insert_trg
@@ -97,6 +102,30 @@ $dbh->do(qq{
     END IF;
   END;
 });
+$dbh->do(qq{
+  CREATE OR REPLACE TRIGGER cd_insert_trg
+  BEFORE INSERT ON cd
+  FOR EACH ROW
+  BEGIN
+    IF :new.cdid IS NULL THEN
+      SELECT cd_seq.nextval
+      INTO :new.cdid
+      FROM DUAL;
+    END IF;
+  END;
+});
+$dbh->do(qq{
+  CREATE OR REPLACE TRIGGER track_insert_trg
+  BEFORE INSERT ON track
+  FOR EACH ROW
+  BEGIN
+    IF :new.trackid IS NULL THEN
+      SELECT track_seq.nextval
+      INTO :new.trackid
+      FROM DUAL;
+    END IF;
+  END;
+});
 
 {
     # Swiped from t/bindtype_columns.t to avoid creating my own Resultset.
@@ -161,7 +190,7 @@ lives_and {
 
 # test join with row count ambiguity
 
-my $track = $schema->resultset('Track')->create({ trackid => 1, cd => 1,
+my $track = $schema->resultset('Track')->create({ cd => $cd->cdid,
     position => 1, title => 'Track1' });
 my $tjoin = $schema->resultset('Track')->search({ 'me.title' => 'Track1'},
         { join => 'cd',
@@ -173,7 +202,7 @@ ok(my $row = $tjoin->next);
 is($row->title, 'Track1', "ambiguous column ok");
 
 # check count distinct with multiple columns
-my $other_track = $schema->resultset('Track')->create({ trackid => 2, cd => 1, position => 1, title => 'Track2' });
+my $other_track = $schema->resultset('Track')->create({ cd => $cd->cdid, position => 1, title => 'Track2' });
 
 my $tcount = $schema->resultset('Track')->search(
   {},
@@ -284,6 +313,378 @@ SKIP: {
   }
 }
 
+
+### test hierarchical queries
+if ( $schema->storage->isa('DBIx::Class::Storage::DBI::Oracle::Generic') ) {
+    my $source = $schema->source('Artist');
+
+    $source->add_column( 'parentid' );
+
+    $source->add_relationship('children', 'DBICTest::Schema::Artist',
+        { 'foreign.parentid' => 'self.artistid' },
+        {
+            accessor => 'multi',
+            join_type => 'LEFT',
+            cascade_delete => 1,
+            cascade_copy => 1,
+        } );
+    $source->add_relationship('parent', 'DBICTest::Schema::Artist',
+        { 'foreign.artistid' => 'self.parentid' },
+        { accessor => 'single' } );
+    DBICTest::Schema::Artist->add_column( 'parentid' );
+    DBICTest::Schema::Artist->has_many(
+        children => 'DBICTest::Schema::Artist',
+        { 'foreign.parentid' => 'self.artistid' }
+    );
+    DBICTest::Schema::Artist->belongs_to(
+        parent => 'DBICTest::Schema::Artist',
+        { 'foreign.artistid' => 'self.parentid' }
+    );
+
+    $schema->resultset('Artist')->create ({
+        name => 'root',
+        cds => [],
+        children => [
+            {
+                name => 'child1',
+                children => [
+                    {
+                        name => 'grandchild',
+                        cds => [
+                            {
+                                title => "grandchilds's cd" ,
+                                year => '2008',
+                                tracks => [
+                                    {
+                                        position => 1,
+                                        title => 'Track 1 grandchild',
+                                    }
+                                ],
+                            }
+                        ],
+                        children => [
+                            {
+                                name => 'greatgrandchild',
+                            }
+                        ],
+                    }
+                ],
+            },
+            {
+                name => 'child2',
+            },
+        ],
+    });
+
+    $schema->resultset('Artist')->create(
+        {
+            name     => 'cycle-root',
+            children => [
+                {
+                    name     => 'cycle-child1',
+                    children => [ { name => 'cycle-grandchild' } ],
+                },
+                { name => 'cycle-child2' },
+            ],
+        }
+    );
+
+    $schema->resultset('Artist')->find({ name => 'cycle-root' })
+      ->update({ parentid => \'artistid' });
+
+    # select the whole tree
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        start_with => { name => 'root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY parentid = PRIOR artistid 
+        )',
+        [ [ name => 'root'] ],
+      );
+      is_deeply (
+        [ $rs->get_column ('name')->all ],
+        [ qw/root child1 grandchild greatgrandchild child2/ ],
+        'got artist tree',
+      );
+
+
+      is_same_sql_bind (
+        $rs->count_rs->as_query,
+        '(
+          SELECT COUNT( * )
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY parentid = PRIOR artistid 
+        )',
+        [ [ name => 'root'] ],
+      );
+
+      is( $rs->count, 5, 'Connect By count ok' );
+    }
+
+    # use order siblings by statement
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        start_with => { name => 'root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+        order_siblings_by => { -desc => 'name' },
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY parentid = PRIOR artistid 
+          ORDER SIBLINGS BY name DESC
+        )',
+        [ [ name => 'root'] ],
+      );
+
+      is_deeply (
+        [ $rs->get_column ('name')->all ],
+        [ qw/root child2 child1 grandchild greatgrandchild/ ],
+        'Order Siblings By ok',
+      );
+    }
+
+    # get the root node
+    {
+      my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
+        start_with => { name => 'root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+            FROM artist me
+          WHERE ( parentid IS NULL )
+          START WITH name = ?
+          CONNECT BY parentid = PRIOR artistid 
+        )',
+        [ [ name => 'root'] ],
+      );
+
+      is_deeply(
+        [ $rs->get_column('name')->all ],
+        [ 'root' ],
+        'found root node',
+      );
+    }
+
+    # combine a connect by with a join
+    {
+      my $rs = $schema->resultset('Artist')->search(
+        {'cds.title' => { -like => '%cd'} },
+        {
+          join => 'cds',
+          start_with => { 'me.name' => 'root' },
+          connect_by => { parentid => { -prior => \ 'artistid' } },
+        }
+      );
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+            FROM artist me
+            LEFT JOIN cd cds ON cds.artist = me.artistid
+          WHERE ( cds.title LIKE ? )
+          START WITH me.name = ?
+          CONNECT BY parentid = PRIOR artistid 
+        )',
+        [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
+      );
+
+      is_deeply(
+        [ $rs->get_column('name')->all ],
+        [ 'grandchild' ],
+        'Connect By with a join result name ok'
+      );
+
+
+      is_same_sql_bind (
+        $rs->count_rs->as_query,
+        '(
+          SELECT COUNT( * )
+            FROM artist me
+            LEFT JOIN cd cds ON cds.artist = me.artistid
+          WHERE ( cds.title LIKE ? )
+          START WITH me.name = ?
+          CONNECT BY parentid = PRIOR artistid 
+        )',
+        [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
+      );
+
+      is( $rs->count, 1, 'Connect By with a join; count ok' );
+    }
+
+    # combine a connect by with order_by
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        start_with => { name => 'root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+        order_by => { -asc => [ 'LEVEL', 'name' ] },
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY parentid = PRIOR artistid 
+          ORDER BY LEVEL ASC, name ASC
+        )',
+        [ [ name => 'root' ] ],
+      );
+
+      is_deeply (
+        [ $rs->get_column ('name')->all ],
+        [ qw/root child1 child2 grandchild greatgrandchild/ ],
+        'Connect By with a order_by - result name ok'
+      );
+    }
+
+
+    # limit a connect by
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        start_with => { name => 'root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+        order_by => { -asc => 'name' },
+        rows => 2,
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '( 
+            SELECT * FROM (
+                  SELECT A.*, ROWNUM r FROM (
+                      SELECT 
+                          me.artistid AS col1,
+                          me.name AS col2,
+                          me.rank AS col3,
+                          me.charfield AS col4,
+                          me.parentid AS col5 
+                      FROM artist me 
+                      START WITH name = ? 
+                      CONNECT BY parentid = PRIOR artistid
+                      ORDER BY name ASC
+                  ) A
+                  WHERE ROWNUM < 3
+              ) B
+              WHERE r >= 1 
+        )',
+        [ [ name => 'root' ] ],
+      );
+
+      is_deeply (
+        [ $rs->get_column ('name')->all ],
+        [qw/child1 child2/],
+        'LIMIT a Connect By query - correct names'
+      );
+
+      # TODO: 
+      # prints "START WITH name = ? 
+      # CONNECT BY artistid = PRIOR parentid "
+      # after count_subq, 
+      # I will fix this later...
+      # 
+      # is_same_sql_bind (
+      #   $rs->count_rs->as_query,
+      #   '( 
+      #       SELECT COUNT( * ) FROM (
+      #           SELECT * FROM (
+      #               SELECT A.*, ROWNUM r FROM (
+      #                   SELECT 
+      #                       me.artistid AS col1 
+      #                   FROM artist me 
+      #                   START WITH name = ? 
+      #                   CONNECT BY artistid = PRIOR parentid 
+      #               ) A
+      #               WHERE ROWNUM < 3
+      #           ) B
+      #           WHERE r >= 1
+      #       ) count_subq 
+      #   )',
+      #   [ [ name => 'greatgrandchild' ] ],
+      # );
+      # 
+      # is( $rs->count, 2, 'Connect By; LIMIT count ok' );
+    }
+
+    # select the whole cycle tree without nocylce
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        start_with => { name => 'cycle-root' },
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+      });
+      eval { $rs->get_column ('name')->all };
+      if ( $@ =~ /ORA-01436/ ){ # ORA-01436:  CONNECT BY loop in user data
+        pass "connect by initify loop detection without nocycle";
+      }else{
+        fail "connect by initify loop detection without nocycle, not detected by oracle";
+      }
+    }
+
+    # select the whole cycle tree with nocylce
+    {
+      my $rs = $schema->resultset('Artist')->search({}, {
+        nocycle    => 1,
+        start_with => { name => 'cycle-root' },
+        '+select'  => [ \ 'CONNECT_BY_ISCYCLE' ],
+        connect_by => { parentid => { -prior => \ 'artistid' } },
+      });
+
+      is_same_sql_bind (
+        $rs->as_query,
+        '(
+          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY NOCYCLE parentid = PRIOR artistid 
+        )',
+        [ [ name => 'cycle-root'] ],
+      );
+      is_deeply (
+        [ $rs->get_column ('name')->all ],
+        [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
+        'got artist tree with nocycle (name)',
+      );
+      is_deeply (
+        [ $rs->get_column ('CONNECT_BY_ISCYCLE')->all ],
+        [ qw/1 0 0 0/ ],
+        'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
+      );
+
+
+      is_same_sql_bind (
+        $rs->count_rs->as_query,
+        '(
+          SELECT COUNT( * )
+            FROM artist me
+          START WITH name = ?
+          CONNECT BY NOCYCLE parentid = PRIOR artistid 
+        )',
+        [ [ name => 'cycle-root'] ],
+      );
+
+      is( $rs->count, 4, 'Connect By Nocycle count ok' );
+    }
+}
+
 done_testing;
 
 # clean up our mess
@@ -291,6 +692,7 @@ END {
     if($schema && ($dbh = $schema->storage->dbh)) {
         $dbh->do("DROP SEQUENCE artist_seq");
         $dbh->do("DROP SEQUENCE cd_seq");
+        $dbh->do("DROP SEQUENCE track_seq");
         $dbh->do("DROP SEQUENCE pkid1_seq");
         $dbh->do("DROP SEQUENCE pkid2_seq");
         $dbh->do("DROP SEQUENCE nonpkid_seq");
diff --git a/t/sqlahacks/oracle.t b/t/sqlahacks/oracle.t
new file mode 100644 (file)
index 0000000..1d36abf
--- /dev/null
@@ -0,0 +1,83 @@
+
+use strict;
+use warnings;
+use Test::More;
+use Test::Exception;
+use Data::Dumper;
+use lib qw(t/lib);
+use DBIC::SqlMakerTest;
+use DBIx::Class::SQLAHacks::Oracle;
+
+
+
+# 
+#  Offline test for connect_by 
+#  ( without acitve database connection)
+# 
+my @handle_tests = (
+    {
+        connect_by  => { 'parentid' => { '-prior' => \'artistid' } },
+        stmt        => '"parentid" = PRIOR artistid',
+        bind        => [],
+        msg         => 'Simple: "parentid" = PRIOR artistid',
+    },
+    {
+        connect_by  => { 'parentid' => { '!=' => { '-prior' => \'artistid' } } },
+        stmt        => '"parentid" != ( PRIOR artistid )',
+        bind        => [],
+        msg         => 'Simple: "parentid" != ( PRIOR artistid )',
+    },
+    # Examples from http://download.oracle.com/docs/cd/B19306_01/server.102/b14200/queries003.htm
+
+    # CONNECT BY last_name != 'King' AND PRIOR employee_id = manager_id ...
+    {
+        connect_by  => [
+            last_name => { '!=' => 'King' },
+            manager_id => { '-prior' => \'employee_id' },
+        ],
+        stmt        => '( "last_name" != ? OR "manager_id" = PRIOR employee_id )',
+        bind        => ['King'],
+        msg         => 'oracle.com example #1',
+    },
+    # CONNECT BY PRIOR employee_id = manager_id and 
+    #            PRIOR account_mgr_id = customer_id ...
+    {
+        connect_by  => {
+            manager_id => { '-prior' => \'employee_id' },
+            customer_id => { '>', { '-prior' => \'account_mgr_id' } },
+        },
+        stmt        => '( "customer_id" > ( PRIOR account_mgr_id ) AND "manager_id" = PRIOR employee_id )',
+        bind        => [],
+        msg         => 'oracle.com example #2',
+    },
+    # CONNECT BY NOCYCLE PRIOR employee_id = manager_id AND LEVEL <= 4;
+    # TODO: NOCYCLE parameter doesn't work
+);
+
+my $sqla_oracle = DBIx::Class::SQLAHacks::Oracle->new( quote_char => '"', name_sep => '.' );
+isa_ok($sqla_oracle, 'DBIx::Class::SQLAHacks::Oracle');
+
+
+my $test_count = ( @handle_tests * 2 ) + 1;
+
+for my $case (@handle_tests) {
+    local $Data::Dumper::Terse = 1;
+    my ( $stmt, @bind );
+    my $msg = sprintf("Offline: %s",
+        $case->{msg} || substr($case->{stmt},0,25),
+    );
+    lives_ok(
+        sub {
+            ( $stmt, @bind ) = $sqla_oracle->_recurse_where( $case->{connect_by} );
+            is_same_sql_bind( $stmt, \@bind, $case->{stmt}, $case->{bind},$msg )
+              || diag "Search term:\n" . Dumper $case->{connect_by};
+        }
+    ,sprintf("lives is ok from '%s'",$msg));
+}
+
+# 
+#   Online Tests?
+# 
+$test_count += 0;
+
+done_testing( $test_count );