root/lang/perl/Mvalve/trunk/lib/Mvalve.pm @ 15560

Revision 15560, 8.3 kB (checked in by daisuke, 6 years ago)

version++

Line 
1# $Id$
2
3package Mvalve;
4use Moose;
5use Moose::Util::TypeConstraints;
6use Mvalve::Message;
7use Time::HiRes();
8
9our $VERSION   = '0.00006';
10our $AUTHORITY = "cpan:DMAKI";
11
12class_type 'Data::Throttler';
13role_type 'Mvalve::Queue';
14role_type 'Mvalve::State';
15
16coerce 'Data::Throttler'
17    => from 'HashRef'
18        => via {
19            my $h = $_;
20            my $module = delete $h->{module} || 'Data::Throttler';
21            Class::MOP::load_class($module);
22
23            $module->new(%$h);
24        }
25;
26
27coerce 'Mvalve::Queue'
28    => from 'HashRef'
29        => via {
30            my $h = $_;
31            my $module = delete $h->{module} || 'Mvalve::Queue::Q4M';
32            Class::MOP::load_class($module);
33
34            $module->new(%$h);
35        }
36;
37
38coerce 'Mvalve::State'
39    => from 'HashRef'
40        => via {
41            my $h = $_;
42            my $module = delete $h->{module} || 'Mvalve::State::Memory';
43            Class::MOP::load_class($module);
44
45            $module->new(memcached => $h);
46        }
47;
48
49has 'throttler' => (
50    is       => 'rw',
51    isa      => 'Data::Throttler',
52    required => 1,
53    coerce   => 1,
54    handles  => [ qw(try_push) ],
55);
56
57has 'queue_set' => (
58    is  => 'ro',
59    isa => 'Mvalve::QueueSet',
60    default => sub {
61        Class::MOP::load_class('Mvalve::QueueSet');
62        Mvalve::QueueSet->new;
63    }
64);
65
66has 'state' => (
67    is => 'rw',
68    does => 'Mvalve::State',
69    coerce => 1,
70    required => 1,
71    default => sub {
72        Class::MOP::load_class('Mvalve::State::Memory');
73        Mvalve::State::Memory->new
74    },
75    handles => {
76        map { ("state_$_" => $_) } qw(get set remove incr decr)
77    }
78);
79
80has 'timeout' => (
81    is => 'rw',
82    isa => 'Int',
83    required => 1,
84    default => 60
85);
86
87has 'queue' => (
88    is       => 'rw',
89    does     => 'Mvalve::Queue',
90    required => 1,
91    coerce   => 1,
92    handles => {
93        map { ( "q_$_" => $_ ) }
94            qw(next fetch insert clear)
95    },
96);
97
98__PACKAGE__->meta->make_immutable;
99
100no Moose;
101
102use constant EMERGENCY_HEADER   => 'X-Mvalve-Emergency';
103use constant DESTINATION_HEADER => 'X-Mvalve-Destination';
104use constant RETRY_HEADER       => 'X-Mvalve-Retry-Time';
105use constant MVALVE_TRACE       => $ENV{MVALVE_TRACE} ? 1 : 0;
106
107sub trace
108{
109    print STDERR "MVALVE: @_\n";
110}
111
112sub next
113{
114    my $self = shift;
115
116    my $qs    = $self->queue_set;
117    my @names = $qs->as_q4m_args;
118    my $table = $self->q_next(
119        table_conds => \@names,
120        timeout     => $self->timeout + 0
121    );
122
123    if (! $table) {
124        trace( "q_next did not return a table name, simply returning" ) if MVALVE_TRACE;
125        return ();
126    }
127
128    trace( "issueing fetch on table '$table'") if MVALVE_TRACE;
129    my $message = $self->q_fetch(table => $table);
130    if (! $message) {
131        trace( "q_fetch did not return a message, simply returning" ) if MVALVE_TRACE;
132        return ();
133    }
134
135    # destination is an abstract symbol representing the endpoint
136    # service name. this /could/ be used by the queue consumer, but it
137    # is *not* a
138    my $destination = $message->header( DESTINATION_HEADER );
139
140    if ( $qs->is_emergency( $table ) ||  $qs->is_timed( $table ) ) {
141        # if this is from an emergency queue or a timed queue, we go ahead
142        # and allow the message, but we also update the throttler's count
143        # so the next message from a normal queue would be throttled correctly
144        if ($message->header(RETRY_HEADER)) {
145            $self->state_decr( [ $destination, 'retry' ] );
146        }
147        $self->try_push( key => $destination );
148
149        return $message;
150    }
151
152    # otherwise, we need to check if this message is going to be throttled
153    my $is_pending   = $self->is_pending( $destination );
154    my $is_throttled = ! $self->try_push( key => $destination );
155    trace( "checking if message to $destination should be throttled (pending: $is_pending, throttled: $is_throttled)" ) if MVALVE_TRACE;
156
157    if ($is_throttled || $is_pending) {
158        trace( "message", $message->id, "is being throttled") if MVALVE_TRACE;
159        $self->defer( $message );
160        return (); # no data for you!
161    }
162
163    # if we got here, we can just return the data
164    trace( "message", $message->id, "being returned") if MVALVE_TRACE;
165    return $message;
166}
167
168sub defer
169{
170    my( $self, $message ) = @_;
171
172    if ( ! blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) {
173        return () ;
174    }
175
176    my $qs          = $self->queue_set;
177    my $interval    = $self->throttler->{db}->{chain}->{interval};
178    my $table       = $qs->choose_table('timed');
179    my $destination = $message->header( DESTINATION_HEADER );
180    my $time_key    = [ $table, $destination, 'retry time' ];
181    my $retry_key   = [ $destination, 'retry' ];
182
183    my $retry = $self->state_get($time_key);
184    my $next  = time + $interval;
185
186    if ( ! $retry || $retry < $next ) {
187        $retry = $next;
188    }
189    $message->header( RETRY_HEADER, $retry );
190
191    trace( "defer to $table" ) if MVALVE_TRACE;
192    my $rv = $self->q_insert(
193        table => $table,
194        data => {
195            destination => $destination,
196            ready       => $retry,
197            message     => $message->serialize,
198        }
199    );
200
201    trace( "q_insert results in $rv" ) if MVALVE_TRACE;
202
203    if ($rv) {
204        # duration specifies t
205        $retry += $message->header('x-wfg-duration') ||
206                  $interval;
207        $self->state_set($time_key, $retry);
208        $self->state_incr($retry_key);
209    }
210
211    return $rv;
212}
213
214sub insert {
215    my ($self, %args) = @_;
216
217    my $message = $args{message};
218
219    my $qs = $self->queue_set;
220
221    # Choose one of the queues, depending on the headers
222    my $table;
223    if ($message->header( EMERGENCY_HEADER ) ) {
224        $table = $qs->choose_table( 'emergency' );
225    } else {
226        $table = $qs->choose_table();
227    }
228
229    trace( "insert message '" . $message->id() . "' to $table" ) if MVALVE_TRACE;
230    $self->q_insert(
231        table => $table,
232        data => {
233            destination => $message->header( DESTINATION_HEADER ),
234            message => $message->serialize()
235        }
236    );
237}
238
239sub is_pending {
240    my( $self, $destination ) = @_;
241
242    my $retry_key = [ $destination, 'retry' ];
243    my $count = $self->state_get($retry_key);
244    return $count ? 1 : 0;
245}
246
247sub clear_all {
248    my $self = shift;
249
250    foreach my $table ($self->queue_set->all_tables) {
251        $self->q_clear($table);
252    }
253}
254
255__END__
256
257=head1 NAME
258
259Mvalve - Generic Q4M Powered Message Pipe
260
261=head1 SYNOPSIS
262
263  my $mvalve = Mvalve->new(
264    state => {
265      module => "...",
266    },
267    queue => {
268      module => "...",
269      connect_info => [ ... ]
270    },
271    throttler => {
272      module => 'Data::Throttler::Memcached',
273      max_items => $max,
274      interval  => $interval,
275      cache     => {
276        data => [ ... ]
277      }
278    }
279  );
280
281  while ( 1 ) {
282    my $message = $mvalve->next;
283    if ($message) {
284      # do whatever
285    }
286  }
287
288=head1 DESCRIPTION
289
290Mvalve stands for "Messave Valve". It is a frontend for Q4M powered set of
291queues, acting as a single pipe.
292
293=head1 METHODS
294
295=head2 next
296
297Fetches the next available message.
298
299=head2 insert
300
301Inserts into the normal queue
302
303=head2 defer
304
305Inserts in the the retry_wait queue.
306
307=head2 is_pending( $destination )
308
309Checks whethere there are pending retries for that particular destination.
310
311=head2 throttler
312
313C<throttler> holds the Data::Throttler instance that does the dirty work of
314determining if a message needs to be throttled or not
315
316  $self->throttler( {
317    module => "Data::Throttler::Memcached",
318  } );
319
320=head2 timeout
321
322C<timeout> specifies the timeout value while we wait to read from the queue.
323
324=head2 queue
325
326C<queue> is the actual queue instance that we'll be dealing with.
327While the architecture is such that you can replace the queue with
328your custom object, we currently only support Q4M
329
330  $self->queue( {
331    module => "Q4M",
332    connect_info => [ 'dbi:mysql:...', ..., ... ]
333 } );
334
335=head2 clear_all
336
337Clears all known queues that are listed under the registered QueueSet
338
339=head2 trace
340
341This is for debugging only
342
343=head1 CONSTANTS
344
345=head2 DESTINATION_HEADER
346
347=head2 EMERGENCY_HEADER
348
349=head2 MVALVE_TRACE
350
351=head2 RETRY_HEADER
352
353=head1 AUTHORS
354
355Daisuke Maki C<< <daisuke@endeworks.jp> >>
356
357Taro Funaki C<< <t@33rpm.jp> >>
358
359=head1 LICENSE
360
361This program is free software; you can redistribute it and/or modify it
362under the same terms as Perl itself.
363
364See http://www.perl.com/perl/misc/Artistic.html
365
366=cut
367
Note: See TracBrowser for help on using the browser.