Reduce mount of perlgolf in ResultSet.pm
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index cba01bc..5be8a14 100644 (file)
@@ -2,10 +2,7 @@ package DBIx::Class::ResultSet;
 
 use strict;
 use warnings;
-use overload
-        '0+'     => "count",
-        'bool'   => "_bool",
-        fallback => 1;
+use base qw/DBIx::Class/;
 use Carp::Clan qw/^DBIx::Class/;
 use DBIx::Class::Exception;
 use Data::Page;
@@ -13,8 +10,14 @@ use Storable;
 use DBIx::Class::ResultSetColumn;
 use DBIx::Class::ResultSourceHandle;
 use List::Util ();
-use Scalar::Util ();
-use base qw/DBIx::Class/;
+use Scalar::Util qw/blessed weaken/;
+use Try::Tiny;
+use namespace::clean;
+
+use overload
+        '0+'     => "count",
+        'bool'   => "_bool",
+        fallback => 1;
 
 __PACKAGE__->mk_group_accessors('simple' => qw/_result_class _source_handle/);
 
@@ -269,106 +272,96 @@ sub search_rs {
 
   # Special-case handling for (undef, undef).
   if ( @_ == 2 && !defined $_[1] && !defined $_[0] ) {
-    pop(@_); pop(@_);
+    @_ = ();
   }
 
-  my $attrs = {};
-  $attrs = pop(@_) if @_ > 1 and ref $_[$#_] eq 'HASH';
-  my $our_attrs = { %{$self->{attrs}} };
-  my $having = delete $our_attrs->{having};
-  my $where = delete $our_attrs->{where};
-
-  my $rows;
+  my $call_attrs = {};
+  $call_attrs = pop(@_) if @_ > 1 and ref $_[-1] eq 'HASH';
 
+  # see if we can keep the cache (no $rs changes)
+  my $cache;
   my %safe = (alias => 1, cache => 1);
-
-  unless (
-    (@_ && defined($_[0])) # @_ == () or (undef)
-    ||
-    (keys %$attrs # empty attrs or only 'safe' attrs
-    && List::Util::first { !$safe{$_} } keys %$attrs)
-  ) {
-    # no search, effectively just a clone
-    $rows = $self->get_cache;
+  if ( ! List::Util::first { !$safe{$_} } keys %$call_attrs and (
+    ! defined $_[0]
+      or
+    ref $_[0] eq 'HASH' && ! keys %{$_[0]}
+      or
+    ref $_[0] eq 'ARRAY' && ! @{$_[0]}
+  )) {
+    $cache = $self->get_cache;
   }
 
+  my $old_attrs = { %{$self->{attrs}} };
+  my $old_having = delete $old_attrs->{having};
+  my $old_where = delete $old_attrs->{where};
+
   # reset the selector list
-  if (List::Util::first { exists $attrs->{$_} } qw{columns select as}) {
-     delete @{$our_attrs}{qw{select as columns +select +as +columns include_columns}};
+  if (List::Util::first { exists $call_attrs->{$_} } qw{columns select as}) {
+     delete @{$old_attrs}{qw{select as columns +select +as +columns include_columns}};
   }
 
-  my $new_attrs = { %{$our_attrs}, %{$attrs} };
+  my $new_attrs = { %{$old_attrs}, %{$call_attrs} };
 
   # merge new attrs into inherited
   foreach my $key (qw/join prefetch +select +as +columns include_columns bind/) {
-    next unless exists $attrs->{$key};
-    $new_attrs->{$key} = $self->_merge_attr($our_attrs->{$key}, $attrs->{$key});
+    next unless exists $call_attrs->{$key};
+    $new_attrs->{$key} = $self->_merge_attr($old_attrs->{$key}, $call_attrs->{$key});
   }
 
-  my $cond = (@_
-    ? (
-        (@_ == 1 || ref $_[0] eq "HASH")
-          ? (
-              (ref $_[0] eq 'HASH')
-                ? (
-                    (keys %{ $_[0] }  > 0)
-                      ? shift
-                      : undef
-                   )
-                :  shift
-             )
-          : (
-              (@_ % 2)
-                ? $self->throw_exception("Odd number of arguments to search")
-                : {@_}
-             )
-      )
-    : undef
-  );
+  # rip apart the rest of @_, parse a condition
+  my $call_cond = do {
 
-  if (defined $where) {
-    $new_attrs->{where} = (
-      defined $new_attrs->{where}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $where, $new_attrs->{where}
-            ]
-          }
-        : $where);
-  }
+    if (ref $_[0] eq 'HASH') {
+      (keys %{$_[0]}) ? $_[0] : undef
+    }
+    elsif (@_ == 1) {
+      $_[0]
+    }
+    elsif (@_ % 2) {
+      $self->throw_exception('Odd number of arguments to search')
+    }
+    else {
+      +{ @_ }
+    }
 
-  if (defined $cond) {
-    $new_attrs->{where} = (
-      defined $new_attrs->{where}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $cond, $new_attrs->{where}
-            ]
-          }
-        : $cond);
+  } if @_;
+
+  for ($old_where, $call_cond) {
+    if (defined $_) {
+      $new_attrs->{where} = $self->_stack_cond (
+        $_, $new_attrs->{where}
+      );
+    }
   }
 
-  if (defined $having) {
-    $new_attrs->{having} = (
-      defined $new_attrs->{having}
-        ? { '-and' => [
-              map {
-                ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_
-              } $having, $new_attrs->{having}
-            ]
-          }
-        : $having);
+  if (defined $old_having) {
+    $new_attrs->{having} = $self->_stack_cond (
+      $old_having, $new_attrs->{having}
+    )
   }
 
   my $rs = (ref $self)->new($self->result_source, $new_attrs);
 
-  $rs->set_cache($rows) if ($rows);
+  $rs->set_cache($cache) if ($cache);
 
   return $rs;
 }
 
+sub _stack_cond {
+  my ($self, $left, $right) = @_;
+  if (defined $left xor defined $right) {
+    return defined $left ? $left : $right;
+  }
+  elsif (defined $left) {
+    return { -and => [ map
+      { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
+      ($left, $right)
+    ]};
+  }
+
+  return undef;
+}
+
 =head2 search_literal
 
 =over 4
@@ -468,47 +461,49 @@ sub find {
   my $self = shift;
   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
 
-  # Default to the primary key, but allow a specific key
-  my @cols = exists $attrs->{key}
-    ? $self->result_source->unique_constraint_columns($attrs->{key})
-    : $self->result_source->primary_columns;
-  $self->throw_exception(
-    "Can't find unless a primary key is defined or unique constraint is specified"
-  ) unless @cols;
-
-  # Parse out a hashref from input
+  # Parse out a query from input
   my $input_query;
   if (ref $_[0] eq 'HASH') {
     $input_query = { %{$_[0]} };
   }
-  elsif (@_ == @cols) {
-    $input_query = {};
-    @{$input_query}{@cols} = @_;
-  }
   else {
-    # Compatibility: Allow e.g. find(id => $value)
-    carp "Find by key => value deprecated; please use a hashref instead";
-    $input_query = {@_};
-  }
+    my $constraint = exists $attrs->{key} ? $attrs->{key} : 'primary';
+    my @c_cols = $self->result_source->unique_constraint_columns($constraint);
 
-  my (%related, $info);
+    $self->throw_exception(
+      "No constraint columns, maybe a malformed '$constraint' constraint?"
+    ) unless @c_cols;
+
+    $self->throw_exception (
+      'find() expects either a column/value hashref, or a list of values '
+    . "corresponding to the columns of the specified unique constraint '$constraint'"
+    ) unless @c_cols == @_;
 
-  KEY: foreach my $key (keys %$input_query) {
-    if (ref($input_query->{$key})
-        && ($info = $self->result_source->relationship_info($key))) {
+    $input_query = {};
+    @{$input_query}{@c_cols} = @_;
+  }
+
+  my %related;
+  for my $key (keys %$input_query) {
+    if (
+      my $keyref = ref($input_query->{$key})
+        and
+      my $relinfo = $self->result_source->relationship_info($key)
+    ) {
       my $val = delete $input_query->{$key};
-      next KEY if (ref($val) eq 'ARRAY'); # has_many for multi_create
+
+      next if $keyref eq 'ARRAY'; # has_many for multi_create
+
       my $rel_q = $self->result_source->_resolve_condition(
-                    $info->{cond}, $val, $key
-                  );
-      die "Can't handle OR join condition in find" if ref($rel_q) eq 'ARRAY';
+        $relinfo->{cond}, $val, $key
+      );
+      die "Can't handle complex relationship conditions in find" if ref($rel_q) ne 'HASH';
       @related{keys %$rel_q} = values %$rel_q;
     }
   }
-  if (my @keys = keys %related) {
-    @{$input_query}{@keys} = values %related;
-  }
 
+  # relationship conditions take precedence (?)
+  @{$input_query}{keys %related} = values %related;
 
   # Build the final query: Default to the disjunction of the unique queries,
   # but allow the input query in case the ResultSet defines the query or the
@@ -529,6 +524,10 @@ sub find {
     # relationship
   }
   else {
+    # no key was specified - fall down to heuristics mode
+    # get all possible unique queries based on the combination of $query
+    # and the condition available in $self, and then run a search with
+    # each and every possible constraint (as long as it's completely specified)
     my @unique_queries = $self->_unique_queries($input_query, $attrs);
     $query = @unique_queries
       ? [ map { $self->_add_alias($_, $alias) } @unique_queries ]
@@ -565,7 +564,7 @@ sub _add_alias {
 
 # _unique_queries
 #
-# Build a list of queries which satisfy unique constraints.
+# Build a list of queries which satisfy the unique constraint(s) as per $attrs
 
 sub _unique_queries {
   my ($self, $query, $attrs) = @_;
@@ -679,15 +678,15 @@ sub cursor {
 
 =item Arguments: $cond?
 
-=item Return Value: $row_object?
+=item Return Value: $row_object | undef
 
 =back
 
   my $cd = $schema->resultset('CD')->single({ year => 2001 });
 
 Inflates the first result without creating a cursor if the resultset has
-any records in it; if not returns nothing. Used by L</find> as a lean version of
-L</search>.
+any records in it; if not returns C<undef>. Used by L</find> as a lean version
+of L</search>.
 
 While this method can take an optional search condition (just like L</search>)
 being a fast-code-path it does not recognize search attributes. If you need to
@@ -742,12 +741,6 @@ sub single {
     }
   }
 
-#  XXX: Disabled since it doesn't infer uniqueness in all cases
-#  unless ($self->_is_unique_query($attrs->{where})) {
-#    carp "Query not guaranteed to return a single row"
-#      . "; please declare your unique constraints or use search instead";
-#  }
-
   my @data = $self->result_source->storage->select_single(
     $attrs->{from}, $attrs->{select},
     $attrs->{where}, $attrs
@@ -757,38 +750,6 @@ sub single {
 }
 
 
-# _is_unique_query
-#
-# Try to determine if the specified query is guaranteed to be unique, based on
-# the declared unique constraints.
-
-sub _is_unique_query {
-  my ($self, $query) = @_;
-
-  my $collapsed = $self->_collapse_query($query);
-  my $alias = $self->{attrs}{alias};
-
-  foreach my $name ($self->result_source->unique_constraint_names) {
-    my @unique_cols = map {
-      "$alias.$_"
-    } $self->result_source->unique_constraint_columns($name);
-
-    # Count the values for each unique column
-    my %seen = map { $_ => 0 } @unique_cols;
-
-    foreach my $key (keys %$collapsed) {
-      my $aliased = $key =~ /\./ ? $key : "$alias.$key";
-      next unless exists $seen{$aliased};  # Additional constraints are okay
-      $seen{$aliased} = scalar keys %{ $collapsed->{$key} };
-    }
-
-    # If we get 0 or more than 1 value for a column, it's not necessarily unique
-    return 1 unless grep { $_ != 1 } values %seen;
-  }
-
-  return 0;
-}
-
 # _collapse_query
 #
 # Recursively collapse the query, accumulating values for each column.
@@ -910,7 +871,7 @@ sub slice {
   $attrs->{offset} = $self->{attrs}{offset} || 0;
   $attrs->{offset} += $min;
   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
-  return $self->search(undef(), $attrs);
+  return $self->search(undef, $attrs);
   #my $slice = (ref $self)->new($self->result_source, $attrs);
   #return (wantarray ? $slice->all : $slice);
 }
@@ -921,7 +882,7 @@ sub slice {
 
 =item Arguments: none
 
-=item Return Value: $result?
+=item Return Value: $result | undef
 
 =back
 
@@ -947,6 +908,7 @@ sub next {
     return $cache->[$self->{all_cache_position}++];
   }
   if ($self->{attrs}{cache}) {
+    delete $self->{pager};
     $self->{all_cache_position} = 1;
     return ($self->all)[0];
   }
@@ -1237,14 +1199,12 @@ sub _count_rs {
   my $rsrc = $self->result_source;
   $attrs ||= $self->_resolved_attrs;
 
-  # only take pieces we need for a simple count
-  my $tmp_attrs = { map
-    { $_ => $attrs->{$_} }
-    qw/ alias from where bind join /
-  };
+  my $tmp_attrs = { %$attrs };
+  # take off any limits, record_filter is cdbi, and no point of ordering nor locking a count
+  delete @{$tmp_attrs}{qw/rows offset order_by record_filter for/};
 
   # overwrite the selector (supplied by the storage)
-  $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $tmp_attrs);
+  $tmp_attrs->{select} = $rsrc->storage->_count_select ($rsrc, $attrs);
   $tmp_attrs->{as} = 'count';
 
   my $tmp_rs = $rsrc->resultset_class->new($rsrc, $tmp_attrs)->get_column ('count');
@@ -1261,10 +1221,9 @@ sub _count_subq_rs {
   my $rsrc = $self->result_source;
   $attrs ||= $self->_resolved_attrs;
 
-  my $sub_attrs = { map
-    { $_ => $attrs->{$_} }
-    qw/ alias from where bind join group_by having rows offset /
-  };
+  my $sub_attrs = { %$attrs };
+  # extra selectors do not go in the subquery and there is no point of ordering it, nor locking it
+  delete @{$sub_attrs}{qw/collapse select _prefetch_select as order_by for/};
 
   # if we multi-prefetch we group_by primary keys only as this is what we would
   # get out of the rs via ->next/->all. We *DO WANT* to clobber old group_by regardless
@@ -1275,6 +1234,8 @@ sub _count_subq_rs {
   # Calculate subquery selector
   if (my $g = $sub_attrs->{group_by}) {
 
+    my $sql_maker = $rsrc->storage->sql_maker;
+
     # necessary as the group_by may refer to aliased functions
     my $sel_index;
     for my $sel (@{$attrs->{select}}) {
@@ -1283,7 +1244,17 @@ sub _count_subq_rs {
     }
 
     for my $g_part (@$g) {
-      push @{$sub_attrs->{select}}, $sel_index->{$g_part} || $g_part;
+      my $colpiece = $sel_index->{$g_part} || $g_part;
+
+      # disqualify join-based group_by's. Arcane but possible query
+      # also horrible horrible hack to alias a column (not a func.)
+      # (probably need to introduce SQLA syntax)
+      if ($colpiece =~ /\./ && $colpiece !~ /^$attrs->{alias}\./) {
+        my $as = $colpiece;
+        $as =~ s/\./__/;
+        $colpiece = \ sprintf ('%s AS %s', map { $sql_maker->_quote ($_) } ($colpiece, $as) );
+      }
+      push @{$sub_attrs->{select}}, $colpiece;
     }
   }
   else {
@@ -1295,7 +1266,7 @@ sub _count_subq_rs {
                ->new ($rsrc, $sub_attrs)
                 ->as_subselect_rs
                  ->search ({}, { columns => { count => $rsrc->storage->_count_select ($rsrc, $attrs) } })
-                  -> get_column ('count');
+                  ->get_column ('count');
 }
 
 sub _bool {
@@ -1396,12 +1367,12 @@ sub reset {
 
 =item Arguments: none
 
-=item Return Value: $object?
+=item Return Value: $object | undef
 
 =back
 
-Resets the resultset and returns an object for the first result (if the
-resultset returns anything).
+Resets the resultset and returns an object for the first result (or C<undef>
+if the resultset is empty).
 
 =cut
 
@@ -1490,8 +1461,16 @@ sub _rs_update_delete {
 =back
 
 Sets the specified columns in the resultset to the supplied values in a
-single query. Return value will be true if the update succeeded or false
-if no records were updated; exact type of success value is storage-dependent.
+single query. Note that this will not run any accessor/set_column/update
+triggers, nor will it update any row object instances derived from this
+resultset (this includes the contents of the L<resultset cache|/set_cache>
+if any). See L</update_all> if you need to execute any on-update
+triggers or cascades defined either by you or a
+L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
+
+The return value is a pass through of what the underlying
+storage backend returned, and may vary. See L<DBI/execute> for the most
+common case.
 
 =cut
 
@@ -1513,8 +1492,9 @@ sub update {
 
 =back
 
-Fetches all objects and updates them one at a time. Note that C<update_all>
-will run DBIC cascade triggers, while L</update> will not.
+Fetches all objects and updates them one at a time via
+L<DBIx::Class::Row/update>. Note that C<update_all> will run DBIC defined
+triggers, while L</update> will not.
 
 =cut
 
@@ -1539,12 +1519,16 @@ sub update_all {
 
 =back
 
-Deletes the contents of the resultset from its result source. Note that this
-will not run DBIC cascade triggers. See L</delete_all> if you need triggers
-to run. See also L<DBIx::Class::Row/delete>.
+Deletes the rows matching this resultset in a single query. Note that this
+will not run any delete triggers, nor will it alter the
+L<in_storage|DBIx::Class::Row/in_storage> status of any row object instances
+derived from this resultset (this includes the contents of the
+L<resultset cache|/set_cache> if any). See L</delete_all> if you need to
+execute any on-delete triggers or cascades defined either by you or a
+L<result component|DBIx::Class::Manual::Component/WHAT_IS_A_COMPONENT>.
 
-Return value will be the number of rows deleted; exact type of return value
-is storage-dependent.
+The return value is a pass through of what the underlying storage backend
+returned, and may vary. See L<DBI/execute> for the most common case.
 
 =cut
 
@@ -1566,8 +1550,9 @@ sub delete {
 
 =back
 
-Fetches all objects and deletes them one at a time. Note that C<delete_all>
-will run DBIC cascade triggers, while L</delete> will not.
+Fetches all objects and deletes them one at a time via
+L<DBIx::Class::Row/delete>. Note that C<delete_all> will run DBIC defined
+triggers, while L</delete> will not.
 
 =cut
 
@@ -1793,11 +1778,115 @@ C<total_entries> on the L<Data::Page> object.
 
 =cut
 
+# make a wizard good for both a scalar and a hashref
+my $mk_lazy_count_wizard = sub {
+  require Variable::Magic;
+
+  my $stash = { total_rs => shift };
+  my $slot = shift; # only used by the hashref magic
+
+  my $magic = Variable::Magic::wizard (
+    data => sub { $stash },
+
+    (!$slot)
+    ? (
+      # the scalar magic
+      get => sub {
+        # set value lazily, and dispell for good
+        ${$_[0]} = $_[1]{total_rs}->count;
+        Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref});
+        return 1;
+      },
+      set => sub {
+        # an explicit set implies dispell as well
+        # the unless() is to work around "fun and giggles" below
+        Variable::Magic::dispell (${$_[0]}, $_[1]{magic_selfref})
+          unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
+        return 1;
+      },
+    )
+    : (
+      # the uvar magic
+      fetch => sub {
+        if ($_[2] eq $slot and !$_[1]{inactive}) {
+          my $cnt = $_[1]{total_rs}->count;
+          $_[0]->{$slot} = $cnt;
+
+          # attempting to dispell in a fetch handle (works in store), seems
+          # to invariable segfault on 5.10, 5.12, 5.13 :(
+          # so use an inactivator instead
+          #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
+          $_[1]{inactive}++;
+        }
+        return 1;
+      },
+      store => sub {
+        if (! $_[1]{inactive} and $_[2] eq $slot) {
+          #Variable::Magic::dispell (%{$_[0]}, $_[1]{magic_selfref});
+          $_[1]{inactive}++
+            unless (caller(2))[3] eq 'DBIx::Class::ResultSet::pager';
+        }
+        return 1;
+      },
+    ),
+  );
+
+  $stash->{magic_selfref} = $magic;
+  weaken ($stash->{magic_selfref}); # this fails on 5.8.1
+
+  return $magic;
+};
+
+# the tie class for 5.8.1
+{
+  package DBIx::Class::__DBIC_LAZY_RS_COUNT__;
+  use base qw/Tie::Hash/;
+
+  sub FIRSTKEY { my $dummy = scalar keys %{$_[0]{data}}; each %{$_[0]{data}} }
+  sub NEXTKEY  { each %{$_[0]{data}} }
+  sub EXISTS   { exists $_[0]{data}{$_[1]} }
+  sub DELETE   { delete $_[0]{data}{$_[1]} }
+  sub CLEAR    { %{$_[0]{data}} = () }
+  sub SCALAR   { scalar %{$_[0]{data}} }
+
+  sub TIEHASH {
+    $_[1]{data} = {%{$_[1]{selfref}}};
+    %{$_[1]{selfref}} = ();
+    Scalar::Util::weaken ($_[1]{selfref});
+    return bless ($_[1], $_[0]);
+  };
+
+  sub FETCH {
+    if ($_[1] eq $_[0]{slot}) {
+      my $cnt = $_[0]{data}{$_[1]} = $_[0]{total_rs}->count;
+      untie %{$_[0]{selfref}};
+      %{$_[0]{selfref}} = %{$_[0]{data}};
+      return $cnt;
+    }
+    else {
+      $_[0]{data}{$_[1]};
+    }
+  }
+
+  sub STORE {
+    $_[0]{data}{$_[1]} = $_[2];
+    if ($_[1] eq $_[0]{slot}) {
+      untie %{$_[0]{selfref}};
+      %{$_[0]{selfref}} = %{$_[0]{data}};
+    }
+    $_[2];
+  }
+}
+
 sub pager {
   my ($self) = @_;
 
   return $self->{pager} if $self->{pager};
 
+  if ($self->get_cache) {
+    $self->throw_exception ('Pagers on cached resultsets are not supported');
+  }
+
   my $attrs = $self->{attrs};
   $self->throw_exception("Can't create pager for non-paged rs")
     unless $self->{attrs}{page};
@@ -1807,13 +1896,69 @@ sub pager {
   # with a subselect) to get the real total count
   my $count_attrs = { %$attrs };
   delete $count_attrs->{$_} for qw/rows offset page pager/;
-  my $total_count = (ref $self)->new($self->result_source, $count_attrs)->count;
+  my $total_rs = (ref $self)->new($self->result_source, $count_attrs);
 
-  return $self->{pager} = Data::Page->new(
-    $total_count,
+
+### the following may seem awkward and dirty, but it's a thought-experiment
+### necessary for future development of DBIx::DS. Do *NOT* change this code
+### before talking to ribasushi/mst
+
+  my $pager = Data::Page->new(
+    0,  #start with an empty set
     $attrs->{rows},
-    $self->{attrs}{page}
+    $self->{attrs}{page},
   );
+
+  my $data_slot = 'total_entries';
+
+  # Since we are interested in a cached value (once it's set - it's set), every
+  # technique will detach from the magic-host once the time comes to fire the
+  # ->count (or in the segfaulting case of >= 5.10 it will deactivate itself)
+
+  if ($] < 5.008003) {
+    # 5.8.1 throws 'Modification of a read-only value attempted' when one tries
+    # to weakref the magic container :(
+    # tested on 5.8.1
+    tie (%$pager, 'DBIx::Class::__DBIC_LAZY_RS_COUNT__',
+      { slot => $data_slot, total_rs => $total_rs, selfref => $pager }
+    );
+  }
+  elsif ($] < 5.010) {
+    # We can use magic on the hash value slot. It's interesting that the magic is
+    # attached to the hash-slot, and does *not* stop working once I do the dummy
+    # assignments after the cast()
+    # tested on 5.8.3 and 5.8.9
+    my $magic = $mk_lazy_count_wizard->($total_rs);
+    Variable::Magic::cast ( $pager->{$data_slot}, $magic );
+
+    # this is for fun and giggles
+    $pager->{$data_slot} = -1;
+    $pager->{$data_slot} = 0;
+
+    # this does not work for scalars, but works with
+    # uvar magic below
+    #my %vals = %$pager;
+    #%$pager = ();
+    #%{$pager} = %vals;
+  }
+  else {
+    # And the uvar magic
+    # works on 5.10.1, 5.12.1 and 5.13.4 in its current form,
+    # however see the wizard maker for more notes
+    my $magic = $mk_lazy_count_wizard->($total_rs, $data_slot);
+    Variable::Magic::cast ( %$pager, $magic );
+
+    # still works
+    $pager->{$data_slot} = -1;
+    $pager->{$data_slot} = 0;
+
+    # this now works
+    my %vals = %$pager;
+    %$pager = ();
+    %{$pager} = %vals;
+  }
+
+  return $self->{pager} = $pager;
 }
 
 =head2 page
@@ -1933,7 +2078,7 @@ sub _is_deterministic_value {
   my $value = shift;
   my $ref_type = ref $value;
   return 1 if $ref_type eq '' || $ref_type eq 'SCALAR';
-  return 1 if Scalar::Util::blessed($value);
+  return 1 if blessed $value;
   return 0;
 }
 
@@ -2180,7 +2325,7 @@ or C<has_one> resultset.  Note Arrayref.
   );
 
 Example of creating a new row and also creating a row in a related
-C<belongs_to>resultset. Note Hashref.
+C<belongs_to> resultset. Note Hashref.
 
   $cd_rs->create({
     title=>"Music for Silly Walks",
@@ -2410,7 +2555,7 @@ sub update_or_new {
 
 =item Arguments: none
 
-=item Return Value: \@cache_objects?
+=item Return Value: \@cache_objects | undef
 
 =back
 
@@ -2458,7 +2603,7 @@ sub set_cache {
 
 =item Arguments: none
 
-=item Return Value: []
+=item Return Value: undef
 
 =back
 
@@ -2501,7 +2646,7 @@ sub is_paged {
 
 sub is_ordered {
   my ($self) = @_;
-  return scalar $self->result_source->storage->_parse_order_by($self->{attrs}{order_by});
+  return scalar $self->result_source->storage->_extract_order_columns($self->{attrs}{order_by});
 }
 
 =head2 related_resultset
@@ -2544,7 +2689,7 @@ sub related_resultset {
     # (the select/as attrs were deleted in the beginning), we need to flip all
     # left joins to inner, so we get the expected results
     # read the comment on top of the actual function to see what this does
-    $attrs->{from} = $rsrc->schema->storage->_straight_join_to_node ($attrs->{from}, $alias);
+    $attrs->{from} = $rsrc->schema->storage->_inner_join_to_node ($attrs->{from}, $alias);
 
 
     #XXX - temp fix for result_class bug. There likely is a more elegant fix -groditi
@@ -2963,21 +3108,33 @@ sub _resolved_attrs {
       carp ("Useless use of distinct on a grouped resultset ('distinct' is ignored when a 'group_by' is present)");
     }
     else {
-      $attrs->{group_by} = [ grep { !ref($_) || (ref($_) ne 'HASH') } @{$attrs->{select}} ];
+      my $storage = $self->result_source->schema->storage;
+      my $rs_column_list = $storage->_resolve_column_info ($attrs->{from});
 
+      my $group_spec = $attrs->{group_by} = [];
+      my %group_index;
+
+      for (@{$attrs->{select}}) {
+        if (! ref($_) or ref ($_) ne 'HASH' ) {
+          push @$group_spec, $_;
+          $group_index{$_}++;
+          if ($rs_column_list->{$_} and $_ !~ /\./ ) {
+            # add a fully qualified version as well
+            $group_index{"$rs_column_list->{$_}{-source_alias}.$_"}++;
+          }
+        }
+      }
       # add any order_by parts that are not already present in the group_by
       # we need to be careful not to add any named functions/aggregates
       # i.e. select => [ ... { count => 'foo', -as 'foocount' } ... ]
-      my %already_grouped = map { $_ => 1 } (@{$attrs->{group_by}});
-
-      my $storage = $self->result_source->schema->storage;
+      for my $chunk ($storage->_extract_order_columns($attrs->{order_by})) {
 
-      my $rs_column_list = $storage->_resolve_column_info ($attrs->{from});
+        # only consider real columns (for functions the user got to do an explicit group_by)
+        my $colinfo = $rs_column_list->{$chunk}
+          or next;
 
-      for my $chunk ($storage->_parse_order_by($attrs->{order_by})) {
-        if ($rs_column_list->{$chunk} && not $already_grouped{$chunk}++) {
-          push @{$attrs->{group_by}}, $chunk;
-        }
+        $chunk = "$colinfo->{-source_alias}.$chunk" if $chunk !~ /\./;
+        push @$group_spec, $chunk unless $group_index{$chunk}++;
       }
     }
   }
@@ -3232,6 +3389,15 @@ it and sets C<select> from that, then auto-populates C<as> from
 C<select> as normal. (You may also use the C<cols> attribute, as in
 earlier versions of DBIC.)
 
+Essentially C<columns> does the same as L</select> and L</as>.
+
+    columns => [ 'foo', { bar => 'baz' } ]
+
+is the same as
+
+    select => [qw/foo baz/],
+    as => [qw/foo bar/]
+
 =head2 +columns
 
 =over 4