Add unique constraint declaration and new ResultSet method, update_or_create
[dbsrgits/DBIx-Class.git] / lib / DBIx / Class / ResultSet.pm
1 package DBIx::Class::ResultSet;
2
3 use strict;
4 use warnings;
5 use Carp qw/croak/;
6 use overload
7         '0+'     => 'count',
8         'bool'   => sub { 1; },
9         fallback => 1;
10 use Data::Page;
11 use Storable;
12
13 =head1 NAME
14
15 DBIx::Class::ResultSet - Responsible for fetching and creating resultset.
16
17 =head1 SYNOPSIS
18
19   my $rs = MyApp::DB::Class->search(registered => 1);
20   my @rows = MyApp::DB::Class->search(foo => 'bar');
21
22 =head1 DESCRIPTION
23
24 The resultset is also known as an iterator. It is responsible for handling
25 queries that may return an arbitrary number of rows, e.g. via C<search>
26 or a C<has_many> relationship.
27
28 =head1 METHODS
29
30 =head2 new($source, \%$attrs)
31
32 The resultset constructor. Takes a source object (usually a DBIx::Class::Table)
33 and an attribute hash (see below for more information on attributes). Does
34 not perform any queries -- these are executed as needed by the other methods.
35
36 =cut
37
38 sub new {
39   my $class = shift;
40   return $class->new_result(@_) if ref $class;
41   my ($source, $attrs) = @_;
42   #use Data::Dumper; warn Dumper($attrs);
43   $attrs = Storable::dclone($attrs || {}); # { %{ $attrs || {} } };
44   my %seen;
45   my $alias = ($attrs->{alias} ||= 'me');
46   if ($attrs->{cols} || !$attrs->{select}) {
47     delete $attrs->{as} if $attrs->{cols};
48     my @cols = ($attrs->{cols}
49                  ? @{delete $attrs->{cols}}
50                  : $source->columns);
51     $attrs->{select} = [ map { m/\./ ? $_ : "${alias}.$_" } @cols ];
52   }
53   $attrs->{as} ||= [ map { m/^$alias\.(.*)$/ ? $1 : $_ } @{$attrs->{select}} ];
54   #use Data::Dumper; warn Dumper(@{$attrs}{qw/select as/});
55   $attrs->{from} ||= [ { $alias => $source->from } ];
56   if (my $join = delete $attrs->{join}) {
57     foreach my $j (ref $join eq 'ARRAY'
58               ? (@{$join}) : ($join)) {
59       if (ref $j eq 'HASH') {
60         $seen{$_} = 1 foreach keys %$j;
61       } else {
62         $seen{$j} = 1;
63       }
64     }
65     push(@{$attrs->{from}}, $source->resolve_join($join, $attrs->{alias}));
66   }
67   $attrs->{group_by} ||= $attrs->{select} if delete $attrs->{distinct};
68   foreach my $pre (@{delete $attrs->{prefetch} || []}) {
69     push(@{$attrs->{from}}, $source->resolve_join($pre, $attrs->{alias}))
70       unless $seen{$pre};
71     my @pre = 
72       map { "$pre.$_" }
73       $source->related_source($pre)->columns;
74     push(@{$attrs->{select}}, @pre);
75     push(@{$attrs->{as}}, @pre);
76   }
77   if ($attrs->{page}) {
78     $attrs->{rows} ||= 10;
79     $attrs->{offset} ||= 0;
80     $attrs->{offset} += ($attrs->{rows} * ($attrs->{page} - 1));
81   }
82   my $new = {
83     source => $source,
84     cond => $attrs->{where},
85     from => $attrs->{from},
86     count => undef,
87     page => delete $attrs->{page},
88     pager => undef,
89     attrs => $attrs };
90   bless ($new, $class);
91   return $new;
92 }
93
94 =head2 search
95
96   my @obj    = $rs->search({ foo => 3 }); # "... WHERE foo = 3"
97   my $new_rs = $rs->search({ foo => 3 });
98
99 If you need to pass in additional attributes but no additional condition,
100 call it as ->search(undef, \%attrs);
101
102   my @all = $class->search({}, { cols => [qw/foo bar/] }); # "SELECT foo, bar FROM $class_table"
103
104 =cut
105
106 sub search {
107   my $self = shift;
108
109   #use Data::Dumper;warn Dumper(@_);
110
111   my $attrs = { %{$self->{attrs}} };
112   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
113     $attrs = { %$attrs, %{ pop(@_) } };
114   }
115
116   my $where = (@_ ? ((@_ == 1 || ref $_[0] eq "HASH") ? shift : {@_}) : undef());
117   if (defined $where) {
118     $where = (defined $attrs->{where}
119                 ? { '-and' =>
120                     [ map { ref $_ eq 'ARRAY' ? [ -or => $_ ] : $_ }
121                         $where, $attrs->{where} ] }
122                 : $where);
123     $attrs->{where} = $where;
124   }
125
126   my $rs = (ref $self)->new($self->{source}, $attrs);
127
128   return (wantarray ? $rs->all : $rs);
129 }
130
131 =head2 search_literal
132
133   my @obj    = $rs->search_literal($literal_where_cond, @bind);
134   my $new_rs = $rs->search_literal($literal_where_cond, @bind);
135
136 Pass a literal chunk of SQL to be added to the conditional part of the
137 resultset.
138
139 =cut
140                                                          
141 sub search_literal {
142   my ($self, $cond, @vals) = @_;
143   my $attrs = (ref $vals[$#vals] eq 'HASH' ? { %{ pop(@vals) } } : {});
144   $attrs->{bind} = [ @{$self->{attrs}{bind}||[]}, @vals ];
145   return $self->search(\$cond, $attrs);
146 }
147
148 =head2 find(@colvalues), find(\%cols, \%attrs?)
149
150 Finds a row based on its primary key or unique constraint. For example:
151
152   # In your table class
153   package MyApp::Schema::CD;
154
155   __PACKAGE__->table('cd');
156   __PACKAGE__->add_columns(qw/cdid artist title year/);
157   __PACKAGE__->set_primary_key('cdid');
158   __PACKAGE__->add_unique_constraint(artist_title => [ qw/artist title/ ]);
159
160   1;
161
162   # In your application
163   my $cd = $schema->resultset('CD')->find(5);
164
165 Also takes an optional C<key> attribute, to search by a specific key or unique
166 constraint. For example:
167
168   my $cd = $schema->resultset('CD')->find_or_create(
169     {
170       artist => 'Massive Attack',
171       title  => 'Mezzanine',
172     },
173     { key => 'artist_title' }
174   );
175
176 =cut
177
178 sub find {
179   my ($self, @vals) = @_;
180   my $attrs = (@vals > 1 && ref $vals[$#vals] eq 'HASH' ? pop(@vals) : {});
181
182   my @cols = $self->{source}->primary_columns;
183   if (exists $attrs->{key}) {
184     my %uniq = $self->{source}->unique_constraints;
185     $self->( "Unknown key " . $attrs->{key} . " on " . $self->name )
186       unless exists $uniq{$attrs->{key}};
187     @cols = @{ $uniq{$attrs->{key}} };
188   }
189   #use Data::Dumper; warn Dumper($attrs, @vals, @cols);
190   $self->{source}->result_class->throw( "Can't find unless a primary key or unique constraint is defined" )
191     unless @cols;
192
193   my $query;
194   if (ref $vals[0] eq 'HASH') {
195     $query = $vals[0];
196   } elsif (@cols == @vals) {
197     $query = {};
198     @{$query}{@cols} = @vals;
199   } else {
200     $query = {@vals};
201   }
202   #warn Dumper($query);
203   # Useless -> disabled
204   #$self->{source}->result_class->throw( "Can't find unless all primary keys are specified" )
205   #  unless (keys %$query >= @pk); # If we check 'em we run afoul of uc/lc
206                                   # column names etc. Not sure what to do yet
207   return $self->search($query)->next;
208 }
209
210 =head2 search_related
211
212   $rs->search_related('relname', $cond?, $attrs?);
213
214 =cut
215
216 sub search_related {
217   my ($self, $rel, @rest) = @_;
218   my $rel_obj = $self->{source}->relationship_info($rel);
219   $self->{source}->result_class->throw(
220     "No such relationship ${rel} in search_related")
221       unless $rel_obj;
222   my $rs = $self->search(undef, { join => $rel });
223   return $self->{source}->schema->resultset($rel_obj->{class}
224            )->search( undef,
225              { %{$rs->{attrs}},
226                alias => $rel,
227                select => undef(),
228                as => undef() }
229            )->search(@rest);
230 }
231
232 =head2 cursor
233
234 Returns a storage-driven cursor to the given resultset.
235
236 =cut
237
238 sub cursor {
239   my ($self) = @_;
240   my ($source, $attrs) = @{$self}{qw/source attrs/};
241   $attrs = { %$attrs };
242   return $self->{cursor}
243     ||= $source->storage->select($self->{from}, $attrs->{select},
244           $attrs->{where},$attrs);
245 }
246
247 =head2 search_like
248
249 Identical to search except defaults to 'LIKE' instead of '=' in condition
250
251 =cut
252
253 sub search_like {
254   my $class    = shift;
255   my $attrs = { };
256   if (@_ > 1 && ref $_[$#_] eq 'HASH') {
257     $attrs = pop(@_);
258   }
259   my $query    = ref $_[0] eq "HASH" ? { %{shift()} }: {@_};
260   $query->{$_} = { 'like' => $query->{$_} } for keys %$query;
261   return $class->search($query, { %$attrs });
262 }
263
264 =head2 slice($first, $last)
265
266 Returns a subset of elements from the resultset.
267
268 =cut
269
270 sub slice {
271   my ($self, $min, $max) = @_;
272   my $attrs = { %{ $self->{attrs} || {} } };
273   $attrs->{offset} ||= 0;
274   $attrs->{offset} += $min;
275   $attrs->{rows} = ($max ? ($max - $min + 1) : 1);
276   my $slice = (ref $self)->new($self->{source}, $attrs);
277   return (wantarray ? $slice->all : $slice);
278 }
279
280 =head2 next
281
282 Returns the next element in the resultset (undef is there is none).
283
284 =cut
285
286 sub next {
287   my ($self) = @_;
288   my @row = $self->cursor->next;
289 #  warn Dumper(\@row); use Data::Dumper;
290   return unless (@row);
291   return $self->_construct_object(@row);
292 }
293
294 sub _construct_object {
295   my ($self, @row) = @_;
296   my @cols = @{ $self->{attrs}{as} };
297   #warn "@cols -> @row";
298   my (%me, %pre);
299   foreach my $col (@cols) {
300     if ($col =~ /([^\.]+)\.([^\.]+)/) {
301       $pre{$1}[0]{$2} = shift @row;
302     } else {
303       $me{$col} = shift @row;
304     }
305   }
306   my $new = $self->{source}->result_class->inflate_result(
307               $self->{source}, \%me, \%pre);
308   $new = $self->{attrs}{record_filter}->($new)
309     if exists $self->{attrs}{record_filter};
310   return $new;
311 }
312
313 =head2 count
314
315 Performs an SQL C<COUNT> with the same query as the resultset was built
316 with to find the number of elements. If passed arguments, does a search
317 on the resultset and counts the results of that.
318
319 =cut
320
321 sub count {
322   my $self = shift;
323   return $self->search(@_)->count if @_ && defined $_[0];
324   croak "Unable to ->count with a GROUP BY" if defined $self->{attrs}{group_by};
325   unless (defined $self->{count}) {
326     my $attrs = { %{ $self->{attrs} },
327                   select => { 'count' => '*' },
328                   as => [ 'count' ] };
329     # offset, order by and page are not needed to count. record_filter is cdbi
330     delete $attrs->{$_} for qw/rows offset order_by page pager record_filter/;
331         
332     ($self->{count}) = (ref $self)->new($self->{source}, $attrs)->cursor->next;
333   }
334   return 0 unless $self->{count};
335   my $count = $self->{count};
336   $count -= $self->{attrs}{offset} if $self->{attrs}{offset};
337   $count = $self->{attrs}{rows} if
338     ($self->{attrs}{rows} && $self->{attrs}{rows} < $count);
339   return $count;
340 }
341
342 =head2 count_literal
343
344 Calls search_literal with the passed arguments, then count.
345
346 =cut
347
348 sub count_literal { shift->search_literal(@_)->count; }
349
350 =head2 all
351
352 Returns all elements in the resultset. Called implictly if the resultset
353 is returned in list context.
354
355 =cut
356
357 sub all {
358   my ($self) = @_;
359   return map { $self->_construct_object(@$_); }
360            $self->cursor->all;
361 }
362
363 =head2 reset
364
365 Resets the resultset's cursor, so you can iterate through the elements again.
366
367 =cut
368
369 sub reset {
370   my ($self) = @_;
371   $self->cursor->reset;
372   return $self;
373 }
374
375 =head2 first
376
377 Resets the resultset and returns the first element.
378
379 =cut
380
381 sub first {
382   return $_[0]->reset->next;
383 }
384
385 =head2 update(\%values)
386
387 Sets the specified columns in the resultset to the supplied values
388
389 =cut
390
391 sub update {
392   my ($self, $values) = @_;
393   croak "Values for update must be a hash" unless ref $values eq 'HASH';
394   return $self->{source}->storage->update(
395            $self->{source}->from, $values, $self->{cond});
396 }
397
398 =head2 update_all(\%values)
399
400 Fetches all objects and updates them one at a time. ->update_all will run
401 cascade triggers, ->update will not.
402
403 =cut
404
405 sub update_all {
406   my ($self, $values) = @_;
407   croak "Values for update must be a hash" unless ref $values eq 'HASH';
408   foreach my $obj ($self->all) {
409     $obj->set_columns($values)->update;
410   }
411   return 1;
412 }
413
414 =head2 delete
415
416 Deletes the contents of the resultset from its result source.
417
418 =cut
419
420 sub delete {
421   my ($self) = @_;
422   $self->{source}->storage->delete($self->{source}->from, $self->{cond});
423   return 1;
424 }
425
426 =head2 delete_all
427
428 Fetches all objects and deletes them one at a time. ->delete_all will run
429 cascade triggers, ->delete will not.
430
431 =cut
432
433 sub delete_all {
434   my ($self) = @_;
435   $_->delete for $self->all;
436   return 1;
437 }
438
439 =head2 pager
440
441 Returns a L<Data::Page> object for the current resultset. Only makes
442 sense for queries with page turned on.
443
444 =cut
445
446 sub pager {
447   my ($self) = @_;
448   my $attrs = $self->{attrs};
449   croak "Can't create pager for non-paged rs" unless $self->{page};
450   $attrs->{rows} ||= 10;
451   $self->count;
452   return $self->{pager} ||= Data::Page->new(
453     $self->{count}, $attrs->{rows}, $self->{page});
454 }
455
456 =head2 page($page_num)
457
458 Returns a new resultset for the specified page.
459
460 =cut
461
462 sub page {
463   my ($self, $page) = @_;
464   my $attrs = { %{$self->{attrs}} };
465   $attrs->{page} = $page;
466   return (ref $self)->new($self->{source}, $attrs);
467 }
468
469 =head2 new_result(\%vals)
470
471 Creates a result in the resultset's result class.
472
473 =cut
474
475 sub new_result {
476   my ($self, $values) = @_;
477   $self->{source}->result_class->throw( "new_result needs a hash" )
478     unless (ref $values eq 'HASH');
479   $self->{source}->result_class->throw( "Can't abstract implicit construct, condition not a hash" )
480     if ($self->{cond} && !(ref $self->{cond} eq 'HASH'));
481   my %new = %$values;
482   my $alias = $self->{attrs}{alias};
483   foreach my $key (keys %{$self->{cond}||{}}) {
484     $new{$1} = $self->{cond}{$key} if ($key =~ m/^(?:$alias\.)?([^\.]+)$/);
485   }
486   my $obj = $self->{source}->result_class->new(\%new);
487   $obj->result_source($self->{source}) if $obj->can('result_source');
488   $obj;
489 }
490
491 =head2 create(\%vals)
492
493 Inserts a record into the resultset and returns the object.
494
495 Effectively a shortcut for ->new_result(\%vals)->insert
496
497 =cut
498
499 sub create {
500   my ($self, $attrs) = @_;
501   $self->{source}->result_class->throw( "create needs a hashref" ) unless ref $attrs eq 'HASH';
502   return $self->new_result($attrs)->insert;
503 }
504
505 =head2 find_or_create(\%vals, \%attrs?)
506
507   $class->find_or_create({ key => $val, ... });
508
509 Searches for a record matching the search condition; if it doesn't find one,    
510 creates one and returns that instead.                                           
511
512   # In your table class
513   package MyApp::Schema::CD;
514
515   __PACKAGE__->table('cd');
516   __PACKAGE__->add_columns(qw/cdid artist title year/);
517   __PACKAGE__->set_primary_key('cdid');
518   __PACKAGE__->add_unique_constraint(artist_title => [ qw/artist title/ ]);
519
520   1;
521
522   # In your application
523   my $cd = $schema->resultset('CD')->find_or_create({
524     cdid   => 5,
525     artist => 'Massive Attack',
526     title  => 'Mezzanine',
527     year   => 2005,
528   });
529
530 Also takes an optional C<key> attribute, to search by a specific key or unique
531 constraint. For example:
532
533   my $cd = $schema->resultset('CD')->find_or_create(
534     {
535       artist => 'Massive Attack',
536       title  => 'Mezzanine',
537     },
538     { key => 'artist_title' }
539   );
540
541 See also L</find> and L</update_or_create>.
542
543 =cut
544
545 sub find_or_create {
546   my $self     = shift;
547   my $attrs    = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
548   my $hash     = ref $_[0] eq "HASH" ? shift : {@_};
549   my $exists   = $self->find($hash, $attrs);
550   return defined($exists) ? $exists : $self->create($hash);
551 }
552
553 =head2 update_or_create
554
555   $class->update_or_create({ key => $val, ... });
556
557 First, search for an existing row matching one of the unique constraints
558 (including the primary key) on the source of this resultset.  If a row is
559 found, update it with the other given column values.  Otherwise, create a new
560 row.
561
562 Takes an optional C<key> attribute to search on a specific unique constraint.
563 For example:
564
565   # In your application
566   my $cd = $schema->resultset('CD')->update_or_create(
567     {
568       artist => 'Massive Attack',
569       title  => 'Mezzanine',
570       year   => 1998,
571     },
572     { key => 'artist_title' }
573   );
574
575 If no C<key> is specified, it searches on all unique constraints defined on the
576 source, including the primary key.
577
578 If the C<key> is specified as C<primary>, search only on the primary key.
579
580 =cut
581
582 sub update_or_create {
583   my $self = shift;
584
585   my $attrs = (@_ > 1 && ref $_[$#_] eq 'HASH' ? pop(@_) : {});
586   my $hash  = ref $_[0] eq "HASH" ? shift : {@_};
587
588   my %unique_constraints = $self->{source}->unique_constraints;
589   my @constraint_names   = (exists $attrs->{key}
590                             ? ($attrs->{key})
591                             : keys %unique_constraints);
592
593   my @unique_hashes;
594   foreach my $name (@constraint_names) {
595     my @unique_cols = @{ $unique_constraints{$name} };
596     my %unique_hash =
597       map  { $_ => $hash->{$_} }
598       grep { exists $hash->{$_} }
599       @unique_cols;
600
601     push @unique_hashes, \%unique_hash
602       if (scalar keys %unique_hash == scalar @unique_cols);
603   }
604
605   my $row;
606   if (@unique_hashes) {
607     $row = $self->search(\@unique_hashes, { rows => 1 })->first;
608     if ($row) {
609       $row->set_columns($hash);
610       $row->update;
611     }
612   }
613
614   unless ($row) {
615     $row = $self->create($hash);
616   }
617
618   return $row;
619 }
620
621 =head1 ATTRIBUTES
622
623 The resultset takes various attributes that modify its behavior.
624 Here's an overview of them:
625
626 =head2 order_by
627
628 Which column(s) to order the results by. This is currently passed
629 through directly to SQL, so you can give e.g. C<foo DESC> for a 
630 descending order.
631
632 =head2 cols (arrayref)
633
634 Shortcut to request a particular set of columns to be retrieved - adds
635 'me.' onto the start of any column without a '.' in it and sets 'select'
636 from that, then auto-populates 'as' from 'select' as normal
637
638 =head2 select (arrayref)
639
640 Indicates which columns should be selected from the storage
641
642 =head2 as (arrayref)
643
644 Indicates column names for object inflation
645
646 =head2 join
647
648 Contains a list of relationships that should be joined for this query. Can also 
649 contain a hash reference to refer to that relation's relations. So, if one column
650 in your class C<belongs_to> foo and another C<belongs_to> bar, you can do
651 C<< join => [qw/ foo bar /] >> to join both (and e.g. use them for C<order_by>).
652 If a foo contains many margles and you want to join those too, you can do
653 C<< join => { foo => 'margle' } >>. If you want to fetch the columns from the
654 related table as well, see C<prefetch> below.
655
656 =head2 prefetch
657
658 Contains a list of relationships that should be fetched along with the main 
659 query (when they are accessed afterwards they will have already been
660 "prefetched"). This is useful for when you know you will need the related
661 object(s), because it saves a query. Currently limited to prefetching
662 one relationship deep, so unlike C<join>, prefetch must be an arrayref.
663
664 =head2 from 
665
666 This attribute can contain a arrayref of elements. Each element can be another
667 arrayref, to nest joins, or it can be a hash which represents the two sides
668 of the join. 
669
670 NOTE: Use this on your own risk. This allows you to shoot your foot off!
671
672 =head2 page
673
674 For a paged resultset, specifies which page to retrieve. Leave unset
675 for an unpaged resultset.
676
677 =head2 rows
678
679 For a paged resultset, how many rows per page
680
681 =head2 group_by (listref)
682
683 A listref of columns to group by (note that 'count' doesn't work on grouped
684 resultsets)
685
686   group_by => [qw/ column1 column2 ... /]
687
688 =head2 distinct
689
690 Set to 1 to group by all columns
691
692 =cut
693
694 1;