Merge 'trunk' into 'oracle_hierarchical_queries_rt39121'
Peter Rabbitson [Mon, 31 May 2010 23:10:46 +0000 (23:10 +0000)]
r9350@Thesaurus (orig r9337):  rabbit | 2010-05-08 11:23:56 +0200
Make sure missing author-deps do not kill makefile creation
r9358@Thesaurus (orig r9344):  rabbit | 2010-05-11 16:46:47 +0200
 r9147@Thesaurus (orig r9134):  frew | 2010-04-13 16:54:24 +0200
 branch for FilterColumn
 r9148@Thesaurus (orig r9135):  frew | 2010-04-13 18:09:57 +0200
 change names wrap accessors
 r9158@Thesaurus (orig r9145):  frew | 2010-04-14 17:55:14 +0200
 basic tests and a tiny fix
 r9159@Thesaurus (orig r9146):  frew | 2010-04-14 19:30:46 +0200
 working filter column impl
 r9160@Thesaurus (orig r9147):  frew | 2010-04-14 19:31:18 +0200
 useless var
 r9161@Thesaurus (orig r9148):  frew | 2010-04-14 20:10:57 +0200
 MultiCreate test
 r9163@Thesaurus (orig r9150):  frew | 2010-04-14 20:22:10 +0200
 test db in MC
 r9178@Thesaurus (orig r9165):  rabbit | 2010-04-14 23:35:00 +0200
 Not sure how this was never noticed, but it definitely doesn't seem right and all tests pass...
 r9191@Thesaurus (orig r9178):  frew | 2010-04-15 06:34:16 +0200
 better namiology
 r9193@Thesaurus (orig r9180):  frew | 2010-04-15 16:14:28 +0200
 method and arg rename
 r9194@Thesaurus (orig r9181):  frew | 2010-04-15 16:35:25 +0200
 use result source for filtering instead of result
 r9195@Thesaurus (orig r9182):  frew | 2010-04-15 17:04:38 +0200
 initial stab at incomplete docs
 r9278@Thesaurus (orig r9265):  frew | 2010-04-28 22:05:36 +0200
 doc, removal of source stuff, and Changes
 r9324@Thesaurus (orig r9311):  frew | 2010-05-06 01:49:25 +0200
 test caching
 r9327@Thesaurus (orig r9314):  rabbit | 2010-05-06 16:30:36 +0200
 Play nicer with lower-level methods
 r9328@Thesaurus (orig r9315):  frew | 2010-05-07 04:27:18 +0200
 no filter and inflate column
 r9352@Thesaurus (orig r9339):  rabbit | 2010-05-10 13:40:00 +0200
 Maintain full coherence between filtered cache and unfiltered results, including store_column
 r9353@Thesaurus (orig r9340):  rabbit | 2010-05-10 13:40:48 +0200
 Fix typo
 r9357@Thesaurus (orig r9343):  rabbit | 2010-05-11 16:45:50 +0200
 Comment weird looking code

r9360@Thesaurus (orig r9346):  caelum | 2010-05-11 17:44:15 +0200
clearer logic
r9364@Thesaurus (orig r9350):  wreis | 2010-05-12 03:44:39 +0200
add failing test for order_by using a function
r9378@Thesaurus (orig r9364):  rabbit | 2010-05-14 11:57:45 +0200
cleanup test by wreis
r9396@Thesaurus (orig r9382):  rabbit | 2010-05-15 17:50:58 +0200
Fix stupid typo-bug
r9397@Thesaurus (orig r9383):  rabbit | 2010-05-15 18:04:59 +0200
Revert erroneous commit (belongs in a branch)
r9402@Thesaurus (orig r9388):  ash | 2010-05-16 12:28:13 +0200
Fix how Schema::Versioned gets connection attributes
r9408@Thesaurus (orig r9394):  caelum | 2010-05-16 19:29:14 +0200
add sql_maker to @rdbms_specific_methods
r9420@Thesaurus (orig r9406):  caelum | 2010-05-20 16:28:18 +0200
support INSERT OR UPDATE triggers for Oracle
r9421@Thesaurus (orig r9407):  matthewt | 2010-05-20 19:19:14 +0200
don't try and ensure_class_loaded an object. this doesn't work.
r9422@Thesaurus (orig r9408):  matthewt | 2010-05-20 19:36:01 +0200
fix result_class setter behaviour to not stuff attrs (line commented out to prevent this regression being mistakenly re-introduced)
r9423@Thesaurus (orig r9409):  matthewt | 2010-05-20 19:49:32 +0200
forgot to commit fixes
r9424@Thesaurus (orig r9410):  matthewt | 2010-05-20 20:09:52 +0200
fix find() since that was also broken in r8754
r9435@Thesaurus (orig r9421):  rabbit | 2010-05-25 11:14:29 +0200
Fix undef warning
r9436@Thesaurus (orig r9422):  rabbit | 2010-05-25 11:15:01 +0200
Rewrite test as to not propagate several ways to do the same thing
r9452@Thesaurus (orig r9438):  caelum | 2010-05-25 21:33:37 +0200
 r24317@hlagh (orig r9367):  tonvoon | 2010-05-14 12:24:35 -0400
 Branch for converting eval {} to Try::Tiny

 r24319@hlagh (orig r9369):  tonvoon | 2010-05-14 17:25:02 -0400
 Conversion of eval => try (part 1)

 r24325@hlagh (orig r9375):  tonvoon | 2010-05-14 18:03:03 -0400
 Add eval => try

 r24326@hlagh (orig r9376):  tonvoon | 2010-05-14 18:22:57 -0400
 Another eval => try

 r24327@hlagh (orig r9377):  tonvoon | 2010-05-14 18:45:27 -0400
 Corrected usage of $@ in catch block

 r24328@hlagh (orig r9378):  tonvoon | 2010-05-14 19:29:52 -0400
 txn_do's eval => try

 r24329@hlagh (orig r9379):  tonvoon | 2010-05-14 19:46:44 -0400
 eval => try where tests for $@ done

 r24330@hlagh (orig r9380):  tonvoon | 2010-05-14 20:38:43 -0400
 All expected evals converted to try, except where no test is done,
 runtime evaluation, or base perl (such as "require"). Only one test
 failure due to string difference in output

 r24346@hlagh (orig r9396):  tonvoon | 2010-05-17 08:52:28 -0400
 Fix missing $@ in try::tiny conversion

 r24347@hlagh (orig r9397):  tonvoon | 2010-05-17 08:55:13 -0400
 Revert to eval instead of try::tiny because no check for $@

 r24348@hlagh (orig r9398):  tonvoon | 2010-05-17 08:55:45 -0400
 Added myself to contributors

 r24349@hlagh (orig r9399):  tonvoon | 2010-05-17 10:23:57 -0400
 Fixed exception logic due to not being able to use return with a catch{}

 r24350@hlagh (orig r9400):  tonvoon | 2010-05-17 10:31:32 -0400
 Removed tab

 r24430@hlagh (orig r9424):  ribasushi | 2010-05-25 10:09:39 -0400
 More try::tiny conversions
 r24432@hlagh (orig r9426):  ribasushi | 2010-05-25 11:40:45 -0400
 Try::Tiny conversion finished
 r24433@hlagh (orig r9427):  ribasushi | 2010-05-25 11:46:52 -0400
 Missed use
 r24440@hlagh (orig r9434):  rkitover | 2010-05-25 13:47:25 -0400
 fix Oracle
 r24441@hlagh (orig r9435):  rkitover | 2010-05-25 14:04:10 -0400
 fix odbc/mssql dynamic cursors
 r24442@hlagh (orig r9436):  rkitover | 2010-05-25 14:32:41 -0400
 fix hang in SQLAnywhere DateTime tests

