Initial full test pass - all fetches are eager for now
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSource.pm
1 package DBIx::Class::ResultSource;
2
3 use strict;
4 use warnings;
5
6 use base 'DBIx::Class';
7
8 use DBIx::Class::ResultSet;
9 use DBIx::Class::ResultSourceHandle;
10
11 use DBIx::Class::Exception;
12 use DBIx::Class::Carp;
13 use DBIx::Class::GlobalDestruction;
14 use Try::Tiny;
15 use List::Util 'first';
16 use Scalar::Util qw/blessed weaken isweak/;
17 use B 'perlstring';
18
19 use namespace::clean;
20
21 __PACKAGE__->mk_group_accessors(simple => qw/
22   source_name name source_info
23   _ordered_columns _columns _primaries _unique_constraints
24   _relationships resultset_attributes
25   column_info_from_storage
26 /);
27
28 __PACKAGE__->mk_group_accessors(component_class => qw/
29   resultset_class
30   result_class
31 /);
32
33 __PACKAGE__->mk_classdata( sqlt_deploy_callback => 'default_sqlt_deploy_hook' );
34
35 =head1 NAME
36
37 DBIx::Class::ResultSource - Result source object
38
39 =head1 SYNOPSIS
40
41   # Create a table based result source, in a result class.
42
43   package MyApp::Schema::Result::Artist;
44   use base qw/DBIx::Class::Core/;
45
46   __PACKAGE__->table('artist');
47   __PACKAGE__->add_columns(qw/ artistid name /);
48   __PACKAGE__->set_primary_key('artistid');
49   __PACKAGE__->has_many(cds => 'MyApp::Schema::Result::CD');
50
51   1;
52
53   # Create a query (view) based result source, in a result class
54   package MyApp::Schema::Result::Year2000CDs;
55   use base qw/DBIx::Class::Core/;
56
57   __PACKAGE__->load_components('InflateColumn::DateTime');
58   __PACKAGE__->table_class('DBIx::Class::ResultSource::View');
59
60   __PACKAGE__->table('year2000cds');
61   __PACKAGE__->result_source_instance->is_virtual(1);
62   __PACKAGE__->result_source_instance->view_definition(
63       "SELECT cdid, artist, title FROM cd WHERE year ='2000'"
64       );
65
66
67 =head1 DESCRIPTION
68
69 A ResultSource is an object that represents a source of data for querying.
70
71 This class is a base class for various specialised types of result
72 sources, for example L<DBIx::Class::ResultSource::Table>. Table is the
73 default result source type, so one is created for you when defining a
74 result class as described in the synopsis above.
75
76 More specifically, the L<DBIx::Class::Core> base class pulls in the
77 L<DBIx::Class::ResultSourceProxy::Table> component, which defines
78 the L<table|DBIx::Class::ResultSourceProxy::Table/table> method.
79 When called, C<table> creates and stores an instance of
80 L<DBIx::Class::ResultSoure::Table>. Luckily, to use tables as result
81 sources, you don't need to remember any of this.
82
83 Result sources representing select queries, or views, can also be
84 created, see L<DBIx::Class::ResultSource::View> for full details.
85
86 =head2 Finding result source objects
87
88 As mentioned above, a result source instance is created and stored for
89 you when you define a L<Result Class|DBIx::Class::Manual::Glossary/Result Class>.
90
91 You can retrieve the result source at runtime in the following ways:
92
93 =over
94
95 =item From a Schema object:
96
97    $schema->source($source_name);
98
99 =item From a Row object:
100
101    $row->result_source;
102
103 =item From a ResultSet object:
104
105    $rs->result_source;
106
107 =back
108
109 =head1 METHODS
110
111 =pod
112
113 =cut
114
115 sub new {
116   my ($class, $attrs) = @_;
117   $class = ref $class if ref $class;
118
119   my $new = bless { %{$attrs || {}} }, $class;
120   $new->{resultset_class} ||= 'DBIx::Class::ResultSet';
121   $new->{resultset_attributes} = { %{$new->{resultset_attributes} || {}} };
122   $new->{_ordered_columns} = [ @{$new->{_ordered_columns}||[]}];
123   $new->{_columns} = { %{$new->{_columns}||{}} };
124   $new->{_relationships} = { %{$new->{_relationships}||{}} };
125   $new->{name} ||= "!!NAME NOT SET!!";
126   $new->{_columns_info_loaded} ||= 0;
127   return $new;
128 }
129
130 =pod
131
132 =head2 add_columns
133
134 =over
135
136 =item Arguments: @columns
137
138 =item Return value: The ResultSource object
139
140 =back
141
142   $source->add_columns(qw/col1 col2 col3/);
143
144   $source->add_columns('col1' => \%col1_info, 'col2' => \%col2_info, ...);
145
146 Adds columns to the result source. If supplied colname => hashref
147 pairs, uses the hashref as the L</column_info> for that column. Repeated
148 calls of this method will add more columns, not replace them.
149
150 The column names given will be created as accessor methods on your
151 L<DBIx::Class::Row> objects. You can change the name of the accessor
152 by supplying an L</accessor> in the column_info hash.
153
154 If a column name beginning with a plus sign ('+col1') is provided, the
155 attributes provided will be merged with any existing attributes for the
156 column, with the new attributes taking precedence in the case that an
157 attribute already exists. Using this without a hashref
158 (C<< $source->add_columns(qw/+col1 +col2/) >>) is legal, but useless --
159 it does the same thing it would do without the plus.
160
161 The contents of the column_info are not set in stone. The following
162 keys are currently recognised/used by DBIx::Class:
163
164 =over 4
165
166 =item accessor
167
168    { accessor => '_name' }
169
170    # example use, replace standard accessor with one of your own:
171    sub name {
172        my ($self, $value) = @_;
173
174        die "Name cannot contain digits!" if($value =~ /\d/);
175        $self->_name($value);
176
177        return $self->_name();
178    }
179
180 Use this to set the name of the accessor method for this column. If unset,
181 the name of the column will be used.
182
183 =item data_type
184
185    { data_type => 'integer' }
186
187 This contains the column type. It is automatically filled if you use the
188 L<SQL::Translator::Producer::DBIx::Class::File> producer, or the
189 L<DBIx::Class::Schema::Loader> module.
190
191 Currently there is no standard set of values for the data_type. Use
192 whatever your database supports.
193
194 =item size
195
196    { size => 20 }
197
198 The length of your column, if it is a column type that can have a size
199 restriction. This is currently only used to create tables from your
200 schema, see L<DBIx::Class::Schema/deploy>.
201
202 =item is_nullable
203
204    { is_nullable => 1 }
205
206 Set this to a true value for a columns that is allowed to contain NULL
207 values, default is false. This is currently only used to create tables
208 from your schema, see L<DBIx::Class::Schema/deploy>.
209
210 =item is_auto_increment
211
212    { is_auto_increment => 1 }
213
214 Set this to a true value for a column whose value is somehow
215 automatically set, defaults to false. This is used to determine which
216 columns to empty when cloning objects using
217 L<DBIx::Class::Row/copy>. It is also used by
218 L<DBIx::Class::Schema/deploy>.
219
220 =item is_numeric
221
222    { is_numeric => 1 }
223
224 Set this to a true or false value (not C<undef>) to explicitly specify
225 if this column contains numeric data. This controls how set_column
226 decides whether to consider a column dirty after an update: if
227 C<is_numeric> is true a numeric comparison C<< != >> will take place
228 instead of the usual C<eq>
229
230 If not specified the storage class will attempt to figure this out on
231 first access to the column, based on the column C<data_type>. The
232 result will be cached in this attribute.
233
234 =item is_foreign_key
235
236    { is_foreign_key => 1 }
237
238 Set this to a true value for a column that contains a key from a
239 foreign table, defaults to false. This is currently only used to
240 create tables from your schema, see L<DBIx::Class::Schema/deploy>.
241
242 =item default_value
243
244    { default_value => \'now()' }
245
246 Set this to the default value which will be inserted into a column by
247 the database. Can contain either a value or a function (use a
248 reference to a scalar e.g. C<\'now()'> if you want a function). This
249 is currently only used to create tables from your schema, see
250 L<DBIx::Class::Schema/deploy>.
251
252 See the note on L<DBIx::Class::Row/new> for more information about possible
253 issues related to db-side default values.
254
255 =item sequence
256
257    { sequence => 'my_table_seq' }
258
259 Set this on a primary key column to the name of the sequence used to
260 generate a new key value. If not specified, L<DBIx::Class::PK::Auto>
261 will attempt to retrieve the name of the sequence from the database
262 automatically.
263
264 =item retrieve_on_insert
265
266   { retrieve_on_insert => 1 }
267
268 For every column where this is set to true, DBIC will retrieve the RDBMS-side
269 value upon a new row insertion (normally only the autoincrement PK is
270 retrieved on insert). C<INSERT ... RETURNING> is used automatically if
271 supported by the underlying storage, otherwise an extra SELECT statement is
272 executed to retrieve the missing data.
273
274 =item auto_nextval
275
276    { auto_nextval => 1 }
277
278 Set this to a true value for a column whose value is retrieved automatically
279 from a sequence or function (if supported by your Storage driver.) For a
280 sequence, if you do not use a trigger to get the nextval, you have to set the
281 L</sequence> value as well.
282
283 Also set this for MSSQL columns with the 'uniqueidentifier'
284 L<data_type|DBIx::Class::ResultSource/data_type> whose values you want to
285 automatically generate using C<NEWID()>, unless they are a primary key in which
286 case this will be done anyway.
287
288 =item extra
289
290 This is used by L<DBIx::Class::Schema/deploy> and L<SQL::Translator>
291 to add extra non-generic data to the column. For example: C<< extra
292 => { unsigned => 1} >> is used by the MySQL producer to set an integer
293 column to unsigned. For more details, see
294 L<SQL::Translator::Producer::MySQL>.
295
296 =back
297
298 =head2 add_column
299
300 =over
301
302 =item Arguments: $colname, \%columninfo?
303
304 =item Return value: 1/0 (true/false)
305
306 =back
307
308   $source->add_column('col' => \%info);
309
310 Add a single column and optional column info. Uses the same column
311 info keys as L</add_columns>.
312
313 =cut
314
315 sub add_columns {
316   my ($self, @cols) = @_;
317   $self->_ordered_columns(\@cols) unless $self->_ordered_columns;
318
319   my @added;
320   my $columns = $self->_columns;
321   while (my $col = shift @cols) {
322     my $column_info = {};
323     if ($col =~ s/^\+//) {
324       $column_info = $self->column_info($col);
325     }
326
327     # If next entry is { ... } use that for the column info, if not
328     # use an empty hashref
329     if (ref $cols[0]) {
330       my $new_info = shift(@cols);
331       %$column_info = (%$column_info, %$new_info);
332     }
333     push(@added, $col) unless exists $columns->{$col};
334     $columns->{$col} = $column_info;
335   }
336   push @{ $self->_ordered_columns }, @added;
337   return $self;
338 }
339
340 sub add_column { shift->add_columns(@_); } # DO NOT CHANGE THIS TO GLOB
341
342 =head2 has_column
343
344 =over
345
346 =item Arguments: $colname
347
348 =item Return value: 1/0 (true/false)
349
350 =back
351
352   if ($source->has_column($colname)) { ... }
353
354 Returns true if the source has a column of this name, false otherwise.
355
356 =cut
357
358 sub has_column {
359   my ($self, $column) = @_;
360   return exists $self->_columns->{$column};
361 }
362
363 =head2 column_info
364
365 =over
366
367 =item Arguments: $colname
368
369 =item Return value: Hashref of info
370
371 =back
372
373   my $info = $source->column_info($col);
374
375 Returns the column metadata hashref for a column, as originally passed
376 to L</add_columns>. See L</add_columns> above for information on the
377 contents of the hashref.
378
379 =cut
380
381 sub column_info {
382   my ($self, $column) = @_;
383   $self->throw_exception("No such column $column")
384     unless exists $self->_columns->{$column};
385
386   if ( ! $self->_columns->{$column}{data_type}
387        and ! $self->{_columns_info_loaded}
388        and $self->column_info_from_storage
389        and my $stor = try { $self->storage } )
390   {
391     $self->{_columns_info_loaded}++;
392
393     # try for the case of storage without table
394     try {
395       my $info = $stor->columns_info_for( $self->from );
396       my $lc_info = { map
397         { (lc $_) => $info->{$_} }
398         ( keys %$info )
399       };
400
401       foreach my $col ( keys %{$self->_columns} ) {
402         $self->_columns->{$col} = {
403           %{ $self->_columns->{$col} },
404           %{ $info->{$col} || $lc_info->{lc $col} || {} }
405         };
406       }
407     };
408   }
409
410   return $self->_columns->{$column};
411 }
412
413 =head2 columns
414
415 =over
416
417 =item Arguments: None
418
419 =item Return value: Ordered list of column names
420
421 =back
422
423   my @column_names = $source->columns;
424
425 Returns all column names in the order they were declared to L</add_columns>.
426
427 =cut
428
429 sub columns {
430   my $self = shift;
431   $self->throw_exception(
432     "columns() is a read-only accessor, did you mean add_columns()?"
433   ) if @_;
434   return @{$self->{_ordered_columns}||[]};
435 }
436
437 =head2 columns_info
438
439 =over
440
441 =item Arguments: \@colnames ?
442
443 =item Return value: Hashref of column name/info pairs
444
445 =back
446
447   my $columns_info = $source->columns_info;
448
449 Like L</column_info> but returns information for the requested columns. If
450 the optional column-list arrayref is omitted it returns info on all columns
451 currently defined on the ResultSource via L</add_columns>.
452
453 =cut
454
455 sub columns_info {
456   my ($self, $columns) = @_;
457
458   my $colinfo = $self->_columns;
459
460   if (
461     first { ! $_->{data_type} } values %$colinfo
462       and
463     ! $self->{_columns_info_loaded}
464       and
465     $self->column_info_from_storage
466       and
467     my $stor = try { $self->storage }
468   ) {
469     $self->{_columns_info_loaded}++;
470
471     # try for the case of storage without table
472     try {
473       my $info = $stor->columns_info_for( $self->from );
474       my $lc_info = { map
475         { (lc $_) => $info->{$_} }
476         ( keys %$info )
477       };
478
479       foreach my $col ( keys %$colinfo ) {
480         $colinfo->{$col} = {
481           %{ $colinfo->{$col} },
482           %{ $info->{$col} || $lc_info->{lc $col} || {} }
483         };
484       }
485     };
486   }
487
488   my %ret;
489
490   if ($columns) {
491     for (@$columns) {
492       if (my $inf = $colinfo->{$_}) {
493         $ret{$_} = $inf;
494       }
495       else {
496         $self->throw_exception( sprintf (
497           "No such column '%s' on source %s",
498           $_,
499           $self->source_name,
500         ));
501       }
502     }
503   }
504   else {
505     %ret = %$colinfo;
506   }
507
508   return \%ret;
509 }
510
511 =head2 remove_columns
512
513 =over
514
515 =item Arguments: @colnames
516
517 =item Return value: undefined
518
519 =back
520
521   $source->remove_columns(qw/col1 col2 col3/);
522
523 Removes the given list of columns by name, from the result source.
524
525 B<Warning>: Removing a column that is also used in the sources primary
526 key, or in one of the sources unique constraints, B<will> result in a
527 broken result source.
528
529 =head2 remove_column
530
531 =over
532
533 =item Arguments: $colname
534
535 =item Return value: undefined
536
537 =back
538
539   $source->remove_column('col');
540
541 Remove a single column by name from the result source, similar to
542 L</remove_columns>.
543
544 B<Warning>: Removing a column that is also used in the sources primary
545 key, or in one of the sources unique constraints, B<will> result in a
546 broken result source.
547
548 =cut
549
550 sub remove_columns {
551   my ($self, @to_remove) = @_;
552
553   my $columns = $self->_columns
554     or return;
555
556   my %to_remove;
557   for (@to_remove) {
558     delete $columns->{$_};
559     ++$to_remove{$_};
560   }
561
562   $self->_ordered_columns([ grep { not $to_remove{$_} } @{$self->_ordered_columns} ]);
563 }
564
565 sub remove_column { shift->remove_columns(@_); } # DO NOT CHANGE THIS TO GLOB
566
567 =head2 set_primary_key
568
569 =over 4
570
571 =item Arguments: @cols
572
573 =item Return value: undefined
574
575 =back
576
577 Defines one or more columns as primary key for this source. Must be
578 called after L</add_columns>.
579
580 Additionally, defines a L<unique constraint|add_unique_constraint>
581 named C<primary>.
582
583 Note: you normally do want to define a primary key on your sources
584 B<even if the underlying database table does not have a primary key>.
585 See
586 L<DBIx::Class::Manual::Intro/The Significance and Importance of Primary Keys>
587 for more info.
588
589 =cut
590
591 sub set_primary_key {
592   my ($self, @cols) = @_;
593   # check if primary key columns are valid columns
594   foreach my $col (@cols) {
595     $self->throw_exception("No such column $col on table " . $self->name)
596       unless $self->has_column($col);
597   }
598   $self->_primaries(\@cols);
599
600   $self->add_unique_constraint(primary => \@cols);
601 }
602
603 =head2 primary_columns
604
605 =over 4
606
607 =item Arguments: None
608
609 =item Return value: Ordered list of primary column names
610
611 =back
612
613 Read-only accessor which returns the list of primary keys, supplied by
614 L</set_primary_key>.
615
616 =cut
617
618 sub primary_columns {
619   return @{shift->_primaries||[]};
620 }
621
622 # a helper method that will automatically die with a descriptive message if
623 # no pk is defined on the source in question. For internal use to save
624 # on if @pks... boilerplate
625 sub _pri_cols {
626   my $self = shift;
627   my @pcols = $self->primary_columns
628     or $self->throw_exception (sprintf(
629       "Operation requires a primary key to be declared on '%s' via set_primary_key",
630       # source_name is set only after schema-registration
631       $self->source_name || $self->result_class || $self->name || 'Unknown source...?',
632     ));
633   return @pcols;
634 }
635
636 =head2 sequence
637
638 Manually define the correct sequence for your table, to avoid the overhead
639 associated with looking up the sequence automatically. The supplied sequence
640 will be applied to the L</column_info> of each L<primary_key|/set_primary_key>
641
642 =over 4
643
644 =item Arguments: $sequence_name
645
646 =item Return value: undefined
647
648 =back
649
650 =cut
651
652 sub sequence {
653   my ($self,$seq) = @_;
654
655   my @pks = $self->primary_columns
656     or return;
657
658   $_->{sequence} = $seq
659     for values %{ $self->columns_info (\@pks) };
660 }
661
662
663 =head2 add_unique_constraint
664
665 =over 4
666
667 =item Arguments: $name?, \@colnames
668
669 =item Return value: undefined
670
671 =back
672
673 Declare a unique constraint on this source. Call once for each unique
674 constraint.
675
676   # For UNIQUE (column1, column2)
677   __PACKAGE__->add_unique_constraint(
678     constraint_name => [ qw/column1 column2/ ],
679   );
680
681 Alternatively, you can specify only the columns:
682
683   __PACKAGE__->add_unique_constraint([ qw/column1 column2/ ]);
684
685 This will result in a unique constraint named
686 C<table_column1_column2>, where C<table> is replaced with the table
687 name.
688
689 Unique constraints are used, for example, when you pass the constraint
690 name as the C<key> attribute to L<DBIx::Class::ResultSet/find>. Then
691 only columns in the constraint are searched.
692
693 Throws an error if any of the given column names do not yet exist on
694 the result source.
695
696 =cut
697
698 sub add_unique_constraint {
699   my $self = shift;
700
701   if (@_ > 2) {
702     $self->throw_exception(
703         'add_unique_constraint() does not accept multiple constraints, use '
704       . 'add_unique_constraints() instead'
705     );
706   }
707
708   my $cols = pop @_;
709   if (ref $cols ne 'ARRAY') {
710     $self->throw_exception (
711       'Expecting an arrayref of constraint columns, got ' . ($cols||'NOTHING')
712     );
713   }
714
715   my $name = shift @_;
716
717   $name ||= $self->name_unique_constraint($cols);
718
719   foreach my $col (@$cols) {
720     $self->throw_exception("No such column $col on table " . $self->name)
721       unless $self->has_column($col);
722   }
723
724   my %unique_constraints = $self->unique_constraints;
725   $unique_constraints{$name} = $cols;
726   $self->_unique_constraints(\%unique_constraints);
727 }
728
729 =head2 add_unique_constraints
730
731 =over 4
732
733 =item Arguments: @constraints
734
735 =item Return value: undefined
736
737 =back
738
739 Declare multiple unique constraints on this source.
740
741   __PACKAGE__->add_unique_constraints(
742     constraint_name1 => [ qw/column1 column2/ ],
743     constraint_name2 => [ qw/column2 column3/ ],
744   );
745
746 Alternatively, you can specify only the columns:
747
748   __PACKAGE__->add_unique_constraints(
749     [ qw/column1 column2/ ],
750     [ qw/column3 column4/ ]
751   );
752
753 This will result in unique constraints named C<table_column1_column2> and
754 C<table_column3_column4>, where C<table> is replaced with the table name.
755
756 Throws an error if any of the given column names do not yet exist on
757 the result source.
758
759 See also L</add_unique_constraint>.
760
761 =cut
762
763 sub add_unique_constraints {
764   my $self = shift;
765   my @constraints = @_;
766
767   if ( !(@constraints % 2) && first { ref $_ ne 'ARRAY' } @constraints ) {
768     # with constraint name
769     while (my ($name, $constraint) = splice @constraints, 0, 2) {
770       $self->add_unique_constraint($name => $constraint);
771     }
772   }
773   else {
774     # no constraint name
775     foreach my $constraint (@constraints) {
776       $self->add_unique_constraint($constraint);
777     }
778   }
779 }
780
781 =head2 name_unique_constraint
782
783 =over 4
784
785 =item Arguments: \@colnames
786
787 =item Return value: Constraint name
788
789 =back
790
791   $source->table('mytable');
792   $source->name_unique_constraint(['col1', 'col2']);
793   # returns
794   'mytable_col1_col2'
795
796 Return a name for a unique constraint containing the specified
797 columns. The name is created by joining the table name and each column
798 name, using an underscore character.
799
800 For example, a constraint on a table named C<cd> containing the columns
801 C<artist> and C<title> would result in a constraint name of C<cd_artist_title>.
802
803 This is used by L</add_unique_constraint> if you do not specify the
804 optional constraint name.
805
806 =cut
807
808 sub name_unique_constraint {
809   my ($self, $cols) = @_;
810
811   my $name = $self->name;
812   $name = $$name if (ref $name eq 'SCALAR');
813
814   return join '_', $name, @$cols;
815 }
816
817 =head2 unique_constraints
818
819 =over 4
820
821 =item Arguments: None
822
823 =item Return value: Hash of unique constraint data
824
825 =back
826
827   $source->unique_constraints();
828
829 Read-only accessor which returns a hash of unique constraints on this
830 source.
831
832 The hash is keyed by constraint name, and contains an arrayref of
833 column names as values.
834
835 =cut
836
837 sub unique_constraints {
838   return %{shift->_unique_constraints||{}};
839 }
840
841 =head2 unique_constraint_names
842
843 =over 4
844
845 =item Arguments: None
846
847 =item Return value: Unique constraint names
848
849 =back
850
851   $source->unique_constraint_names();
852
853 Returns the list of unique constraint names defined on this source.
854
855 =cut
856
857 sub unique_constraint_names {
858   my ($self) = @_;
859
860   my %unique_constraints = $self->unique_constraints;
861
862   return keys %unique_constraints;
863 }
864
865 =head2 unique_constraint_columns
866
867 =over 4
868
869 =item Arguments: $constraintname
870
871 =item Return value: List of constraint columns
872
873 =back
874
875   $source->unique_constraint_columns('myconstraint');
876
877 Returns the list of columns that make up the specified unique constraint.
878
879 =cut
880
881 sub unique_constraint_columns {
882   my ($self, $constraint_name) = @_;
883
884   my %unique_constraints = $self->unique_constraints;
885
886   $self->throw_exception(
887     "Unknown unique constraint $constraint_name on '" . $self->name . "'"
888   ) unless exists $unique_constraints{$constraint_name};
889
890   return @{ $unique_constraints{$constraint_name} };
891 }
892
893 =head2 sqlt_deploy_callback
894
895 =over
896
897 =item Arguments: $callback_name | \&callback_code
898
899 =item Return value: $callback_name | \&callback_code
900
901 =back
902
903   __PACKAGE__->sqlt_deploy_callback('mycallbackmethod');
904
905    or
906
907   __PACKAGE__->sqlt_deploy_callback(sub {
908     my ($source_instance, $sqlt_table) = @_;
909     ...
910   } );
911
912 An accessor to set a callback to be called during deployment of
913 the schema via L<DBIx::Class::Schema/create_ddl_dir> or
914 L<DBIx::Class::Schema/deploy>.
915
916 The callback can be set as either a code reference or the name of a
917 method in the current result class.
918
919 Defaults to L</default_sqlt_deploy_hook>.
920
921 Your callback will be passed the $source object representing the
922 ResultSource instance being deployed, and the
923 L<SQL::Translator::Schema::Table> object being created from it. The
924 callback can be used to manipulate the table object or add your own
925 customised indexes. If you need to manipulate a non-table object, use
926 the L<DBIx::Class::Schema/sqlt_deploy_hook>.
927
928 See L<DBIx::Class::Manual::Cookbook/Adding Indexes And Functions To
929 Your SQL> for examples.
930
931 This sqlt deployment callback can only be used to manipulate
932 SQL::Translator objects as they get turned into SQL. To execute
933 post-deploy statements which SQL::Translator does not currently
934 handle, override L<DBIx::Class::Schema/deploy> in your Schema class
935 and call L<dbh_do|DBIx::Class::Storage::DBI/dbh_do>.
936
937 =head2 default_sqlt_deploy_hook
938
939 This is the default deploy hook implementation which checks if your
940 current Result class has a C<sqlt_deploy_hook> method, and if present
941 invokes it B<on the Result class directly>. This is to preserve the
942 semantics of C<sqlt_deploy_hook> which was originally designed to expect
943 the Result class name and the
944 L<$sqlt_table instance|SQL::Translator::Schema::Table> of the table being
945 deployed.
946
947 =cut
948
949 sub default_sqlt_deploy_hook {
950   my $self = shift;
951
952   my $class = $self->result_class;
953
954   if ($class and $class->can('sqlt_deploy_hook')) {
955     $class->sqlt_deploy_hook(@_);
956   }
957 }
958
959 sub _invoke_sqlt_deploy_hook {
960   my $self = shift;
961   if ( my $hook = $self->sqlt_deploy_callback) {
962     $self->$hook(@_);
963   }
964 }
965
966 =head2 resultset
967
968 =over 4
969
970 =item Arguments: None
971
972 =item Return value: $resultset
973
974 =back
975
976 Returns a resultset for the given source. This will initially be created
977 on demand by calling
978
979   $self->resultset_class->new($self, $self->resultset_attributes)
980
981 but is cached from then on unless resultset_class changes.
982
983 =head2 resultset_class
984
985 =over 4
986
987 =item Arguments: $classname
988
989 =item Return value: $classname
990
991 =back
992
993   package My::Schema::ResultSet::Artist;
994   use base 'DBIx::Class::ResultSet';
995   ...
996
997   # In the result class
998   __PACKAGE__->resultset_class('My::Schema::ResultSet::Artist');
999
1000   # Or in code
1001   $source->resultset_class('My::Schema::ResultSet::Artist');
1002
1003 Set the class of the resultset. This is useful if you want to create your
1004 own resultset methods. Create your own class derived from
1005 L<DBIx::Class::ResultSet>, and set it here. If called with no arguments,
1006 this method returns the name of the existing resultset class, if one
1007 exists.
1008
1009 =head2 resultset_attributes
1010
1011 =over 4
1012
1013 =item Arguments: \%attrs
1014
1015 =item Return value: \%attrs
1016
1017 =back
1018
1019   # In the result class
1020   __PACKAGE__->resultset_attributes({ order_by => [ 'id' ] });
1021
1022   # Or in code
1023   $source->resultset_attributes({ order_by => [ 'id' ] });
1024
1025 Store a collection of resultset attributes, that will be set on every
1026 L<DBIx::Class::ResultSet> produced from this result source. For a full
1027 list see L<DBIx::Class::ResultSet/ATTRIBUTES>.
1028
1029 =cut
1030
1031 sub resultset {
1032   my $self = shift;
1033   $self->throw_exception(
1034     'resultset does not take any arguments. If you want another resultset, '.
1035     'call it on the schema instead.'
1036   ) if scalar @_;
1037
1038   $self->resultset_class->new(
1039     $self,
1040     {
1041       try { %{$self->schema->default_resultset_attributes} },
1042       %{$self->{resultset_attributes}},
1043     },
1044   );
1045 }
1046
1047 =head2 name
1048
1049 =over 4
1050
1051 =item Arguments: None
1052
1053 =item Result value: $name
1054
1055 =back
1056
1057 Returns the name of the result source, which will typically be the table
1058 name. This may be a scalar reference if the result source has a non-standard
1059 name.
1060
1061 =head2 source_name
1062
1063 =over 4
1064
1065 =item Arguments: $source_name
1066
1067 =item Result value: $source_name
1068
1069 =back
1070
1071 Set an alternate name for the result source when it is loaded into a schema.
1072 This is useful if you want to refer to a result source by a name other than
1073 its class name.
1074
1075   package ArchivedBooks;
1076   use base qw/DBIx::Class/;
1077   __PACKAGE__->table('books_archive');
1078   __PACKAGE__->source_name('Books');
1079
1080   # from your schema...
1081   $schema->resultset('Books')->find(1);
1082
1083 =head2 from
1084
1085 =over 4
1086
1087 =item Arguments: None
1088
1089 =item Return value: FROM clause
1090
1091 =back
1092
1093   my $from_clause = $source->from();
1094
1095 Returns an expression of the source to be supplied to storage to specify
1096 retrieval from this source. In the case of a database, the required FROM
1097 clause contents.
1098
1099 =cut
1100
1101 sub from { die 'Virtual method!' }
1102
1103 =head2 schema
1104
1105 =over 4
1106
1107 =item Arguments: $schema
1108
1109 =item Return value: A schema object
1110
1111 =back
1112
1113   my $schema = $source->schema();
1114
1115 Sets and/or returns the L<DBIx::Class::Schema> object to which this
1116 result source instance has been attached to.
1117
1118 =cut
1119
1120 sub schema {
1121   if (@_ > 1) {
1122     $_[0]->{schema} = $_[1];
1123   }
1124   else {
1125     $_[0]->{schema} || do {
1126       my $name = $_[0]->{source_name} || '_unnamed_';
1127       my $err = 'Unable to perform storage-dependent operations with a detached result source '
1128               . "(source '$name' is not associated with a schema).";
1129
1130       $err .= ' You need to use $schema->thaw() or manually set'
1131             . ' $DBIx::Class::ResultSourceHandle::thaw_schema while thawing.'
1132         if $_[0]->{_detached_thaw};
1133
1134       DBIx::Class::Exception->throw($err);
1135     };
1136   }
1137 }
1138
1139 =head2 storage
1140
1141 =over 4
1142
1143 =item Arguments: None
1144
1145 =item Return value: A Storage object
1146
1147 =back
1148
1149   $source->storage->debug(1);
1150
1151 Returns the storage handle for the current schema.
1152
1153 See also: L<DBIx::Class::Storage>
1154
1155 =cut
1156
1157 sub storage { shift->schema->storage; }
1158
1159 =head2 add_relationship
1160
1161 =over 4
1162
1163 =item Arguments: $relname, $related_source_name, \%cond, [ \%attrs ]
1164
1165 =item Return value: 1/true if it succeeded
1166
1167 =back
1168
1169   $source->add_relationship('relname', 'related_source', $cond, $attrs);
1170
1171 L<DBIx::Class::Relationship> describes a series of methods which
1172 create pre-defined useful types of relationships. Look there first
1173 before using this method directly.
1174
1175 The relationship name can be arbitrary, but must be unique for each
1176 relationship attached to this result source. 'related_source' should
1177 be the name with which the related result source was registered with
1178 the current schema. For example:
1179
1180   $schema->source('Book')->add_relationship('reviews', 'Review', {
1181     'foreign.book_id' => 'self.id',
1182   });
1183
1184 The condition C<$cond> needs to be an L<SQL::Abstract>-style
1185 representation of the join between the tables. For example, if you're
1186 creating a relation from Author to Book,
1187
1188   { 'foreign.author_id' => 'self.id' }
1189
1190 will result in the JOIN clause
1191
1192   author me JOIN book foreign ON foreign.author_id = me.id
1193
1194 You can specify as many foreign => self mappings as necessary.
1195
1196 Valid attributes are as follows:
1197
1198 =over 4
1199
1200 =item join_type
1201
1202 Explicitly specifies the type of join to use in the relationship. Any
1203 SQL join type is valid, e.g. C<LEFT> or C<RIGHT>. It will be placed in
1204 the SQL command immediately before C<JOIN>.
1205
1206 =item proxy
1207
1208 An arrayref containing a list of accessors in the foreign class to proxy in
1209 the main class. If, for example, you do the following:
1210
1211   CD->might_have(liner_notes => 'LinerNotes', undef, {
1212     proxy => [ qw/notes/ ],
1213   });
1214
1215 Then, assuming LinerNotes has an accessor named notes, you can do:
1216
1217   my $cd = CD->find(1);
1218   # set notes -- LinerNotes object is created if it doesn't exist
1219   $cd->notes('Notes go here');
1220
1221 =item accessor
1222
1223 Specifies the type of accessor that should be created for the
1224 relationship. Valid values are C<single> (for when there is only a single
1225 related object), C<multi> (when there can be many), and C<filter> (for
1226 when there is a single related object, but you also want the relationship
1227 accessor to double as a column accessor). For C<multi> accessors, an
1228 add_to_* method is also created, which calls C<create_related> for the
1229 relationship.
1230
1231 =back
1232
1233 Throws an exception if the condition is improperly supplied, or cannot
1234 be resolved.
1235
1236 =cut
1237
1238 sub add_relationship {
1239   my ($self, $rel, $f_source_name, $cond, $attrs) = @_;
1240   $self->throw_exception("Can't create relationship without join condition")
1241     unless $cond;
1242   $attrs ||= {};
1243
1244   # Check foreign and self are right in cond
1245   if ( (ref $cond ||'') eq 'HASH') {
1246     for (keys %$cond) {
1247       $self->throw_exception("Keys of condition should be of form 'foreign.col', not '$_'")
1248         if /\./ && !/^foreign\./;
1249     }
1250   }
1251
1252   my %rels = %{ $self->_relationships };
1253   $rels{$rel} = { class => $f_source_name,
1254                   source => $f_source_name,
1255                   cond  => $cond,
1256                   attrs => $attrs };
1257   $self->_relationships(\%rels);
1258
1259   return $self;
1260
1261 # XXX disabled. doesn't work properly currently. skip in tests.
1262
1263   my $f_source = $self->schema->source($f_source_name);
1264   unless ($f_source) {
1265     $self->ensure_class_loaded($f_source_name);
1266     $f_source = $f_source_name->result_source;
1267     #my $s_class = ref($self->schema);
1268     #$f_source_name =~ m/^${s_class}::(.*)$/;
1269     #$self->schema->register_class(($1 || $f_source_name), $f_source_name);
1270     #$f_source = $self->schema->source($f_source_name);
1271   }
1272   return unless $f_source; # Can't test rel without f_source
1273
1274   try { $self->_resolve_join($rel, 'me', {}, []) }
1275   catch {
1276     # If the resolve failed, back out and re-throw the error
1277     delete $rels{$rel};
1278     $self->_relationships(\%rels);
1279     $self->throw_exception("Error creating relationship $rel: $_");
1280   };
1281
1282   1;
1283 }
1284
1285 =head2 relationships
1286
1287 =over 4
1288
1289 =item Arguments: None
1290
1291 =item Return value: List of relationship names
1292
1293 =back
1294
1295   my @relnames = $source->relationships();
1296
1297 Returns all relationship names for this source.
1298
1299 =cut
1300
1301 sub relationships {
1302   return keys %{shift->_relationships};
1303 }
1304
1305 =head2 relationship_info
1306
1307 =over 4
1308
1309 =item Arguments: $relname
1310
1311 =item Return value: Hashref of relation data,
1312
1313 =back
1314
1315 Returns a hash of relationship information for the specified relationship
1316 name. The keys/values are as specified for L</add_relationship>.
1317
1318 =cut
1319
1320 sub relationship_info {
1321   my ($self, $rel) = @_;
1322   return $self->_relationships->{$rel};
1323 }
1324
1325 =head2 has_relationship
1326
1327 =over 4
1328
1329 =item Arguments: $rel
1330
1331 =item Return value: 1/0 (true/false)
1332
1333 =back
1334
1335 Returns true if the source has a relationship of this name, false otherwise.
1336
1337 =cut
1338
1339 sub has_relationship {
1340   my ($self, $rel) = @_;
1341   return exists $self->_relationships->{$rel};
1342 }
1343
1344 =head2 reverse_relationship_info
1345
1346 =over 4
1347
1348 =item Arguments: $relname
1349
1350 =item Return value: Hashref of relationship data
1351
1352 =back
1353
1354 Looks through all the relationships on the source this relationship
1355 points to, looking for one whose condition is the reverse of the
1356 condition on this relationship.
1357
1358 A common use of this is to find the name of the C<belongs_to> relation
1359 opposing a C<has_many> relation. For definition of these look in
1360 L<DBIx::Class::Relationship>.
1361
1362 The returned hashref is keyed by the name of the opposing
1363 relationship, and contains its data in the same manner as
1364 L</relationship_info>.
1365
1366 =cut
1367
1368 sub reverse_relationship_info {
1369   my ($self, $rel) = @_;
1370
1371   my $rel_info = $self->relationship_info($rel)
1372     or $self->throw_exception("No such relationship '$rel'");
1373
1374   my $ret = {};
1375
1376   return $ret unless ((ref $rel_info->{cond}) eq 'HASH');
1377
1378   my $stripped_cond = $self->__strip_relcond ($rel_info->{cond});
1379
1380   my $rsrc_schema_moniker = $self->source_name
1381     if try { $self->schema };
1382
1383   # this may be a partial schema or something else equally esoteric
1384   my $other_rsrc = try { $self->related_source($rel) }
1385     or return $ret;
1386
1387   # Get all the relationships for that source that related to this source
1388   # whose foreign column set are our self columns on $rel and whose self
1389   # columns are our foreign columns on $rel
1390   foreach my $other_rel ($other_rsrc->relationships) {
1391
1392     # only consider stuff that points back to us
1393     # "us" here is tricky - if we are in a schema registration, we want
1394     # to use the source_names, otherwise we will use the actual classes
1395
1396     # the schema may be partial
1397     my $roundtrip_rsrc = try { $other_rsrc->related_source($other_rel) }
1398       or next;
1399
1400     if ($rsrc_schema_moniker and try { $roundtrip_rsrc->schema } ) {
1401       next unless $rsrc_schema_moniker eq $roundtrip_rsrc->source_name;
1402     }
1403     else {
1404       next unless $self->result_class eq $roundtrip_rsrc->result_class;
1405     }
1406
1407     my $other_rel_info = $other_rsrc->relationship_info($other_rel);
1408
1409     # this can happen when we have a self-referential class
1410     next if $other_rel_info eq $rel_info;
1411
1412     next unless ref $other_rel_info->{cond} eq 'HASH';
1413     my $other_stripped_cond = $self->__strip_relcond($other_rel_info->{cond});
1414
1415     $ret->{$other_rel} = $other_rel_info if (
1416       $self->_compare_relationship_keys (
1417         [ keys %$stripped_cond ], [ values %$other_stripped_cond ]
1418       )
1419         and
1420       $self->_compare_relationship_keys (
1421         [ values %$stripped_cond ], [ keys %$other_stripped_cond ]
1422       )
1423     );
1424   }
1425
1426   return $ret;
1427 }
1428
1429 # all this does is removes the foreign/self prefix from a condition
1430 sub __strip_relcond {
1431   +{
1432     map
1433       { map { /^ (?:foreign|self) \. (\w+) $/x } ($_, $_[1]{$_}) }
1434       keys %{$_[1]}
1435   }
1436 }
1437
1438 sub compare_relationship_keys {
1439   carp 'compare_relationship_keys is a private method, stop calling it';
1440   my $self = shift;
1441   $self->_compare_relationship_keys (@_);
1442 }
1443
1444 # Returns true if both sets of keynames are the same, false otherwise.
1445 sub _compare_relationship_keys {
1446 #  my ($self, $keys1, $keys2) = @_;
1447   return
1448     join ("\x00", sort @{$_[1]})
1449       eq
1450     join ("\x00", sort @{$_[2]})
1451   ;
1452 }
1453
1454 # optionally takes either an arrayref of column names, or a hashref of already
1455 # retrieved colinfos
1456 # returns an arrayref of column names of the shortest unique constraint
1457 # (matching some of the input if any), giving preference to the PK
1458 sub _identifying_column_set {
1459   my ($self, $cols) = @_;
1460
1461   my %unique = $self->unique_constraints;
1462   my $colinfos = ref $cols eq 'HASH' ? $cols : $self->columns_info($cols||());
1463
1464   # always prefer the PK first, and then shortest constraints first
1465   USET:
1466   for my $set (delete $unique{primary}, sort { @$a <=> @$b } (values %unique) ) {
1467     next unless $set && @$set;
1468
1469     for (@$set) {
1470       next USET unless ($colinfos->{$_} && !$colinfos->{$_}{is_nullable} );
1471     }
1472
1473     # copy so we can mangle it at will
1474     return [ @$set ];
1475   }
1476
1477   return undef;
1478 }
1479
1480 # Returns the {from} structure used to express JOIN conditions
1481 sub _resolve_join {
1482   my ($self, $join, $alias, $seen, $jpath, $parent_force_left) = @_;
1483
1484   # we need a supplied one, because we do in-place modifications, no returns
1485   $self->throw_exception ('You must supply a seen hashref as the 3rd argument to _resolve_join')
1486     unless ref $seen eq 'HASH';
1487
1488   $self->throw_exception ('You must supply a joinpath arrayref as the 4th argument to _resolve_join')
1489     unless ref $jpath eq 'ARRAY';
1490
1491   $jpath = [@$jpath]; # copy
1492
1493   if (not defined $join or not length $join) {
1494     return ();
1495   }
1496   elsif (ref $join eq 'ARRAY') {
1497     return
1498       map {
1499         $self->_resolve_join($_, $alias, $seen, $jpath, $parent_force_left);
1500       } @$join;
1501   }
1502   elsif (ref $join eq 'HASH') {
1503
1504     my @ret;
1505     for my $rel (keys %$join) {
1506
1507       my $rel_info = $self->relationship_info($rel)
1508         or $self->throw_exception("No such relationship '$rel' on " . $self->source_name);
1509
1510       my $force_left = $parent_force_left;
1511       $force_left ||= lc($rel_info->{attrs}{join_type}||'') eq 'left';
1512
1513       # the actual seen value will be incremented by the recursion
1514       my $as = $self->storage->relname_to_table_alias(
1515         $rel, ($seen->{$rel} && $seen->{$rel} + 1)
1516       );
1517
1518       push @ret, (
1519         $self->_resolve_join($rel, $alias, $seen, [@$jpath], $force_left),
1520         $self->related_source($rel)->_resolve_join(
1521           $join->{$rel}, $as, $seen, [@$jpath, { $rel => $as }], $force_left
1522         )
1523       );
1524     }
1525     return @ret;
1526
1527   }
1528   elsif (ref $join) {
1529     $self->throw_exception("No idea how to resolve join reftype ".ref $join);
1530   }
1531   else {
1532     my $count = ++$seen->{$join};
1533     my $as = $self->storage->relname_to_table_alias(
1534       $join, ($count > 1 && $count)
1535     );
1536
1537     my $rel_info = $self->relationship_info($join)
1538       or $self->throw_exception("No such relationship $join on " . $self->source_name);
1539
1540     my $rel_src = $self->related_source($join);
1541     return [ { $as => $rel_src->from,
1542                -rsrc => $rel_src,
1543                -join_type => $parent_force_left
1544                   ? 'left'
1545                   : $rel_info->{attrs}{join_type}
1546                 ,
1547                -join_path => [@$jpath, { $join => $as } ],
1548                -is_single => (
1549                   (! $rel_info->{attrs}{accessor})
1550                     or
1551                   first { $rel_info->{attrs}{accessor} eq $_ } (qw/single filter/)
1552                 ),
1553                -alias => $as,
1554                -relation_chain_depth => $seen->{-relation_chain_depth} || 0,
1555              },
1556              scalar $self->_resolve_condition($rel_info->{cond}, $as, $alias, $join)
1557           ];
1558   }
1559 }
1560
1561 sub pk_depends_on {
1562   carp 'pk_depends_on is a private method, stop calling it';
1563   my $self = shift;
1564   $self->_pk_depends_on (@_);
1565 }
1566
1567 # Determines whether a relation is dependent on an object from this source
1568 # having already been inserted. Takes the name of the relationship and a
1569 # hashref of columns of the related object.
1570 sub _pk_depends_on {
1571   my ($self, $relname, $rel_data) = @_;
1572
1573   my $relinfo = $self->relationship_info($relname);
1574
1575   # don't assume things if the relationship direction is specified
1576   return $relinfo->{attrs}{is_foreign_key_constraint}
1577     if exists ($relinfo->{attrs}{is_foreign_key_constraint});
1578
1579   my $cond = $relinfo->{cond};
1580   return 0 unless ref($cond) eq 'HASH';
1581
1582   # map { foreign.foo => 'self.bar' } to { bar => 'foo' }
1583   my $keyhash = { map { my $x = $_; $x =~ s/.*\.//; $x; } reverse %$cond };
1584
1585   # assume anything that references our PK probably is dependent on us
1586   # rather than vice versa, unless the far side is (a) defined or (b)
1587   # auto-increment
1588   my $rel_source = $self->related_source($relname);
1589
1590   foreach my $p ($self->primary_columns) {
1591     if (exists $keyhash->{$p}) {
1592       unless (defined($rel_data->{$keyhash->{$p}})
1593               || $rel_source->column_info($keyhash->{$p})
1594                             ->{is_auto_increment}) {
1595         return 0;
1596       }
1597     }
1598   }
1599
1600   return 1;
1601 }
1602
1603 sub resolve_condition {
1604   carp 'resolve_condition is a private method, stop calling it';
1605   my $self = shift;
1606   $self->_resolve_condition (@_);
1607 }
1608
1609 our $UNRESOLVABLE_CONDITION = \ '1 = 0';
1610
1611 # Resolves the passed condition to a concrete query fragment and a flag
1612 # indicating whether this is a cross-table condition. Also an optional
1613 # list of non-triviail values (notmally conditions) returned as a part
1614 # of a joinfree condition hash
1615 sub _resolve_condition {
1616   my ($self, $cond, $as, $for, $relname) = @_;
1617
1618   my $obj_rel = !!blessed $for;
1619
1620   if (ref $cond eq 'CODE') {
1621     my $relalias = $obj_rel ? 'me' : $as;
1622
1623     my ($crosstable_cond, $joinfree_cond) = $cond->({
1624       self_alias => $obj_rel ? $as : $for,
1625       foreign_alias => $relalias,
1626       self_resultsource => $self,
1627       foreign_relname => $relname || ($obj_rel ? $as : $for),
1628       self_rowobj => $obj_rel ? $for : undef
1629     });
1630
1631     my $cond_cols;
1632     if ($joinfree_cond) {
1633
1634       # FIXME sanity check until things stabilize, remove at some point
1635       $self->throw_exception (
1636         "A join-free condition returned for relationship '$relname' without a row-object to chain from"
1637       ) unless $obj_rel;
1638
1639       # FIXME another sanity check
1640       if (
1641         ref $joinfree_cond ne 'HASH'
1642           or
1643         first { $_ !~ /^\Q$relalias.\E.+/ } keys %$joinfree_cond
1644       ) {
1645         $self->throw_exception (
1646           "The join-free condition returned for relationship '$relname' must be a hash "
1647          .'reference with all keys being valid columns on the related result source'
1648         );
1649       }
1650
1651       # normalize
1652       for (values %$joinfree_cond) {
1653         $_ = $_->{'='} if (
1654           ref $_ eq 'HASH'
1655             and
1656           keys %$_ == 1
1657             and
1658           exists $_->{'='}
1659         );
1660       }
1661
1662       # see which parts of the joinfree cond are conditionals
1663       my $relcol_list = { map { $_ => 1 } $self->related_source($relname)->columns };
1664
1665       for my $c (keys %$joinfree_cond) {
1666         my ($colname) = $c =~ /^ (?: \Q$relalias.\E )? (.+)/x;
1667
1668         unless ($relcol_list->{$colname}) {
1669           push @$cond_cols, $colname;
1670           next;
1671         }
1672
1673         if (
1674           ref $joinfree_cond->{$c}
1675             and
1676           ref $joinfree_cond->{$c} ne 'SCALAR'
1677             and
1678           ref $joinfree_cond->{$c} ne 'REF'
1679         ) {
1680           push @$cond_cols, $colname;
1681           next;
1682         }
1683       }
1684
1685       return wantarray ? ($joinfree_cond, 0, $cond_cols) : $joinfree_cond;
1686     }
1687     else {
1688       return wantarray ? ($crosstable_cond, 1) : $crosstable_cond;
1689     }
1690   }
1691   elsif (ref $cond eq 'HASH') {
1692     my %ret;
1693     foreach my $k (keys %{$cond}) {
1694       my $v = $cond->{$k};
1695       # XXX should probably check these are valid columns
1696       $k =~ s/^foreign\.// ||
1697         $self->throw_exception("Invalid rel cond key ${k}");
1698       $v =~ s/^self\.// ||
1699         $self->throw_exception("Invalid rel cond val ${v}");
1700       if (ref $for) { # Object
1701         #warn "$self $k $for $v";
1702         unless ($for->has_column_loaded($v)) {
1703           if ($for->in_storage) {
1704             $self->throw_exception(sprintf
1705               "Unable to resolve relationship '%s' from object %s: column '%s' not "
1706             . 'loaded from storage (or not passed to new() prior to insert()). You '
1707             . 'probably need to call ->discard_changes to get the server-side defaults '
1708             . 'from the database.',
1709               $as,
1710               $for,
1711               $v,
1712             );
1713           }
1714           return $UNRESOLVABLE_CONDITION;
1715         }
1716         $ret{$k} = $for->get_column($v);
1717         #$ret{$k} = $for->get_column($v) if $for->has_column_loaded($v);
1718         #warn %ret;
1719       } elsif (!defined $for) { # undef, i.e. "no object"
1720         $ret{$k} = undef;
1721       } elsif (ref $as eq 'HASH') { # reverse hashref
1722         $ret{$v} = $as->{$k};
1723       } elsif (ref $as) { # reverse object
1724         $ret{$v} = $as->get_column($k);
1725       } elsif (!defined $as) { # undef, i.e. "no reverse object"
1726         $ret{$v} = undef;
1727       } else {
1728         $ret{"${as}.${k}"} = { -ident => "${for}.${v}" };
1729       }
1730     }
1731
1732     return wantarray
1733       ? ( \%ret, ($obj_rel || !defined $as || ref $as) ? 0 : 1 )
1734       : \%ret
1735     ;
1736   }
1737   elsif (ref $cond eq 'ARRAY') {
1738     my (@ret, $crosstable);
1739     for (@$cond) {
1740       my ($cond, $crosstab) = $self->_resolve_condition($_, $as, $for, $relname);
1741       push @ret, $cond;
1742       $crosstable ||= $crosstab;
1743     }
1744     return wantarray ? (\@ret, $crosstable) : \@ret;
1745   }
1746   else {
1747     $self->throw_exception ("Can't handle condition $cond for relationship '$relname' yet :(");
1748   }
1749 }
1750
1751 # Accepts one or more relationships for the current source and returns an
1752 # array of column names for each of those relationships. Column names are
1753 # prefixed relative to the current source, in accordance with where they appear
1754 # in the supplied relationships.
1755 sub _resolve_prefetch {
1756   my ($self, $pre, $alias, $alias_map, $order, $pref_path) = @_;
1757   $pref_path ||= [];
1758
1759   if (not defined $pre or not length $pre) {
1760     return ();
1761   }
1762   elsif( ref $pre eq 'ARRAY' ) {
1763     return
1764       map { $self->_resolve_prefetch( $_, $alias, $alias_map, $order, [ @$pref_path ] ) }
1765         @$pre;
1766   }
1767   elsif( ref $pre eq 'HASH' ) {
1768     my @ret =
1769     map {
1770       $self->_resolve_prefetch($_, $alias, $alias_map, $order, [ @$pref_path ] ),
1771       $self->related_source($_)->_resolve_prefetch(
1772                $pre->{$_}, "${alias}.$_", $alias_map, $order, [ @$pref_path, $_] )
1773     } keys %$pre;
1774     return @ret;
1775   }
1776   elsif( ref $pre ) {
1777     $self->throw_exception(
1778       "don't know how to resolve prefetch reftype ".ref($pre));
1779   }
1780   else {
1781     my $p = $alias_map;
1782     $p = $p->{$_} for (@$pref_path, $pre);
1783
1784     $self->throw_exception (
1785       "Unable to resolve prefetch '$pre' - join alias map does not contain an entry for path: "
1786       . join (' -> ', @$pref_path, $pre)
1787     ) if (ref $p->{-join_aliases} ne 'ARRAY' or not @{$p->{-join_aliases}} );
1788
1789     my $as = shift @{$p->{-join_aliases}};
1790
1791     my $rel_info = $self->relationship_info( $pre );
1792     $self->throw_exception( $self->source_name . " has no such relationship '$pre'" )
1793       unless $rel_info;
1794
1795     my $as_prefix = ($alias =~ /^.*?\.(.+)$/ ? $1.'.' : '');
1796     my $rel_source = $self->related_source($pre);
1797
1798     if ($rel_info->{attrs}{accessor} && $rel_info->{attrs}{accessor} eq 'multi') {
1799       $self->throw_exception(
1800         "Can't prefetch has_many ${pre} (join cond too complex)")
1801         unless ref($rel_info->{cond}) eq 'HASH';
1802       my $dots = @{[$as_prefix =~ m/\./g]} + 1; # +1 to match the ".${as_prefix}"
1803
1804       #my @col = map { (/^self\.(.+)$/ ? ("${as_prefix}.$1") : ()); }
1805       #              values %{$rel_info->{cond}};
1806       my @key = map { (/^foreign\.(.+)$/ ? ($1) : ()); }
1807                     keys %{$rel_info->{cond}};
1808
1809       push @$order, map { "${as}.$_" } @key;
1810
1811       if (my $rel_order = $rel_info->{attrs}{order_by}) {
1812         # this is kludgy and incomplete, I am well aware
1813         # but the parent method is going away entirely anyway
1814         # so sod it
1815         my $sql_maker = $self->storage->sql_maker;
1816         my ($orig_ql, $orig_qr) = $sql_maker->_quote_chars;
1817         my $sep = $sql_maker->name_sep;
1818
1819         # install our own quoter, so we can catch unqualified stuff
1820         local $sql_maker->{quote_char} = ["\x00", "\xFF"];
1821
1822         my $quoted_prefix = "\x00${as}\xFF";
1823
1824         for my $chunk ( $sql_maker->_order_by_chunks ($rel_order) ) {
1825           my @bind;
1826           ($chunk, @bind) = @$chunk if ref $chunk;
1827
1828           $chunk = "${quoted_prefix}${sep}${chunk}"
1829             unless $chunk =~ /\Q$sep/;
1830
1831           $chunk =~ s/\x00/$orig_ql/g;
1832           $chunk =~ s/\xFF/$orig_qr/g;
1833           push @$order, \[$chunk, @bind];
1834         }
1835       }
1836     }
1837
1838     return map { [ "${as}.$_", "${as_prefix}${pre}.$_", ] }
1839       $rel_source->columns;
1840   }
1841 }
1842
1843 # adding a dep on MoreUtils *just* for this is retarded
1844 my $unique_numlist = sub { [ sort { $a <=> $b } keys %{ {map { $_ => 1 } @_ }} ] };
1845
1846 # This error must be thrown from two distinct codepaths, joining them is
1847 # rather hard. Go for this hack instead.
1848 my $get_related_source = sub {
1849   my ($rsrc, $rel, $relcols) = @_;
1850   try {
1851     $rsrc->related_source ($rel)
1852   } catch {
1853     $rsrc->throw_exception(sprintf(
1854       "Can't inflate prefetch into non-existent relationship '%s' from '%s', "
1855     . "check the inflation specification (columns/as) ending in '...%s.%s'.",
1856       $rel,
1857       $rsrc->source_name,
1858       $rel,
1859       (sort { length($a) <=> length ($b) } keys %$relcols)[0],
1860   ))};
1861 };
1862
1863 # Takes a selection list and generates a collapse-map representing
1864 # row-object fold-points. Every relationship is assigned a set of unique,
1865 # non-nullable columns (which may *not even be* from the same resultset)
1866 # and the collapser will use this information to correctly distinguish
1867 # data of individual to-be-row-objects.
1868 sub _resolve_collapse {
1869   my ($self, $as, $as_fq_idx, $rel_chain, $parent_info, $node_idx_ref) = @_;
1870
1871   # for comprehensible error messages put ourselves at the head of the relationship chain
1872   $rel_chain ||= [ $self->source_name ];
1873
1874   # record top-level fully-qualified column index
1875   $as_fq_idx ||= { %$as };
1876
1877   my ($my_cols, $rel_cols);
1878   for (keys %$as) {
1879     if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
1880       $rel_cols->{$1}{$2} = 1;
1881     }
1882     else {
1883       $my_cols->{$_} = {};  # important for ||= below
1884     }
1885   }
1886
1887   my $relinfo;
1888   # run through relationships, collect metadata, inject non-left fk-bridges from
1889   # *INNER-JOINED* children (if any)
1890   for my $rel (keys %$rel_cols) {
1891     my $rel_src = $get_related_source->($self, $rel, $rel_cols->{$rel});
1892
1893     my $inf = $self->relationship_info ($rel);
1894
1895     $relinfo->{$rel}{is_single} = $inf->{attrs}{accessor} && $inf->{attrs}{accessor} ne 'multi';
1896     $relinfo->{$rel}{is_inner} = ( $inf->{attrs}{join_type} || '' ) !~ /^left/i;
1897     $relinfo->{$rel}{rsrc} = $rel_src;
1898
1899     my $cond = $inf->{cond};
1900
1901     if (
1902       ref $cond eq 'HASH'
1903         and
1904       keys %$cond
1905         and
1906       ! first { $_ !~ /^foreign\./ } (keys %$cond)
1907         and
1908       ! first { $_ !~ /^self\./ } (values %$cond)
1909     ) {
1910       for my $f (keys %$cond) {
1911         my $s = $cond->{$f};
1912         $_ =~ s/^ (?: foreign | self ) \.//x for ($f, $s);
1913         $relinfo->{$rel}{fk_map}{$s} = $f;
1914
1915         # need to know source from *our* pov, hnce $rel.
1916         $my_cols->{$s} ||= { via_fk => "$rel.$f" } if (
1917           defined $rel_cols->{$rel}{$f} # in fact selected
1918             and
1919           (! $node_idx_ref or $relinfo->{$rel}{is_inner}) # either top-level or an inner join
1920         );
1921       }
1922     }
1923   }
1924
1925   # if the parent is already defined, assume all of its related FKs are selected
1926   # (even if they in fact are NOT in the select list). Keep a record of what we
1927   # assumed, and if any such phantom-column becomes part of our own collapser,
1928   # throw everything assumed-from-parent away and replace with the collapser of
1929   # the parent (whatever it may be)
1930   my $assumed_from_parent;
1931   unless ($parent_info->{underdefined}) {
1932     $assumed_from_parent->{columns} = { map
1933       # only add to the list if we do not already select said columns
1934       { ! exists $my_cols->{$_} ? ( $_ => 1 ) : () }
1935       values %{$parent_info->{rel_condition} || {}}
1936     };
1937
1938     $my_cols->{$_} = { via_collapse => $parent_info->{collapse_on} }
1939       for keys %{$assumed_from_parent->{columns}};
1940   }
1941
1942   # get colinfo for everything
1943   if ($my_cols) {
1944     my $ci = $self->columns_info;
1945     $my_cols->{$_}{colinfo} = $ci->{$_} for keys %$my_cols;
1946   }
1947
1948   my $collapse_map;
1949
1950   # try to resolve based on our columns (plus already inserted FK bridges)
1951   if (
1952     $my_cols
1953       and
1954     my $uset = $self->_unique_column_set ($my_cols)
1955   ) {
1956     # see if the resulting collapser relies on any implied columns,
1957     # and fix stuff up if this is the case
1958
1959     my $parent_collapser_used = defined delete @{$uset}{keys %{$assumed_from_parent->{columns}}};
1960     $collapse_map->{-node_id} = $unique_numlist->(
1961       $parent_collapser_used ? @{$parent_info->{collapse_on}} : (),
1962       (map
1963         {
1964           my $fqc = join ('.',
1965             @{$rel_chain}[1 .. $#$rel_chain],
1966             ( $my_cols->{$_}{via_fk} || $_ ),
1967           );
1968
1969           $as_fq_idx->{$fqc};
1970         }
1971         keys %$uset
1972       ),
1973     );
1974   }
1975
1976   # Stil don't know how to collapse - keep descending down 1:1 chains - if
1977   # a related non-LEFT 1:1 is resolvable - its condition will collapse us
1978   # too
1979   unless ($collapse_map->{-node_id}) {
1980     my @candidates;
1981
1982     for my $rel (keys %$relinfo) {
1983       next unless ($relinfo->{$rel}{is_single} && $relinfo->{$rel}{is_inner});
1984
1985       if ( my $rel_collapse = $relinfo->{$rel}{rsrc}->_resolve_collapse (
1986         $rel_cols->{$rel},
1987         $as_fq_idx,
1988         [ @$rel_chain, $rel ],
1989         { underdefined => 1 }
1990       )) {
1991         push @candidates, $rel_collapse->{-node_id};
1992       }
1993     }
1994
1995     # get the set with least amount of columns
1996     # FIXME - maybe need to implement a data type order as well (i.e. prefer several ints
1997     # to a single varchar)
1998     if (@candidates) {
1999       ($collapse_map->{-node_id}) = sort { scalar @$a <=> scalar @$b } (@candidates);
2000     }
2001   }
2002
2003   # Still dont know how to collapse - see if the parent passed us anything
2004   # (i.e. reuse collapser over 1:1)
2005   unless ($collapse_map->{-node_id}) {
2006     $collapse_map->{-node_id} = $parent_info->{collapse_on}
2007       if $parent_info->{collapser_reusable};
2008   }
2009
2010   # stop descending into children if we were called by a parent for first-pass
2011   # and don't despair if nothing was found (there may be other parallel branches
2012   # to dive into)
2013   if ($parent_info->{underdefined}) {
2014     return $collapse_map->{-node_id} ? $collapse_map : undef
2015   }
2016   # nothing down the chain resolved - can't calculate a collapse-map
2017   elsif (! $collapse_map->{-node_id}) {
2018     $self->throw_exception ( sprintf
2019       "Unable to calculate a definitive collapse column set for %s%s: fetch more unique non-nullable columns",
2020       $self->source_name,
2021       @$rel_chain > 1
2022         ? sprintf (' (last member of the %s chain)', join ' -> ', @$rel_chain )
2023         : ''
2024       ,
2025     );
2026   }
2027
2028   # If we got that far - we are collapsable - GREAT! Now go down all children
2029   # a second time, and fill in the rest
2030
2031   $collapse_map->{-is_optional} = 1 if $parent_info->{is_optional};
2032   $collapse_map->{-node_index} = ${ $node_idx_ref ||= \do { my $x = 1 } }++;  # this is *deliberately* not 0-based
2033
2034   my (@id_sets, $multis_in_chain);
2035   for my $rel (sort keys %$relinfo) {
2036
2037     $collapse_map->{$rel} = $relinfo->{$rel}{rsrc}->_resolve_collapse (
2038       { map { $_ => 1 } ( keys %{$rel_cols->{$rel}} ) },
2039
2040       $as_fq_idx,
2041
2042       [ @$rel_chain, $rel],
2043
2044       {
2045         collapse_on => [ @{$collapse_map->{-node_id}} ],
2046
2047         rel_condition => $relinfo->{$rel}{fk_map},
2048
2049         is_optional => $collapse_map->{-is_optional},
2050
2051         # if this is a 1:1 our own collapser can be used as a collapse-map
2052         # (regardless of left or not)
2053         collapser_reusable => $relinfo->{$rel}{is_single},
2054       },
2055
2056       $node_idx_ref,
2057     );
2058
2059     $collapse_map->{$rel}{-is_single} = 1 if $relinfo->{$rel}{is_single};
2060     $collapse_map->{$rel}{-is_optional} ||= 1 unless $relinfo->{$rel}{is_inner};
2061     push @id_sets, @{ $collapse_map->{$rel}{-branch_id} };
2062   }
2063
2064   $collapse_map->{-branch_id} = $unique_numlist->( @id_sets, @{$collapse_map->{-node_id}} );
2065
2066   return $collapse_map;
2067 }
2068
2069 sub _unique_column_set {
2070   my ($self, $cols) = @_;
2071
2072   my %unique = $self->unique_constraints;
2073
2074   # always prefer the PK first, and then shortest constraints first
2075   USET:
2076   for my $set (delete $unique{primary}, sort { @$a <=> @$b } (values %unique) ) {
2077     next unless $set && @$set;
2078
2079     for (@$set) {
2080       next USET unless ($cols->{$_} && $cols->{$_}{colinfo} && !$cols->{$_}{colinfo}{is_nullable} );
2081     }
2082
2083     return { map { $_ => 1 } @$set };
2084   }
2085
2086   return undef;
2087 }
2088
2089 # Takes an arrayref of {as} dbic column aliases and the collapse and select
2090 # attributes from the same $rs (the slector requirement is a temporary
2091 # workaround), and returns a coderef capable of:
2092 # my $me_pref_clps = $coderef->([$rs->cursor->next])
2093 # Where the $me_pref_clps arrayref is the future argument to
2094 # ::ResultSet::_collapse_result.
2095 #
2096 # $me_pref_clps->[0] is always returned (even if as an empty hash with no
2097 # rowdata), however branches of related data in $me_pref_clps->[1] may be
2098 # pruned short of what was originally requested based on {as}, depending
2099 # on:
2100 #
2101 # * If collapse is requested, a definitive collapse map is calculated for
2102 #   every relationship "fold-point", consisting of a set of values (which
2103 #   may not even be contained in the future 'me' of said relationship
2104 #   (for example a cd.artist_id defines the related inner-joined artist)).
2105 #   Thus a definedness check is carried on all collapse-condition values
2106 #   and if at least one is undef it is assumed that we are dealing with a
2107 #   NULLed right-side of a left-join, so we don't return a related data
2108 #   container at all, which implies no related objects
2109 #
2110 # * If we are not collapsing, there is no constraint on having a selector
2111 #   uniquely identifying all possible objects, and the user might have very
2112 #   well requested a column that just *happens* to be all NULLs. What we do
2113 #   in this case is fallback to the old behavior (which is a potential FIXME)
2114 #   by always returning a data container, but only filling it with columns
2115 #   IFF at least one of them is defined. This way we do not get an object
2116 #   with a bunch of has_column_loaded to undef, but at the same time do not
2117 #   further relationships based off this "null" object (e.g. in case the user
2118 #   deliberately skipped link-table values). I am pretty sure there are some
2119 #   tests that codify this behavior, need to find the exact testname.
2120 #
2121 # For an example of this coderef in action (and to see its guts) look at
2122 # t/prefetch/_internals.t
2123 #
2124 # This is a huge performance win, as we call the same code for
2125 # every row returned from the db, thus avoiding repeated method
2126 # lookups when traversing relationships
2127 #
2128 # Also since the coderef is completely stateless (the returned structure is
2129 # always fresh on every new invocation) this is a very good opportunity for
2130 # memoization if further speed improvements are needed
2131 #
2132 # The way we construct this coderef is somewhat fugly, although I am not
2133 # sure if the string eval is *that* bad of an idea. The alternative is to
2134 # have a *very* large number of anon coderefs calling each other in a twisty
2135 # maze, whereas the current result is a nice, smooth, single-pass function.
2136 # In any case - the output of this thing is meticulously micro-tested, so
2137 # any sort of rewrite should be relatively easy
2138 #
2139 sub _mk_row_parser {
2140   my ($self, $args) = @_;
2141
2142   my $inflate_index = { map
2143     { $args->{inflate_map}[$_] => $_ }
2144     ( 0 .. $#{$args->{inflate_map}} )
2145   };
2146
2147   my ($parser_src);
2148   if ($args->{collapse}) {
2149     # FIXME - deal with unorderedness
2150     #    unordered => $unordered
2151
2152     my $collapse_map = $self->_resolve_collapse (
2153       # FIXME
2154       # only consider real columns (not functions) during collapse resolution
2155       # this check shouldn't really be here, as fucktards are not supposed to
2156       # alias random crap to existing column names anyway, but still - just in
2157       # case
2158       # FIXME !!!! - this does not yet deal with unbalanced selectors correctly
2159       # (it is now trivial as the attrs specify where things go out of sync)
2160       { map
2161         { ref $args->{selection}[$inflate_index->{$_}] ? () : ( $_ => $inflate_index->{$_} ) }
2162         keys %$inflate_index
2163       }
2164     );
2165
2166     my $unrolled_top_branch_id_indexes = join (', ', @{$collapse_map->{-branch_id}});
2167
2168     my ($sequenced_top_branch_id, $sequenced_top_node_id) = map
2169       { join ('', map { "{'\xFF__IDVALPOS__${_}__\xFF'}" } @$_ ) }
2170       $collapse_map->{-branch_id}, $collapse_map->{-node_id}
2171     ;
2172
2173     my $rolled_out_assemblers = __visit_infmap_collapse (
2174       $inflate_index, $collapse_map
2175     );
2176
2177     my @sprintf_args = (
2178       $unrolled_top_branch_id_indexes,
2179       $sequenced_top_branch_id,
2180       $sequenced_top_node_id,
2181       $rolled_out_assemblers,
2182       $sequenced_top_node_id,
2183     );
2184     $parser_src = sprintf (<<'EOS', @sprintf_args);
2185
2186 ### BEGIN STRING EVAL
2187   my ($rows_pos, $result_pos, $cur_row, @cur_row_id_values, $is_new_res, @collapse_idx) = (0,0);
2188
2189   # this loop is a bit arcane - the rationale is that the passed in
2190   # $_[0] will either have only one row (->next) or will have all
2191   # rows already pulled in (->all and/or unordered). Given that the
2192   # result can be rather large - we reuse the same already allocated
2193   # array, since the collapsed prefetch is smaller by definition.
2194   # At the end we cut the leftovers away and move on.
2195   while ($cur_row =
2196     ($rows_pos >= 0 and $_[0][$rows_pos++] or do { $rows_pos = -1; 0 } )
2197       ||
2198     ($_[1] and $_[1]->())
2199   ) {
2200
2201     # FIXME
2202     # optimize this away when we know we have no undefs in the collapse map
2203     $cur_row_id_values[$_] = defined $cur_row->[$_] ? $cur_row->[$_] : "\xFF\xFFN\xFFU\xFFL\xFFL\xFF\xFF"
2204       for (%s); # the top branch_id includes all id values
2205
2206     # check top branch for doubling via a has_many non-selecting join or something
2207     # 0 is reserved for this (node indexes start from 1)
2208     next if $collapse_idx[0]%s++;
2209
2210     $is_new_res = ! $collapse_idx[1]%s;
2211
2212     # lazify
2213     # fire on ordered only
2214 #    if ($is_new_res = ! $collapse_idx[1]{$cur_row_id_values[2]}) {
2215 #    }
2216
2217     %s
2218
2219     $_[0][$result_pos++] = $collapse_idx[1]%s
2220       if $is_new_res;
2221   }
2222
2223   splice @{$_[0]}, $result_pos; # truncate the passed in array for cases of collapsing ->all()
2224
2225 ### END STRING EVAL
2226 EOS
2227
2228     # change the quoted placeholders to unquoted alias-references
2229     $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /sprintf ('$cur_row->[%d]', $1)/gex;
2230     $parser_src =~ s/ \' \xFF__IDVALPOS__(\d+)__\xFF \' /sprintf ('$cur_row_id_values[%d]', $1)/gex;
2231   }
2232
2233   else {
2234     $parser_src = sprintf(
2235       '$_ = %s for @{$_[0]}',
2236       __visit_infmap_simple($inflate_index, { rsrc => $self }), # need the $rsrc to determine left-ness
2237     );
2238
2239     # change the quoted placeholders to unquoted alias-references
2240     $parser_src =~ s/ \' \xFF__VALPOS__(\d+)__\xFF \' /sprintf ('$_->[%d]', $1)/gex;
2241   }
2242
2243   eval "sub { no strict; no warnings; $parser_src }" or die "$@\n\n$parser_src";
2244 }
2245
2246 {
2247   # keep our own DD object around so we don't have to fitz with quoting
2248   my $dumper_obj;
2249   my $visit_dump = sub {
2250     # we actually will be producing functional perl code here,
2251     # thus no second-guessing of what these globals might have
2252     # been set to. DO NOT CHANGE!
2253     ($dumper_obj ||= do {
2254       require Data::Dumper;
2255       Data::Dumper->new([])
2256         ->Purity (1)
2257         ->Pad ('')
2258         ->Useqq (0)
2259         ->Terse (1)
2260         ->Quotekeys (1)
2261         ->Deepcopy (1)
2262         ->Deparse (0)
2263         ->Maxdepth (0)
2264         ->Indent (0)
2265     })->Values ([shift])->Dump,
2266   };
2267
2268   sub __visit_infmap_simple {
2269     my ($val_idx, $args) = @_;
2270
2271     my $my_cols = {};
2272     my $rel_cols;
2273     for (keys %$val_idx) {
2274       if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
2275         $rel_cols->{$1}{$2} = $val_idx->{$_};
2276       }
2277       else {
2278         $my_cols->{$_} = $val_idx->{$_};
2279       }
2280     }
2281     my @relperl;
2282     for my $rel (sort keys %$rel_cols) {
2283
2284       my $rel_rsrc = $get_related_source->($args->{rsrc}, $rel, $rel_cols->{$rel});
2285
2286       #my $optional = $args->{is_optional};
2287       #$optional ||= ($args->{rsrc}->relationship_info($rel)->{attrs}{join_type} || '') =~ /^left/i;
2288
2289       push @relperl, join ' => ', perlstring($rel), __visit_infmap_simple($rel_cols->{$rel}, {
2290         non_top => 1,
2291         #is_optional => $optional,
2292         rsrc => $rel_rsrc,
2293       });
2294
2295       # FIXME SUBOPTIMAL - disabled to satisfy t/resultset/inflate_result_api.t
2296       #if ($optional and my @branch_null_checks = map
2297       #  { "(! defined '\xFF__VALPOS__${_}__\xFF')" }
2298       #  sort { $a <=> $b } values %{$rel_cols->{$rel}}
2299       #) {
2300       #  $relperl[-1] = sprintf ( '(%s) ? ( %s => [] ) : ( %s )',
2301       #    join (' && ', @branch_null_checks ),
2302       #    perlstring($rel),
2303       #    $relperl[-1],
2304       #  );
2305       #}
2306     }
2307
2308     my $me_struct = keys %$my_cols
2309       ? $visit_dump->({ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) })
2310       : 'undef'
2311     ;
2312
2313     return sprintf '[%s]', join (',',
2314       $me_struct,
2315       @relperl ? sprintf ('{ %s }', join (',', @relperl)) : (),
2316     );
2317   }
2318
2319   sub __visit_infmap_collapse {
2320     my ($val_idx, $collapse_map, $parent_info) = @_;
2321
2322     my $my_cols = {};
2323     my $rel_cols;
2324     for (keys %$val_idx) {
2325       if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
2326         $rel_cols->{$1}{$2} = $val_idx->{$_};
2327       }
2328       else {
2329         $my_cols->{$_} = $val_idx->{$_};
2330       }
2331     }
2332
2333     my $sequenced_node_id = join ('', map
2334       { "{'\xFF__IDVALPOS__${_}__\xFF'}" }
2335       @{$collapse_map->{-node_id}}
2336     );
2337
2338     my $me_struct = keys %$my_cols
2339       ? $visit_dump->([{ map { $_ => "\xFF__VALPOS__$my_cols->{$_}__\xFF" } (keys %$my_cols) }])
2340       : 'undef'
2341     ;
2342     my $node_idx_ref = sprintf '$collapse_idx[%d]%s', $collapse_map->{-node_index}, $sequenced_node_id;
2343
2344     my $parent_idx_ref = sprintf( '$collapse_idx[%d]%s[1]{%s}',
2345       @{$parent_info}{qw/node_idx sequenced_node_id/},
2346       perlstring($parent_info->{relname}),
2347     ) if $parent_info;
2348
2349     my @src;
2350     if ($collapse_map->{-node_index} == 1) {
2351       push @src, sprintf( '%s ||= %s;',
2352         $node_idx_ref,
2353         $me_struct,
2354       );
2355     }
2356     elsif ($collapse_map->{-is_single}) {
2357       push @src, sprintf ( '%s = %s ||= %s;',
2358         $parent_idx_ref,
2359         $node_idx_ref,
2360         $me_struct,
2361       );
2362     }
2363     else {
2364       push @src, sprintf('push @{%s}, %s = %s if !%s;',
2365         $parent_idx_ref,
2366         $node_idx_ref,
2367         $me_struct,
2368         $node_idx_ref,
2369       );
2370     }
2371
2372     #my $known_defined = { %{ $parent_info->{known_defined} || {} } };
2373     #$known_defined->{$_}++ for @{$collapse_map->{-node_id}};
2374
2375     for my $rel (sort keys %$rel_cols) {
2376
2377       push @src, sprintf( '%s[1]{%s} ||= [];', $node_idx_ref, perlstring($rel) );
2378
2379       push @src,  __visit_infmap_collapse($rel_cols->{$rel}, $collapse_map->{$rel}, {
2380         node_idx => $collapse_map->{-node_index},
2381         sequenced_node_id => $sequenced_node_id,
2382         relname => $rel,
2383         #known_defined => $known_defined,
2384       });
2385
2386       # FIXME SUBOPTIMAL - disabled to satisfy t/resultset/inflate_result_api.t
2387       #if ($collapse_map->{$rel}{-is_optional} and my @null_checks = map
2388       #  { "(! defined '\xFF__VALPOS__${_}__\xFF')" }
2389       #  sort { $a <=> $b } grep
2390       #    { ! $known_defined->{$_} }
2391       #    @{$collapse_map->{$rel}{-node_id}}
2392       #) {
2393       #  $src[-1] = sprintf( '(%s) or %s',
2394       #    join (' || ', @null_checks ),
2395       #    $src[-1],
2396       #  );
2397       #}
2398     }
2399
2400     join "\n", @src;
2401   }
2402 }
2403
2404 =head2 related_source
2405
2406 =over 4
2407
2408 =item Arguments: $relname
2409
2410 =item Return value: $source
2411
2412 =back
2413
2414 Returns the result source object for the given relationship.
2415
2416 =cut
2417
2418 sub related_source {
2419   my ($self, $rel) = @_;
2420   if( !$self->has_relationship( $rel ) ) {
2421     $self->throw_exception("No such relationship '$rel' on " . $self->source_name);
2422   }
2423
2424   # if we are not registered with a schema - just use the prototype
2425   # however if we do have a schema - ask for the source by name (and
2426   # throw in the process if all fails)
2427   if (my $schema = try { $self->schema }) {
2428     $schema->source($self->relationship_info($rel)->{source});
2429   }
2430   else {
2431     my $class = $self->relationship_info($rel)->{class};
2432     $self->ensure_class_loaded($class);
2433     $class->result_source_instance;
2434   }
2435 }
2436
2437 =head2 related_class
2438
2439 =over 4
2440
2441 =item Arguments: $relname
2442
2443 =item Return value: $classname
2444
2445 =back
2446
2447 Returns the class name for objects in the given relationship.
2448
2449 =cut
2450
2451 sub related_class {
2452   my ($self, $rel) = @_;
2453   if( !$self->has_relationship( $rel ) ) {
2454     $self->throw_exception("No such relationship '$rel' on " . $self->source_name);
2455   }
2456   return $self->schema->class($self->relationship_info($rel)->{source});
2457 }
2458
2459 =head2 handle
2460
2461 =over 4
2462
2463 =item Arguments: None
2464
2465 =item Return value: $source_handle
2466
2467 =back
2468
2469 Obtain a new L<result source handle instance|DBIx::Class::ResultSourceHandle>
2470 for this source. Used as a serializable pointer to this resultsource, as it is not
2471 easy (nor advisable) to serialize CODErefs which may very well be present in e.g.
2472 relationship definitions.
2473
2474 =cut
2475
2476 sub handle {
2477   return DBIx::Class::ResultSourceHandle->new({
2478     source_moniker => $_[0]->source_name,
2479
2480     # so that a detached thaw can be re-frozen
2481     $_[0]->{_detached_thaw}
2482       ? ( _detached_source  => $_[0]          )
2483       : ( schema            => $_[0]->schema  )
2484     ,
2485   });
2486 }
2487
2488 my $global_phase_destroy;
2489 sub DESTROY {
2490   return if $global_phase_destroy ||= in_global_destruction;
2491
2492 ######
2493 # !!! ACHTUNG !!!!
2494 ######
2495 #
2496 # Under no circumstances shall $_[0] be stored anywhere else (like copied to
2497 # a lexical variable, or shifted, or anything else). Doing so will mess up
2498 # the refcount of this particular result source, and will allow the $schema
2499 # we are trying to save to reattach back to the source we are destroying.
2500 # The relevant code checking refcounts is in ::Schema::DESTROY()
2501
2502   # if we are not a schema instance holder - we don't matter
2503   return if(
2504     ! ref $_[0]->{schema}
2505       or
2506     isweak $_[0]->{schema}
2507   );
2508
2509   # weaken our schema hold forcing the schema to find somewhere else to live
2510   # during global destruction (if we have not yet bailed out) this will throw
2511   # which will serve as a signal to not try doing anything else
2512   # however beware - on older perls the exception seems randomly untrappable
2513   # due to some weird race condition during thread joining :(((
2514   local $@;
2515   eval {
2516     weaken $_[0]->{schema};
2517
2518     # if schema is still there reintroduce ourselves with strong refs back to us
2519     if ($_[0]->{schema}) {
2520       my $srcregs = $_[0]->{schema}->source_registrations;
2521       for (keys %$srcregs) {
2522         next unless $srcregs->{$_};
2523         $srcregs->{$_} = $_[0] if $srcregs->{$_} == $_[0];
2524       }
2525     }
2526
2527     1;
2528   } or do {
2529     $global_phase_destroy = 1;
2530   };
2531
2532   return;
2533 }
2534
2535 sub STORABLE_freeze { Storable::nfreeze($_[0]->handle) }
2536
2537 sub STORABLE_thaw {
2538   my ($self, $cloning, $ice) = @_;
2539   %$self = %{ (Storable::thaw($ice))->resolve };
2540 }
2541
2542 =head2 throw_exception
2543
2544 See L<DBIx::Class::Schema/"throw_exception">.
2545
2546 =cut
2547
2548 sub throw_exception {
2549   my $self = shift;
2550
2551   $self->{schema}
2552     ? $self->{schema}->throw_exception(@_)
2553     : DBIx::Class::Exception->throw(@_)
2554   ;
2555 }
2556
2557 =head2 source_info
2558
2559 Stores a hashref of per-source metadata.  No specific key names
2560 have yet been standardized, the examples below are purely hypothetical
2561 and don't actually accomplish anything on their own:
2562
2563   __PACKAGE__->source_info({
2564     "_tablespace" => 'fast_disk_array_3',
2565     "_engine" => 'InnoDB',
2566   });
2567
2568 =head2 new
2569
2570   $class->new();
2571
2572   $class->new({attribute_name => value});
2573
2574 Creates a new ResultSource object.  Not normally called directly by end users.
2575
2576 =head2 column_info_from_storage
2577
2578 =over
2579
2580 =item Arguments: 1/0 (default: 0)
2581
2582 =item Return value: 1/0
2583
2584 =back
2585
2586   __PACKAGE__->column_info_from_storage(1);
2587
2588 Enables the on-demand automatic loading of the above column
2589 metadata from storage as necessary.  This is *deprecated*, and
2590 should not be used.  It will be removed before 1.0.
2591
2592
2593 =head1 AUTHORS
2594
2595 Matt S. Trout <mst@shadowcatsystems.co.uk>
2596
2597 =head1 LICENSE
2598
2599 You may distribute this code under the same terms as Perl itself.
2600
2601 =cut
2602
2603 1;