Tighten even more inflate_result() invocations, for crazy overloads
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
index f71bf38..8745c5f 100644 (file)
@@ -7,7 +7,7 @@ use DBIx::Class::Carp;
 use DBIx::Class::ResultSetColumn;
 use Scalar::Util qw/blessed weaken reftype/;
 use DBIx::Class::_Util qw(
-  fail_on_internal_wantarray UNRESOLVABLE_CONDITION
+  fail_on_internal_wantarray fail_on_internal_call UNRESOLVABLE_CONDITION
 );
 use Try::Tiny;
 use Data::Compare (); # no imports!!! guard against insane architecture
@@ -301,14 +301,18 @@ creation B<will not work>. See also warning pertaining to L</create>.
 
 sub new {
   my $class = shift;
-  return $class->new_result(@_) if ref $class;
+
+  if (ref $class) {
+    DBIx::Class::_ENV_::ASSERT_NO_INTERNAL_INDIRECT_CALLS and fail_on_internal_call;
+    return $class->new_result(@_);
+  }
 
   my ($source, $attrs) = @_;
   $source = $source->resolve
     if $source->isa('DBIx::Class::ResultSourceHandle');
 
   $attrs = { %{$attrs||{}} };
-  delete @{$attrs}{qw(_last_sqlmaker_alias_map _related_results_construction)};
+  delete @{$attrs}{qw(_last_sqlmaker_alias_map _simple_passthrough_construction)};
 
   if ($attrs->{page}) {
     $attrs->{rows} ||= 10;
@@ -389,7 +393,7 @@ sub search {
   my $rs = $self->search_rs( @_ );
 
   if (wantarray) {
-    DBIx::Class::_ENV_::ASSERT_NO_INTERNAL_WANTARRAY and my $sog = fail_on_internal_wantarray($rs);
+    DBIx::Class::_ENV_::ASSERT_NO_INTERNAL_WANTARRAY and my $sog = fail_on_internal_wantarray;
     return $rs->all;
   }
   elsif (defined wantarray) {
@@ -1411,8 +1415,8 @@ sub _construct_results {
   ) ? 1 : 0 ) unless defined $self->{_result_inflator}{is_hri};
 
 
-  if (! $attrs->{_related_results_construction}) {
-    # construct a much simpler array->hash folder for the one-table cases right here
+  if ($attrs->{_simple_passthrough_construction}) {
+    # construct a much simpler array->hash folder for the one-table HRI cases right here
     if ($self->{_result_inflator}{is_hri}) {
       for my $r (@$rows) {
         $r = { map { $infmap->[$_] => $r->[$_] } 0..$#$infmap };
@@ -1425,15 +1429,19 @@ sub _construct_results {
     #
     # crude unscientific benchmarking indicated the shortcut eval is not worth it for
     # this particular resultset size
-    elsif (@$rows < 60) {
+    elsif ( $self->{_result_inflator}{is_core_row} and @$rows < 60 ) {
       for my $r (@$rows) {
         $r = $inflator_cref->($res_class, $rsrc, { map { $infmap->[$_] => $r->[$_] } (0..$#$infmap) } );
       }
     }
     else {
       eval sprintf (
-        '$_ = $inflator_cref->($res_class, $rsrc, { %s }) for @$rows',
-        join (', ', map { "\$infmap->[$_] => \$_->[$_]" } 0..$#$infmap )
+        ( $self->{_result_inflator}{is_core_row}
+          ? '$_ = $inflator_cref->($res_class, $rsrc, { %s }) for @$rows'
+          # a custom inflator may be a multiplier/reductor - put it in direct list ctx
+          : '@$rows = map { $inflator_cref->($res_class, $rsrc, { %s } ) } @$rows'
+        ),
+        ( join (', ', map { "\$infmap->[$_] => \$_->[$_]" } 0..$#$infmap ) )
       );
     }
   }
@@ -1506,10 +1514,15 @@ EOS
       $next_cref ? ( $next_cref, $self->{_stashed_rows} = [] ) : (),
     );
 
-    # Special-case multi-object HRI - there is no $inflator_cref pass
-    unless ($self->{_result_inflator}{is_hri}) {
+    # simple in-place substitution, does not regrow $rows
+    if ($self->{_result_inflator}{is_core_row}) {
       $_ = $inflator_cref->($res_class, $rsrc, @$_) for @$rows
     }
+    # Special-case multi-object HRI - there is no $inflator_cref pass at all
+    elsif ( ! $self->{_result_inflator}{is_hri} ) {
+      # the inflator may be a multiplier/reductor - put it in list ctx
+      @$rows = map { $inflator_cref->($res_class, $rsrc, @$_) } @$rows;
+    }
   }
 
   # The @$rows check seems odd at first - why wouldn't we want to warn
@@ -2230,127 +2243,266 @@ case there are obviously no benefits to using this method over L</create>.
 sub populate {
   my $self = shift;
 
-  # cruft placed in standalone method
-  my $data = $self->_normalize_populate_args(@_);
+  my ($data, $guard);
 
-  return unless @$data;
+  # this is naive and just a quick check
+  # the types will need to be checked more thoroughly when the
+  # multi-source populate gets added
+  if (ref $_[0] eq 'ARRAY') {
+    return unless @{$_[0]};
 
-  if(defined wantarray) {
-    my @created = map { $self->create($_) } @$data;
-    return wantarray ? @created : \@created;
+    $data = $_[0] if (ref $_[0][0] eq 'HASH' or ref $_[0][0] eq 'ARRAY');
   }
-  else {
-    my $first = $data->[0];
 
-    # if a column is a registered relationship, and is a non-blessed hash/array, consider
-    # it relationship data
-    my (@rels, @columns);
-    my $rsrc = $self->result_source;
-    my $rels = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
-    for (keys %$first) {
-      my $ref = ref $first->{$_};
-      $rels->{$_} && ($ref eq 'ARRAY' or $ref eq 'HASH')
-        ? push @rels, $_
-        : push @columns, $_
+  $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs')
+    unless $data;
+
+  # FIXME - no cref handling
+  # At this point assume either hashes or arrays
+
+  if(defined wantarray) {
+    my @results;
+
+    $guard = $self->result_source->schema->storage->txn_scope_guard
+      if ( @$data > 2 or ( @$data == 2 and ref $data->[0] eq 'ARRAY' ) );
+
+    if (ref $data->[0] eq 'ARRAY') {
+      @results = map
+        { my $vals = $_; $self->new_result({ map { $data->[0][$_] => $vals->[$_] } 0..$#{$data->[0]} })->insert }
+        @{$data}[1 .. $#$data]
       ;
     }
+    else {
+      @results = map { $self->new_result($_)->insert } @$data;
+    }
+
+    $guard->commit if $guard;
+    return wantarray ? @results : \@results;
+  }
+
+  # we have to deal with *possibly incomplete* related data
+  # this means we have to walk the data structure twice
+  # whether we want this or not
+  # jnap, I hate you ;)
+  my $rsrc = $self->result_source;
+  my $rel_info = { map { $_ => $rsrc->relationship_info($_) } $rsrc->relationships };
+
+  my ($colinfo, $colnames, $slices_with_rels);
+  my $data_start = 0;
+
+  DATA_SLICE:
+  for my $i (0 .. $#$data) {
 
-    my @pks = $rsrc->primary_columns;
+    my $current_slice_seen_rel_infos;
 
-    ## do the belongs_to relationships
-    foreach my $index (0..$#$data) {
+### Determine/Supplement collists
+### BEWARE - This is a hot piece of code, a lot of weird idioms were used
+    if( ref $data->[$i] eq 'ARRAY' ) {
 
-      # delegate to create() for any dataset without primary keys with specified relationships
-      if (grep { !defined $data->[$index]->{$_} } @pks ) {
-        for my $r (@rels) {
-          if (grep { ref $data->[$index]{$r} eq $_ } qw/HASH ARRAY/) {  # a related set must be a HASH or AoH
-            my @ret = $self->populate($data);
-            return;
+      # positional(!) explicit column list
+      if ($i == 0) {
+
+        $colinfo->{$data->[0][$_]} = { pos => $_, name => $data->[0][$_] } and push @$colnames, $data->[0][$_]
+          for 0 .. $#{$data->[0]};
+
+        $data_start = 1;
+
+        next DATA_SLICE;
+      }
+      else {
+        for (values %$colinfo) {
+          if ($_->{is_rel} ||= (
+            $rel_info->{$_->{name}}
+              and
+            (
+              ref $data->[$i][$_->{pos}] eq 'ARRAY'
+                or
+              ref $data->[$i][$_->{pos}] eq 'HASH'
+                or
+              ( defined blessed $data->[$i][$_->{pos}] and $data->[$i][$_->{pos}]->isa('DBIx::Class::Row') )
+            )
+              and
+            1
+          )) {
+
+            # moar sanity check... sigh
+            for ( ref $data->[$i][$_->{pos}] eq 'ARRAY' ? @{$data->[$i][$_->{pos}]} : $data->[$i][$_->{pos}] ) {
+              if ( defined blessed $_ and $_->isa('DBIx::Class::Row' ) ) {
+                carp_unique("Fast-path populate() with supplied related objects is not possible - falling back to regular create()");
+                return my $throwaway = $self->populate(@_);
+              }
+            }
+
+            push @$current_slice_seen_rel_infos, $rel_info->{$_->{name}};
           }
         }
       }
 
-      foreach my $rel (@rels) {
-        next unless ref $data->[$index]->{$rel} eq "HASH";
-        my $result = $self->related_resultset($rel)->create($data->[$index]->{$rel});
-        my (undef, $reverse_relinfo) = %{$rsrc->reverse_relationship_info($rel)};
-        my $related = $result->result_source->_resolve_condition(
-          $reverse_relinfo->{cond},
-          $self,
-          $result,
-          $rel,
-        );
-
-        delete $data->[$index]->{$rel};
-        $data->[$index] = {%{$data->[$index]}, %$related};
-
-        push @columns, keys %$related if $index == 0;
+     if ($current_slice_seen_rel_infos) {
+        push @$slices_with_rels, { map { $colnames->[$_] => $data->[$i][$_] } 0 .. $#$colnames };
+
+        # this is needed further down to decide whether or not to fallback to create()
+        $colinfo->{$colnames->[$_]}{seen_null} ||= ! defined $data->[$i][$_]
+          for 0 .. $#$colnames;
       }
     }
+    elsif( ref $data->[$i] eq 'HASH' ) {
 
-    ## inherit the data locked in the conditions of the resultset
-    my ($rs_data) = $self->_merge_with_rscond({});
-    delete @{$rs_data}{@columns};
-
-    ## do bulk insert on current row
-    $rsrc->storage->insert_bulk(
-      $rsrc,
-      [@columns, keys %$rs_data],
-      [ map { [ @$_{@columns}, values %$rs_data ] } @$data ],
-    );
+      for ( sort keys %{$data->[$i]} ) {
 
-    ## do the has_many relationships
-    foreach my $item (@$data) {
+        $colinfo->{$_} ||= do {
 
-      my $main_row;
+          $self->throw_exception("Column '$_' must be present in supplied explicit column list")
+            if $data_start; # it will be 0 on AoH, 1 on AoA
 
-      foreach my $rel (@rels) {
-        next unless ref $item->{$rel} eq "ARRAY" && @{ $item->{$rel} };
+          push @$colnames, $_;
 
-        $main_row ||= $self->new_result({map { $_ => $item->{$_} } @pks});
+          # RV
+          { pos => $#$colnames, name => $_ }
+        };
 
-        my $child = $main_row->$rel;
+        if ($colinfo->{$_}{is_rel} ||= (
+          $rel_info->{$_}
+            and
+          (
+            ref $data->[$i]{$_} eq 'ARRAY'
+              or
+            ref $data->[$i]{$_} eq 'HASH'
+              or
+            ( defined blessed $data->[$i]{$_} and $data->[$i]{$_}->isa('DBIx::Class::Row') )
+          )
+            and
+          1
+        )) {
+
+          # moar sanity check... sigh
+          for ( ref $data->[$i]{$_} eq 'ARRAY' ? @{$data->[$i]{$_}} : $data->[$i]{$_} ) {
+            if ( defined blessed $_ and $_->isa('DBIx::Class::Row' ) ) {
+              carp_unique("Fast-path populate() with supplied related objects is not possible - falling back to regular create()");
+              return my $throwaway = $self->populate(@_);
+            }
+          }
 
-        my $related = $child->result_source->_resolve_condition(
-          $rels->{$rel}{cond},
-          $child,
-          $main_row,
-          $rel,
-        );
+          push @$current_slice_seen_rel_infos, $rel_info->{$_};
+        }
+      }
 
-        my @rows_to_add = ref $item->{$rel} eq 'ARRAY' ? @{$item->{$rel}} : ($item->{$rel});
-        my @populate = map { {%$_, %$related} } @rows_to_add;
+      if ($current_slice_seen_rel_infos) {
+        push @$slices_with_rels, $data->[$i];
 
-        $child->populate( \@populate );
+        # this is needed further down to decide whether or not to fallback to create()
+        $colinfo->{$_}{seen_null} ||= ! defined $data->[$i]{$_}
+          for keys %{$data->[$i]};
       }
     }
+    else {
+      $self->throw_exception('Unexpected populate() data structure member type: ' . ref $data->[$i] );
+    }
+
+    if ( grep
+      { $_->{attrs}{is_depends_on} }
+      @{ $current_slice_seen_rel_infos || [] }
+    ) {
+      carp_unique("Fast-path populate() of belongs_to relationship data is not possible - falling back to regular create()");
+      return my $throwaway = $self->populate(@_);
+    }
   }
-}
 
+  if( $slices_with_rels ) {
 
-# populate() arguments went over several incarnations
-# What we ultimately support is AoH
-sub _normalize_populate_args {
-  my ($self, $arg) = @_;
+    # need to exclude the rel "columns"
+    $colnames = [ grep { ! $colinfo->{$_}{is_rel} } @$colnames ];
 
-  if (ref $arg eq 'ARRAY') {
-    if (!@$arg) {
-      return [];
-    }
-    elsif (ref $arg->[0] eq 'HASH') {
-      return $arg;
+    # extra sanity check - ensure the main source is in fact identifiable
+    # the localizing of nullability is insane, but oh well... the use-case is legit
+    my $ci = $rsrc->columns_info($colnames);
+
+    $ci->{$_} = { %{$ci->{$_}}, is_nullable => 0 }
+      for grep { ! $colinfo->{$_}{seen_null} } keys %$ci;
+
+    unless( $rsrc->_identifying_column_set($ci) ) {
+      carp_unique("Fast-path populate() of non-uniquely identifiable rows with related data is not possible - falling back to regular create()");
+      return my $throwaway = $self->populate(@_);
     }
-    elsif (ref $arg->[0] eq 'ARRAY') {
-      my @ret;
-      my @colnames = @{$arg->[0]};
-      foreach my $values (@{$arg}[1 .. $#$arg]) {
-        push @ret, { map { $colnames[$_] => $values->[$_] } (0 .. $#colnames) };
+  }
+
+### inherit the data locked in the conditions of the resultset
+  my ($rs_data) = $self->_merge_with_rscond({});
+  delete @{$rs_data}{@$colnames};  # passed-in stuff takes precedence
+
+  # if anything left - decompose rs_data
+  my $rs_data_vals;
+  if (keys %$rs_data) {
+     push @$rs_data_vals, $rs_data->{$_}
+      for sort keys %$rs_data;
+  }
+
+### start work
+  $guard = $rsrc->schema->storage->txn_scope_guard
+    if $slices_with_rels;
+
+### main source data
+  # FIXME - need to switch entirely to a coderef-based thing,
+  # so that large sets aren't copied several times... I think
+  $rsrc->storage->insert_bulk(
+    $rsrc,
+    [ @$colnames, sort keys %$rs_data ],
+    [ map {
+      ref $data->[$_] eq 'ARRAY'
+      ? (
+          $slices_with_rels ? [ @{$data->[$_]}[0..$#$colnames], @{$rs_data_vals||[]} ]  # the collist changed
+        : $rs_data_vals     ? [ @{$data->[$_]}, @$rs_data_vals ]
+        :                     $data->[$_]
+      )
+      : [ @{$data->[$_]}{@$colnames}, @{$rs_data_vals||[]} ]
+    } $data_start .. $#$data ],
+  );
+
+### do the children relationships
+  if ( $slices_with_rels ) {
+    my @rels = grep { $colinfo->{$_}{is_rel} } keys %$colinfo
+      or die 'wtf... please report a bug with DBIC_TRACE=1 output (stacktrace)';
+
+    for my $sl (@$slices_with_rels) {
+
+      my ($main_proto, $main_proto_rs);
+      for my $rel (@rels) {
+        next unless defined $sl->{$rel};
+
+        $main_proto ||= {
+          %$rs_data,
+          (map { $_ => $sl->{$_} } @$colnames),
+        };
+
+        unless (defined $colinfo->{$rel}{rs}) {
+
+          $colinfo->{$rel}{rs} = $rsrc->related_source($rel)->resultset;
+
+          $colinfo->{$rel}{fk_map} = { reverse %{ $rsrc->_resolve_relationship_condition(
+            rel_name => $rel,
+            self_alias => "\xFE", # irrelevant
+            foreign_alias => "\xFF", # irrelevant
+          )->{identity_map} || {} } };
+
+        }
+
+        $colinfo->{$rel}{rs}->search({ map # only so that we inherit them values properly, no actual search
+          {
+            $_ => { '=' =>
+              ( $main_proto_rs ||= $rsrc->resultset->search($main_proto) )
+                ->get_column( $colinfo->{$rel}{fk_map}{$_} )
+                 ->as_query
+            }
+          }
+          keys %{$colinfo->{$rel}{fk_map}}
+        })->populate( ref $sl->{$rel} eq 'ARRAY' ? $sl->{$rel} : [ $sl->{$rel} ] );
+
+        1;
       }
-      return \@ret;
     }
   }
 
-  $self->throw_exception('Populate expects an arrayref of hashrefs or arrayref of arrayrefs');
+  $guard->commit if $guard;
 }
 
 =head2 pager
@@ -2446,7 +2598,7 @@ sub new_result {
   $self->throw_exception( "new_result takes only one argument - a hashref of values" )
     if @_ > 2;
 
-  $self->throw_exception( "new_result expects a hashref" )
+  $self->throw_exception( "Result object instantiation requires a hashref as argument" )
     unless (ref $values eq 'HASH');
 
   my ($merged_cond, $cols_from_relations) = $self->_merge_with_rscond($values);
@@ -2747,10 +2899,9 @@ L</new>.
 =cut
 
 sub create {
-  my ($self, $col_data) = @_;
-  $self->throw_exception( "create needs a hashref" )
-    unless ref $col_data eq 'HASH';
-  return $self->new_result($col_data)->insert;
+  #my ($self, $col_data) = @_;
+  DBIx::Class::_ENV_::ASSERT_NO_INTERNAL_INDIRECT_CALLS and fail_on_internal_call;
+  return shift->new_result(shift)->insert;
 }
 
 =head2 find_or_create
@@ -2832,7 +2983,7 @@ sub find_or_create {
   if (keys %$hash and my $row = $self->find($hash, $attrs) ) {
     return $row;
   }
-  return $self->create($hash);
+  return $self->new_result($hash)->insert;
 }
 
 =head2 update_or_create
@@ -2902,7 +3053,7 @@ sub update_or_create {
     return $row;
   }
 
-  return $self->create($cond);
+  return $self->new_result($cond)->insert;
 }
 
 =head2 update_or_new
@@ -3607,10 +3758,11 @@ sub _resolved_attrs {
   push @{$attrs->{select}}, @prefetch_select;
   push @{$attrs->{as}}, @prefetch_as;
 
-  # whether we can get away with the dumbest (possibly DBI-internal) collapser
-  if ( List::Util::first { $_ =~ /\./ } @{$attrs->{as}} ) {
-    $attrs->{_related_results_construction} = 1;
-  }
+  $attrs->{_simple_passthrough_construction} = !(
+    $attrs->{collapse}
+      or
+    grep { $_ =~ /\./ } @{$attrs->{as}}
+  );
 
   # if both page and offset are specified, produce a combined offset
   # even though it doesn't make much sense, this is what pre 081xx has
@@ -3930,7 +4082,7 @@ syntax as outlined above.
 Shortcut to request a particular set of columns to be retrieved. Each
 column spec may be a string (a table column name), or a hash (in which
 case the key is the C<as> value, and the value is used as the C<select>
-expression). Adds C<me.> onto the start of any column without a C<.> in
+expression). Adds the L</current_source_alias> onto the start of any column without a C<.> in
 it and sets C<select> from that, then auto-populates C<as> from
 C<select> as normal. (You may also use the C<cols> attribute, as in
 earlier versions of DBIC, but this is deprecated)