r9454@Thesaurus (orig r9440):  rabbit | 2010-05-26 11:28:37 +0200
Simplify oracle retrial logic
r9455@Thesaurus (orig r9441):  rabbit | 2010-05-26 12:00:20 +0200
Can not return from within a try block
r9456@Thesaurus (orig r9442):  rabbit | 2010-05-26 12:17:55 +0200
Really fix logic
r9464@Thesaurus (orig r9450):  jester | 2010-05-27 16:06:43 +0200
Light doc tweaks

r9475@Thesaurus (orig r9461):  ribasushi | 2010-05-31 00:17:29 +0200
Rewrite GenericSubQ from SQLA::L to be actually useful
Since it now works it is no longer necessary to turn on softlimit when genericsubq is detected
Switch all sprintf()ed limit/offset specs to unsigned integers
Lower the default rows-without-offset to 2^32
r9476@Thesaurus (orig r9462):  rabbit | 2010-05-31 00:25:01 +0200
New format of changelog (easier to read)
r9477@Thesaurus (orig r9463):  rabbit | 2010-05-31 00:27:18 +0200
Fix MC double-object creation (important for e.g. IC::FS which otherwise leaves orphaned files)
r9479@Thesaurus (orig r9465):  rabbit | 2010-05-31 00:37:23 +0200
Fix tests to survive the new SQLA bindtype checks
r9483@Thesaurus (orig r9469):  rabbit | 2010-05-31 13:21:04 +0200
Skip tests segfaulting with ancient DBD::Sybase versions
r9488@Thesaurus (orig r9474):  frew | 2010-05-31 17:11:51 +0200
use namespace::clean w/ Try::Tiny
r9489@Thesaurus (orig r9475):  rabbit | 2010-05-31 17:13:29 +0200
Fix Top-limit problem of missed bindvars
r9490@Thesaurus (orig r9476):  rabbit | 2010-05-31 17:21:20 +0200
Skip failing tests on old DBD
r9491@Thesaurus (orig r9477):  frew | 2010-05-31 17:23:49 +0200
add namespace::clean as regular dep
r9501@Thesaurus (orig r9487):  rabbit | 2010-05-31 19:45:27 +0200
Fix RT57467, simplify test
r9503@Thesaurus (orig r9489):  rabbit | 2010-05-31 23:52:17 +0200
Fix Schema::Versioned borkage
r9506@Thesaurus (orig r9492):  rabbit | 2010-06-01 00:08:45 +0200
 r9306@Thesaurus (orig r9293):  edenc | 2010-05-03 21:20:21 +0200
 braching for bug fixes (rt 54939)
 r9339@Thesaurus (orig r9326):  edenc | 2010-05-07 18:15:47 +0200
 added failing test case for non-versioned schema deploy attempt
 r9340@Thesaurus (orig r9327):  edenc | 2010-05-07 18:16:03 +0200
 dbicadmin can now install non-versioned schemas
 r9342@Thesaurus (orig r9329):  rabbit | 2010-05-07 18:28:27 +0200
 Trap erroneous warnings
 r9345@Thesaurus (orig r9332):  edenc | 2010-05-08 00:02:00 +0200
 test for the dbicadmin -I option
 r9346@Thesaurus (orig r9333):  edenc | 2010-05-08 00:02:25 +0200
 fixes to dbicadmin -I test
 r9347@Thesaurus (orig r9334):  edenc | 2010-05-08 00:02:41 +0200
 -I option functional and passing tests
 r9348@Thesaurus (orig r9335):  edenc | 2010-05-08 01:39:52 +0200
 moved mock schema out of t/var
 r9375@Thesaurus (orig r9361):  edenc | 2010-05-14 04:02:41 +0200
 added debug option
 r9376@Thesaurus (orig r9362):  edenc | 2010-05-14 04:03:00 +0200
 debug and include_dirs integration between dbicadmin and DBIx::Class::Admin
 r9377@Thesaurus (orig r9363):  edenc | 2010-05-14 04:03:21 +0200
 testing dbicadmin/DBIx::Class::Admin integration
 r9494@Thesaurus (orig r9480):  rabbit | 2010-05-31 18:03:08 +0200
 Simplify includedir testing
 r9496@Thesaurus (orig r9482):  rabbit | 2010-05-31 18:47:35 +0200
 Some comments
 r9497@Thesaurus (orig r9483):  rabbit | 2010-05-31 18:50:50 +0200
 Properly ignore contents of var
 r9498@Thesaurus (orig r9484):  rabbit | 2010-05-31 18:59:49 +0200
 Remove leftovers
 r9499@Thesaurus (orig r9485):  rabbit | 2010-05-31 19:24:55 +0200
 Cleanup debug output
 r9500@Thesaurus (orig r9486):  rabbit | 2010-05-31 19:35:31 +0200
 Fix RT#57732
 r9502@Thesaurus (orig r9488):  rabbit | 2010-05-31 19:48:41 +0200
 typos
 r9505@Thesaurus (orig r9491):  rabbit | 2010-06-01 00:08:29 +0200
 Changes

