implemented _collapse_result and _merge_result
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSource.pm
1 package DBIx::Class::ResultSource;
2
3 use strict;
4 use warnings;
5
6 use base qw/DBIx::Class/;
7
8 use DBIx::Class::ResultSet;
9 use DBIx::Class::ResultSourceHandle;
10
11 use DBIx::Class::Exception;
12 use Carp::Clan qw/^DBIx::Class/;
13
14 __PACKAGE__->mk_group_accessors('simple' => qw/_ordered_columns
15   _columns _primaries _unique_constraints name resultset_attributes
16   schema from _relationships column_info_from_storage source_info
17   source_name sqlt_deploy_callback/);
18
19 __PACKAGE__->mk_group_accessors('component_class' => qw/resultset_class
20   result_class/);
21
22 =head1 NAME
23
24 DBIx::Class::ResultSource - Result source object
25
26 =head1 SYNOPSIS
27
28   # Create a table based result source, in a result class.
29
30   package MyDB::Schema::Result::Artist;
31   use base qw/DBIx::Class::Core/;
32
33   __PACKAGE__->table('artist');
34   __PACKAGE__->add_columns(qw/ artistid name /);
35   __PACKAGE__->set_primary_key('artistid');
36   __PACKAGE__->has_many(cds => 'MyDB::Schema::Result::CD');
37
38   1;
39
40   # Create a query (view) based result source, in a result class
41   package MyDB::Schema::Result::Year2000CDs;
42   use base qw/DBIx::Class::Core/;
43
44   __PACKAGE__->load_components('InflateColumn::DateTime');
45   __PACKAGE__->table_class('DBIx::Class::ResultSource::View');
46
47   __PACKAGE__->table('year2000cds');
48   __PACKAGE__->result_source_instance->is_virtual(1);
49   __PACKAGE__->result_source_instance->view_definition(
50       "SELECT cdid, artist, title FROM cd WHERE year ='2000'"
51       );
52
53
54 =head1 DESCRIPTION
55
56 A ResultSource is an object that represents a source of data for querying.
57
58 This class is a base class for various specialised types of result
59 sources, for example L<DBIx::Class::ResultSource::Table>. Table is the
60 default result source type, so one is created for you when defining a
61 result class as described in the synopsis above.
62
63 More specifically, the L<DBIx::Class::Core> base class pulls in the
64 L<DBIx::Class::ResultSourceProxy::Table> component, which defines
65 the L<table|DBIx::Class::ResultSourceProxy::Table/table> method.
66 When called, C<table> creates and stores an instance of
67 L<DBIx::Class::ResultSoure::Table>. Luckily, to use tables as result
68 sources, you don't need to remember any of this.
69
70 Result sources representing select queries, or views, can also be
71 created, see L<DBIx::Class::ResultSource::View> for full details.
72
73 =head2 Finding result source objects
74
75 As mentioned above, a result source instance is created and stored for
76 you when you define a L<Result Class|DBIx::Class::Manual::Glossary/Result Class>.
77
78 You can retrieve the result source at runtime in the following ways:
79
80 =over
81
82 =item From a Schema object:
83
84    $schema->source($source_name);
85
86 =item From a Row object:
87
88    $row->result_source;
89
90 =item From a ResultSet object:
91
92    $rs->result_source;
93
94 =back
95
96 =head1 METHODS
97
98 =pod
99
100 =cut
101
102 sub new {
103   my ($class, $attrs) = @_;
104   $class = ref $class if ref $class;
105
106   my $new = bless { %{$attrs || {}} }, $class;
107   $new->{resultset_class} ||= 'DBIx::Class::ResultSet';
108   $new->{resultset_attributes} = { %{$new->{resultset_attributes} || {}} };
109   $new->{_ordered_columns} = [ @{$new->{_ordered_columns}||[]}];
110   $new->{_columns} = { %{$new->{_columns}||{}} };
111   $new->{_relationships} = { %{$new->{_relationships}||{}} };
112   $new->{name} ||= "!!NAME NOT SET!!";
113   $new->{_columns_info_loaded} ||= 0;
114   $new->{sqlt_deploy_callback} ||= "default_sqlt_deploy_hook";
115   return $new;
116 }
117
118 =pod
119
120 =head2 add_columns
121
122 =over
123
124 =item Arguments: @columns
125
126 =item Return value: The ResultSource object
127
128 =back
129
130   $source->add_columns(qw/col1 col2 col3/);
131
132   $source->add_columns('col1' => \%col1_info, 'col2' => \%col2_info, ...);
133
134 Adds columns to the result source. If supplied colname => hashref
135 pairs, uses the hashref as the L</column_info> for that column. Repeated
136 calls of this method will add more columns, not replace them.
137
138 The column names given will be created as accessor methods on your
139 L<DBIx::Class::Row> objects. You can change the name of the accessor
140 by supplying an L</accessor> in the column_info hash.
141
142 The contents of the column_info are not set in stone. The following
143 keys are currently recognised/used by DBIx::Class:
144
145 =over 4
146
147 =item accessor
148
149    { accessor => '_name' }
150
151    # example use, replace standard accessor with one of your own:
152    sub name {
153        my ($self, $value) = @_;
154
155        die "Name cannot contain digits!" if($value =~ /\d/);
156        $self->_name($value);
157
158        return $self->_name();
159    }
160
161 Use this to set the name of the accessor method for this column. If unset,
162 the name of the column will be used.
163
164 =item data_type
165
166    { data_type => 'integer' }
167
168 This contains the column type. It is automatically filled if you use the
169 L<SQL::Translator::Producer::DBIx::Class::File> producer, or the
170 L<DBIx::Class::Schema::Loader> module. 
171
172 Currently there is no standard set of values for the data_type. Use
173 whatever your database supports.
174
175 =item size
176
177    { size => 20 }
178
179 The length of your column, if it is a column type that can have a size
180 restriction. This is currently only used to create tables from your
181 schema, see L<DBIx::Class::Schema/deploy>.
182
183 =item is_nullable
184
185    { is_nullable => 1 }
186
187 Set this to a true value for a columns that is allowed to contain NULL
188 values, default is false. This is currently only used to create tables
189 from your schema, see L<DBIx::Class::Schema/deploy>.
190
191 =item is_auto_increment
192
193    { is_auto_increment => 1 }
194
195 Set this to a true value for a column whose value is somehow
196 automatically set, defaults to false. This is used to determine which
197 columns to empty when cloning objects using
198 L<DBIx::Class::Row/copy>. It is also used by
199 L<DBIx::Class::Schema/deploy>.
200
201 =item is_numeric
202
203    { is_numeric => 1 }
204
205 Set this to a true or false value (not C<undef>) to explicitly specify
206 if this column contains numeric data. This controls how set_column
207 decides whether to consider a column dirty after an update: if
208 C<is_numeric> is true a numeric comparison C<< != >> will take place
209 instead of the usual C<eq>
210
211 If not specified the storage class will attempt to figure this out on
212 first access to the column, based on the column C<data_type>. The
213 result will be cached in this attribute.
214
215 =item is_foreign_key
216
217    { is_foreign_key => 1 }
218
219 Set this to a true value for a column that contains a key from a
220 foreign table, defaults to false. This is currently only used to
221 create tables from your schema, see L<DBIx::Class::Schema/deploy>.
222
223 =item default_value
224
225    { default_value => \'now()' }
226
227 Set this to the default value which will be inserted into a column by
228 the database. Can contain either a value or a function (use a
229 reference to a scalar e.g. C<\'now()'> if you want a function). This
230 is currently only used to create tables from your schema, see
231 L<DBIx::Class::Schema/deploy>.
232
233 See the note on L<DBIx::Class::Row/new> for more information about possible
234 issues related to db-side default values.
235
236 =item sequence
237
238    { sequence => 'my_table_seq' }
239
240 Set this on a primary key column to the name of the sequence used to
241 generate a new key value. If not specified, L<DBIx::Class::PK::Auto>
242 will attempt to retrieve the name of the sequence from the database
243 automatically.
244
245 =item auto_nextval
246
247 Set this to a true value for a column whose value is retrieved automatically
248 from a sequence or function (if supported by your Storage driver.) For a
249 sequence, if you do not use a trigger to get the nextval, you have to set the
250 L</sequence> value as well.
251
252 Also set this for MSSQL columns with the 'uniqueidentifier'
253 L<DBIx::Class::ResultSource/data_type> whose values you want to automatically
254 generate using C<NEWID()>, unless they are a primary key in which case this will
255 be done anyway.
256
257 =item extra
258
259 This is used by L<DBIx::Class::Schema/deploy> and L<SQL::Translator>
260 to add extra non-generic data to the column. For example: C<< extra
261 => { unsigned => 1} >> is used by the MySQL producer to set an integer
262 column to unsigned. For more details, see
263 L<SQL::Translator::Producer::MySQL>.
264
265 =back
266
267 =head2 add_column
268
269 =over
270
271 =item Arguments: $colname, \%columninfo?
272
273 =item Return value: 1/0 (true/false)
274
275 =back
276
277   $source->add_column('col' => \%info);
278
279 Add a single column and optional column info. Uses the same column
280 info keys as L</add_columns>.
281
282 =cut
283
284 sub add_columns {
285   my ($self, @cols) = @_;
286   $self->_ordered_columns(\@cols) unless $self->_ordered_columns;
287
288   my @added;
289   my $columns = $self->_columns;
290   while (my $col = shift @cols) {
291     # If next entry is { ... } use that for the column info, if not
292     # use an empty hashref
293     my $column_info = ref $cols[0] ? shift(@cols) : {};
294     push(@added, $col) unless exists $columns->{$col};
295     $columns->{$col} = $column_info;
296   }
297   push @{ $self->_ordered_columns }, @added;
298   return $self;
299 }
300
301 sub add_column { shift->add_columns(@_); } # DO NOT CHANGE THIS TO GLOB
302
303 =head2 has_column
304
305 =over
306
307 =item Arguments: $colname
308
309 =item Return value: 1/0 (true/false)
310
311 =back
312
313   if ($source->has_column($colname)) { ... }
314
315 Returns true if the source has a column of this name, false otherwise.
316
317 =cut
318
319 sub has_column {
320   my ($self, $column) = @_;
321   return exists $self->_columns->{$column};
322 }
323
324 =head2 column_info
325
326 =over
327
328 =item Arguments: $colname
329
330 =item Return value: Hashref of info
331
332 =back
333
334   my $info = $source->column_info($col);
335
336 Returns the column metadata hashref for a column, as originally passed
337 to L</add_columns>. See L</add_columns> above for information on the
338 contents of the hashref.
339
340 =cut
341
342 sub column_info {
343   my ($self, $column) = @_;
344   $self->throw_exception("No such column $column")
345     unless exists $self->_columns->{$column};
346   #warn $self->{_columns_info_loaded}, "\n";
347   if ( ! $self->_columns->{$column}{data_type}
348        and $self->column_info_from_storage
349        and ! $self->{_columns_info_loaded}
350        and $self->schema and $self->storage )
351   {
352     $self->{_columns_info_loaded}++;
353     my $info = {};
354     my $lc_info = {};
355     # eval for the case of storage without table
356     eval { $info = $self->storage->columns_info_for( $self->from ) };
357     unless ($@) {
358       for my $realcol ( keys %{$info} ) {
359         $lc_info->{lc $realcol} = $info->{$realcol};
360       }
361       foreach my $col ( keys %{$self->_columns} ) {
362         $self->_columns->{$col} = {
363           %{ $self->_columns->{$col} },
364           %{ $info->{$col} || $lc_info->{lc $col} || {} }
365         };
366       }
367     }
368   }
369   return $self->_columns->{$column};
370 }
371
372 =head2 columns
373
374 =over
375
376 =item Arguments: None
377
378 =item Return value: Ordered list of column names
379
380 =back
381
382   my @column_names = $source->columns;
383
384 Returns all column names in the order they were declared to L</add_columns>.
385
386 =cut
387
388 sub columns {
389   my $self = shift;
390   $self->throw_exception(
391     "columns() is a read-only accessor, did you mean add_columns()?"
392   ) if @_;
393   return @{$self->{_ordered_columns}||[]};
394 }
395
396 =head2 remove_columns
397
398 =over
399
400 =item Arguments: @colnames
401
402 =item Return value: undefined
403
404 =back
405
406   $source->remove_columns(qw/col1 col2 col3/);
407
408 Removes the given list of columns by name, from the result source.
409
410 B<Warning>: Removing a column that is also used in the sources primary
411 key, or in one of the sources unique constraints, B<will> result in a
412 broken result source.
413
414 =head2 remove_column
415
416 =over
417
418 =item Arguments: $colname
419
420 =item Return value: undefined
421
422 =back
423
424   $source->remove_column('col');
425
426 Remove a single column by name from the result source, similar to
427 L</remove_columns>.
428
429 B<Warning>: Removing a column that is also used in the sources primary
430 key, or in one of the sources unique constraints, B<will> result in a
431 broken result source.
432
433 =cut
434
435 sub remove_columns {
436   my ($self, @to_remove) = @_;
437
438   my $columns = $self->_columns
439     or return;
440
441   my %to_remove;
442   for (@to_remove) {
443     delete $columns->{$_};
444     ++$to_remove{$_};
445   }
446
447   $self->_ordered_columns([ grep { not $to_remove{$_} } @{$self->_ordered_columns} ]);
448 }
449
450 sub remove_column { shift->remove_columns(@_); } # DO NOT CHANGE THIS TO GLOB
451
452 =head2 set_primary_key
453
454 =over 4
455
456 =item Arguments: @cols
457
458 =item Return value: undefined
459
460 =back
461
462 Defines one or more columns as primary key for this source. Must be
463 called after L</add_columns>.
464
465 Additionally, defines a L<unique constraint|add_unique_constraint>
466 named C<primary>.
467
468 The primary key columns are used by L<DBIx::Class::PK::Auto> to
469 retrieve automatically created values from the database. They are also
470 used as default joining columns when specifying relationships, see
471 L<DBIx::Class::Relationship>.
472
473 =cut
474
475 sub set_primary_key {
476   my ($self, @cols) = @_;
477   # check if primary key columns are valid columns
478   foreach my $col (@cols) {
479     $self->throw_exception("No such column $col on table " . $self->name)
480       unless $self->has_column($col);
481   }
482   $self->_primaries(\@cols);
483
484   $self->add_unique_constraint(primary => \@cols);
485 }
486
487 =head2 primary_columns
488
489 =over 4
490
491 =item Arguments: None
492
493 =item Return value: Ordered list of primary column names
494
495 =back
496
497 Read-only accessor which returns the list of primary keys, supplied by
498 L</set_primary_key>.
499
500 =cut
501
502 sub primary_columns {
503   return @{shift->_primaries||[]};
504 }
505
506 sub _pri_cols {
507   my $self = shift;
508   my @pcols = $self->primary_columns
509     or $self->throw_exception (sprintf(
510       'Operation requires a primary key to be declared on %s via set_primary_key',
511       $self->source_name,
512     ));
513   return @pcols;
514 }
515
516 =head2 add_unique_constraint
517
518 =over 4
519
520 =item Arguments: $name?, \@colnames
521
522 =item Return value: undefined
523
524 =back
525
526 Declare a unique constraint on this source. Call once for each unique
527 constraint.
528
529   # For UNIQUE (column1, column2)
530   __PACKAGE__->add_unique_constraint(
531     constraint_name => [ qw/column1 column2/ ],
532   );
533
534 Alternatively, you can specify only the columns:
535
536   __PACKAGE__->add_unique_constraint([ qw/column1 column2/ ]);
537
538 This will result in a unique constraint named
539 C<table_column1_column2>, where C<table> is replaced with the table
540 name.
541
542 Unique constraints are used, for example, when you pass the constraint
543 name as the C<key> attribute to L<DBIx::Class::ResultSet/find>. Then
544 only columns in the constraint are searched.
545
546 Throws an error if any of the given column names do not yet exist on
547 the result source.
548
549 =cut
550
551 sub add_unique_constraint {
552   my $self = shift;
553   my $cols = pop @_;
554   my $name = shift;
555
556   $name ||= $self->name_unique_constraint($cols);
557
558   foreach my $col (@$cols) {
559     $self->throw_exception("No such column $col on table " . $self->name)
560       unless $self->has_column($col);
561   }
562
563   my %unique_constraints = $self->unique_constraints;
564   $unique_constraints{$name} = $cols;
565   $self->_unique_constraints(\%unique_constraints);
566 }
567
568 =head2 name_unique_constraint
569
570 =over 4
571
572 =item Arguments: @colnames
573
574 =item Return value: Constraint name
575
576 =back
577
578   $source->table('mytable');
579   $source->name_unique_constraint('col1', 'col2');
580   # returns
581   'mytable_col1_col2'
582
583 Return a name for a unique constraint containing the specified
584 columns. The name is created by joining the table name and each column
585 name, using an underscore character.
586
587 For example, a constraint on a table named C<cd> containing the columns
588 C<artist> and C<title> would result in a constraint name of C<cd_artist_title>.
589
590 This is used by L</add_unique_constraint> if you do not specify the
591 optional constraint name.
592
593 =cut
594
595 sub name_unique_constraint {
596   my ($self, $cols) = @_;
597
598   my $name = $self->name;
599   $name = $$name if (ref $name eq 'SCALAR');
600
601   return join '_', $name, @$cols;
602 }
603
604 =head2 unique_constraints
605
606 =over 4
607
608 =item Arguments: None
609
610 =item Return value: Hash of unique constraint data
611
612 =back
613
614   $source->unique_constraints();
615
616 Read-only accessor which returns a hash of unique constraints on this
617 source.
618
619 The hash is keyed by constraint name, and contains an arrayref of
620 column names as values.
621
622 =cut
623
624 sub unique_constraints {
625   return %{shift->_unique_constraints||{}};
626 }
627
628 =head2 unique_constraint_names
629
630 =over 4
631
632 =item Arguments: None
633
634 =item Return value: Unique constraint names
635
636 =back
637
638   $source->unique_constraint_names();
639
640 Returns the list of unique constraint names defined on this source.
641
642 =cut
643
644 sub unique_constraint_names {
645   my ($self) = @_;
646
647   my %unique_constraints = $self->unique_constraints;
648
649   return keys %unique_constraints;
650 }
651
652 =head2 unique_constraint_columns
653
654 =over 4
655
656 =item Arguments: $constraintname
657
658 =item Return value: List of constraint columns
659
660 =back
661
662   $source->unique_constraint_columns('myconstraint');
663
664 Returns the list of columns that make up the specified unique constraint.
665
666 =cut
667
668 sub unique_constraint_columns {
669   my ($self, $constraint_name) = @_;
670
671   my %unique_constraints = $self->unique_constraints;
672
673   $self->throw_exception(
674     "Unknown unique constraint $constraint_name on '" . $self->name . "'"
675   ) unless exists $unique_constraints{$constraint_name};
676
677   return @{ $unique_constraints{$constraint_name} };
678 }
679
680 =head2 sqlt_deploy_callback
681
682 =over
683
684 =item Arguments: $callback
685
686 =back
687
688   __PACKAGE__->sqlt_deploy_callback('mycallbackmethod');
689
690 An accessor to set a callback to be called during deployment of
691 the schema via L<DBIx::Class::Schema/create_ddl_dir> or
692 L<DBIx::Class::Schema/deploy>.
693
694 The callback can be set as either a code reference or the name of a
695 method in the current result class.
696
697 If not set, the L</default_sqlt_deploy_hook> is called.
698
699 Your callback will be passed the $source object representing the
700 ResultSource instance being deployed, and the
701 L<SQL::Translator::Schema::Table> object being created from it. The
702 callback can be used to manipulate the table object or add your own
703 customised indexes. If you need to manipulate a non-table object, use
704 the L<DBIx::Class::Schema/sqlt_deploy_hook>.
705
706 See L<DBIx::Class::Manual::Cookbook/Adding Indexes And Functions To
707 Your SQL> for examples.
708
709 This sqlt deployment callback can only be used to manipulate
710 SQL::Translator objects as they get turned into SQL. To execute
711 post-deploy statements which SQL::Translator does not currently
712 handle, override L<DBIx::Class::Schema/deploy> in your Schema class
713 and call L<dbh_do|DBIx::Class::Storage::DBI/dbh_do>.
714
715 =head2 default_sqlt_deploy_hook
716
717 =over
718
719 =item Arguments: $source, $sqlt_table
720
721 =item Return value: undefined
722
723 =back
724
725 This is the sensible default for L</sqlt_deploy_callback>.
726
727 If a method named C<sqlt_deploy_hook> exists in your Result class, it
728 will be called and passed the current C<$source> and the
729 C<$sqlt_table> being deployed.
730
731 =cut
732
733 sub default_sqlt_deploy_hook {
734   my $self = shift;
735
736   my $class = $self->result_class;
737
738   if ($class and $class->can('sqlt_deploy_hook')) {
739     $class->sqlt_deploy_hook(@_);
740   }
741 }
742
743 sub _invoke_sqlt_deploy_hook {
744   my $self = shift;
745   if ( my $hook = $self->sqlt_deploy_callback) {
746     $self->$hook(@_);
747   }
748 }
749
750 =head2 resultset
751
752 =over 4
753
754 =item Arguments: None
755
756 =item Return value: $resultset
757
758 =back
759
760 Returns a resultset for the given source. This will initially be created
761 on demand by calling
762
763   $self->resultset_class->new($self, $self->resultset_attributes)
764
765 but is cached from then on unless resultset_class changes.
766
767 =head2 resultset_class
768
769 =over 4
770
771 =item Arguments: $classname
772
773 =item Return value: $classname
774
775 =back
776
777   package My::Schema::ResultSet::Artist;
778   use base 'DBIx::Class::ResultSet';
779   ...
780
781   # In the result class
782   __PACKAGE__->resultset_class('My::Schema::ResultSet::Artist');
783
784   # Or in code
785   $source->resultset_class('My::Schema::ResultSet::Artist');
786
787 Set the class of the resultset. This is useful if you want to create your
788 own resultset methods. Create your own class derived from
789 L<DBIx::Class::ResultSet>, and set it here. If called with no arguments,
790 this method returns the name of the existing resultset class, if one
791 exists.
792
793 =head2 resultset_attributes
794
795 =over 4
796
797 =item Arguments: \%attrs
798
799 =item Return value: \%attrs
800
801 =back
802
803   # In the result class
804   __PACKAGE__->resultset_attributes({ order_by => [ 'id' ] });
805
806   # Or in code
807   $source->resultset_attributes({ order_by => [ 'id' ] });
808
809 Store a collection of resultset attributes, that will be set on every
810 L<DBIx::Class::ResultSet> produced from this result source. For a full
811 list see L<DBIx::Class::ResultSet/ATTRIBUTES>.
812
813 =cut
814
815 sub resultset {
816   my $self = shift;
817   $self->throw_exception(
818     'resultset does not take any arguments. If you want another resultset, '.
819     'call it on the schema instead.'
820   ) if scalar @_;
821
822   return $self->resultset_class->new(
823     $self,
824     {
825       %{$self->{resultset_attributes}},
826       %{$self->schema->default_resultset_attributes}
827     },
828   );
829 }
830
831 =head2 source_name
832
833 =over 4
834
835 =item Arguments: $source_name
836
837 =item Result value: $source_name
838
839 =back
840
841 Set an alternate name for the result source when it is loaded into a schema.
842 This is useful if you want to refer to a result source by a name other than
843 its class name.
844
845   package ArchivedBooks;
846   use base qw/DBIx::Class/;
847   __PACKAGE__->table('books_archive');
848   __PACKAGE__->source_name('Books');
849
850   # from your schema...
851   $schema->resultset('Books')->find(1);
852
853 =head2 from
854
855 =over 4
856
857 =item Arguments: None
858
859 =item Return value: FROM clause
860
861 =back
862
863   my $from_clause = $source->from();
864
865 Returns an expression of the source to be supplied to storage to specify
866 retrieval from this source. In the case of a database, the required FROM
867 clause contents.
868
869 =head2 schema
870
871 =over 4
872
873 =item Arguments: None
874
875 =item Return value: A schema object
876
877 =back
878
879   my $schema = $source->schema();
880
881 Returns the L<DBIx::Class::Schema> object that this result source 
882 belongs to.
883
884 =head2 storage
885
886 =over 4
887
888 =item Arguments: None
889
890 =item Return value: A Storage object
891
892 =back
893
894   $source->storage->debug(1);
895
896 Returns the storage handle for the current schema.
897
898 See also: L<DBIx::Class::Storage>
899
900 =cut
901
902 sub storage { shift->schema->storage; }
903
904 =head2 add_relationship
905
906 =over 4
907
908 =item Arguments: $relname, $related_source_name, \%cond, [ \%attrs ]
909
910 =item Return value: 1/true if it succeeded
911
912 =back
913
914   $source->add_relationship('relname', 'related_source', $cond, $attrs);
915
916 L<DBIx::Class::Relationship> describes a series of methods which
917 create pre-defined useful types of relationships. Look there first
918 before using this method directly.
919
920 The relationship name can be arbitrary, but must be unique for each
921 relationship attached to this result source. 'related_source' should
922 be the name with which the related result source was registered with
923 the current schema. For example:
924
925   $schema->source('Book')->add_relationship('reviews', 'Review', {
926     'foreign.book_id' => 'self.id',
927   });
928
929 The condition C<$cond> needs to be an L<SQL::Abstract>-style
930 representation of the join between the tables. For example, if you're
931 creating a relation from Author to Book,
932
933   { 'foreign.author_id' => 'self.id' }
934
935 will result in the JOIN clause
936
937   author me JOIN book foreign ON foreign.author_id = me.id
938
939 You can specify as many foreign => self mappings as necessary.
940
941 Valid attributes are as follows:
942
943 =over 4
944
945 =item join_type
946
947 Explicitly specifies the type of join to use in the relationship. Any
948 SQL join type is valid, e.g. C<LEFT> or C<RIGHT>. It will be placed in
949 the SQL command immediately before C<JOIN>.
950
951 =item proxy
952
953 An arrayref containing a list of accessors in the foreign class to proxy in
954 the main class. If, for example, you do the following:
955
956   CD->might_have(liner_notes => 'LinerNotes', undef, {
957     proxy => [ qw/notes/ ],
958   });
959
960 Then, assuming LinerNotes has an accessor named notes, you can do:
961
962   my $cd = CD->find(1);
963   # set notes -- LinerNotes object is created if it doesn't exist
964   $cd->notes('Notes go here');
965
966 =item accessor
967
968 Specifies the type of accessor that should be created for the
969 relationship. Valid values are C<single> (for when there is only a single
970 related object), C<multi> (when there can be many), and C<filter> (for
971 when there is a single related object, but you also want the relationship
972 accessor to double as a column accessor). For C<multi> accessors, an
973 add_to_* method is also created, which calls C<create_related> for the
974 relationship.
975
976 =back
977
978 Throws an exception if the condition is improperly supplied, or cannot
979 be resolved.
980
981 =cut
982
983 sub add_relationship {
984   my ($self, $rel, $f_source_name, $cond, $attrs) = @_;
985   $self->throw_exception("Can't create relationship without join condition")
986     unless $cond;
987   $attrs ||= {};
988
989   # Check foreign and self are right in cond
990   if ( (ref $cond ||'') eq 'HASH') {
991     for (keys %$cond) {
992       $self->throw_exception("Keys of condition should be of form 'foreign.col', not '$_'")
993         if /\./ && !/^foreign\./;
994     }
995   }
996
997   my %rels = %{ $self->_relationships };
998   $rels{$rel} = { class => $f_source_name,
999                   source => $f_source_name,
1000                   cond  => $cond,
1001                   attrs => $attrs };
1002   $self->_relationships(\%rels);
1003
1004   return $self;
1005
1006   # XXX disabled. doesn't work properly currently. skip in tests.
1007
1008   my $f_source = $self->schema->source($f_source_name);
1009   unless ($f_source) {
1010     $self->ensure_class_loaded($f_source_name);
1011     $f_source = $f_source_name->result_source;
1012     #my $s_class = ref($self->schema);
1013     #$f_source_name =~ m/^${s_class}::(.*)$/;
1014     #$self->schema->register_class(($1 || $f_source_name), $f_source_name);
1015     #$f_source = $self->schema->source($f_source_name);
1016   }
1017   return unless $f_source; # Can't test rel without f_source
1018
1019   eval { $self->_resolve_join($rel, 'me', {}, []) };
1020
1021   if ($@) { # If the resolve failed, back out and re-throw the error
1022     delete $rels{$rel}; #
1023     $self->_relationships(\%rels);
1024     $self->throw_exception("Error creating relationship $rel: $@");
1025   }
1026   1;
1027 }
1028
1029 =head2 relationships
1030
1031 =over 4
1032
1033 =item Arguments: None
1034
1035 =item Return value: List of relationship names
1036
1037 =back
1038
1039   my @relnames = $source->relationships();
1040
1041 Returns all relationship names for this source.
1042
1043 =cut
1044
1045 sub relationships {
1046   return keys %{shift->_relationships};
1047 }
1048
1049 =head2 relationship_info
1050
1051 =over 4
1052
1053 =item Arguments: $relname
1054
1055 =item Return value: Hashref of relation data,
1056
1057 =back
1058
1059 Returns a hash of relationship information for the specified relationship
1060 name. The keys/values are as specified for L</add_relationship>.
1061
1062 =cut
1063
1064 sub relationship_info {
1065   my ($self, $rel) = @_;
1066   return $self->_relationships->{$rel};
1067 }
1068
1069 =head2 has_relationship
1070
1071 =over 4
1072
1073 =item Arguments: $rel
1074
1075 =item Return value: 1/0 (true/false)
1076
1077 =back
1078
1079 Returns true if the source has a relationship of this name, false otherwise.
1080
1081 =cut
1082
1083 sub has_relationship {
1084   my ($self, $rel) = @_;
1085   return exists $self->_relationships->{$rel};
1086 }
1087
1088 =head2 reverse_relationship_info
1089
1090 =over 4
1091
1092 =item Arguments: $relname
1093
1094 =item Return value: Hashref of relationship data
1095
1096 =back
1097
1098 Looks through all the relationships on the source this relationship
1099 points to, looking for one whose condition is the reverse of the
1100 condition on this relationship.
1101
1102 A common use of this is to find the name of the C<belongs_to> relation
1103 opposing a C<has_many> relation. For definition of these look in
1104 L<DBIx::Class::Relationship>.
1105
1106 The returned hashref is keyed by the name of the opposing
1107 relationship, and contains its data in the same manner as
1108 L</relationship_info>.
1109
1110 =cut
1111
1112 sub reverse_relationship_info {
1113   my ($self, $rel) = @_;
1114   my $rel_info = $self->relationship_info($rel);
1115   my $ret = {};
1116
1117   return $ret unless ((ref $rel_info->{cond}) eq 'HASH');
1118
1119   my @cond = keys(%{$rel_info->{cond}});
1120   my @refkeys = map {/^\w+\.(\w+)$/} @cond;
1121   my @keys = map {$rel_info->{cond}->{$_} =~ /^\w+\.(\w+)$/} @cond;
1122
1123   # Get the related result source for this relationship
1124   my $othertable = $self->related_source($rel);
1125
1126   # Get all the relationships for that source that related to this source
1127   # whose foreign column set are our self columns on $rel and whose self
1128   # columns are our foreign columns on $rel.
1129   my @otherrels = $othertable->relationships();
1130   my $otherrelationship;
1131   foreach my $otherrel (@otherrels) {
1132     my $otherrel_info = $othertable->relationship_info($otherrel);
1133
1134     my $back = $othertable->related_source($otherrel);
1135     next unless $back->source_name eq $self->source_name;
1136
1137     my @othertestconds;
1138
1139     if (ref $otherrel_info->{cond} eq 'HASH') {
1140       @othertestconds = ($otherrel_info->{cond});
1141     }
1142     elsif (ref $otherrel_info->{cond} eq 'ARRAY') {
1143       @othertestconds = @{$otherrel_info->{cond}};
1144     }
1145     else {
1146       next;
1147     }
1148
1149     foreach my $othercond (@othertestconds) {
1150       my @other_cond = keys(%$othercond);
1151       my @other_refkeys = map {/^\w+\.(\w+)$/} @other_cond;
1152       my @other_keys = map {$othercond->{$_} =~ /^\w+\.(\w+)$/} @other_cond;
1153       next if (!$self->_compare_relationship_keys(\@refkeys, \@other_keys) ||
1154                !$self->_compare_relationship_keys(\@other_refkeys, \@keys));
1155       $ret->{$otherrel} =  $otherrel_info;
1156     }
1157   }
1158   return $ret;
1159 }
1160
1161 sub compare_relationship_keys {
1162   carp 'compare_relationship_keys is a private method, stop calling it';
1163   my $self = shift;
1164   $self->_compare_relationship_keys (@_);
1165 }
1166
1167 # Returns true if both sets of keynames are the same, false otherwise.
1168 sub _compare_relationship_keys {
1169   my ($self, $keys1, $keys2) = @_;
1170
1171   # Make sure every keys1 is in keys2
1172   my $found;
1173   foreach my $key (@$keys1) {
1174     $found = 0;
1175     foreach my $prim (@$keys2) {
1176       if ($prim eq $key) {
1177         $found = 1;
1178         last;
1179       }
1180     }
1181     last unless $found;
1182   }
1183
1184   # Make sure every key2 is in key1
1185   if ($found) {
1186     foreach my $prim (@$keys2) {
1187       $found = 0;
1188       foreach my $key (@$keys1) {
1189         if ($prim eq $key) {
1190           $found = 1;
1191           last;
1192         }
1193       }
1194       last unless $found;
1195     }
1196   }
1197
1198   return $found;
1199 }
1200
1201 # Returns the {from} structure used to express JOIN conditions
1202 sub _resolve_join {
1203   my ($self, $join, $alias, $seen, $jpath, $parent_force_left) = @_;
1204
1205   # we need a supplied one, because we do in-place modifications, no returns
1206   $self->throw_exception ('You must supply a seen hashref as the 3rd argument to _resolve_join')
1207     unless ref $seen eq 'HASH';
1208
1209   $self->throw_exception ('You must supply a joinpath arrayref as the 4th argument to _resolve_join')
1210     unless ref $jpath eq 'ARRAY';
1211
1212   $jpath = [@$jpath]; # copy
1213
1214   if (not defined $join) {
1215     return ();
1216   }
1217   elsif (ref $join eq 'ARRAY') {
1218     return
1219       map {
1220         $self->_resolve_join($_, $alias, $seen, $jpath, $parent_force_left);
1221       } @$join;
1222   }
1223   elsif (ref $join eq 'HASH') {
1224
1225     my @ret;
1226     for my $rel (keys %$join) {
1227
1228       my $rel_info = $self->relationship_info($rel)
1229         or $self->throw_exception("No such relationship ${rel}");
1230
1231       my $force_left = $parent_force_left;
1232       $force_left ||= lc($rel_info->{attrs}{join_type}||'') eq 'left';
1233
1234       # the actual seen value will be incremented by the recursion
1235       my $as = $self->storage->relname_to_table_alias(
1236         $rel, ($seen->{$rel} && $seen->{$rel} + 1)
1237       );
1238
1239       push @ret, (
1240         $self->_resolve_join($rel, $alias, $seen, [@$jpath], $force_left),
1241         $self->related_source($rel)->_resolve_join(
1242           $join->{$rel}, $as, $seen, [@$jpath, { $rel => $as }], $force_left
1243         )
1244       );
1245     }
1246     return @ret;
1247
1248   }
1249   elsif (ref $join) {
1250     $self->throw_exception("No idea how to resolve join reftype ".ref $join);
1251   }
1252   else {
1253     my $count = ++$seen->{$join};
1254     my $as = $self->storage->relname_to_table_alias(
1255       $join, ($count > 1 && $count)
1256     );
1257
1258     my $rel_info = $self->relationship_info($join)
1259       or $self->throw_exception("No such relationship ${join}");
1260
1261     my $rel_src = $self->related_source($join);
1262     return [ { $as => $rel_src->from,
1263                -source_handle => $rel_src->handle,
1264                -join_type => $parent_force_left
1265                   ? 'left'
1266                   : $rel_info->{attrs}{join_type}
1267                 ,
1268                -join_path => [@$jpath, { $join => $as } ],
1269                -is_single => (
1270                   $rel_info->{attrs}{accessor}
1271                     &&
1272                   List::Util::first { $rel_info->{attrs}{accessor} eq $_ } (qw/single filter/)
1273                 ),
1274                -alias => $as,
1275                -relation_chain_depth => $seen->{-relation_chain_depth} || 0,
1276              },
1277              $self->_resolve_condition($rel_info->{cond}, $as, $alias) ];
1278   }
1279 }
1280
1281 sub pk_depends_on {
1282   carp 'pk_depends_on is a private method, stop calling it';
1283   my $self = shift;
1284   $self->_pk_depends_on (@_);
1285 }
1286
1287 # Determines whether a relation is dependent on an object from this source
1288 # having already been inserted. Takes the name of the relationship and a
1289 # hashref of columns of the related object.
1290 sub _pk_depends_on {
1291   my ($self, $relname, $rel_data) = @_;
1292
1293   my $relinfo = $self->relationship_info($relname);
1294
1295   # don't assume things if the relationship direction is specified
1296   return $relinfo->{attrs}{is_foreign_key_constraint}
1297     if exists ($relinfo->{attrs}{is_foreign_key_constraint});
1298
1299   my $cond = $relinfo->{cond};
1300   return 0 unless ref($cond) eq 'HASH';
1301
1302   # map { foreign.foo => 'self.bar' } to { bar => 'foo' }
1303   my $keyhash = { map { my $x = $_; $x =~ s/.*\.//; $x; } reverse %$cond };
1304
1305   # assume anything that references our PK probably is dependent on us
1306   # rather than vice versa, unless the far side is (a) defined or (b)
1307   # auto-increment
1308   my $rel_source = $self->related_source($relname);
1309
1310   foreach my $p ($self->primary_columns) {
1311     if (exists $keyhash->{$p}) {
1312       unless (defined($rel_data->{$keyhash->{$p}})
1313               || $rel_source->column_info($keyhash->{$p})
1314                             ->{is_auto_increment}) {
1315         return 0;
1316       }
1317     }
1318   }
1319
1320   return 1;
1321 }
1322
1323 sub resolve_condition {
1324   carp 'resolve_condition is a private method, stop calling it';
1325   my $self = shift;
1326   $self->_resolve_condition (@_);
1327 }
1328
1329 # Resolves the passed condition to a concrete query fragment. If given an alias,
1330 # returns a join condition; if given an object, inverts that object to produce
1331 # a related conditional from that object.
1332 our $UNRESOLVABLE_CONDITION = \ '1 = 0';
1333
1334 sub _resolve_condition {
1335   my ($self, $cond, $as, $for) = @_;
1336   if (ref $cond eq 'HASH') {
1337     my %ret;
1338     foreach my $k (keys %{$cond}) {
1339       my $v = $cond->{$k};
1340       # XXX should probably check these are valid columns
1341       $k =~ s/^foreign\.// ||
1342         $self->throw_exception("Invalid rel cond key ${k}");
1343       $v =~ s/^self\.// ||
1344         $self->throw_exception("Invalid rel cond val ${v}");
1345       if (ref $for) { # Object
1346         #warn "$self $k $for $v";
1347         unless ($for->has_column_loaded($v)) {
1348           if ($for->in_storage) {
1349             $self->throw_exception(sprintf
1350               "Unable to resolve relationship '%s' from object %s: column '%s' not "
1351             . 'loaded from storage (or not passed to new() prior to insert()). You '
1352             . 'probably need to call ->discard_changes to get the server-side defaults '
1353             . 'from the database.',
1354               $as,
1355               $for,
1356               $v,
1357             );
1358           }
1359           return $UNRESOLVABLE_CONDITION;
1360         }
1361         $ret{$k} = $for->get_column($v);
1362         #$ret{$k} = $for->get_column($v) if $for->has_column_loaded($v);
1363         #warn %ret;
1364       } elsif (!defined $for) { # undef, i.e. "no object"
1365         $ret{$k} = undef;
1366       } elsif (ref $as eq 'HASH') { # reverse hashref
1367         $ret{$v} = $as->{$k};
1368       } elsif (ref $as) { # reverse object
1369         $ret{$v} = $as->get_column($k);
1370       } elsif (!defined $as) { # undef, i.e. "no reverse object"
1371         $ret{$v} = undef;
1372       } else {
1373         $ret{"${as}.${k}"} = "${for}.${v}";
1374       }
1375     }
1376     return \%ret;
1377   } elsif (ref $cond eq 'ARRAY') {
1378     return [ map { $self->_resolve_condition($_, $as, $for) } @$cond ];
1379   } else {
1380    die("Can't handle condition $cond yet :(");
1381   }
1382 }
1383
1384
1385 # Accepts one or more relationships for the current source and returns an
1386 # array of column names for each of those relationships. Column names are
1387 # prefixed relative to the current source, in accordance with where they appear
1388 # in the supplied relationships.
1389
1390 sub _resolve_prefetch {
1391   my ($self, $pre, $alias, $alias_map, $order, $pref_path) = @_;
1392   $pref_path ||= [];
1393
1394   if (not defined $pre) {
1395     return ();
1396   }
1397   elsif( ref $pre eq 'ARRAY' ) {
1398     return
1399       map { $self->_resolve_prefetch( $_, $alias, $alias_map, $order, [ @$pref_path ] ) }
1400         @$pre;
1401   }
1402   elsif( ref $pre eq 'HASH' ) {
1403     my @ret =
1404     map {
1405       $self->_resolve_prefetch($_, $alias, $alias_map, $order, [ @$pref_path ] ),
1406       $self->related_source($_)->_resolve_prefetch(
1407                $pre->{$_}, "${alias}.$_", $alias_map, $order, [ @$pref_path, $_] )
1408     } keys %$pre;
1409     return @ret;
1410   }
1411   elsif( ref $pre ) {
1412     $self->throw_exception(
1413       "don't know how to resolve prefetch reftype ".ref($pre));
1414   }
1415   else {
1416     my $p = $alias_map;
1417     $p = $p->{$_} for (@$pref_path, $pre);
1418
1419     $self->throw_exception (
1420       "Unable to resolve prefetch '$pre' - join alias map does not contain an entry for path: "
1421       . join (' -> ', @$pref_path, $pre)
1422     ) if (ref $p->{-join_aliases} ne 'ARRAY' or not @{$p->{-join_aliases}} );
1423
1424     my $as = shift @{$p->{-join_aliases}};
1425
1426     my $rel_info = $self->relationship_info( $pre );
1427     $self->throw_exception( $self->name . " has no such relationship '$pre'" )
1428       unless $rel_info;
1429     my $as_prefix = ($alias =~ /^.*?\.(.+)$/ ? $1.'.' : '');
1430     my $rel_source = $self->related_source($pre);
1431
1432     if ($rel_info->{attrs}{accessor} && $rel_info->{attrs}{accessor} eq 'multi') {
1433       $self->throw_exception(
1434         "Can't prefetch has_many ${pre} (join cond too complex)")
1435         unless ref($rel_info->{cond}) eq 'HASH';
1436       my $dots = @{[$as_prefix =~ m/\./g]} + 1; # +1 to match the ".${as_prefix}"
1437
1438       #my @col = map { (/^self\.(.+)$/ ? ("${as_prefix}.$1") : ()); }
1439       #              values %{$rel_info->{cond}};
1440       my @key = map { (/^foreign\.(.+)$/ ? ($1) : ()); }
1441                     keys %{$rel_info->{cond}};
1442       my @ord = (ref($rel_info->{attrs}{order_by}) eq 'ARRAY'
1443                    ? @{$rel_info->{attrs}{order_by}}
1444
1445                 : (defined $rel_info->{attrs}{order_by}
1446                        ? ($rel_info->{attrs}{order_by})
1447                        : ()
1448       ));
1449       push(@$order, map { "${as}.$_" } (@key, @ord));
1450     }
1451
1452     return map { [ "${as}.$_", "${as_prefix}${pre}.$_", ] }
1453       $rel_source->columns;
1454   }
1455 }
1456
1457 # Takes a selection list and generates a collapse-map representing
1458 # row-object fold-points. Every relationship is assigned a set of unique,
1459 # non-nullable columns (which may *not even be* from the same resultset)
1460 # and the collapser will use this information to correctly distinguish
1461 # data of individual to-be-row-objects.
1462 sub _resolve_collapse {
1463   my ($self, $as, $as_fq_idx, $rel_chain, $parent_info) = @_;
1464
1465   # for comprehensible error messages put ourselves at the head of the relationship chain
1466   $rel_chain ||= [ $self->source_name ];
1467
1468   # record top-level fully-qualified column index
1469   $as_fq_idx ||= { %$as };
1470
1471   my ($my_cols, $rel_cols);
1472   for (keys %$as) {
1473     if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
1474       $rel_cols->{$1}{$2} = 1;
1475     }
1476     else {
1477       $my_cols->{$_} = {};  # important for ||= below
1478     }
1479   }
1480
1481   my $relinfo;
1482   # run through relationships, collect metadata, inject non-left fk-bridges from
1483   # *INNER-JOINED* children (if any)
1484   for my $rel (keys %$rel_cols) {
1485     my $rel_src = $self->related_source ($rel);
1486     my $inf = $self->relationship_info ($rel);
1487
1488     $relinfo->{$rel}{is_single} = $inf->{attrs}{accessor} && $inf->{attrs}{accessor} ne 'multi';
1489     $relinfo->{$rel}{is_inner} = ( $inf->{attrs}{join_type} || '' ) !~ /^left/i;
1490     $relinfo->{$rel}{rsrc} = $rel_src;
1491
1492     my $cond = $inf->{cond};
1493
1494     if (
1495       ref $cond eq 'HASH'
1496         and
1497       keys %$cond
1498         and
1499       ! List::Util::first { $_ !~ /^foreign\./ } (keys %$cond)
1500         and
1501       ! List::Util::first { $_ !~ /^self\./ } (values %$cond)
1502     ) {
1503       for my $f (keys %$cond) {
1504         my $s = $cond->{$f};
1505         $_ =~ s/^ (?: foreign | self ) \.//x for ($f, $s);
1506         $relinfo->{$rel}{fk_map}{$s} = $f;
1507
1508         $my_cols->{$s} ||= { via_fk => "$rel.$f" }  # need to know source from *our* pov
1509           if ($relinfo->{$rel}{is_inner} && defined $rel_cols->{$rel}{$f});  # only if it is inner and in fact selected of course
1510       }
1511     }
1512   }
1513
1514   # if the parent is already defined, assume all of its related FKs are selected
1515   # (even if they in fact are NOT in the select list). Keep a record of what we
1516   # assumed, and if any such phantom-column becomes part of our own collapser,
1517   # throw everything assumed-from-parent away and replace with the collapser of
1518   # the parent (whatever it may be)
1519   my $assumed_from_parent;
1520   unless ($parent_info->{underdefined}) {
1521     $assumed_from_parent->{columns} = { map
1522       # only add to the list if we do not already select said columns
1523       { ! exists $my_cols->{$_} ? ( $_ => 1 ) : () }
1524       values %{$parent_info->{rel_condition} || {}}
1525     };
1526
1527     $my_cols->{$_} = { via_collapse => $parent_info->{collapse_on} }
1528       for keys %{$assumed_from_parent->{columns}};
1529   }
1530
1531   # get colinfo for everything
1532   if ($my_cols) {
1533     $my_cols->{$_}{colinfo} = (
1534       $self->has_column ($_) ? $self->column_info ($_) : undef
1535     ) for keys %$my_cols;
1536   }
1537
1538   my $collapse_map;
1539
1540   # try to resolve based on our columns (plus already inserted FK bridges)
1541   if (
1542     $my_cols
1543       and
1544     my $uset = $self->_unique_column_set ($my_cols)
1545   ) {
1546     # see if the resulting collapser relies on any implied columns,
1547     # and fix stuff up if this is the case
1548
1549     my $parent_collapser_used;
1550
1551     if (List::Util::first
1552         { exists $assumed_from_parent->{columns}{$_} }
1553         keys %$uset
1554     ) {
1555       # remove implied stuff from the uset, we will inject the equivalent collapser a bit below
1556       delete @{$uset}{keys %{$assumed_from_parent->{columns}}};
1557       $parent_collapser_used = 1;
1558     }
1559
1560     $collapse_map->{-collapse_on} = {
1561       %{ $parent_collapser_used ? $parent_info->{collapse_on} : {} },
1562       (map
1563         {
1564           my $fqc = join ('.',
1565             @{$rel_chain}[1 .. $#$rel_chain],
1566             ( $my_cols->{$_}{via_fk} || $_ ),
1567           );
1568
1569           $fqc => $as_fq_idx->{$fqc};
1570         }
1571         keys %$uset
1572       ),
1573     };
1574   }
1575
1576   # don't know how to collapse - keep descending down 1:1 chains - if
1577   # a related non-LEFT 1:1 is resolvable - its condition will collapse us
1578   # too
1579   unless ($collapse_map->{-collapse_on}) {
1580     my @candidates;
1581
1582     for my $rel (keys %$relinfo) {
1583       next unless ($relinfo->{$rel}{is_single} && $relinfo->{$rel}{is_inner});
1584
1585       if ( my $rel_collapse = $relinfo->{$rel}{rsrc}->_resolve_collapse (
1586         $rel_cols->{$rel},
1587         $as_fq_idx,
1588         [ @$rel_chain, $rel ],
1589         { underdefined => 1 }
1590       )) {
1591         push @candidates, $rel_collapse->{-collapse_on};
1592       }
1593     }
1594
1595     # get the set with least amount of columns
1596     # FIXME - maybe need to implement a data type order as well (i.e. prefer several ints
1597     # to a single varchar)
1598     if (@candidates) {
1599       ($collapse_map->{-collapse_on}) = sort { keys %$a <=> keys %$b } (@candidates);
1600     }
1601   }
1602
1603   # Still dont know how to collapse - see if the parent passed us anything
1604   # (i.e. reuse collapser over 1:1)
1605   unless ($collapse_map->{-collapse_on}) {
1606     $collapse_map->{-collapse_on} = $parent_info->{collapse_on} 
1607       if $parent_info->{collapser_reusable};
1608   }
1609
1610
1611   # stop descending into children if we were called by a parent for first-pass
1612   # and don't despair if nothing was found (there may be other parallel branches
1613   # to dive into)
1614   if ($parent_info->{underdefined}) {
1615     return $collapse_map->{-collapse_on} ? $collapse_map : undef
1616   }
1617   # nothing down the chain resolved - can't calculate a collapse-map
1618   elsif (! $collapse_map->{-collapse_on}) {
1619     $self->throw_exception ( sprintf
1620       "Unable to calculate a definitive collapse column set for %s%s: fetch more unique non-nullable columns",
1621       $self->source_name,
1622       @$rel_chain > 1
1623         ? sprintf (' (last member of the %s chain)', join ' -> ', @$rel_chain )
1624         : ''
1625       ,
1626     );
1627   }
1628
1629
1630   # If we got that far - we are collapsable - GREAT! Now go down all children
1631   # a second time, and fill in the rest
1632
1633   for my $rel (keys %$relinfo) {
1634
1635     $collapse_map->{$rel} = $relinfo->{$rel}{rsrc}->_resolve_collapse (
1636       { map { $_ => 1 } ( keys %{$rel_cols->{$rel}} ) },
1637
1638       $as_fq_idx,
1639
1640       [ @$rel_chain, $rel],
1641
1642       {
1643         collapse_on => { %{$collapse_map->{-collapse_on}} },
1644
1645         rel_condition => $relinfo->{$rel}{fk_map},
1646
1647         # if this is a 1:1 our own collapser can be used as a collapse-map
1648         # (regardless of left or not)
1649         collapser_reusable =>  $relinfo->{$rel}{is_single},
1650       },
1651     );
1652   }
1653
1654   return $collapse_map;
1655 }
1656
1657 sub _unique_column_set {
1658   my ($self, $cols) = @_;
1659
1660   my %unique = $self->unique_constraints;
1661
1662   # always prefer the PK first, and then shortest constraints first
1663   USET:
1664   for my $set (delete $unique{primary}, sort { @$a <=> @$b } (values %unique) ) {
1665     next unless $set && @$set;
1666
1667     for (@$set) {
1668       next USET unless ($cols->{$_} && $cols->{$_}{colinfo} && !$cols->{$_}{colinfo}{is_nullable} );
1669     }
1670
1671     return { map { $_ => 1 } @$set };
1672   }
1673
1674   return undef;
1675 }
1676
1677 # Takes an arrayref of {as} dbic column aliases and the collapse and select
1678 # attributes from the same $rs (the slector requirement is a temporary 
1679 # workaround), and returns a coderef capable of:
1680 # my $me_pref_clps = $coderef->([$rs->cursor->next])
1681 # Where the $me_pref_clps arrayref is the future argument to
1682 # ::ResultSet::_collapse_result.
1683 #
1684 # $me_pref_clps->[0] is always returned (even if as an empty hash with no
1685 # rowdata), however branches of related data in $me_pref_clps->[1] may be
1686 # pruned short of what was originally requested based on {as}, depending
1687 # on:
1688 #
1689 # * If collapse is requested, a definitive collapse map is calculated for
1690 #   every relationship "fold-point", consisting of a set of values (which
1691 #   may not even be contained in the future 'me' of said relationship
1692 #   (for example a cd.artist_id defines the related inner-joined artist)).
1693 #   Thus a definedness check is carried on all collapse-condition values
1694 #   and if at least one is undef it is assumed that we are dealing with a
1695 #   NULLed right-side of a left-join, so we don't return a related data
1696 #   container at all, which implies no related objects
1697 #
1698 # * If we are not collapsing, there is no constraint on having a selector
1699 #   uniquely identifying all possible objects, and the user might have very
1700 #   well requested a column that just *happens* to be all NULLs. What we do
1701 #   in this case is fallback to the old behavior (which is a potential FIXME)
1702 #   by always returning a data container, but only filling it with columns
1703 #   IFF at least one of them is defined. This way we do not get an object
1704 #   with a bunch of has_column_loaded to undef, but at the same time do not
1705 #   further relationships based off this "null" object (e.g. in case the user
1706 #   deliberately skipped link-table values). I am pretty sure there are some
1707 #   tests that codify this behavior, need to find the exact testname.
1708 #
1709 # For an example of this coderef in action (and to see its guts) look at
1710 # t/prefetch/_internals.t
1711 #
1712 # This is a huge performance win, as we call the same code for
1713 # every row returned from the db, thus avoiding repeated method
1714 # lookups when traversing relationships
1715 #
1716 # Also since the coderef is completely stateless (the returned structure is
1717 # always fresh on every new invocation) this is a very good opportunity for
1718 # memoization if further speed improvements are needed
1719 #
1720 # The way we construct this coderef is somewhat fugly, although I am not
1721 # sure if the string eval is *that* bad of an idea. The alternative is to
1722 # have a *very* large number of anon coderefs calling each other in a twisty
1723 # maze, whereas the current result is a nice, smooth, single-pass function.
1724 # In any case - the output of this thing is meticulously micro-tested, so
1725 # any sort of rewrite should be relatively easy
1726 #
1727 sub _mk_row_parser {
1728   my ($self, $as, $with_collapse, $select) = @_;
1729
1730   my $as_indexed = { map
1731     { $as->[$_] => $_ }
1732     ( 0 .. $#$as )
1733   };
1734
1735   # calculate collapse fold-points if needed
1736   my $collapse_on = do {
1737     # FIXME
1738     # only consider real columns (not functions) during collapse resolution
1739     # this check shouldn't really be here, as fucktards are not supposed to
1740     # alias random crap to existing column names anyway, but still - just in
1741     # case (also saves us from select/as mismatches which need fixing as well...)
1742
1743     my $plain_as = { %$as_indexed };
1744     for (keys %$plain_as) {
1745       delete $plain_as->{$_} if ref $select->[$plain_as->{$_}];
1746     }
1747     $self->_resolve_collapse ($plain_as);
1748
1749   } if $with_collapse;
1750
1751   my $perl = $self->__visit_as ($as_indexed, $collapse_on);
1752   my $cref = eval "sub { $perl }"
1753     or die "Oops! _mk_row_parser generated invalid perl:\n$@\n\n$perl\n";
1754   return $cref;
1755 }
1756
1757 {
1758   my $visit_as_dumper; # keep our own DD object around so we don't have to fitz with quoting
1759
1760   sub __visit_as {
1761     my ($self, $as, $collapse_on, $known_defined) = @_;
1762     $known_defined ||= {};
1763
1764     # prepopulate the known defined map with our own collapse value positions
1765     # the rationale is that if an Artist needs column 0 to be uniquely
1766     # identified, and related CDs need columns 0 and 1, by the time we get to
1767     # CDs we already know that column 0 is defined (otherwise there would be
1768     # no related CDs as there is no Artist in the 1st place). So we use this
1769     # index to cut on repetitive defined() checks.
1770     $known_defined->{$_}++ for ( values %{$collapse_on->{-collapse_on} || {}} );
1771
1772     my $my_cols = {};
1773     my $rel_cols;
1774     for (keys %$as) {
1775       if ($_ =~ /^ ([^\.]+) \. (.+) /x) {
1776         $rel_cols->{$1}{$2} = $as->{$_};
1777       }
1778       else {
1779         $my_cols->{$_} = $as->{$_};
1780       }
1781     }
1782
1783     my @relperl;
1784     for my $rel (sort keys %$rel_cols) {
1785       my $rel_node = $self->__visit_as($rel_cols->{$rel}, $collapse_on->{$rel}, {%$known_defined} );
1786
1787       my @null_checks;
1788       if ($collapse_on->{$rel}{-collapse_on}) {
1789         @null_checks = map
1790           { "(! defined '__VALPOS__${_}__')" }
1791           ( grep
1792             { ! $known_defined->{$_} }
1793             ( sort
1794               { $a <=> $b }
1795               values %{$collapse_on->{$rel}{-collapse_on}}
1796             )
1797           )
1798         ;
1799       }
1800
1801       if (@null_checks) {
1802         push @relperl, sprintf ( '(%s) ? () : ( %s => %s )',
1803           join (' || ', @null_checks ),
1804           $rel,
1805           $rel_node,
1806         );
1807       }
1808       else {
1809         push @relperl, "$rel => $rel_node";
1810       }
1811     }
1812     my $rels = @relperl
1813       ? sprintf ('{ %s }', join (',', @relperl))
1814       : 'undef'
1815     ;
1816
1817     my $me = {
1818       map { $_ => "__VALPOS__$my_cols->{$_}__" } (keys %$my_cols)
1819     };
1820
1821     my $clps = undef; # funny thing, but this prevents a memory leak, I guess it's Data::Dumper#s fault (mo)
1822     $clps = [
1823       map { "__VALPOS__${_}__" } ( sort { $a <=> $b } (values %{$collapse_on->{-collapse_on}}) )
1824     ] if $collapse_on->{-collapse_on};
1825
1826     # we actually will be producing functional perl code here,
1827     # thus no second-guessing of what these globals might have
1828     # been set to. DO NOT CHANGE!
1829     $visit_as_dumper ||= do {
1830       require Data::Dumper;
1831       Data::Dumper->new([])
1832         ->Purity (1)
1833         ->Pad ('')
1834         ->Useqq (0)
1835         ->Terse (1)
1836         ->Quotekeys (1)
1837         ->Deepcopy (1)
1838         ->Deparse (0)
1839         ->Maxdepth (0)
1840         ->Indent (0)
1841     };
1842     for ($me, $clps) {
1843       $_ = $visit_as_dumper->Values ([$_])->Dump;
1844     }
1845
1846     unless ($collapse_on->{-collapse_on}) { # we are not collapsing, insert a definedness check on 'me'
1847       $me = sprintf ( '(%s) ? %s : {}',
1848         join (' || ', map { "( defined '__VALPOS__${_}__')" } (sort { $a <=> $b } values %$my_cols) ),
1849         $me,
1850       );
1851     }
1852
1853     my @rv_list = ($me, $rels, $clps);
1854     pop @rv_list while ($rv_list[-1] eq 'undef'); # strip trailing undefs
1855
1856     # change the quoted placeholders to unquoted alias-references
1857     $_ =~ s/ \' __VALPOS__(\d+)__ \' /sprintf ('$_[0][%d]', $1)/gex
1858       for grep { defined $_ } @rv_list;
1859     return sprintf '[%s]', join (',', @rv_list);
1860   }
1861 }
1862
1863 =head2 related_source
1864
1865 =over 4
1866
1867 =item Arguments: $relname
1868
1869 =item Return value: $source
1870
1871 =back
1872
1873 Returns the result source object for the given relationship.
1874
1875 =cut
1876
1877 sub related_source {
1878   my ($self, $rel) = @_;
1879   if( !$self->has_relationship( $rel ) ) {
1880     $self->throw_exception("No such relationship '$rel' on " . $self->source_name);
1881   }
1882   return $self->schema->source($self->relationship_info($rel)->{source});
1883 }
1884
1885 =head2 related_class
1886
1887 =over 4
1888
1889 =item Arguments: $relname
1890
1891 =item Return value: $classname
1892
1893 =back
1894
1895 Returns the class name for objects in the given relationship.
1896
1897 =cut
1898
1899 sub related_class {
1900   my ($self, $rel) = @_;
1901   if( !$self->has_relationship( $rel ) ) {
1902     $self->throw_exception("No such relationship '$rel'");
1903   }
1904   return $self->schema->class($self->relationship_info($rel)->{source});
1905 }
1906
1907 =head2 handle
1908
1909 Obtain a new handle to this source. Returns an instance of a 
1910 L<DBIx::Class::ResultSourceHandle>.
1911
1912 =cut
1913
1914 sub handle {
1915     return DBIx::Class::ResultSourceHandle->new({
1916         schema         => $_[0]->schema,
1917         source_moniker => $_[0]->source_name
1918     });
1919 }
1920
1921 =head2 throw_exception
1922
1923 See L<DBIx::Class::Schema/"throw_exception">.
1924
1925 =cut
1926
1927 sub throw_exception {
1928   my $self = shift;
1929
1930   if (defined $self->schema) {
1931     $self->schema->throw_exception(@_);
1932   }
1933   else {
1934     DBIx::Class::Exception->throw(@_);
1935   }
1936 }
1937
1938 =head2 source_info
1939
1940 Stores a hashref of per-source metadata.  No specific key names
1941 have yet been standardized, the examples below are purely hypothetical
1942 and don't actually accomplish anything on their own:
1943
1944   __PACKAGE__->source_info({
1945     "_tablespace" => 'fast_disk_array_3',
1946     "_engine" => 'InnoDB',
1947   });
1948
1949 =head2 new
1950
1951   $class->new();
1952
1953   $class->new({attribute_name => value});
1954
1955 Creates a new ResultSource object.  Not normally called directly by end users.
1956
1957 =head2 column_info_from_storage
1958
1959 =over
1960
1961 =item Arguments: 1/0 (default: 0)
1962
1963 =item Return value: 1/0
1964
1965 =back
1966
1967   __PACKAGE__->column_info_from_storage(1);
1968
1969 Enables the on-demand automatic loading of the above column
1970 metadata from storage as necessary.  This is *deprecated*, and
1971 should not be used.  It will be removed before 1.0.
1972
1973
1974 =head1 AUTHORS
1975
1976 Matt S. Trout <mst@shadowcatsystems.co.uk>
1977
1978 =head1 LICENSE
1979
1980 You may distribute this code under the same terms as Perl itself.
1981
1982 =cut
1983
1984 1;