--- /dev/null
+package DBIx::Data::Collection::Set;
+
+use Moose;
+use Method::Signatures::Simple;
+use Data::Perl::Stream::Array;
+
+has _store => (is => 'ro', required => 1, init_arg => 'store');
+
+has _column_order => (is => 'ro', required => 1, init_arg => 'column_order');
+
+has _class => (is => 'ro', predicate => '_has_class');
+
+has _member_cache => (is => 'rw', lazy_build => 1);
+
+method _build__member_cache {
+ my $stream = $self->_new_raw_stream;
+ my @cache;
+ while (my ($raw) = $stream->next) {
+ push @cache, $self->_inflate($raw);
+ }
+ \@cache;
+}
+
+method _new_raw_stream {
+ $self->_store->new_select_command([])->execute;
+}
+
+method _inflate ($raw) {
+ my @order = @{$self->_column_order};
+ my %final;
+ @final{@order} = @$raw;
+ bless(\%final, $self->_class) if $self->_has_class;
+ \%final;
+}
+
+method flatten {
+ @{$self->_member_cache};
+}
+
+method as_stream {
+ Data::Perl::Stream::Array->new(array => $self->_member_cache);
+}
+
+1;
--- /dev/null
+package DBIx::Data::Store;
+
+use Moose;
+use Method::Signatures::Simple;
+use DBIx::Connector;
+use DBIx::Data::Store::Command::Call;
+use DBIx::Data::Store::Command::Row;
+use DBIx::Data::Store::Command::Stream;
+
+has 'connection' => (is => 'ro', lazy_build => 1); # , isa => 'DBIx::Connector'
+
+has 'connect_info' => (is => 'ro', required => 1);
+
+method connect (@connect_info) { # for DBI heads because I'm kind
+ $self->new(
+ connect_info => \@connect_info
+ );
+}
+
+method _build_connection {
+ DBIx::Connector->new(@{$self->connect_info});
+}
+
+method new_call_command ($sql, $args) {
+ return DBIx::Data::Store::Command::Call->new(
+ run => $sql, with => $args, against => $self->connection
+ );
+}
+
+method new_row_command ($sql, $args) {
+ return DBIx::Data::Store::Command::Row->new(
+ run => $sql, with => $args, against => $self->connection
+ );
+}
+
+method new_stream_command ($sql, $args) {
+ return DBIx::Data::Store::Command::Stream->new(
+ run => $sql, with => $args, against => $self->connection
+ );
+}
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package DBIx::Data::Store::CRUD;
+
+use Moose;
+use Method::Signatures::Simple;
+
+has raw_store => (is => 'ro', required => 1); # DBIx::Data::Store object
+
+foreach my $type (qw(select insert update delete)) {
+ has "${type}_sql" => (is => 'ro', predicate => "has_${type}_sql");
+}
+
+method new_select_command ($args) {
+ die "$self->has_select_sql" unless $self->has_select_sql;
+ $self->raw_store->new_stream_command($self->select_sql, $args);
+}
+
+method new_insert_command ($args) {
+ die "$self->has_insert_sql" unless $self->has_insert_sql;
+ $self->raw_store->new_call_command($self->insert_sql, $args);
+}
+
+method new_update_command ($args) {
+ die "$self->has_update_sql" unless $self->has_update_sql;
+ $self->raw_store->new_call_command($self->update_sql, $args);
+}
+
+method new_delete_command ($args) {
+ die "$self->has_delete_sql" unless $self->has_delete_sql;
+ $self->raw_store->new_call_command($self->delete_sql, $args);
+}
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package DBIx::Data::Store::Command;
+
+use Moose::Role;
+use Method::Signatures::Simple;
+
+has [ 'run', 'with', 'against' ] => (is => 'ro', required => 1);
+
+requires 'execute';
+
+method _new_sth {
+ $self->against->run(sub {
+ $_->prepare_cached($self->run, {}, 3);
+ });
+}
+
+method _execute_sth ($sth) {
+ $sth->execute(@{$self->with});
+ $sth;
+}
+
+method _new_active_sth {
+ $self->_execute_sth($self->_new_sth);
+}
+
+1;
--- /dev/null
+package DBIx::Data::Store::Command::Call;
+
+use Moose;
+use Method::Signatures::Simple;
+
+method execute {
+ $self->_new_active_sth->rows;
+}
+
+with 'DBIx::Data::Store::Command';
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package DBIx::Data::Store::Command::Row;
+
+use Moose;
+use Carp qw(carp);
+use Method::Signatures::Simple;
+
+method execute {
+ my $sth = $self->_new_active_sth;
+ my @row = $sth->fetchrow_array;
+ my @nextrow = $sth->fetchrow_array if @row;
+ if(@row && @nextrow) {
+ carp "Query returned more than one row - did you want a stream command?";
+ }
+ # Need to call finish() to work round broken DBDs
+ $sth->finish();
+ return \@row;
+}
+
+with 'DBIx::Data::Store::Command';
+
+__PACKAGE__->meta->make_immutable;
+
+1;
+
--- /dev/null
+package DBIx::Data::Store::Command::Stream;
+
+use Moose;
+use Method::Signatures::Simple;
+use DBIx::Data::Stream::STH;
+
+method execute {
+ my $sth = $self->_new_active_sth;
+ DBIx::Data::Stream::STH->new(
+ sth => $sth
+ );
+}
+
+with 'DBIx::Data::Store::Command';
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package DBIx::Data::Stream::STH;
+
+use Moose;
+use Method::Signatures::Simple;
+
+has 'sth' => (is => 'ro', required => 1, clearer => '_clear_sth');
+
+method next {
+ my $sth = $self->sth;
+ return unless $sth;
+ # {Active} only means that there *may* be more results to fetch
+ if ($sth->{Active} and my @next = $self->sth->fetchrow_array) {
+ return \@next;
+ }
+ $sth->finish;
+ # prepare_cached might recycle it now we're finished so get rid of it
+ $self->_clear_sth;
+ return;
+}
+
+__PACKAGE__->meta->make_immutable;
+
+1;
--- /dev/null
+package Data::Perl::Stream::Array;
+
+use Moose;
+use Method::Signatures::Simple;
+
+has _array => (is => 'rw', required => 1, init_arg => 'array');
+
+method BUILD { $self->_array([ @{$self->_array} ]) }
+
+use Devel::Dwarn;
+
+method next {
+ my $ary = $self->_array;
+ return unless @$ary;
+ return shift @$ary;
+}
+
+1;
--- /dev/null
+use strict;
+use warnings FATAL => 'all';
+use Test::More;
+use DBIx::Data::Store;
+use DBIx::Data::Store::CRUD;
+use DBIx::Data::Collection::Set;
+
+use DBI;
+
+my $dsn = 'dbi:SQLite:tmp.db';
+
+my @expect;
+
+{
+ unlink('tmp.db');
+ my $dbh = DBI->connect($dsn);
+ $dbh->do(q{
+ CREATE TABLE person (
+ id INTEGER NOT NULL PRIMARY KEY,
+ name VARCHAR(255) NOT NULL
+ )
+ });
+ my $pop = $dbh->prepare(q{INSERT INTO person (name) VALUES (?)});
+ my @names = qw(Joe Jim Bob Pterry);
+ $pop->execute($_) for @names;
+ @expect = do { my $id = 0; map +{ id => ++$id, name => $_ }, @names };
+}
+
+sub make_set {
+ DBIx::Data::Collection::Set->new(
+ store => DBIx::Data::Store::CRUD->new(
+ raw_store => DBIx::Data::Store->connect($dsn),
+ select_sql => q{SELECT id, name FROM person},
+ ),
+ column_order => [ qw(id name) ],
+ );
+}
+
+my $set = make_set;
+
+is_deeply([ $set->flatten ], \@expect, 'Basic data out ok (flatten)');
+
+{
+ my $stream = $set->as_stream;
+
+ my @got; while (my ($next) = $stream->next) { push @got, $next }
+
+ is_deeply(\@got, \@expect, 'Basic data out ok (stream)');
+}
+
+done_testing;