r9514@Thesaurus (orig r9500):  rabbit | 2010-06-01 00:25:35 +0200
 r9365@Thesaurus (orig r9351):  ribasushi | 2010-05-12 10:09:54 +0200
 New branch to cleanup resultset-wide update/delete
 r9419@Thesaurus (orig r9405):  wreis | 2010-05-19 02:49:47 +0200
 failing tests for RS->update
 r9511@Thesaurus (orig r9497):  rabbit | 2010-06-01 00:20:39 +0200
 Fix update/delete on prefetching resultsets
 r9512@Thesaurus (orig r9498):  rabbit | 2010-06-01 00:24:54 +0200
 Test cleanup
 r9513@Thesaurus (orig r9499):  rabbit | 2010-06-01 00:25:14 +0200
 test replication test fail

1  2 
lib/DBIx/Class/SQLAHacks.pm
lib/DBIx/Class/Storage/DBI/Oracle/Generic.pm
t/73oracle.t

@@@ -48,29 -48,25 +48,25 @@@ sub new 
  
  # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
  #
- # generate inner/outer select lists for various limit dialects
+ # Generates inner/outer select lists for various limit dialects
  # which result in one or more subqueries (e.g. RNO, Top, RowNum)
  # Any non-root-table columns need to have their table qualifier
  # turned into a column alias (otherwise names in subqueries clash
  # and/or lose their source table)
  #
- # returns inner/outer strings of SQL QUOTED selectors with aliases
+ # Returns inner/outer strings of SQL QUOTED selectors with aliases
  # (to be used in whatever select statement), and an alias index hashref
  # of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used for string-subst
- # higher up)
- #
- # If the $scan_order option is supplied, it signals that the limit dialect
- # needs to order the outer side of the query, which in turn means that the
- # inner select needs to bring out columns used in implicit (non-selected)
- # orders, and the order condition itself needs to be realiased to the proper
- # names in the outer query.
- #
- # In this case ($scan_order os true) we also return a hashref (order doesn't
- # matter) of QUOTED EXTRA-SEL => QUOTED ALIAS pairs, which is a list of extra
- # selectors that do *not* exist in the original select list
+ # higher up).
+ # If an order_by is supplied, the inner select needs to bring out columns
+ # used in implicit (non-selected) orders, and the order condition itself
+ # needs to be realiased to the proper names in the outer query. Thus we
+ # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
+ # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
+ # exist in the original select list
  
  sub _subqueried_limit_attrs {
-   my ($self, $rs_attrs, $scan_order) = @_;
+   my ($self, $rs_attrs) = @_;
  
    croak 'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
      unless ref ($rs_attrs) eq 'HASH';
      $in_sel_index->{$sql_sel}++;
      $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
  
- # this *may* turn out to be necessary, not sure yet
- #    my ($sql_unqualified_sel) = $sql_sel =~ / $re_sep (.+) $/x
- #      if ! ref $s;
- #    $in_sel_index->{$sql_unqualified_sel}++;
+     # record unqualified versions too, so we do not have
+     # to reselect the same column twice (in qualified and
+     # unqualified form)
+     if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
+       $in_sel_index->{$1}++;
+     }
    }
  
  
      }
    }
  
+   # see if the order gives us anything
    my %extra_order_sel;
-   if ($scan_order) {
-     for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
-       # order with bind
-       $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
-       $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
+   for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
+     # order with bind
+     $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
+     $chunk =~ s/\s+ (?: ASC|DESC ) \s* $//ix;
  
-       next if $in_sel_index->{$chunk};
+     next if $in_sel_index->{$chunk};
  
-       $extra_order_sel{$chunk} ||= $self->_quote (
-         'ORDER__BY__' . scalar keys %extra_order_sel
-       );
-     }
+     $extra_order_sel{$chunk} ||= $self->_quote (
+       'ORDER__BY__' . scalar keys %extra_order_sel
+     );
    }
    return (
      (map { join (', ', @$_ ) } (
        \@in_sel,
@@@ -163,9 -161,8 +161,8 @@@ sub _RowNumberOver 
      or croak "Unrecognizable SELECT: $sql";
  
    # get selectors, and scan the order_by (if any)
-   my ($in_sel, $out_sel, $alias_map, $extra_order_sel) = $self->_subqueried_limit_attrs (
-     $rs_attrs, 'scan_order_by',
-   );
+   my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
+     = $self->_subqueried_limit_attrs ( $rs_attrs );
  
    # make up an order if none exists
    my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
@@@ -207,7 -204,7 +204,7 @@@ SELECT $out_sel FROM 
    SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
      SELECT $in_sel ${sql}${group_having}
    ) $qalias
- ) $qalias WHERE $idx_name BETWEEN %d AND %d
+ ) $qalias WHERE $idx_name BETWEEN %u AND %u
  
  EOS
  
@@@ -229,10 -226,10 +226,10 @@@ sub _SkipFirst 
  
    return sprintf ('SELECT %s%s%s%s',
      $offset
-       ? sprintf ('SKIP %d ', $offset)
+       ? sprintf ('SKIP %u ', $offset)
        : ''
      ,
-     sprintf ('FIRST %d ', $rows),
+     sprintf ('FIRST %u ', $rows),
      $sql,
      $self->_parse_rs_attrs ($rs_attrs),
    );
@@@ -246,9 -243,9 +243,9 @@@ sub _FirstSkip 
      or croak "Unrecognizable SELECT: $sql";
  
    return sprintf ('SELECT %s%s%s%s',
-     sprintf ('FIRST %d ', $rows),
+     sprintf ('FIRST %u ', $rows),
      $offset
-       ? sprintf ('SKIP %d ', $offset)
+       ? sprintf ('SKIP %u ', $offset)
        : ''
      ,
      $sql,
@@@ -276,7 -273,7 +273,7 @@@ SELECT $outsel FROM 
    SELECT $outsel, ROWNUM $idx_name FROM (
      SELECT $insel ${sql}${order_group_having}
    ) $qalias
- ) $qalias WHERE $idx_name BETWEEN %d AND %d
+ ) $qalias WHERE $idx_name BETWEEN %u AND %u
  
  EOS
  
