fixed perl event loop
[urisagit/Stem.git] / lib / Stem / Load / Ticker.pm
CommitLineData
4536f655 1
2
3package Stem::Load::Ticker ;
4
5use strict ;
6
7use Time::HiRes qw( gettimeofday tv_interval ) ;
8
9my $attr_spec = [
10
11
12 {
13 'name' => 'reg_name',
14 'help' => <<HELP,
15Name this Cell was registered with.
16HELP
17 },
18 {
19 'name' => 'dbi_addr',
20 'help' => <<HELP,
21Address to send the insert messages
22HELP
23 },
24 {
25 'name' => 'max_cnt',
26 'default' => 20,
27 'help' => <<HELP,
28Maximum number of rows to insert
29HELP
30 },
31 {
32 'name' => 'parallel_cnt',
33 'default' => 1,
34 'help' => <<HELP,
35Number of inserts to do in parallel
36HELP
37 },
38] ;
39
40sub new {
41
42 my( $class ) = shift ;
43
44 my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
45 return $self unless ref $self ;
46
47 return $self ;
48}
49
50sub go_cmd {
51
52 my( $self, $msg ) = @_ ;
53
54 my %go_args ;
55
56 if ( my $data = $msg->data() ) {
57
58 %go_args = ${$data} =~ /(\S+)=(\S+)/g if $$data ;
59 }
60
61 $self->{'start_time'} = gettimeofday() ;
62 $self->{'go_from_addr'} = $msg->from() ;
63 $self->{'go_max_cnt'} = $go_args{'max_cnt'} || $self->{'max_cnt'} ;
64
65 $self->{'inserted_cnt'} = 0 ;
66 $self->{'send_cnt'} = $self->{'go_max_cnt'} ;
67 $self->{'parallel_cnt'} = $go_args{'para_cnt'} if $go_args{'para_cnt'} ;
68
69 $self->send_ticker_msgs( $self->{'parallel_cnt'} ) ;
70
71 return "Ticker Started\n" ;
72}
73
74sub send_ticker_msgs {
75
76 my( $self, $parallel_cnt ) = @_ ;
77
78#print "PARA $parallel_cnt\n" ;
79
80 while ( $parallel_cnt-- ) {
81
82 $self->insert_ticker_row() ;
83 }
84
85 return ;
86}
87
88sub insert_ticker_row {
89
90 my( $self ) = @_ ;
91
92 return if $self->{'send_cnt'} <= 0 ;
93 $self->{'send_cnt'}-- ;
94
95 my $ticker = join '', map ['A' .. 'Z']->[rand 26], 1 .. 3 ;
96
97 my $price = 100 + int rand 9900 ;
98
99 my $delta = -1000 + int rand 2000 ;
100
101 my $dbi_msg = Stem::Msg->new(
102
103 'to' => $self->{'dbi_addr'},
104 'from' => $self->{'reg_name'},
105 'type' => 'cmd',
106 'cmd' => 'execute',
107 'reply_type' => 'insert_done',
108 'data' => {
109 statement => 'insert_tick',
110 bind => [ $ticker, $price, $delta ],
111 },
112 );
113
114#print $dbi_msg->dump( 'SEND' ) ;
115 $dbi_msg->dispatch() ;
116
117 return ;
118}
119
120sub insert_done_in {
121
122 my( $self, $msg ) = @_ ;
123
124#print $msg->dump( 'DONE' ) ;
125
126 if ( $self->{'send_cnt'} ) {
127
128 $self->send_ticker_msgs( 1 ) ;
129 }
130
131 if ( ++$self->{'inserted_cnt'} >= $self->{'go_max_cnt'} ) {
132
133 my $data = $msg->data() ;
134
135 die "insert_done_in: $$data" unless ref $data eq 'HASH' ;
136
137 my $time_delta = sprintf( "%8.4f",
138 gettimeofday() - $self->{'start_time'} ) ;
139
140 my $rows_per_second = $self->{'inserted_cnt'} / $time_delta ;
141
142 my $done_msg = Stem::Msg->new(
143 'to' => $self->{'go_from_addr'},
144 'from' => $self->{'reg_name'},
145 'type' => 'response',
146 'data' => <<DATA,
147inserted $self->{'inserted_cnt'} rows in $time_delta seconds
148$rows_per_second rows per second
149with $self->{'parallel_cnt'} inserts in parallel
150last row ID $data->{'insert_id'}
151DATA
152 ) ;
153
154 $done_msg->dispatch() ;
155
156 return ;
157 }
158
159
160 return ;
161}
162
1631 ;