# !!! THIS IS ALSO HORRIFIC !!! /me ashamed
#
- # generate inner/outer select lists for various limit dialects
+ # Generates inner/outer select lists for various limit dialects
# which result in one or more subqueries (e.g. RNO, Top, RowNum)
# Any non-root-table columns need to have their table qualifier
# turned into a column alias (otherwise names in subqueries clash
# and/or lose their source table)
#
- # returns inner/outer strings of SQL QUOTED selectors with aliases
+ # Returns inner/outer strings of SQL QUOTED selectors with aliases
# (to be used in whatever select statement), and an alias index hashref
# of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
- # higher up)
- #
- # If the $scan_order option is supplied, it signals that the limit dialect
- # needs to order the outer side of the query, which in turn means that the
- # inner select needs to bring out columns used in implicit (non-selected)
- # orders, and the order condition itself needs to be realiased to the proper
- # names in the outer query.
- #
- # In this case ($scan_order os true) we also return a hashref (order doesn't
- # matter) of QUOTED EXTRA-SEL => QUOTED ALIAS pairs, which is a list of extra
- # selectors that do *not* exist in the original select list
+ # higher up).
+ # If an order_by is supplied, the inner select needs to bring out columns
+ # used in implicit (non-selected) orders, and the order condition itself
+ # needs to be realiased to the proper names in the outer query. Thus we
+ # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
+ # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
+ # exist in the original select list
sub _subqueried_limit_attrs {
- my ($self, $rs_attrs, $scan_order) = @_;
+ my ($self, $rs_attrs) = @_;
croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
unless ref ($rs_attrs) eq 'HASH';
$in_sel_index->{$sql_sel}++;
$in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
- # this *may* turn out to be necessary, not sure yet
- # my ($sql_unqualified_sel) = $sql_sel =~ / $re_sep (.+) $/x
- # if ! ref $s;
- # $in_sel_index->{$sql_unqualified_sel}++;
+ # record unqualified versions too, so we do not have
+ # to reselect the same column twice (in qualified and
+ # unqualified form)
+ if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
+ $in_sel_index->{$1}++;
+ }
}
}
}
+ # see if the order gives us anything
my %extra_order_sel;
- if ($scan_order) {
- for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
- # order with bind
- $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
- $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
+ for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
+ # order with bind
+ $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
+ $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
- next if $in_sel_index->{$chunk};
+ next if $in_sel_index->{$chunk};
- $extra_order_sel{$chunk} ||= $self->_quote (
- 'ORDER__BY__' . scalar keys %extra_order_sel
- );
- }
+ $extra_order_sel{$chunk} ||= $self->_quote (
+ 'ORDER__BY__' . scalar keys %extra_order_sel
+ );
}
+
return (
(map { join (', ', @$_ ) } (
\@in_sel,
or croak "Unrecognizable SELECT: $sql";
# get selectors, and scan the order_by (if any)
- my ($in_sel, $out_sel, $alias_map, $extra_order_sel) = $self->_subqueried_limit_attrs (
- $rs_attrs, 'scan_order_by',
- );
+ my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
+ = $self->_subqueried_limit_attrs ( $rs_attrs );
# make up an order if none exists
my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
SELECT $in_sel ${sql}${group_having}
) $qalias
- ) $qalias WHERE $idx_name BETWEEN %d AND %d
+ ) $qalias WHERE $idx_name BETWEEN %u AND %u
EOS
return sprintf ('SELECT %s%s%s%s',
$offset
- ? sprintf ('SKIP %d ', $offset)
+ ? sprintf ('SKIP %u ', $offset)
: ''
,
- sprintf ('FIRST %d ', $rows),
+ sprintf ('FIRST %u ', $rows),
$sql,
$self->_parse_rs_attrs ($rs_attrs),
);
or croak "Unrecognizable SELECT: $sql";
return sprintf ('SELECT %s%s%s%s',
- sprintf ('FIRST %d ', $rows),
+ sprintf ('FIRST %u ', $rows),
$offset
- ? sprintf ('SKIP %d ', $offset)
+ ? sprintf ('SKIP %u ', $offset)
: ''
,
$sql,
SELECT $outsel, ROWNUM $idx_name FROM (
SELECT $insel ${sql}${order_group_having}
) $qalias
- ) $qalias WHERE $idx_name BETWEEN %d AND %d
+ ) $qalias WHERE $idx_name BETWEEN %u AND %u
EOS
# get selectors
my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
- = $self->_subqueried_limit_attrs ($rs_attrs, 'outer_order_by');
+ = $self->_subqueried_limit_attrs ($rs_attrs);
my $requested_order = delete $rs_attrs->{order_by};
$mid_sel .= ', ' . $extra_order_sel->{$extra_col};
}
+
+ # since whatever order bindvals there are, they will be realiased
+ # and need to show up in front of the entire initial inner subquery
+ # Unshift *from_bind* to make this happen (horrible, horrible, but
+ # we don't have another mechanism yet)
+ unshift @{$self->{from_bind}}, @{$self->{order_bind}};
}
# and this is order re-alias magic
my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias});
- $sql = sprintf ('SELECT TOP %d %s %s %s %s',
+ $sql = sprintf ('SELECT TOP %u %s %s %s %s',
$rows + ($offset||0),
$in_sel,
$sql,
$order_by_inner,
);
- $sql = sprintf ('SELECT TOP %d %s FROM ( %s ) %s %s',
+ $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
$rows,
$mid_sel,
$sql,
$order_by_reversed,
) if $offset;
- $sql = sprintf ('SELECT TOP %d %s FROM ( %s ) %s %s',
+ $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
$rows,
$out_sel,
$sql,
$order_by_requested,
) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) );
+ $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
+ return $sql;
+ }
+
+ # This is the most evil limit "dialect" (more of a hack) for *really*
+ # stupid databases. It works by ordering the set by some unique column,
+ # and calculating amount of rows that have a less-er value (thus
+ # emulating a RowNum-like index). Of course this implies the set can
+ # only be ordered by a single unique columns.
+ sub _GenericSubQ {
+ my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
+
+ my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve;
+ my $root_tbl_name = $root_rsrc->name;
+
+ # mangle the input sql as we will be replacing the selector
+ $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
+ or croak "Unrecognizable SELECT: $sql";
+
+ my ($order_by, @rest) = do {
+ local $self->{quote_char};
+ $self->_order_by_chunks ($rs_attrs->{order_by})
+ };
+
+ unless (
+ $order_by
+ &&
+ ! @rest
+ &&
+ ( ! ref $order_by
+ ||
+ ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
+ )
+ ) {
+ croak (
+ 'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
+ . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
+ . 'unique-column order criteria.'
+ );
+ }
+
+ ($order_by) = @$order_by if ref $order_by;
+
+ $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
+ my $direction = lc ($1 || 'asc');
+
+ my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
+
+ my $inf = $root_rsrc->storage->_resolve_column_info (
+ $rs_attrs->{from}, [$order_by, $unq_sort_col]
+ );
+
+ my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'";
+
+ if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
+ croak "Generic Subquery Limit order criteria can be only based on the root-source '"
+ . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')";
+ }
+
+ # make sure order column is qualified
+ $order_by = "$rs_attrs->{alias}.$order_by"
+ unless $order_by =~ /^$rs_attrs->{alias}\./;
+
+ my $is_u;
+ my $ucs = { $root_rsrc->unique_constraints };
+ for (values %$ucs ) {
+ if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
+ $is_u++;
+ last;
+ }
+ }
+ croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
+ unless $is_u;
+
+ my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
+ = $self->_subqueried_limit_attrs ($rs_attrs);
+
+ my $cmp_op = $direction eq 'desc' ? '>' : '<';
+ my $count_tbl_alias = 'rownum__emulation';
+
+ my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
+
+ # add the order supplement (if any) as this is what will be used for the outer WHERE
+ $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
+
+ $sql = sprintf (<<EOS,
+ SELECT $out_sel
+ FROM (
+ SELECT $in_sel ${sql}${order_group_having}
+ ) %s
+ WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
+ EOS
+ ( map { $self->_quote ($_) } (
+ $rs_attrs->{alias},
+ $root_tbl_name,
+ $count_tbl_alias,
+ "$count_tbl_alias.$unq_sort_col",
+ $order_by,
+ )),
+ $offset
+ ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1)
+ : sprintf ('< %u', $rows )
+ ,
+ );
+
+ $sql =~ s/\s*\n\s*/ /g; # easier to read in the debugger
return $sql;
}
sub select {
my ($self, $table, $fields, $where, $rs_attrs, @rest) = @_;
- $self->{"${_}_bind"} = [] for (qw/having from order/);
+ $self->{"${_}_bind"} = [] for (qw/having from order where/);
if (not ref($table) or ref($table) eq 'SCALAR') {
$table = $self->_quote($table);
croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
# and anyway, SQL::Abstract::Limit will cause a barf if we don't first
- my ($sql, @where_bind) = $self->SUPER::select(
+ my $sql = '';
+ ($sql, @{$self->{where_bind}}) = $self->SUPER::select(
$table, $self->_recurse_fields($fields), $where, $rs_attrs, @rest
);
- return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
+ return wantarray ? ($sql, @{$self->{from_bind}}, @{$self->{where_bind}}, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
}
# Quotes table names, and handles default inserts
use warnings;
use Scope::Guard ();
use Context::Preserve ();
+ use Try::Tiny;
+ use namespace::clean;
=head1 NAME
__PACKAGE__->set_primary_key('id');
__PACKAGE__->sequence('mysequence');
+ # Somewhere in your Code
+ # add some data to a table with a hierarchical relationship
+ $schema->resultset('Person')->create ({
+ firstname => 'foo',
+ lastname => 'bar',
+ children => [
+ {
+ firstname => 'child1',
+ lastname => 'bar',
+ children => [
+ {
+ firstname => 'grandchild',
+ lastname => 'bar',
+ }
+ ],
+ },
+ {
+ firstname => 'child2',
+ lastname => 'bar',
+ },
+ ],
+ });
+
+ # select from the hierarchical relationship
+ my $rs = $schema->resultset('Person')->search({},
+ {
+ 'start_with' => { 'firstname' => 'foo', 'lastname' => 'bar' },
+ 'connect_by' => { 'parentid' => { '-prior' => \'persionid' },
+ 'order_siblings_by' => { -asc => 'name' },
+ };
+ );
+
+ # this will select the whole tree starting from person "foo bar", creating
+ # following query:
+ # SELECT
+ # me.persionid me.firstname, me.lastname, me.parentid
+ # FROM
+ # person me
+ # START WITH
+ # firstname = 'foo' and lastname = 'bar'
+ # CONNECT BY
+ # parentid = prior persionid
+ # ORDER SIBLINGS BY
+ # firstname ASC
+
=head1 DESCRIPTION
This class implements base Oracle support. The subclass
use base qw/DBIx::Class::Storage::DBI/;
use mro 'c3';
+__PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::Oracle');
+
sub deployment_statements {
my $self = shift;;
my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
$sqltargs->{quote_table_names} = $quote_char ? 1 : 0;
$sqltargs->{quote_field_names} = $quote_char ? 1 : 0;
- my $oracle_version = eval { $self->_get_dbh->get_info(18) };
+ my $oracle_version = try { $self->_get_dbh->get_info(18) };
$sqltargs->{producer_args}{oracle_version} = $oracle_version;
{
$schema ? (owner => $schema) : (),
table_name => $table || $source_name,
- triggering_event => 'INSERT',
+ triggering_event => { -like => '%INSERT%' },
status => 'ENABLED',
},
);
local $dbh->{RaiseError} = 1;
local $dbh->{PrintError} = 0;
- eval {
+ return try {
$dbh->do('select 1 from dual');
+ 1;
+ } catch {
+ 0;
};
-
- return $@ ? 0 : 1;
}
sub _dbh_execute {
my $self = shift;
my ($dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
- my $wantarray = wantarray;
-
- my (@res, $exception, $retried);
-
- RETRY: {
- do {
- eval {
- if ($wantarray) {
- @res = $self->next::method(@_);
- } else {
- $res[0] = $self->next::method(@_);
- }
- };
- $exception = $@;
- if ($exception =~ /ORA-01003/) {
+ my (@res, $tried);
+ my $wantarray = wantarray();
+ my $next = $self->next::can;
+ do {
+ try {
+ my $exec = sub { $self->$next($dbh, $op, $extra_bind, $ident, $bind_attributes, @args) };
+
+ if (!defined $wantarray) {
+ $exec->();
+ }
+ elsif (! $wantarray) {
+ $res[0] = $exec->();
+ }
+ else {
+ @res = $exec->();
+ }
+
+ $tried++;
+ }
+ catch {
+ if (! $tried and $_ =~ /ORA-01003/) {
# ORA-01003: no statement parsed (someone changed the table somehow,
# invalidating your cursor.)
my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
delete $dbh->{CachedKids}{$sql};
- } else {
- last RETRY;
}
- } while (not $retried++);
- }
-
- $self->throw_exception($exception) if $exception;
+ else {
+ $self->throw_exception($_);
+ }
+ };
+ } while (! $tried++);
- $wantarray ? @res : $res[0]
+ return $wantarray ? @res : $res[0];
}
=head2 get_autoinc_seq
$self->dbh_do('_dbh_get_autoinc_seq', $source, $col);
}
- =head2 columns_info_for
-
- This wraps the superclass version of this method to force table
- names to uppercase
-
- =cut
-
- sub columns_info_for {
- my ($self, $table) = @_;
-
- $self->next::method($table);
- }
-
=head2 datetime_parser_type
This sets the proper DateTime::Format module for use with
after => sub { $txn_scope_guard->commit });
}
+=head1 ATTRIBUTES
+
+Following additional attributes can be used in resultsets.
+
+=head2 connect_by or connect_by_nocycle
+
+=over 4
+
+=item Value: \%connect_by
+
+=back
+
+A hashref of conditions used to specify the relationship between parent rows
+and child rows of the hierarchy.
+
+
+ connect_by => { parentid => 'prior personid' }
+
+ # adds a connect by statement to the query:
+ # SELECT
+ # me.persionid me.firstname, me.lastname, me.parentid
+ # FROM
+ # person me
+ # CONNECT BY
+ # parentid = prior persionid
+
+
+ connect_by_nocycle => { parentid => 'prior personid' }
+
+ # adds a connect by statement to the query:
+ # SELECT
+ # me.persionid me.firstname, me.lastname, me.parentid
+ # FROM
+ # person me
+ # CONNECT BY NOCYCLE
+ # parentid = prior persionid
+
+
+=head2 start_with
+
+=over 4
+
+=item Value: \%condition
+
+=back
+
+A hashref of conditions which specify the root row(s) of the hierarchy.
+
+It uses the same syntax as L<DBIx::Class::ResultSet/search>
+
+ start_with => { firstname => 'Foo', lastname => 'Bar' }
+
+ # SELECT
+ # me.persionid me.firstname, me.lastname, me.parentid
+ # FROM
+ # person me
+ # START WITH
+ # firstname = 'foo' and lastname = 'bar'
+ # CONNECT BY
+ # parentid = prior persionid
+
+=head2 order_siblings_by
+
+=over 4
+
+=item Value: ($order_siblings_by | \@order_siblings_by)
+
+=back
+
+Which column(s) to order the siblings by.
+
+It uses the same syntax as L<DBIx::Class::ResultSet/order_by>
+
+ 'order_siblings_by' => 'firstname ASC'
+
+ # SELECT
+ # me.persionid me.firstname, me.lastname, me.parentid
+ # FROM
+ # person me
+ # CONNECT BY
+ # parentid = prior persionid
+ # ORDER SIBLINGS BY
+ # firstname ASC
+
=head1 AUTHOR
See L<DBIx::Class/CONTRIBUTORS>.
use Test::Exception;
use Test::More;
+
use lib qw(t/lib);
use DBICTest;
+use DBIC::SqlMakerTest;
my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_ORA_${_}" } qw/DSN USER PASS/};
eval {
$dbh->do("DROP SEQUENCE artist_seq");
$dbh->do("DROP SEQUENCE cd_seq");
+ $dbh->do("DROP SEQUENCE track_seq");
$dbh->do("DROP SEQUENCE pkid1_seq");
$dbh->do("DROP SEQUENCE pkid2_seq");
$dbh->do("DROP SEQUENCE nonpkid_seq");
};
$dbh->do("CREATE SEQUENCE artist_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
+$dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE pkid1_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE pkid2_seq START WITH 10 MAXVALUE 999999 MINVALUE 0");
$dbh->do("CREATE SEQUENCE nonpkid_seq START WITH 20 MAXVALUE 999999 MINVALUE 0");
-$dbh->do("CREATE TABLE artist (artistid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
+$dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
$dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
$dbh->do("CREATE TABLE sequence_test (pkid1 NUMBER(12), pkid2 NUMBER(12), nonpkid NUMBER(12), name VARCHAR(255))");
$dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
$dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
+$dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
$dbh->do(qq{
CREATE OR REPLACE TRIGGER artist_insert_trg
});
$dbh->do(qq{
CREATE OR REPLACE TRIGGER cd_insert_trg
- BEFORE INSERT ON cd
+ BEFORE INSERT OR UPDATE ON cd
FOR EACH ROW
BEGIN
IF :new.cdid IS NULL THEN
END IF;
END;
});
+$dbh->do(qq{
+ CREATE OR REPLACE TRIGGER cd_insert_trg
+ BEFORE INSERT ON cd
+ FOR EACH ROW
+ BEGIN
+ IF :new.cdid IS NULL THEN
+ SELECT cd_seq.nextval
+ INTO :new.cdid
+ FROM DUAL;
+ END IF;
+ END;
+});
+$dbh->do(qq{
+ CREATE OR REPLACE TRIGGER track_insert_trg
+ BEFORE INSERT ON track
+ FOR EACH ROW
+ BEGIN
+ IF :new.trackid IS NULL THEN
+ SELECT track_seq.nextval
+ INTO :new.trackid
+ FROM DUAL;
+ END IF;
+ END;
+});
{
# Swiped from t/bindtype_columns.t to avoid creating my own Resultset.
# test join with row count ambiguity
-my $track = $schema->resultset('Track')->create({ trackid => 1, cd => 1,
+my $track = $schema->resultset('Track')->create({ cd => $cd->cdid,
position => 1, title => 'Track1' });
my $tjoin = $schema->resultset('Track')->search({ 'me.title' => 'Track1'},
{ join => 'cd',
is($row->title, 'Track1', "ambiguous column ok");
# check count distinct with multiple columns
-my $other_track = $schema->resultset('Track')->create({ trackid => 2, cd => 1, position => 1, title => 'Track2' });
+my $other_track = $schema->resultset('Track')->create({ cd => $cd->cdid, position => 1, title => 'Track2' });
my $tcount = $schema->resultset('Track')->search(
{},
}
}
+
+### test hierarchical queries
+if ( $schema->storage->isa('DBIx::Class::Storage::DBI::Oracle::Generic') ) {
+ my $source = $schema->source('Artist');
+
+ $source->add_column( 'parentid' );
+
+ $source->add_relationship('children', 'DBICTest::Schema::Artist',
+ { 'foreign.parentid' => 'self.artistid' },
+ {
+ accessor => 'multi',
+ join_type => 'LEFT',
+ cascade_delete => 1,
+ cascade_copy => 1,
+ } );
+ $source->add_relationship('parent', 'DBICTest::Schema::Artist',
+ { 'foreign.artistid' => 'self.parentid' },
+ { accessor => 'single' } );
+ DBICTest::Schema::Artist->add_column( 'parentid' );
+ DBICTest::Schema::Artist->has_many(
+ children => 'DBICTest::Schema::Artist',
+ { 'foreign.parentid' => 'self.artistid' }
+ );
+ DBICTest::Schema::Artist->belongs_to(
+ parent => 'DBICTest::Schema::Artist',
+ { 'foreign.artistid' => 'self.parentid' }
+ );
+
+ $schema->resultset('Artist')->create ({
+ name => 'root',
+ rank => 1,
+ cds => [],
+ children => [
+ {
+ name => 'child1',
+ rank => 2,
+ children => [
+ {
+ name => 'grandchild',
+ rank => 3,
+ cds => [
+ {
+ title => "grandchilds's cd" ,
+ year => '2008',
+ tracks => [
+ {
+ position => 1,
+ title => 'Track 1 grandchild',
+ }
+ ],
+ }
+ ],
+ children => [
+ {
+ name => 'greatgrandchild',
+ rank => 3,
+ }
+ ],
+ }
+ ],
+ },
+ {
+ name => 'child2',
+ rank => 3,
+ },
+ ],
+ });
+
+ $schema->resultset('Artist')->create(
+ {
+ name => 'cycle-root',
+ children => [
+ {
+ name => 'cycle-child1',
+ children => [ { name => 'cycle-grandchild' } ],
+ },
+ { name => 'cycle-child2' },
+ ],
+ }
+ );
+
+ $schema->resultset('Artist')->find({ name => 'cycle-root' })
+ ->update({ parentid => \'artistid' });
+
+ # select the whole tree
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ )',
+ [ [ name => 'root'] ],
+ );
+ is_deeply (
+ [ $rs->get_column ('name')->all ],
+ [ qw/root child1 grandchild greatgrandchild child2/ ],
+ 'got artist tree',
+ );
+
+
+ is_same_sql_bind (
+ $rs->count_rs->as_query,
+ '(
+ SELECT COUNT( * )
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ )',
+ [ [ name => 'root'] ],
+ );
+
+ is( $rs->count, 5, 'Connect By count ok' );
+ }
+
+ # use order siblings by statement
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ order_siblings_by => { -desc => 'name' },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ ORDER SIBLINGS BY name DESC
+ )',
+ [ [ name => 'root'] ],
+ );
+
+ is_deeply (
+ [ $rs->get_column ('name')->all ],
+ [ qw/root child2 child1 grandchild greatgrandchild/ ],
+ 'Order Siblings By ok',
+ );
+ }
+
+ # get the root node
+ {
+ my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+ FROM artist me
+ WHERE ( parentid IS NULL )
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ )',
+ [ [ name => 'root'] ],
+ );
+
+ is_deeply(
+ [ $rs->get_column('name')->all ],
+ [ 'root' ],
+ 'found root node',
+ );
+ }
+
+ # combine a connect by with a join
+ {
+ my $rs = $schema->resultset('Artist')->search(
+ {'cds.title' => { -like => '%cd'} },
+ {
+ join => 'cds',
+ start_with => { 'me.name' => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ }
+ );
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+ FROM artist me
+ LEFT JOIN cd cds ON cds.artist = me.artistid
+ WHERE ( cds.title LIKE ? )
+ START WITH me.name = ?
+ CONNECT BY parentid = PRIOR artistid
+ )',
+ [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
+ );
+
+ is_deeply(
+ [ $rs->get_column('name')->all ],
+ [ 'grandchild' ],
+ 'Connect By with a join result name ok'
+ );
+
+
+ is_same_sql_bind (
+ $rs->count_rs->as_query,
+ '(
+ SELECT COUNT( * )
+ FROM artist me
+ LEFT JOIN cd cds ON cds.artist = me.artistid
+ WHERE ( cds.title LIKE ? )
+ START WITH me.name = ?
+ CONNECT BY parentid = PRIOR artistid
+ )',
+ [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
+ );
+
+ is( $rs->count, 1, 'Connect By with a join; count ok' );
+ }
+
+ # combine a connect by with order_by
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ order_by => { -asc => [ 'LEVEL', 'name' ] },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ ORDER BY LEVEL ASC, name ASC
+ )',
+ [ [ name => 'root' ] ],
+ );
+
+ is_deeply (
+ [ $rs->get_column ('name')->all ],
+ [ qw/root child1 child2 grandchild greatgrandchild/ ],
+ 'Connect By with a order_by - result name ok'
+ );
+ }
+
+
+ # limit a connect by
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ order_by => { -asc => 'name' },
+ rows => 2,
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT artistid, name, rank, charfield, parentid FROM (
+ SELECT artistid, name, rank, charfield, parentid, ROWNUM rownum__index FROM (
+ SELECT
+ me.artistid,
+ me.name,
+ me.rank,
+ me.charfield,
+ me.parentid
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ ORDER BY name ASC
+ ) me
+ ) me
+ WHERE rownum__index BETWEEN 1 AND 2
+ )',
+ [ [ name => 'root' ] ],
+ );
+
+ is_deeply (
+ [ $rs->get_column ('name')->all ],
+ [qw/child1 child2/],
+ 'LIMIT a Connect By query - correct names'
+ );
+
+ # TODO:
+ # prints "START WITH name = ?
+ # CONNECT BY artistid = PRIOR parentid "
+ # after count_subq,
+ # I will fix this later...
+ #
+ is_same_sql_bind (
+ $rs->count_rs->as_query,
+ '(
+ SELECT COUNT( * ) FROM (
+ SELECT artistid FROM (
+ SELECT artistid, ROWNUM rownum__index FROM (
+ SELECT
+ me.artistid
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ ) me
+ ) me
+ WHERE rownum__index BETWEEN 1 AND 2
+ ) me
+ )',
+ [ [ name => 'root' ] ],
+ );
+
+ is( $rs->count, 2, 'Connect By; LIMIT count ok' );
+ }
+
+ # combine a connect_by with group_by and having
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ select => ['count(rank)'],
+ start_with => { name => 'root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ group_by => ['rank'],
+ having => { 'count(rank)' => { '<', 2 } },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT count(rank)
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY parentid = PRIOR artistid
+ GROUP BY rank HAVING count(rank) < ?
+ )',
+ [ [ name => 'root' ], [ 'count(rank)' => 2 ] ],
+ );
+
+ is_deeply (
+ [ $rs->get_column ('count(rank)')->all ],
+ [1, 1],
+ 'Group By a Connect By query - correct values'
+ );
+ }
+
+
+ # select the whole cycle tree without nocylce
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'cycle-root' },
+ connect_by => { parentid => { -prior => \ 'artistid' } },
+ });
+ eval { $rs->get_column ('name')->all };
+ if ( $@ =~ /ORA-01436/ ){ # ORA-01436: CONNECT BY loop in user data
+ pass "connect by initify loop detection without nocycle";
+ }else{
+ fail "connect by initify loop detection without nocycle, not detected by oracle";
+ }
+ }
+
+ # select the whole cycle tree with nocylce
+ {
+ my $rs = $schema->resultset('Artist')->search({}, {
+ start_with => { name => 'cycle-root' },
+ '+select' => [ \ 'CONNECT_BY_ISCYCLE' ],
+ connect_by_nocycle => { parentid => { -prior => \ 'artistid' } },
+ });
+
+ is_same_sql_bind (
+ $rs->as_query,
+ '(
+ SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY NOCYCLE parentid = PRIOR artistid
+ )',
+ [ [ name => 'cycle-root'] ],
+ );
+ is_deeply (
+ [ $rs->get_column ('name')->all ],
+ [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
+ 'got artist tree with nocycle (name)',
+ );
+ is_deeply (
+ [ $rs->get_column ('CONNECT_BY_ISCYCLE')->all ],
+ [ qw/1 0 0 0/ ],
+ 'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
+ );
+
+
+ is_same_sql_bind (
+ $rs->count_rs->as_query,
+ '(
+ SELECT COUNT( * )
+ FROM artist me
+ START WITH name = ?
+ CONNECT BY NOCYCLE parentid = PRIOR artistid
+ )',
+ [ [ name => 'cycle-root'] ],
+ );
+
+ is( $rs->count, 4, 'Connect By Nocycle count ok' );
+ }
+}
+
done_testing;
# clean up our mess
if($schema && ($dbh = $schema->storage->dbh)) {
$dbh->do("DROP SEQUENCE artist_seq");
$dbh->do("DROP SEQUENCE cd_seq");
+ $dbh->do("DROP SEQUENCE track_seq");
$dbh->do("DROP SEQUENCE pkid1_seq");
$dbh->do("DROP SEQUENCE pkid2_seq");
$dbh->do("DROP SEQUENCE nonpkid_seq");