root/lang/perl/Mvalve/trunk/lib/Mvalve/Base.pm @ 15891

Revision 15891, 3.5 kB (checked in by daisuke, 5 years ago)

fixed timed scheduling

  • Property svn:keywords set to Id
Line 
1# $Id$
2
3package Mvalve::Base;
4use Moose;
5use Mvalve;
6use Mvalve::QueueSet;
7use Time::HiRes;
8
9with 'MooseX::KeyedMutex';
10
11has 'queue' => (
12    is       => 'rw',
13    does     => 'Mvalve::Queue',
14    required => 1,
15    coerce   => 1,
16    handles => {
17        map { ( "q_$_" => $_ ) }
18            qw(next fetch insert clear)
19    },
20);
21
22{
23    my $default = sub {
24        my $class = shift;
25        return sub {
26            Class::MOP::load_class($class);
27            $class->new;
28        };
29    };
30
31    has 'queue_set' => (
32        is  => 'rw',
33        isa => 'Mvalve::QueueSet',
34        required => 1,
35        default => $default->( 'Mvalve::QueueSet' )
36    );
37
38    has 'state' => (
39        is => 'rw',
40        does => 'Mvalve::State',
41        coerce => 1,
42        required => 1,
43        default => $default->( 'Mvalve::State::Memory' ),
44        handles => {
45            map { ("state_$_" => $_) } qw(get set remove incr decr)
46        }
47    );
48}
49
50__PACKAGE__->meta->make_immutable;
51
52no Moose;
53
54sub clear_all {
55    my $self = shift;
56
57    foreach my $table ($self->queue_set->all_tables) {
58        $self->q_clear($table);
59    }
60}
61
62sub defer
63{
64    my( $self, %args ) = @_;
65
66    my $message  = $args{message};
67    my $interval = $args{interval} || 0;
68    my $duration = $args{duration} ||
69        $message->header( &Mvalve::Const::DURATION_HEADER ) ||
70        0;
71
72    my $factor = 100_000;
73    $interval *= $factor;
74    $duration *= $factor;
75
76    if ( ! blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) {
77        return () ;
78    }
79
80    my $qs          = $self->queue_set;
81    my $destination = $message->header( &Mvalve::Const::DESTINATION_HEADER );
82    my $time_key    = [ $destination, 'retry time' ];
83    my $retry_key   = [ $destination, 'retry' ];
84
85    my $done = 0;
86    my $rv;
87    while (! $done) {
88        my $lock = $self->lock( join('.', @$time_key ) );
89        next unless $lock;
90
91        $done = 1;
92
93        my $now    = Time::HiRes::time() * $factor;
94        my $retry  = int($self->state_get($time_key) || $now);
95
96        # we always prefer duration
97        my $offset = $duration || $interval;
98        my $myturn = 0;
99
100        if ($retry > $now) {
101            $myturn = $retry;
102        } else {
103            if ( $retry + $offset >= $now ) {
104                $myturn = $retry + $offset;
105            } else {
106                $myturn = $now;
107            }
108        }
109        my $next   = $myturn + $offset;
110
111        $message->header( &Mvalve::Const::RETRY_HEADER, $myturn );
112
113        Mvalve::trace( "defer (retry = $retry)" ) if &Mvalve::Const::MVALVE_TRACE;
114        $rv = $self->q_insert(
115            table => $qs->choose_table('timed'),
116            data => {
117                destination => $destination,
118                ready       => $myturn,
119                message     => $message->serialize,
120            }
121        );
122
123        Mvalve::trace( "q_insert results in $rv" ) if &Mvalve::Const::MVALVE_TRACE;
124
125        if ($rv) {
126            $self->state_set($time_key, $next);
127        }
128    }
129
130    return $rv;
131}
132
1331;
134
135__END__
136
137=head1 NAME
138
139Mvalve::Base - Base Class For Mvalve Reader/Writer
140
141=head1 METHODS
142
143=head2 defer
144
145Inserts in the the retry_wait queue.
146
147=head2 clear_all
148
149Clears all known queues that are listed under the registered QueueSet
150
151=head2 queue
152
153C<queue> is the actual queue instance that we'll be dealing with.
154While the architecture is such that you can replace the queue with
155your custom object, we currently only support Q4M
156
157  $self->queue( {
158    module => "Q4M",
159    connect_info => [ 'dbi:mysql:...', ..., ... ]
160 } );
161
162=cut
Note: See TracBrowser for help on using the browser.