added some of uri's utility actions for build script
[urisagit/Stem.git] / lib / Stem / Load / Ticker.pm
1
2
3 package Stem::Load::Ticker ;
4
5 use strict ;
6
7 use Time::HiRes qw( gettimeofday tv_interval ) ;
8
9 my $attr_spec = [
10
11
12         {
13                 'name'          => 'reg_name',
14                 'help'          => <<HELP,
15 Name this Cell was registered with.
16 HELP
17         },
18         {
19                 'name'          => 'dbi_addr',
20                 'help'          => <<HELP,
21 Address to send the insert messages
22 HELP
23         },
24         {
25                 'name'          => 'max_cnt',
26                 'default'       => 20,
27                 'help'          => <<HELP,
28 Maximum number of rows to insert
29 HELP
30         },
31         {
32                 'name'          => 'parallel_cnt',
33                 'default'       => 1,
34                 'help'          => <<HELP,
35 Number of inserts to do in parallel
36 HELP
37         },
38 ] ;
39
40 sub new {
41
42         my( $class ) = shift ;
43
44         my $self = Stem::Class::parse_args( $attr_spec, @_ ) ;
45         return $self unless ref $self ;
46
47         return $self ;
48 }
49
50 sub go_cmd {
51
52         my( $self, $msg ) = @_ ;
53
54         my %go_args ;
55
56         if ( my $data = $msg->data() ) {
57
58                 %go_args = ${$data} =~ /(\S+)=(\S+)/g if $$data ;
59         }
60
61         $self->{'start_time'} = gettimeofday() ;
62         $self->{'go_from_addr'} = $msg->from() ;
63         $self->{'go_max_cnt'} = $go_args{'max_cnt'} || $self->{'max_cnt'} ;
64
65         $self->{'inserted_cnt'} = 0 ;
66         $self->{'send_cnt'} = $self->{'go_max_cnt'} ;
67         $self->{'parallel_cnt'} = $go_args{'para_cnt'} if $go_args{'para_cnt'} ;
68
69         $self->send_ticker_msgs( $self->{'parallel_cnt'} ) ;
70
71         return "Ticker Started\n" ;
72 }
73
74 sub send_ticker_msgs {
75
76         my( $self, $parallel_cnt ) = @_ ;
77
78 #print "PARA $parallel_cnt\n" ;
79
80         while ( $parallel_cnt-- ) {
81
82                 $self->insert_ticker_row() ;
83         }
84
85         return ;
86 }
87
88 sub insert_ticker_row {
89
90         my( $self ) = @_ ;
91
92         return if $self->{'send_cnt'} <= 0 ;
93         $self->{'send_cnt'}-- ;
94
95         my $ticker = join '', map ['A' .. 'Z']->[rand 26], 1 .. 3 ;
96
97         my $price = 100 + int rand 9900 ;
98
99         my $delta = -1000 + int rand 2000 ;
100
101         my $dbi_msg = Stem::Msg->new(
102
103                 'to'            => $self->{'dbi_addr'},
104                 'from'          => $self->{'reg_name'},
105                 'type'          => 'cmd',
106                 'cmd'           => 'execute',
107                 'reply_type'    => 'insert_done',
108                 'data'          => {
109                         statement       => 'insert_tick',
110                         bind            => [ $ticker, $price, $delta ],
111                 },
112         );
113
114 #print $dbi_msg->dump( 'SEND' ) ;
115         $dbi_msg->dispatch() ;
116
117         return ;
118 }
119
120 sub insert_done_in {
121
122         my( $self, $msg ) = @_ ;
123
124 #print $msg->dump( 'DONE' ) ;
125
126         if ( $self->{'send_cnt'} ) {
127
128                 $self->send_ticker_msgs( 1 ) ;
129         }
130
131         if ( ++$self->{'inserted_cnt'} >= $self->{'go_max_cnt'} ) {
132
133                 my $data = $msg->data() ;
134
135                 die "insert_done_in: $$data" unless ref $data eq 'HASH' ;
136
137                 my $time_delta = sprintf( "%8.4f",
138                                      gettimeofday() - $self->{'start_time'} ) ;
139
140                 my $rows_per_second = $self->{'inserted_cnt'} / $time_delta ;
141
142                 my $done_msg = Stem::Msg->new(
143                         'to'    => $self->{'go_from_addr'},
144                         'from'  => $self->{'reg_name'},
145                         'type'  => 'response',
146                         'data'  => <<DATA,
147 inserted $self->{'inserted_cnt'} rows in $time_delta seconds
148 $rows_per_second rows per second
149 with $self->{'parallel_cnt'} inserts in parallel
150 last row ID $data->{'insert_id'}
151 DATA
152                 ) ;
153
154                 $done_msg->dispatch() ;
155
156                 return ;
157         }
158
159
160         return ;
161 }
162
163 1 ;