root/lang/perl/Mvalve/trunk/lib/Mvalve/Base.pm @ 15914

Revision 15914, 3.7 kB (checked in by daisuke, 5 years ago)

Start implementing logger

  • Property svn:keywords set to Id
Line 
1# $Id$
2
3package Mvalve::Base;
4use Moose;
5use Mvalve;
6use Mvalve::QueueSet;
7use Mvalve::Logger;
8use Time::HiRes;
9
10with 'MooseX::KeyedMutex';
11
12has 'logger' => (
13    is       => 'rw',
14    does     => 'Mvalve::Logger',
15);
16
17has 'queue' => (
18    is       => 'rw',
19    does     => 'Mvalve::Queue',
20    required => 1,
21    coerce   => 1,
22    handles => {
23        map { ( "q_$_" => $_ ) }
24            qw(next fetch insert clear)
25    },
26);
27
28{
29    my $default = sub {
30        my $class = shift;
31        return sub {
32            Class::MOP::load_class($class);
33            $class->new;
34        };
35    };
36
37    has 'queue_set' => (
38        is  => 'rw',
39        isa => 'Mvalve::QueueSet',
40        required => 1,
41        default => $default->( 'Mvalve::QueueSet' )
42    );
43
44    has 'state' => (
45        is => 'rw',
46        does => 'Mvalve::State',
47        coerce => 1,
48        required => 1,
49        default => $default->( 'Mvalve::State::Memory' ),
50        handles => {
51            map { ("state_$_" => $_) } qw(get set remove incr decr)
52        }
53    );
54}
55
56__PACKAGE__->meta->make_immutable;
57
58no Moose;
59
60sub log {
61    my $self = shift;
62    my $logger = $self->logger ;
63    return () unless $logger;
64
65    $logger->log(@_);
66}
67
68sub clear_all {
69    my $self = shift;
70
71    foreach my $table ($self->queue_set->all_tables) {
72        $self->q_clear($table);
73    }
74}
75
76sub defer
77{
78    my( $self, %args ) = @_;
79
80    my $message  = $args{message};
81    my $interval = $args{interval} || 0;
82    my $duration = $args{duration} ||
83        $message->header( &Mvalve::Const::DURATION_HEADER ) ||
84        0;
85
86    my $factor = 100_000;
87    $interval *= $factor;
88    $duration *= $factor;
89
90    if ( ! blessed($message) || ! $message->isa( 'Mvalve::Message' ) ) {
91        return () ;
92    }
93
94    my $qs          = $self->queue_set;
95    my $destination = $message->header( &Mvalve::Const::DESTINATION_HEADER );
96    my $time_key    = [ $destination, 'retry time' ];
97    my $retry_key   = [ $destination, 'retry' ];
98
99    my $done = 0;
100    my $rv;
101    while (! $done) {
102        my $lock = $self->lock( join('.', @$time_key ) );
103        next unless $lock;
104
105        $done = 1;
106
107        my $now    = Time::HiRes::time() * $factor;
108        my $retry  = int($self->state_get($time_key) || $now);
109
110        # we always prefer duration
111        my $offset = $duration || $interval;
112        my $myturn = 0;
113
114        if ($retry > $now) {
115            $myturn = $retry;
116        } else {
117            if ( $retry + $offset >= $now ) {
118                $myturn = $retry + $offset;
119            } else {
120                $myturn = $now;
121            }
122        }
123        my $next   = $myturn + $offset;
124
125        $message->header( &Mvalve::Const::RETRY_HEADER, $myturn );
126
127        Mvalve::trace( "defer (retry = $retry)" ) if &Mvalve::Const::MVALVE_TRACE;
128        $rv = $self->q_insert(
129            table => $qs->choose_table('timed'),
130            data => {
131                destination => $destination,
132                ready       => $myturn,
133                message     => $message->serialize,
134            }
135        );
136
137        Mvalve::trace( "q_insert results in $rv" ) if &Mvalve::Const::MVALVE_TRACE;
138
139        if ($rv) {
140            $self->state_set($time_key, $next);
141        }
142    }
143
144    return $rv;
145}
146
1471;
148
149__END__
150
151=head1 NAME
152
153Mvalve::Base - Base Class For Mvalve Reader/Writer
154
155=head1 METHODS
156
157=head2 defer
158
159Inserts in the the retry_wait queue.
160
161=head2 clear_all
162
163Clears all known queues that are listed under the registered QueueSet
164
165=head2 queue
166
167C<queue> is the actual queue instance that we'll be dealing with.
168While the architecture is such that you can replace the queue with
169your custom object, we currently only support Q4M
170
171  $self->queue( {
172    module => "Q4M",
173    connect_info => [ 'dbi:mysql:...', ..., ... ]
174 } );
175
176=cut
Note: See TracBrowser for help on using the browser.