| 1 | # $Id$ |
|---|
| 2 | |
|---|
| 3 | package Mvalve; |
|---|
| 4 | use Moose; |
|---|
| 5 | use Moose::Util::TypeConstraints; |
|---|
| 6 | use Mvalve::Message; |
|---|
| 7 | use Time::HiRes(); |
|---|
| 8 | |
|---|
| 9 | our $VERSION = '0.00006'; |
|---|
| 10 | our $AUTHORITY = "cpan:DMAKI"; |
|---|
| 11 | |
|---|
| 12 | class_type 'Data::Throttler'; |
|---|
| 13 | role_type 'Mvalve::Queue'; |
|---|
| 14 | role_type 'Mvalve::State'; |
|---|
| 15 | |
|---|
| 16 | coerce 'Data::Throttler' |
|---|
| 17 | => from 'HashRef' |
|---|
| 18 | => via { |
|---|
| 19 | my $h = $_; |
|---|
| 20 | my $module = delete $h->{module} || 'Data::Throttler'; |
|---|
| 21 | Class::MOP::load_class($module); |
|---|
| 22 | |
|---|
| 23 | $module->new(%$h); |
|---|
| 24 | } |
|---|
| 25 | ; |
|---|
| 26 | |
|---|
| 27 | coerce 'Mvalve::Queue' |
|---|
| 28 | => from 'HashRef' |
|---|
| 29 | => via { |
|---|
| 30 | my $h = $_; |
|---|
| 31 | my $module = delete $h->{module} || 'Mvalve::Queue::Q4M'; |
|---|
| 32 | Class::MOP::load_class($module); |
|---|
| 33 | |
|---|
| 34 | $module->new(%$h); |
|---|
| 35 | } |
|---|
| 36 | ; |
|---|
| 37 | |
|---|
| 38 | coerce 'Mvalve::State' |
|---|
| 39 | => from 'HashRef' |
|---|
| 40 | => via { |
|---|
| 41 | my $h = $_; |
|---|
| 42 | my $module = delete $h->{module} || 'Mvalve::State::Memory'; |
|---|
| 43 | Class::MOP::load_class($module); |
|---|
| 44 | |
|---|
| 45 | $module->new(memcached => $h); |
|---|
| 46 | } |
|---|
| 47 | ; |
|---|
| 48 | |
|---|
| 49 | has 'throttler' => ( |
|---|
| 50 | is => 'rw', |
|---|
| 51 | isa => 'Data::Throttler', |
|---|
| 52 | required => 1, |
|---|
| 53 | coerce => 1, |
|---|
| 54 | handles => [ qw(try_push) ], |
|---|
| 55 | ); |
|---|
| 56 | |
|---|
| 57 | has 'queue_set' => ( |
|---|
| 58 | is => 'ro', |
|---|
| 59 | isa => 'Mvalve::QueueSet', |
|---|
| 60 | default => sub { |
|---|
| 61 | Class::MOP::load_class('Mvalve::QueueSet'); |
|---|
| 62 | Mvalve::QueueSet->new; |
|---|
| 63 | } |
|---|
| 64 | ); |
|---|
| 65 | |
|---|
| 66 | has 'state' => ( |
|---|
| 67 | is => 'rw', |
|---|
| 68 | does => 'Mvalve::State', |
|---|
| 69 | coerce => 1, |
|---|
| 70 | required => 1, |
|---|
| 71 | default => sub { |
|---|
| 72 | Class::MOP::load_class('Mvalve::State::Memory'); |
|---|
| 73 | Mvalve::State::Memory->new |
|---|
| 74 | }, |
|---|
| 75 | handles => { |
|---|
| 76 | map { ("state_$_" => $_) } qw(get set remove incr decr) |
|---|
| 77 | } |
|---|
| 78 | ); |
|---|
| 79 | |
|---|
| 80 | has 'timeout' => ( |
|---|
| 81 | is => 'rw', |
|---|
| 82 | isa => 'Int', |
|---|
| 83 | required => 1, |
|---|
| 84 | default => 60 |
|---|
| 85 | ); |
|---|
| 86 | |
|---|
| 87 | has 'queue' => ( |
|---|
| 88 | is => 'rw', |
|---|
| 89 | does => 'Mvalve::Queue', |
|---|
| 90 | required => 1, |
|---|
| 91 | coerce => 1, |
|---|
| 92 | handles => { |
|---|
| 93 | map { ( "q_$_" => $_ ) } |
|---|
| 94 | qw(next fetch insert clear) |
|---|
| 95 | }, |
|---|
| 96 | ); |
|---|
| 97 | |
|---|
| 98 | __PACKAGE__->meta->make_immutable; |
|---|
| 99 | |
|---|
| 100 | no Moose; |
|---|
| 101 | |
|---|
| 102 | use constant EMERGENCY_HEADER => 'X-Mvalve-Emergency'; |
|---|
| 103 | use constant DESTINATION_HEADER => 'X-Mvalve-Destination'; |
|---|
| 104 | use constant RETRY_HEADER => 'X-Mvalve-Retry-Time'; |
|---|
| 105 | use constant MVALVE_TRACE => $ENV{MVALVE_TRACE} ? 1 : 0; |
|---|
| 106 | |
|---|
| 107 | sub trace |
|---|
| 108 | { |
|---|
| 109 | print STDERR "MVALVE: @_\n"; |
|---|
| 110 | } |
|---|
| 111 | |
|---|
| 112 | sub next |
|---|
| 113 | { |
|---|
| 114 | my $self = shift; |
|---|
| 115 | |
|---|
| 116 | my $qs = $self->queue_set; |
|---|
| 117 | my @names = $qs->as_q4m_args; |
|---|
| 118 | my $table = $self->q_next( |
|---|
| 119 | table_conds => \@names, |
|---|
| 120 | timeout => $self->timeout + 0 |
|---|
| 121 | ); |
|---|
| 122 | |
|---|
| 123 | if (! $table) { |
|---|
| 124 | trace( "q_next did not return a table name, simply returning" ) if MVALVE_TRACE; |
|---|
| 125 | return (); |
|---|
| 126 | } |
|---|
| 127 | |
|---|
| 128 | trace( "issueing fetch on table '$table'") if MVALVE_TRACE; |
|---|
| 129 | my $message = $self->q_fetch(table => $table); |
|---|
| 130 | if (! $message) { |
|---|
| 131 | trace( "q_fetch did not return a message, simply returning" ) if MVALVE_TRACE; |
|---|
| 132 | return (); |
|---|
| 133 | } |
|---|
| 134 | |
|---|
| 135 | # destination is an abstract symbol representing the endpoint |
|---|
| 136 | # service name. this /could/ be used by the queue consumer, but it |
|---|
| 137 | # is *not* a |
|---|
| 138 | my $destination = $message->header( DESTINATION_HEADER ); |
|---|
| 139 | |
|---|
| 140 | if ( $qs->is_emergency( $table ) || $qs->is_timed( $table ) ) { |
|---|
| 141 | # if this is from an emergency queue or a timed queue, we go ahead |
|---|
| 142 | # and allow the message, but we also update the throttler's count |
|---|
| 143 | # so the next message from a normal queue would be throttled correctly |
|---|
| 144 | if ($message->header(RETRY_HEADER)) { |
|---|
| 145 | $self->state_decr( [ $destination, 'retry' ] ); |
|---|
| 146 | } |
|---|
| 147 | $self->try_push( key => $destination ); |
|---|
| 148 | |
|---|
| 149 | return $message; |
|---|
| 150 | } |
|---|
| 151 | |
|---|
| 152 | # otherwise, we need to check if this message is going to be throttled |
|---|
| 153 | my $is_pending = $self->is_pending( $destination ); |
|---|
| 154 | my $is_throttled = ! $self->try_push( key => $destination ); |
|---|
| 155 | trace( "checking if message to $destination should be throttled (pending: $is_pending, throttled: $is_throttled)" ) if MVALVE_TRACE; |
|---|
| 156 | |
|---|
| 157 | if ($is_throttled || $is_pending) { |
|---|
| 158 | trace( "message", $message->id, "is being throttled") if MVALVE_TRACE; |
|---|
| 159 | $self->defer( $message ); |
|---|
| 160 | return (); # no data for you! |
|---|
| 161 | } |
|---|
| 162 | |
|---|
| 163 | # if we got here, we can just return the data |
|---|
| 164 | trace( "message", $message->id, "being returned") if MVALVE_TRACE; |
|---|
| 165 | return $message; |
|---|
| 166 | } |
|---|
| 167 | |
|---|
| 168 | sub defer |
|---|
| 169 | { |
|---|
| 170 | my( $self, $message ) = @_; |
|---|
| 171 | |
|---|
| 172 | if ( ! blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) { |
|---|
| 173 | return () ; |
|---|
| 174 | } |
|---|
| 175 | |
|---|
| 176 | my $qs = $self->queue_set; |
|---|
| 177 | my $interval = $self->throttler->{db}->{chain}->{interval}; |
|---|
| 178 | my $table = $qs->choose_table('timed'); |
|---|
| 179 | my $destination = $message->header( DESTINATION_HEADER ); |
|---|
| 180 | my $time_key = [ $table, $destination, 'retry time' ]; |
|---|
| 181 | my $retry_key = [ $destination, 'retry' ]; |
|---|
| 182 | |
|---|
| 183 | my $retry = $self->state_get($time_key); |
|---|
| 184 | my $next = time + $interval; |
|---|
| 185 | |
|---|
| 186 | if ( ! $retry || $retry < $next ) { |
|---|
| 187 | $retry = $next; |
|---|
| 188 | } |
|---|
| 189 | $message->header( RETRY_HEADER, $retry ); |
|---|
| 190 | |
|---|
| 191 | trace( "defer to $table" ) if MVALVE_TRACE; |
|---|
| 192 | my $rv = $self->q_insert( |
|---|
| 193 | table => $table, |
|---|
| 194 | data => { |
|---|
| 195 | destination => $destination, |
|---|
| 196 | ready => $retry, |
|---|
| 197 | message => $message->serialize, |
|---|
| 198 | } |
|---|
| 199 | ); |
|---|
| 200 | |
|---|
| 201 | trace( "q_insert results in $rv" ) if MVALVE_TRACE; |
|---|
| 202 | |
|---|
| 203 | if ($rv) { |
|---|
| 204 | # duration specifies t |
|---|
| 205 | $retry += $message->header('x-wfg-duration') || |
|---|
| 206 | $interval; |
|---|
| 207 | $self->state_set($time_key, $retry); |
|---|
| 208 | $self->state_incr($retry_key); |
|---|
| 209 | } |
|---|
| 210 | |
|---|
| 211 | return $rv; |
|---|
| 212 | } |
|---|
| 213 | |
|---|
| 214 | sub insert { |
|---|
| 215 | my ($self, %args) = @_; |
|---|
| 216 | |
|---|
| 217 | my $message = $args{message}; |
|---|
| 218 | |
|---|
| 219 | my $qs = $self->queue_set; |
|---|
| 220 | |
|---|
| 221 | # Choose one of the queues, depending on the headers |
|---|
| 222 | my $table; |
|---|
| 223 | if ($message->header( EMERGENCY_HEADER ) ) { |
|---|
| 224 | $table = $qs->choose_table( 'emergency' ); |
|---|
| 225 | } else { |
|---|
| 226 | $table = $qs->choose_table(); |
|---|
| 227 | } |
|---|
| 228 | |
|---|
| 229 | trace( "insert message '" . $message->id() . "' to $table" ) if MVALVE_TRACE; |
|---|
| 230 | $self->q_insert( |
|---|
| 231 | table => $table, |
|---|
| 232 | data => { |
|---|
| 233 | destination => $message->header( DESTINATION_HEADER ), |
|---|
| 234 | message => $message->serialize() |
|---|
| 235 | } |
|---|
| 236 | ); |
|---|
| 237 | } |
|---|
| 238 | |
|---|
| 239 | sub is_pending { |
|---|
| 240 | my( $self, $destination ) = @_; |
|---|
| 241 | |
|---|
| 242 | my $retry_key = [ $destination, 'retry' ]; |
|---|
| 243 | my $count = $self->state_get($retry_key); |
|---|
| 244 | return $count ? 1 : 0; |
|---|
| 245 | } |
|---|
| 246 | |
|---|
| 247 | sub clear_all { |
|---|
| 248 | my $self = shift; |
|---|
| 249 | |
|---|
| 250 | foreach my $table ($self->queue_set->all_tables) { |
|---|
| 251 | $self->q_clear($table); |
|---|
| 252 | } |
|---|
| 253 | } |
|---|
| 254 | |
|---|
| 255 | __END__ |
|---|
| 256 | |
|---|
| 257 | =head1 NAME |
|---|
| 258 | |
|---|
| 259 | Mvalve - Generic Q4M Powered Message Pipe |
|---|
| 260 | |
|---|
| 261 | =head1 SYNOPSIS |
|---|
| 262 | |
|---|
| 263 | my $mvalve = Mvalve->new( |
|---|
| 264 | state => { |
|---|
| 265 | module => "...", |
|---|
| 266 | }, |
|---|
| 267 | queue => { |
|---|
| 268 | module => "...", |
|---|
| 269 | connect_info => [ ... ] |
|---|
| 270 | }, |
|---|
| 271 | throttler => { |
|---|
| 272 | module => 'Data::Throttler::Memcached', |
|---|
| 273 | max_items => $max, |
|---|
| 274 | interval => $interval, |
|---|
| 275 | cache => { |
|---|
| 276 | data => [ ... ] |
|---|
| 277 | } |
|---|
| 278 | } |
|---|
| 279 | ); |
|---|
| 280 | |
|---|
| 281 | while ( 1 ) { |
|---|
| 282 | my $message = $mvalve->next; |
|---|
| 283 | if ($message) { |
|---|
| 284 | # do whatever |
|---|
| 285 | } |
|---|
| 286 | } |
|---|
| 287 | |
|---|
| 288 | =head1 DESCRIPTION |
|---|
| 289 | |
|---|
| 290 | Mvalve stands for "Messave Valve". It is a frontend for Q4M powered set of |
|---|
| 291 | queues, acting as a single pipe. |
|---|
| 292 | |
|---|
| 293 | =head1 METHODS |
|---|
| 294 | |
|---|
| 295 | =head2 next |
|---|
| 296 | |
|---|
| 297 | Fetches the next available message. |
|---|
| 298 | |
|---|
| 299 | =head2 insert |
|---|
| 300 | |
|---|
| 301 | Inserts into the normal queue |
|---|
| 302 | |
|---|
| 303 | =head2 defer |
|---|
| 304 | |
|---|
| 305 | Inserts in the the retry_wait queue. |
|---|
| 306 | |
|---|
| 307 | =head2 is_pending( $destination ) |
|---|
| 308 | |
|---|
| 309 | Checks whethere there are pending retries for that particular destination. |
|---|
| 310 | |
|---|
| 311 | =head2 throttler |
|---|
| 312 | |
|---|
| 313 | C<throttler> holds the Data::Throttler instance that does the dirty work of |
|---|
| 314 | determining if a message needs to be throttled or not |
|---|
| 315 | |
|---|
| 316 | $self->throttler( { |
|---|
| 317 | module => "Data::Throttler::Memcached", |
|---|
| 318 | } ); |
|---|
| 319 | |
|---|
| 320 | =head2 timeout |
|---|
| 321 | |
|---|
| 322 | C<timeout> specifies the timeout value while we wait to read from the queue. |
|---|
| 323 | |
|---|
| 324 | =head2 queue |
|---|
| 325 | |
|---|
| 326 | C<queue> is the actual queue instance that we'll be dealing with. |
|---|
| 327 | While the architecture is such that you can replace the queue with |
|---|
| 328 | your custom object, we currently only support Q4M |
|---|
| 329 | |
|---|
| 330 | $self->queue( { |
|---|
| 331 | module => "Q4M", |
|---|
| 332 | connect_info => [ 'dbi:mysql:...', ..., ... ] |
|---|
| 333 | } ); |
|---|
| 334 | |
|---|
| 335 | =head2 clear_all |
|---|
| 336 | |
|---|
| 337 | Clears all known queues that are listed under the registered QueueSet |
|---|
| 338 | |
|---|
| 339 | =head2 trace |
|---|
| 340 | |
|---|
| 341 | This is for debugging only |
|---|
| 342 | |
|---|
| 343 | =head1 CONSTANTS |
|---|
| 344 | |
|---|
| 345 | =head2 DESTINATION_HEADER |
|---|
| 346 | |
|---|
| 347 | =head2 EMERGENCY_HEADER |
|---|
| 348 | |
|---|
| 349 | =head2 MVALVE_TRACE |
|---|
| 350 | |
|---|
| 351 | =head2 RETRY_HEADER |
|---|
| 352 | |
|---|
| 353 | =head1 AUTHORS |
|---|
| 354 | |
|---|
| 355 | Daisuke Maki C<< <daisuke@endeworks.jp> >> |
|---|
| 356 | |
|---|
| 357 | Taro Funaki C<< <t@33rpm.jp> >> |
|---|
| 358 | |
|---|
| 359 | =head1 LICENSE |
|---|
| 360 | |
|---|
| 361 | This program is free software; you can redistribute it and/or modify it |
|---|
| 362 | under the same terms as Perl itself. |
|---|
| 363 | |
|---|
| 364 | See http://www.perl.com/perl/misc/Artistic.html |
|---|
| 365 | |
|---|
| 366 | =cut |
|---|
| 367 | |
|---|