3 package Stem::Load::Ticker ;
7 use Time::HiRes qw( gettimeofday tv_interval ) ;
15 Name this Cell was registered with.
21 Address to send the insert messages
28 Maximum number of rows to insert
32 'name' => 'parallel_cnt',
35 Number of inserts to do in parallel
42 my( $class ) = shift ;
44 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
45 return $self unless ref $self ;
52 my( $self, $msg ) = @_ ;
56 if ( my $data = $msg->data() ) {
58 %go_args = ${$data} =~ /(\S+)=(\S+)/g if $$data ;
61 $self->{'start_time'} = gettimeofday() ;
62 $self->{'go_from_addr'} = $msg->from() ;
63 $self->{'go_max_cnt'} = $go_args{'max_cnt'} || $self->{'max_cnt'} ;
65 $self->{'inserted_cnt'} = 0 ;
66 $self->{'send_cnt'} = $self->{'go_max_cnt'} ;
67 $self->{'parallel_cnt'} = $go_args{'para_cnt'} if $go_args{'para_cnt'} ;
69 $self->send_ticker_msgs( $self->{'parallel_cnt'} ) ;
71 return "Ticker Started\n" ;
74 sub send_ticker_msgs {
76 my( $self, $parallel_cnt ) = @_ ;
78 #print "PARA $parallel_cnt\n" ;
80 while ( $parallel_cnt-- ) {
82 $self->insert_ticker_row() ;
88 sub insert_ticker_row {
92 return if $self->{'send_cnt'} <= 0 ;
93 $self->{'send_cnt'}-- ;
95 my $ticker = join '', map ['A' .. 'Z']->[rand 26], 1 .. 3 ;
97 my $price = 100 + int rand 9900 ;
99 my $delta = -1000 + int rand 2000 ;
101 my $dbi_msg = Stem::Msg->new(
103 'to' => $self->{'dbi_addr'},
104 'from' => $self->{'reg_name'},
107 'reply_type' => 'insert_done',
109 statement => 'insert_tick',
110 bind => [ $ticker, $price, $delta ],
114 #print $dbi_msg->dump( 'SEND' ) ;
115 $dbi_msg->dispatch() ;
122 my( $self, $msg ) = @_ ;
124 #print $msg->dump( 'DONE' ) ;
126 if ( $self->{'send_cnt'} ) {
128 $self->send_ticker_msgs( 1 ) ;
131 if ( ++$self->{'inserted_cnt'} >= $self->{'go_max_cnt'} ) {
133 my $data = $msg->data() ;
135 die "insert_done_in: $$data" unless ref $data eq 'HASH' ;
137 my $time_delta = sprintf( "%8.4f",
138 gettimeofday() - $self->{'start_time'} ) ;
140 my $rows_per_second = $self->{'inserted_cnt'} / $time_delta ;
142 my $done_msg = Stem::Msg->new(
143 'to' => $self->{'go_from_addr'},
144 'from' => $self->{'reg_name'},
145 'type' => 'response',
147 inserted $self->{'inserted_cnt'} rows in $time_delta seconds
148 $rows_per_second rows per second
149 with $self->{'parallel_cnt'} inserts in parallel
150 last row ID $data->{'insert_id'}
154 $done_msg->dispatch() ;