@@@ -294,7 -291,7 +291,7 @@@ sub _Top 
  
    # get selectors
    my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
-     = $self->_subqueried_limit_attrs ($rs_attrs, 'outer_order_by');
+     = $self->_subqueried_limit_attrs ($rs_attrs);
  
    my $requested_order = delete $rs_attrs->{order_by};
  
  
        $mid_sel .= ', ' . $extra_order_sel->{$extra_col};
      }
+     # since whatever order bindvals there are, they will be realiased
+     # and need to show up in front of the entire initial inner subquery
+     # Unshift *from_bind* to make this happen (horrible, horrible, but
+     # we don't have another mechanism yet)
+     unshift @{$self->{from_bind}}, @{$self->{order_bind}};
    }
  
    # and this is order re-alias magic
  
    my $quoted_rs_alias = $self->_quote ($rs_attrs->{alias});
  
-   $sql = sprintf ('SELECT TOP %d %s %s %s %s',
+   $sql = sprintf ('SELECT TOP %u %s %s %s %s',
      $rows + ($offset||0),
      $in_sel,
      $sql,
      $order_by_inner,
    );
  
-   $sql = sprintf ('SELECT TOP %d %s FROM ( %s ) %s %s',
+   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
      $rows,
      $mid_sel,
      $sql,
      $order_by_reversed,
    ) if $offset;
  
-   $sql = sprintf ('SELECT TOP %d %s FROM ( %s ) %s %s',
+   $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
      $rows,
      $out_sel,
      $sql,
      $order_by_requested,
    ) if ( ($offset && $order_by_requested) || ($mid_sel ne $out_sel) );
  
+   $sql =~ s/\s*\n\s*/ /g;   # easier to read in the debugger
+   return $sql;
+ }
+ # This is the most evil limit "dialect" (more of a hack) for *really*
+ # stupid databases. It works by ordering the set by some unique column,
+ # and calculating amount of rows that have a less-er value (thus
+ # emulating a RowNum-like index). Of course this implies the set can
+ # only be ordered by a single unique columns.
+ sub _GenericSubQ {
+   my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
+   my $root_rsrc = $rs_attrs->{_rsroot_source_handle}->resolve;
+   my $root_tbl_name = $root_rsrc->name;
+   # mangle the input sql as we will be replacing the selector
+   $sql =~ s/^ \s* SELECT \s+ .+? \s+ (?= \b FROM \b )//ix
+     or croak "Unrecognizable SELECT: $sql";
+   my ($order_by, @rest) = do {
+     local $self->{quote_char};
+     $self->_order_by_chunks ($rs_attrs->{order_by})
+   };
+   unless (
+     $order_by
+       &&
+     ! @rest
+       &&
+     ( ! ref $order_by
+         ||
+       ( ref $order_by eq 'ARRAY' and @$order_by == 1 )
+     )
+   ) {
+     croak (
+       'Generic Subquery Limit does not work on resultsets without an order, or resultsets '
+     . 'with complex order criteria (multicolumn and/or functions). Provide a single, '
+     . 'unique-column order criteria.'
+     );
+   }
+   ($order_by) = @$order_by if ref $order_by;
+   $order_by =~ s/\s+ ( ASC|DESC ) \s* $//ix;
+   my $direction = lc ($1 || 'asc');
+   my ($unq_sort_col) = $order_by =~ /(?:^|\.)([^\.]+)$/;
+   my $inf = $root_rsrc->storage->_resolve_column_info (
+     $rs_attrs->{from}, [$order_by, $unq_sort_col]
+   );
+   my $ord_colinfo = $inf->{$order_by} || croak "Unable to determine source of order-criteria '$order_by'";
+   if ($ord_colinfo->{-result_source}->name ne $root_tbl_name) {
+     croak "Generic Subquery Limit order criteria can be only based on the root-source '"
+         . $root_rsrc->source_name . "' (aliased as '$rs_attrs->{alias}')";
+   }
+   # make sure order column is qualified
+   $order_by = "$rs_attrs->{alias}.$order_by"
+     unless $order_by =~ /^$rs_attrs->{alias}\./;
+   my $is_u;
+   my $ucs = { $root_rsrc->unique_constraints };
+   for (values %$ucs ) {
+     if (@$_ == 1 && "$rs_attrs->{alias}.$_->[0]" eq $order_by) {
+       $is_u++;
+       last;
+     }
+   }
+   croak "Generic Subquery Limit order criteria column '$order_by' must be unique (no unique constraint found)"
+     unless $is_u;
+   my ($in_sel, $out_sel, $alias_map, $extra_order_sel)
+     = $self->_subqueried_limit_attrs ($rs_attrs);
+   my $cmp_op = $direction eq 'desc' ? '>' : '<';
+   my $count_tbl_alias = 'rownum__emulation';
+   my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
+   # add the order supplement (if any) as this is what will be used for the outer WHERE
+   $in_sel .= ", $_" for keys %{$extra_order_sel||{}};
+   $sql = sprintf (<<EOS,
+ SELECT $out_sel
+   FROM (
+     SELECT $in_sel ${sql}${order_group_having}
+   ) %s
+ WHERE ( SELECT COUNT(*) FROM %s %s WHERE %s $cmp_op %s ) %s
+ EOS
+     ( map { $self->_quote ($_) } (
+       $rs_attrs->{alias},
+       $root_tbl_name,
+       $count_tbl_alias,
+       "$count_tbl_alias.$unq_sort_col",
+       $order_by,
+     )),
+     $offset
+       ? sprintf ('BETWEEN %u AND %u', $offset, $offset + $rows - 1)
+       : sprintf ('< %u', $rows )
+     ,
+   );
+   $sql =~ s/\s*\n\s*/ /g;   # easier to read in the debugger
    return $sql;
  }
  
