Commit | Line | Data |
4536f655 |
1 | |
2 | |
3 | package Stem::Load::Ticker ; |
4 | |
5 | use strict ; |
6 | |
7 | use Time::HiRes qw( gettimeofday tv_interval ) ; |
8 | |
9 | my $attr_spec = [ |
10 | |
11 | |
12 | { |
13 | 'name' => 'reg_name', |
14 | 'help' => <<HELP, |
15 | Name this Cell was registered with. |
16 | HELP |
17 | }, |
18 | { |
19 | 'name' => 'dbi_addr', |
20 | 'help' => <<HELP, |
21 | Address to send the insert messages |
22 | HELP |
23 | }, |
24 | { |
25 | 'name' => 'max_cnt', |
26 | 'default' => 20, |
27 | 'help' => <<HELP, |
28 | Maximum number of rows to insert |
29 | HELP |
30 | }, |
31 | { |
32 | 'name' => 'parallel_cnt', |
33 | 'default' => 1, |
34 | 'help' => <<HELP, |
35 | Number of inserts to do in parallel |
36 | HELP |
37 | }, |
38 | ] ; |
39 | |
40 | sub new { |
41 | |
42 | my( $class ) = shift ; |
43 | |
44 | my $self = Stem::Class::parse_args( $attr_spec, @_ ) ; |
45 | return $self unless ref $self ; |
46 | |
47 | return $self ; |
48 | } |
49 | |
50 | sub go_cmd { |
51 | |
52 | my( $self, $msg ) = @_ ; |
53 | |
54 | my %go_args ; |
55 | |
56 | if ( my $data = $msg->data() ) { |
57 | |
58 | %go_args = ${$data} =~ /(\S+)=(\S+)/g if $$data ; |
59 | } |
60 | |
61 | $self->{'start_time'} = gettimeofday() ; |
62 | $self->{'go_from_addr'} = $msg->from() ; |
63 | $self->{'go_max_cnt'} = $go_args{'max_cnt'} || $self->{'max_cnt'} ; |
64 | |
65 | $self->{'inserted_cnt'} = 0 ; |
66 | $self->{'send_cnt'} = $self->{'go_max_cnt'} ; |
67 | $self->{'parallel_cnt'} = $go_args{'para_cnt'} if $go_args{'para_cnt'} ; |
68 | |
69 | $self->send_ticker_msgs( $self->{'parallel_cnt'} ) ; |
70 | |
71 | return "Ticker Started\n" ; |
72 | } |
73 | |
74 | sub send_ticker_msgs { |
75 | |
76 | my( $self, $parallel_cnt ) = @_ ; |
77 | |
78 | #print "PARA $parallel_cnt\n" ; |
79 | |
80 | while ( $parallel_cnt-- ) { |
81 | |
82 | $self->insert_ticker_row() ; |
83 | } |
84 | |
85 | return ; |
86 | } |
87 | |
88 | sub insert_ticker_row { |
89 | |
90 | my( $self ) = @_ ; |
91 | |
92 | return if $self->{'send_cnt'} <= 0 ; |
93 | $self->{'send_cnt'}-- ; |
94 | |
95 | my $ticker = join '', map ['A' .. 'Z']->[rand 26], 1 .. 3 ; |
96 | |
97 | my $price = 100 + int rand 9900 ; |
98 | |
99 | my $delta = -1000 + int rand 2000 ; |
100 | |
101 | my $dbi_msg = Stem::Msg->new( |
102 | |
103 | 'to' => $self->{'dbi_addr'}, |
104 | 'from' => $self->{'reg_name'}, |
105 | 'type' => 'cmd', |
106 | 'cmd' => 'execute', |
107 | 'reply_type' => 'insert_done', |
108 | 'data' => { |
109 | statement => 'insert_tick', |
110 | bind => [ $ticker, $price, $delta ], |
111 | }, |
112 | ); |
113 | |
114 | #print $dbi_msg->dump( 'SEND' ) ; |
115 | $dbi_msg->dispatch() ; |
116 | |
117 | return ; |
118 | } |
119 | |
120 | sub insert_done_in { |
121 | |
122 | my( $self, $msg ) = @_ ; |
123 | |
124 | #print $msg->dump( 'DONE' ) ; |
125 | |
126 | if ( $self->{'send_cnt'} ) { |
127 | |
128 | $self->send_ticker_msgs( 1 ) ; |
129 | } |
130 | |
131 | if ( ++$self->{'inserted_cnt'} >= $self->{'go_max_cnt'} ) { |
132 | |
133 | my $data = $msg->data() ; |
134 | |
135 | die "insert_done_in: $$data" unless ref $data eq 'HASH' ; |
136 | |
137 | my $time_delta = sprintf( "%8.4f", |
138 | gettimeofday() - $self->{'start_time'} ) ; |
139 | |
140 | my $rows_per_second = $self->{'inserted_cnt'} / $time_delta ; |
141 | |
142 | my $done_msg = Stem::Msg->new( |
143 | 'to' => $self->{'go_from_addr'}, |
144 | 'from' => $self->{'reg_name'}, |
145 | 'type' => 'response', |
146 | 'data' => <<DATA, |
147 | inserted $self->{'inserted_cnt'} rows in $time_delta seconds |
148 | $rows_per_second rows per second |
149 | with $self->{'parallel_cnt'} inserts in parallel |
150 | last row ID $data->{'insert_id'} |
151 | DATA |
152 | ) ; |
153 | |
154 | $done_msg->dispatch() ; |
155 | |
156 | return ; |
157 | } |
158 | |
159 | |
160 | return ; |
161 | } |
162 | |
163 | 1 ; |