root/lang/perl/Mvalve/trunk/lib/Mvalve.pm @ 15776

Revision 15776, 9.6 kB (checked in by daisuke, 5 years ago)

Change nested constructor semantics

Line 
1# $Id$
2
3package Mvalve;
4use Moose;
5use Moose::Util::TypeConstraints;
6use Mvalve::Message;
7use Mvalve::Throttler;
8use Time::HiRes();
9
10our $VERSION   = '0.00006';
11our $AUTHORITY = "cpan:DMAKI";
12
13role_type 'Mvalve::Queue';
14role_type 'Mvalve::State';
15
16{
17    my $coerce = sub {
18        my $default_class = shift;
19        my $prefix = shift;
20        return sub {
21            my $h = $_;
22            my $module = delete $h->{module} || $default_class;
23            if ($prefix && $module !~ s/^\+//) {
24                $module = join('::', $prefix, $module);
25            }
26            Class::MOP::load_class($module);
27
28            $module->new(%{$h->{args}});
29        };
30    };
31
32    coerce 'Mvalve::Throttler'
33        => from 'HashRef'
34        => $coerce->('Data::Valve', 'Mvalve::Throttler'), # XXX - no need to use via () here
35    ;
36
37    coerce 'Mvalve::Queue'
38        => from 'HashRef'
39        => $coerce->('Q4M', 'Mvalve::Queue'), # XXX - no need to use via () here
40    ;
41}
42
43coerce 'Mvalve::State'
44    => from 'HashRef'
45        => via {
46            my $h = $_;
47            my $module = delete $h->{module} || 'Memory';
48            if ($module !~ s/^\+//) {
49                $module = "Mvalve::State::$module";
50            }
51            Class::MOP::load_class($module);
52
53            $module->new(%{$h->{args}});
54        }
55;
56
57has 'throttler' => (
58    is       => 'rw',
59    does     => 'Mvalve::Throttler',
60    required => 1,
61    coerce   => 1,
62    handles  => [ qw(try_push) ],
63);
64
65{
66    my $default = sub {
67        my $class = shift;
68        return sub {
69            Class::MOP::load_class($class);
70            $class->new;
71        };
72    };
73
74    has 'queue_set' => (
75        is  => 'ro',
76        isa => 'Mvalve::QueueSet',
77        default => $default->( 'Mvalve::QueueSet' )
78    );
79
80    has 'state' => (
81        is => 'rw',
82        does => 'Mvalve::State',
83        coerce => 1,
84        required => 1,
85        default => $default->( 'Mvalve::State::Memory' ),
86        handles => {
87            map { ("state_$_" => $_) } qw(get set remove incr decr)
88        }
89    );
90}
91
92has 'timeout' => (
93    is => 'rw',
94    isa => 'Int',
95    required => 1,
96    default => 60
97);
98
99has 'queue' => (
100    is       => 'rw',
101    does     => 'Mvalve::Queue',
102    required => 1,
103    coerce   => 1,
104    handles => {
105        map { ( "q_$_" => $_ ) }
106            qw(next fetch insert clear)
107    },
108);
109
110__PACKAGE__->meta->make_immutable;
111
112no Moose;
113
114# some special headers
115use constant EMERGENCY_HEADER   => 'X-Mvalve-Emergency';
116use constant DESTINATION_HEADER => 'X-Mvalve-Destination';
117use constant RETRY_HEADER       => 'X-Mvalve-Retry-Time';
118use constant DURATION_HEADER    => 'X-Mvalve-Duration';
119use constant MVALVE_TRACE       => $ENV{MVALVE_TRACE} ? 1 : 0;
120
121sub trace { print STDERR "MVALVE: @_\n" }
122
123sub next
124{
125    my $self = shift;
126
127    my $qs    = $self->queue_set;
128    my @names = $qs->as_q4m_args;
129    my $table = $self->q_next(
130        table_conds => \@names,
131        timeout     => $self->timeout + 0
132    );
133
134    if (! $table) {
135        trace( "q_next did not return a table name, simply returning" ) if MVALVE_TRACE;
136        return ();
137    }
138
139    trace( "issueing fetch on table '$table'") if MVALVE_TRACE;
140    my $message = $self->q_fetch(table => $table);
141    if (! $message) {
142        trace( "q_fetch did not return a message, simply returning" ) if MVALVE_TRACE;
143        return ();
144    }
145
146    # destination is an abstract symbol representing the endpoint
147    # service name. this /could/ be used by the queue consumer, but it
148    # is *not* a
149    my $destination = $message->header( DESTINATION_HEADER );
150
151    if ( $qs->is_emergency( $table ) ||  $qs->is_timed( $table ) ) {
152        # if this is from an emergency queue or a timed queue, we go ahead
153        # and allow the message, but we also update the throttler's count
154        # so the next message from a normal queue would be throttled correctly
155        if ($message->header(RETRY_HEADER)) {
156            $self->state_decr( [ $destination, 'retry' ] );
157        }
158        $self->try_push( key => $destination );
159
160        return $message;
161    }
162
163    # otherwise, we need to check if this message is going to be throttled
164    my $is_pending   = $self->is_pending( $destination );
165    my $is_throttled = ! $self->try_push( key => $destination );
166    trace( "checking if message to $destination should be throttled (pending: $is_pending, throttled: $is_throttled)" ) if MVALVE_TRACE;
167
168    if ($is_throttled || $is_pending) {
169        trace( "message", $message->id, "is being throttled") if MVALVE_TRACE;
170        $self->defer( $message );
171        return (); # no data for you!
172    }
173
174    # if we got here, we can just return the data
175    trace( "message", $message->id, "being returned") if MVALVE_TRACE;
176    return $message;
177}
178
179sub defer
180{
181    my( $self, $message ) = @_;
182
183    if ( ! blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) {
184        return () ;
185    }
186
187    my $qs          = $self->queue_set;
188    my $interval    = $self->throttler->interval;
189    my $table       = $qs->choose_table('timed');
190    my $destination = $message->header( DESTINATION_HEADER );
191    my $time_key    = [ $table, $destination, 'retry time' ];
192    my $retry_key   = [ $destination, 'retry' ];
193
194    my $retry = $self->state_get($time_key);
195    my $next  = time + $interval;
196
197    if ( ! $retry || $retry < $next ) {
198        $retry = $next;
199    }
200    $message->header( RETRY_HEADER, $retry );
201
202    trace( "defer to $table" ) if MVALVE_TRACE;
203    my $rv = $self->q_insert(
204        table => $table,
205        data => {
206            destination => $destination,
207            ready       => int($retry * 1000),
208            message     => $message->serialize,
209        }
210    );
211
212    trace( "q_insert results in $rv" ) if MVALVE_TRACE;
213
214    if ($rv) {
215        # duration specifies t
216        $retry += $message->header( DURATION_HEADER ) || $interval;
217        $self->state_set($time_key, $retry);
218        $self->state_incr($retry_key);
219    }
220
221    return $rv;
222}
223
224sub insert {
225    my ($self, %args) = @_;
226
227    my $message = $args{message};
228
229    my $qs = $self->queue_set;
230
231    # Choose one of the queues, depending on the headers
232    my $table;
233    if ($message->header( EMERGENCY_HEADER ) ) {
234        $table = $qs->choose_table( 'emergency' );
235    } else {
236        $table = $qs->choose_table();
237    }
238
239    trace( "insert message '" . $message->id() . "' to $table" ) if MVALVE_TRACE;
240    $self->q_insert(
241        table => $table,
242        data => {
243            destination => $message->header( DESTINATION_HEADER ),
244            message => $message->serialize()
245        }
246    );
247}
248
249sub is_pending {
250    my( $self, $destination ) = @_;
251
252    my $retry_key = [ $destination, 'retry' ];
253    my $count = $self->state_get($retry_key);
254    return $count ? 1 : 0;
255}
256
257sub clear_all {
258    my $self = shift;
259
260    foreach my $table ($self->queue_set->all_tables) {
261        $self->q_clear($table);
262    }
263}
264
2651;
266
267__END__
268
269=head1 NAME
270
271Mvalve - Generic Q4M Powered Message Pipe
272
273=head1 SYNOPSIS
274
275  my $mvalve = Mvalve->new(
276    state => {
277      module => "...",
278    },
279    queue => {
280      module => "...",
281      connect_info => [ ... ]
282    },
283    throttler => {
284      module => 'Data::Throttler::Memcached',
285      args => {
286        max_items => $max,
287        interval  => $interval,
288        cache     => {
289          data => [ ... ]
290        }
291      }
292    }
293  );
294
295  while ( 1 ) {
296    my $message = $mvalve->next;
297    if ($message) {
298      # do whatever
299    }
300  }
301
302=head1 DESCRIPTION
303
304Mvalve stands for "Messave Valve". It is a frontend for Q4M powered set of
305queues, acting as a single pipe.
306
307=head1 SETUP
308
309You need to have installed mysql 5.1 or later and q4m. You can grab
310them at:
311
312  http://dev.mysql.com/
313  http://q4m.31tools.com/
314
315Once you have a q4m-enabled mysql running, you need to create these q4m
316enabled tables in your mysql database.
317
318  CREATE TABLE q_emerg (
319     destination VARCHAR(40) NOT NULL,
320     message     BLOB NOT NULL
321  ) ENGINE=QUEUE DEFAULT CHARSET=UTF-8
322 
323  CREATE TABLE q_timed (
324     destination VARCHAR(40) NOT NULL,
325     ready       BIGINT NOT NULL,
326     message     BLOB NOT NULL
327  ) ENGINE=QUEUE DEFAULT CHARSET=UTF-8
328 
329  CREATE TABLE q_incoming (
330     destination VARCHAR(40) NOT NULL,
331     message     BLOB NOT NULL
332  ) ENGINE=QUEUE DEFAULT CHARSET=UTF-8
333
334You also need to setup a memcached compatible distributed cache/storage.
335This will be used to share certain key data across multiple instances
336of Mvalve.
337 
338=head1 METHODS
339
340=head2 next
341
342Fetches the next available message.
343
344=head2 insert
345
346Inserts into the normal queue
347
348=head2 defer
349
350Inserts in the the retry_wait queue.
351
352=head2 is_pending( $destination )
353
354Checks whethere there are pending retries for that particular destination.
355
356=head2 throttler
357
358C<throttler> holds the Data::Throttler instance that does the dirty work of
359determining if a message needs to be throttled or not
360
361  $self->throttler( {
362    module => "Data::Throttler::Memcached",
363  } );
364
365=head2 timeout
366
367C<timeout> specifies the timeout value while we wait to read from the queue.
368
369=head2 queue
370
371C<queue> is the actual queue instance that we'll be dealing with.
372While the architecture is such that you can replace the queue with
373your custom object, we currently only support Q4M
374
375  $self->queue( {
376    module => "Q4M",
377    connect_info => [ 'dbi:mysql:...', ..., ... ]
378 } );
379
380=head2 clear_all
381
382Clears all known queues that are listed under the registered QueueSet
383
384=head2 trace
385
386This is for debugging only
387
388=head1 CONSTANTS
389
390=head2 DESTINATION_HEADER
391
392=head2 EMERGENCY_HEADER
393
394=head2 MVALVE_TRACE
395
396=head2 RETRY_HEADER
397
398=head1 AUTHORS
399
400Daisuke Maki C<< <daisuke@endeworks.jp> >>
401
402Taro Funaki C<< <t@33rpm.jp> >>
403
404=head1 LICENSE
405
406This program is free software; you can redistribute it and/or modify it
407under the same terms as Perl itself.
408
409See http://www.perl.com/perl/misc/Artistic.html
410
411=cut
412
Note: See TracBrowser for help on using the browser.