@@@ -399,7 -508,7 +508,7 @@@ sub _find_syntax 
  sub select {
    my ($self, $table, $fields, $where, $rs_attrs, @rest) = @_;
  
 -  $self->{"${_}_bind"} = [] for (qw/having from order/);
 +  $self->{"${_}_bind"} = [] for (qw/having from order where/);
  
    if (not ref($table) or ref($table) eq 'SCALAR') {
      $table = $self->_quote($table);
    croak "LIMIT 0 Does Not Compute" if $rest[0] == 0;
      # and anyway, SQL::Abstract::Limit will cause a barf if we don't first
  
 -  my ($sql, @where_bind) = $self->SUPER::select(
 +  my $sql = '';
 +  ($sql, @{$self->{where_bind}}) = $self->SUPER::select(
      $table, $self->_recurse_fields($fields), $where, $rs_attrs, @rest
    );
 -  return wantarray ? ($sql, @{$self->{from_bind}}, @where_bind, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
 +  return wantarray ? ($sql, @{$self->{from_bind}}, @{$self->{where_bind}}, @{$self->{having_bind}}, @{$self->{order_bind}} ) : $sql;
  }
  
  # Quotes table names, and handles default inserts
@@@ -4,6 -4,8 +4,8 @@@ use strict
  use warnings;
  use Scope::Guard ();
  use Context::Preserve ();
+ use Try::Tiny;
+ use namespace::clean;
  
  =head1 NAME
  
@@@ -17,51 -19,6 +19,51 @@@ DBIx::Class::Storage::DBI::Oracle::Gene
    __PACKAGE__->set_primary_key('id');
    __PACKAGE__->sequence('mysequence');
  
 +  # Somewhere in your Code
 +  # add some data to a table with a hierarchical relationship
 +  $schema->resultset('Person')->create ({
 +        firstname => 'foo',
 +        lastname => 'bar',
 +        children => [
 +            {
 +                firstname => 'child1',
 +                lastname => 'bar',
 +                children => [
 +                    {
 +                        firstname => 'grandchild',
 +                        lastname => 'bar',
 +                    }
 +                ],
 +            },
 +            {
 +                firstname => 'child2',
 +                lastname => 'bar',
 +            },
 +        ],
 +    });
 +
 +  # select from the hierarchical relationship
 +  my $rs = $schema->resultset('Person')->search({},
 +    {
 +      'start_with' => { 'firstname' => 'foo', 'lastname' => 'bar' },
 +      'connect_by' => { 'parentid' => { '-prior' => \'persionid' },
 +      'order_siblings_by' => { -asc => 'name' },
 +    };
 +  );
 +
 +  # this will select the whole tree starting from person "foo bar", creating
 +  # following query:
 +  # SELECT
 +  #     me.persionid me.firstname, me.lastname, me.parentid
 +  # FROM
 +  #     person me
 +  # START WITH
 +  #     firstname = 'foo' and lastname = 'bar'
 +  # CONNECT BY
 +  #     parentid = prior persionid
 +  # ORDER SIBLINGS BY
 +  #     firstname ASC
 +
  =head1 DESCRIPTION
  
  This class implements base Oracle support. The subclass
@@@ -75,8 -32,6 +77,8 @@@ versions before 9
  use base qw/DBIx::Class::Storage::DBI/;
  use mro 'c3';
  
 +__PACKAGE__->sql_maker_class('DBIx::Class::SQLAHacks::Oracle');
 +
  sub deployment_statements {
    my $self = shift;;
    my ($schema, $type, $version, $dir, $sqltargs, @rest) = @_;
@@@ -86,7 -41,7 +88,7 @@@
    $sqltargs->{quote_table_names} = $quote_char ? 1 : 0;
    $sqltargs->{quote_field_names} = $quote_char ? 1 : 0;
  
-   my $oracle_version = eval { $self->_get_dbh->get_info(18) };
+   my $oracle_version = try { $self->_get_dbh->get_info(18) };
  
    $sqltargs->{producer_args}{oracle_version} = $oracle_version;
  
@@@ -132,7 -87,7 +134,7 @@@ sub _dbh_get_autoinc_seq 
      {
        $schema ? (owner => $schema) : (),
        table_name => $table || $source_name,
-       triggering_event => 'INSERT',
+       triggering_event => { -like => '%INSERT%' },
        status => 'ENABLED',
       },
    );
@@@ -159,45 -114,51 +161,51 @@@ sub _ping 
    local $dbh->{RaiseError} = 1;
    local $dbh->{PrintError} = 0;
  
-   eval {
+   return try {
      $dbh->do('select 1 from dual');
+     1;
+   } catch {
+     0;
    };
-   return $@ ? 0 : 1;
  }
  
  sub _dbh_execute {
    my $self = shift;
    my ($dbh, $op, $extra_bind, $ident, $bind_attributes, @args) = @_;
  
-   my $wantarray = wantarray;
-   my (@res, $exception, $retried);
-   RETRY: {
-     do {
-       eval {
-         if ($wantarray) {
-           @res    = $self->next::method(@_);
-         } else {
-           $res[0] = $self->next::method(@_);
-         }
-       };
-       $exception = $@;
-       if ($exception =~ /ORA-01003/) {
+   my (@res, $tried);
+   my $wantarray = wantarray();
+   my $next = $self->next::can;
+   do {
+     try {
+       my $exec = sub { $self->$next($dbh, $op, $extra_bind, $ident, $bind_attributes, @args) };
+       if (!defined $wantarray) {
+         $exec->();
+       }
+       elsif (! $wantarray) {
+         $res[0] = $exec->();
+       }
+       else {
+         @res = $exec->();
+       }
+       $tried++;
+     }
+     catch {
+       if (! $tried and $_ =~ /ORA-01003/) {
          # ORA-01003: no statement parsed (someone changed the table somehow,
          # invalidating your cursor.)
          my ($sql, $bind) = $self->_prep_for_execute($op, $extra_bind, $ident, \@args);
          delete $dbh->{CachedKids}{$sql};
-       } else {
-         last RETRY;
        }
-     } while (not $retried++);
-   }
-   $self->throw_exception($exception) if $exception;
+       else {
+         $self->throw_exception($_);
+       }
+     };
+   } while (! $tried++);
  
-   $wantarray ? @res : $res[0]
+   return $wantarray ? @res : $res[0];
  }
  
  =head2 get_autoinc_seq
@@@ -212,19 -173,6 +220,6 @@@ sub get_autoinc_seq 
    $self->dbh_do('_dbh_get_autoinc_seq', $source, $col);
  }
  
- =head2 columns_info_for
- This wraps the superclass version of this method to force table
- names to uppercase
- =cut
- sub columns_info_for {
-   my ($self, $table) = @_;
-   $self->next::method($table);
- }
  =head2 datetime_parser_type
  
  This sets the proper DateTime::Format module for use with
@@@ -417,90 -365,6 +412,90 @@@ sub with_deferred_fk_checks 
      after => sub { $txn_scope_guard->commit });
  }
  
 +=head1 ATTRIBUTES
 +
 +Following additional attributes can be used in resultsets.
 +
 +=head2 connect_by or connect_by_nocycle
 +
 +=over 4
 +
 +=item Value: \%connect_by
 +
 +=back
 +
 +A hashref of conditions used to specify the relationship between parent rows
 +and child rows of the hierarchy.
 +
 +
 +  connect_by => { parentid => 'prior personid' }
 +
 +  # adds a connect by statement to the query:
 +  # SELECT
 +  #     me.persionid me.firstname, me.lastname, me.parentid
 +  # FROM
 +  #     person me
 +  # CONNECT BY
 +  #     parentid = prior persionid
 +  
 +
 +  connect_by_nocycle => { parentid => 'prior personid' }
 +
 +  # adds a connect by statement to the query:
 +  # SELECT
 +  #     me.persionid me.firstname, me.lastname, me.parentid
 +  # FROM
 +  #     person me
 +  # CONNECT BY NOCYCLE
 +  #     parentid = prior persionid
 +
 +
 +=head2 start_with
 +
 +=over 4
 +
 +=item Value: \%condition
 +
 +=back
 +
 +A hashref of conditions which specify the root row(s) of the hierarchy.
 +
 +It uses the same syntax as L<DBIx::Class::ResultSet/search>
 +
 +  start_with => { firstname => 'Foo', lastname => 'Bar' }
 +
 +  # SELECT
 +  #     me.persionid me.firstname, me.lastname, me.parentid
 +  # FROM
 +  #     person me
 +  # START WITH
 +  #     firstname = 'foo' and lastname = 'bar'
 +  # CONNECT BY
 +  #     parentid = prior persionid
 +
 +=head2 order_siblings_by
 +
 +=over 4
 +
 +=item Value: ($order_siblings_by | \@order_siblings_by)
 +
 +=back
 +
 +Which column(s) to order the siblings by.
 +
 +It uses the same syntax as L<DBIx::Class::ResultSet/order_by>
 +
 +  'order_siblings_by' => 'firstname ASC'
 +
 +  # SELECT
 +  #     me.persionid me.firstname, me.lastname, me.parentid
 +  # FROM
 +  #     person me
 +  # CONNECT BY
 +  #     parentid = prior persionid
 +  # ORDER SIBLINGS BY
 +  #     firstname ASC
 +
  =head1 AUTHOR
  
  See L<DBIx::Class/CONTRIBUTORS>.
diff --combined t/73oracle.t
@@@ -30,10 -30,8 +30,10 @@@ use warnings
  
  use Test::Exception;
  use Test::More;
 +
  use lib qw(t/lib);
  use DBICTest;
 +use DBIC::SqlMakerTest;
  
  my ($dsn, $user, $pass) = @ENV{map { "DBICTEST_ORA_${_}" } qw/DSN USER PASS/};
  
@@@ -50,7 -48,6 +50,7 @@@ my $dbh = $schema->storage->dbh
  eval {
    $dbh->do("DROP SEQUENCE artist_seq");
    $dbh->do("DROP SEQUENCE cd_seq");
 +  $dbh->do("DROP SEQUENCE track_seq");
    $dbh->do("DROP SEQUENCE pkid1_seq");
    $dbh->do("DROP SEQUENCE pkid2_seq");
    $dbh->do("DROP SEQUENCE nonpkid_seq");
  };
  $dbh->do("CREATE SEQUENCE artist_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
  $dbh->do("CREATE SEQUENCE cd_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
 +$dbh->do("CREATE SEQUENCE track_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
  $dbh->do("CREATE SEQUENCE pkid1_seq START WITH 1 MAXVALUE 999999 MINVALUE 0");
  $dbh->do("CREATE SEQUENCE pkid2_seq START WITH 10 MAXVALUE 999999 MINVALUE 0");
  $dbh->do("CREATE SEQUENCE nonpkid_seq START WITH 20 MAXVALUE 999999 MINVALUE 0");
  
 -$dbh->do("CREATE TABLE artist (artistid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
 +$dbh->do("CREATE TABLE artist (artistid NUMBER(12), parentid NUMBER(12), name VARCHAR(255), rank NUMBER(38), charfield VARCHAR2(10))");
  $dbh->do("ALTER TABLE artist ADD (CONSTRAINT artist_pk PRIMARY KEY (artistid))");
  
  $dbh->do("CREATE TABLE sequence_test (pkid1 NUMBER(12), pkid2 NUMBER(12), nonpkid NUMBER(12), name VARCHAR(255))");
@@@ -76,7 -72,6 +76,7 @@@ $dbh->do("CREATE TABLE cd (cdid NUMBER(
  $dbh->do("ALTER TABLE cd ADD (CONSTRAINT cd_pk PRIMARY KEY (cdid))");
  
  $dbh->do("CREATE TABLE track (trackid NUMBER(12), cd NUMBER(12) REFERENCES cd(cdid) DEFERRABLE, position NUMBER(12), title VARCHAR(255), last_updated_on DATE, last_updated_at DATE, small_dt DATE)");
 +$dbh->do("ALTER TABLE track ADD (CONSTRAINT track_pk PRIMARY KEY (trackid))");
  
  $dbh->do(qq{
    CREATE OR REPLACE TRIGGER artist_insert_trg
@@@ -92,7 -87,7 +92,7 @@@
  });
  $dbh->do(qq{
    CREATE OR REPLACE TRIGGER cd_insert_trg
-   BEFORE INSERT ON cd
+   BEFORE INSERT OR UPDATE ON cd
    FOR EACH ROW
    BEGIN
      IF :new.cdid IS NULL THEN
      END IF;
    END;
  });
 +$dbh->do(qq{
 +  CREATE OR REPLACE TRIGGER cd_insert_trg
 +  BEFORE INSERT ON cd
 +  FOR EACH ROW
 +  BEGIN
 +    IF :new.cdid IS NULL THEN
 +      SELECT cd_seq.nextval
 +      INTO :new.cdid
 +      FROM DUAL;
 +    END IF;
 +  END;
 +});
 +$dbh->do(qq{
 +  CREATE OR REPLACE TRIGGER track_insert_trg
 +  BEFORE INSERT ON track
 +  FOR EACH ROW
 +  BEGIN
 +    IF :new.trackid IS NULL THEN
 +      SELECT track_seq.nextval
 +      INTO :new.trackid
 +      FROM DUAL;
 +    END IF;
 +  END;
 +});
  
  {
      # Swiped from t/bindtype_columns.t to avoid creating my own Resultset.
@@@ -190,7 -161,7 +190,7 @@@ lives_and 
  
  # test join with row count ambiguity
  
 -my $track = $schema->resultset('Track')->create({ trackid => 1, cd => 1,
 +my $track = $schema->resultset('Track')->create({ cd => $cd->cdid,
      position => 1, title => 'Track1' });
  my $tjoin = $schema->resultset('Track')->search({ 'me.title' => 'Track1'},
          { join => 'cd',
@@@ -202,7 -173,7 +202,7 @@@ ok(my $row = $tjoin->next)
  is($row->title, 'Track1', "ambiguous column ok");
  
  # check count distinct with multiple columns
 -my $other_track = $schema->resultset('Track')->create({ trackid => 2, cd => 1, position => 1, title => 'Track2' });
 +my $other_track = $schema->resultset('Track')->create({ cd => $cd->cdid, position => 1, title => 'Track2' });
  
  my $tcount = $schema->resultset('Track')->search(
    {},
@@@ -313,410 -284,6 +313,410 @@@ SKIP: 
    }
  }
  
 +
 +### test hierarchical queries
 +if ( $schema->storage->isa('DBIx::Class::Storage::DBI::Oracle::Generic') ) {
 +    my $source = $schema->source('Artist');
 +
 +    $source->add_column( 'parentid' );
 +
 +    $source->add_relationship('children', 'DBICTest::Schema::Artist',
 +        { 'foreign.parentid' => 'self.artistid' },
 +        {
 +            accessor => 'multi',
 +            join_type => 'LEFT',
 +            cascade_delete => 1,
 +            cascade_copy => 1,
 +        } );
 +    $source->add_relationship('parent', 'DBICTest::Schema::Artist',
 +        { 'foreign.artistid' => 'self.parentid' },
 +        { accessor => 'single' } );
 +    DBICTest::Schema::Artist->add_column( 'parentid' );
 +    DBICTest::Schema::Artist->has_many(
 +        children => 'DBICTest::Schema::Artist',
 +        { 'foreign.parentid' => 'self.artistid' }
 +    );
 +    DBICTest::Schema::Artist->belongs_to(
 +        parent => 'DBICTest::Schema::Artist',
 +        { 'foreign.artistid' => 'self.parentid' }
 +    );
 +
 +    $schema->resultset('Artist')->create ({
 +        name => 'root',
 +        rank => 1,
 +        cds => [],
 +        children => [
 +            {
 +                name => 'child1',
 +                rank => 2,
 +                children => [
 +                    {
 +                        name => 'grandchild',
 +                        rank => 3,
 +                        cds => [
 +                            {
 +                                title => "grandchilds's cd" ,
 +                                year => '2008',
 +                                tracks => [
 +                                    {
 +                                        position => 1,
 +                                        title => 'Track 1 grandchild',
 +                                    }
 +                                ],
 +                            }
 +                        ],
 +                        children => [
 +                            {
 +                                name => 'greatgrandchild',
 +                                rank => 3,
 +                            }
 +                        ],
 +                    }
 +                ],
 +            },
 +            {
 +                name => 'child2',
 +                rank => 3,
 +            },
 +        ],
 +    });
 +
 +    $schema->resultset('Artist')->create(
 +        {
 +            name     => 'cycle-root',
 +            children => [
 +                {
 +                    name     => 'cycle-child1',
 +                    children => [ { name => 'cycle-grandchild' } ],
 +                },
 +                { name => 'cycle-child2' },
 +            ],
 +        }
 +    );
 +
 +    $schema->resultset('Artist')->find({ name => 'cycle-root' })
 +      ->update({ parentid => \'artistid' });
 +
 +    # select the whole tree
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +        )',
 +        [ [ name => 'root'] ],
 +      );
 +      is_deeply (
 +        [ $rs->get_column ('name')->all ],
 +        [ qw/root child1 grandchild greatgrandchild child2/ ],
 +        'got artist tree',
 +      );
 +
 +
 +      is_same_sql_bind (
 +        $rs->count_rs->as_query,
 +        '(
 +          SELECT COUNT( * )
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +        )',
 +        [ [ name => 'root'] ],
 +      );
 +
 +      is( $rs->count, 5, 'Connect By count ok' );
 +    }
 +
 +    # use order siblings by statement
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +        order_siblings_by => { -desc => 'name' },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +          ORDER SIBLINGS BY name DESC
 +        )',
 +        [ [ name => 'root'] ],
 +      );
 +
 +      is_deeply (
 +        [ $rs->get_column ('name')->all ],
 +        [ qw/root child2 child1 grandchild greatgrandchild/ ],
 +        'Order Siblings By ok',
 +      );
 +    }
 +
 +    # get the root node
 +    {
 +      my $rs = $schema->resultset('Artist')->search({ parentid => undef }, {
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
 +            FROM artist me
 +          WHERE ( parentid IS NULL )
 +          START WITH name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +        )',
 +        [ [ name => 'root'] ],
 +      );
 +
 +      is_deeply(
 +        [ $rs->get_column('name')->all ],
 +        [ 'root' ],
 +        'found root node',
 +      );
 +    }
 +
 +    # combine a connect by with a join
 +    {
 +      my $rs = $schema->resultset('Artist')->search(
 +        {'cds.title' => { -like => '%cd'} },
 +        {
 +          join => 'cds',
 +          start_with => { 'me.name' => 'root' },
 +          connect_by => { parentid => { -prior => \ 'artistid' } },
 +        }
 +      );
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
 +            FROM artist me
 +            LEFT JOIN cd cds ON cds.artist = me.artistid
 +          WHERE ( cds.title LIKE ? )
 +          START WITH me.name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +        )',
 +        [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
 +      );
 +
 +      is_deeply(
 +        [ $rs->get_column('name')->all ],
 +        [ 'grandchild' ],
 +        'Connect By with a join result name ok'
 +      );
 +
 +
 +      is_same_sql_bind (
 +        $rs->count_rs->as_query,
 +        '(
 +          SELECT COUNT( * )
 +            FROM artist me
 +            LEFT JOIN cd cds ON cds.artist = me.artistid
 +          WHERE ( cds.title LIKE ? )
 +          START WITH me.name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +        )',
 +        [ [ 'cds.title' => '%cd' ], [ 'me.name' => 'root' ] ],
 +      );
 +
 +      is( $rs->count, 1, 'Connect By with a join; count ok' );
 +    }
 +
 +    # combine a connect by with order_by
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +        order_by => { -asc => [ 'LEVEL', 'name' ] },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY parentid = PRIOR artistid 
 +          ORDER BY LEVEL ASC, name ASC
 +        )',
 +        [ [ name => 'root' ] ],
 +      );
 +
 +      is_deeply (
 +        [ $rs->get_column ('name')->all ],
 +        [ qw/root child1 child2 grandchild greatgrandchild/ ],
 +        'Connect By with a order_by - result name ok'
 +      );
 +    }
 +
 +
 +    # limit a connect by
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +        order_by => { -asc => 'name' },
 +        rows => 2,
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '( 
 +            SELECT artistid, name, rank, charfield, parentid FROM (
 +                  SELECT artistid, name, rank, charfield, parentid, ROWNUM rownum__index FROM (
 +                      SELECT 
 +                          me.artistid,
 +                          me.name,
 +                          me.rank,
 +                          me.charfield,
 +                          me.parentid 
 +                      FROM artist me 
 +                      START WITH name = ? 
 +                      CONNECT BY parentid = PRIOR artistid
 +                      ORDER BY name ASC 
 +                  ) me 
 +            ) me
 +            WHERE rownum__index BETWEEN 1 AND 2
 +        )',
 +        [ [ name => 'root' ] ],
 +      );
 +
 +      is_deeply (
 +        [ $rs->get_column ('name')->all ],
 +        [qw/child1 child2/],
 +        'LIMIT a Connect By query - correct names'
 +      );
 +
 +      # TODO: 
 +      # prints "START WITH name = ? 
 +      # CONNECT BY artistid = PRIOR parentid "
 +      # after count_subq, 
 +      # I will fix this later...
 +      # 
 +      is_same_sql_bind (
 +        $rs->count_rs->as_query,
 +        '( 
 +            SELECT COUNT( * ) FROM (
 +                SELECT artistid FROM (
 +                    SELECT artistid, ROWNUM rownum__index FROM (
 +                        SELECT 
 +                            me.artistid
 +                        FROM artist me 
 +                        START WITH name = ? 
 +                        CONNECT BY parentid = PRIOR artistid
 +                    ) me
 +                ) me 
 +                WHERE rownum__index BETWEEN 1 AND 2
 +            ) me
 +        )',
 +        [ [ name => 'root' ] ],
 +      );
 +
 +      is( $rs->count, 2, 'Connect By; LIMIT count ok' );
 +    }
 +
 +    # combine a connect_by with group_by and having
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        select => ['count(rank)'],
 +        start_with => { name => 'root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +        group_by => ['rank'],
 +        having => { 'count(rank)' => { '<', 2 } },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +            SELECT count(rank)
 +            FROM artist me
 +            START WITH name = ?
 +            CONNECT BY parentid = PRIOR artistid
 +            GROUP BY rank HAVING count(rank) < ?
 +        )',
 +        [ [ name => 'root' ], [ 'count(rank)' => 2 ] ],
 +      );
 +
 +      is_deeply (
 +        [ $rs->get_column ('count(rank)')->all ],
 +        [1, 1],
 +        'Group By a Connect By query - correct values'
 +      );
 +    }
 +
 +
 +    # select the whole cycle tree without nocylce
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'cycle-root' },
 +        connect_by => { parentid => { -prior => \ 'artistid' } },
 +      });
 +      eval { $rs->get_column ('name')->all };
 +      if ( $@ =~ /ORA-01436/ ){ # ORA-01436:  CONNECT BY loop in user data
 +        pass "connect by initify loop detection without nocycle";
 +      }else{
 +        fail "connect by initify loop detection without nocycle, not detected by oracle";
 +      }
 +    }
 +
 +    # select the whole cycle tree with nocylce
 +    {
 +      my $rs = $schema->resultset('Artist')->search({}, {
 +        start_with => { name => 'cycle-root' },
 +        '+select'  => [ \ 'CONNECT_BY_ISCYCLE' ],
 +        connect_by_nocycle => { parentid => { -prior => \ 'artistid' } },
 +      });
 +
 +      is_same_sql_bind (
 +        $rs->as_query,
 +        '(
 +          SELECT me.artistid, me.name, me.rank, me.charfield, me.parentid, CONNECT_BY_ISCYCLE
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY NOCYCLE parentid = PRIOR artistid 
 +        )',
 +        [ [ name => 'cycle-root'] ],
 +      );
 +      is_deeply (
 +        [ $rs->get_column ('name')->all ],
 +        [ qw/cycle-root cycle-child1 cycle-grandchild cycle-child2/ ],
 +        'got artist tree with nocycle (name)',
 +      );
 +      is_deeply (
 +        [ $rs->get_column ('CONNECT_BY_ISCYCLE')->all ],
 +        [ qw/1 0 0 0/ ],
 +        'got artist tree with nocycle (CONNECT_BY_ISCYCLE)',
 +      );
 +
 +
 +      is_same_sql_bind (
 +        $rs->count_rs->as_query,
 +        '(
 +          SELECT COUNT( * )
 +            FROM artist me
 +          START WITH name = ?
 +          CONNECT BY NOCYCLE parentid = PRIOR artistid 
 +        )',
 +        [ [ name => 'cycle-root'] ],
 +      );
 +
 +      is( $rs->count, 4, 'Connect By Nocycle count ok' );
 +    }
 +}
 +
  done_testing;
  
  # clean up our mess
@@@ -724,7 -291,6 +724,7 @@@ END 
      if($schema && ($dbh = $schema->storage->dbh)) {
          $dbh->do("DROP SEQUENCE artist_seq");
          $dbh->do("DROP SEQUENCE cd_seq");
 +        $dbh->do("DROP SEQUENCE track_seq");
          $dbh->do("DROP SEQUENCE pkid1_seq");
          $dbh->do("DROP SEQUENCE pkid2_seq");
          $dbh->do("DROP SEQUENCE nonpkid_seq");