| 1 | # $Id$ |
|---|
| 2 | |
|---|
| 3 | # TODO I think we need locking! |
|---|
| 4 | package Data::Valve::BucketStore::Memcached; |
|---|
| 5 | use Moose; |
|---|
| 6 | use Moose::Util::TypeConstraints; |
|---|
| 7 | |
|---|
| 8 | use KeyedMutex; |
|---|
| 9 | |
|---|
| 10 | with 'Data::Valve::BucketStore'; |
|---|
| 11 | |
|---|
| 12 | subtype 'Memcached' |
|---|
| 13 | => as 'Object' |
|---|
| 14 | => where { |
|---|
| 15 | my $h = $_; |
|---|
| 16 | foreach my $class qw( Cache::Memcached Cache::Memcached::Fast Cache::Memcached::libmemcached ) { |
|---|
| 17 | $h->isa($class) and return 1; |
|---|
| 18 | } |
|---|
| 19 | return (); |
|---|
| 20 | } |
|---|
| 21 | ; |
|---|
| 22 | |
|---|
| 23 | coerce 'Memcached' |
|---|
| 24 | => from 'HashRef' |
|---|
| 25 | => via { |
|---|
| 26 | my $h = $_; |
|---|
| 27 | my $module = $h->{module} || 'Cache::Memcached'; |
|---|
| 28 | Class::MOP::load_class($module); |
|---|
| 29 | $module->new($h->{args}); |
|---|
| 30 | } |
|---|
| 31 | ; |
|---|
| 32 | |
|---|
| 33 | class_type 'KeyedMutex'; |
|---|
| 34 | |
|---|
| 35 | coerce 'KeyedMutex' |
|---|
| 36 | => from 'HashRef' |
|---|
| 37 | => via { |
|---|
| 38 | my $h = $_; |
|---|
| 39 | KeyedMutex->new($h->{args}); |
|---|
| 40 | } |
|---|
| 41 | ; |
|---|
| 42 | |
|---|
| 43 | has 'memcached' => ( |
|---|
| 44 | is => 'rw', |
|---|
| 45 | isa => 'Memcached', |
|---|
| 46 | coerce => 1, |
|---|
| 47 | required => 1, |
|---|
| 48 | ); |
|---|
| 49 | |
|---|
| 50 | has 'mutex' => ( |
|---|
| 51 | is => 'rw', |
|---|
| 52 | isa => 'KeyedMutex', |
|---|
| 53 | coerce => 1, |
|---|
| 54 | ); |
|---|
| 55 | |
|---|
| 56 | no Moose; |
|---|
| 57 | |
|---|
| 58 | sub BUILD { |
|---|
| 59 | my $self = shift; |
|---|
| 60 | |
|---|
| 61 | # if no keyedmutex was provided explicitly, we attempt to create one |
|---|
| 62 | # however, if the creation of this object fails, well, we can go |
|---|
| 63 | # without it in degraded mode |
|---|
| 64 | if ( ! $self->mutex ) { |
|---|
| 65 | my $mutex = eval {KeyedMutex->new }; |
|---|
| 66 | if ($mutex) { |
|---|
| 67 | $self->mutex($mutex); |
|---|
| 68 | } else { |
|---|
| 69 | warn $@; |
|---|
| 70 | } |
|---|
| 71 | } |
|---|
| 72 | } |
|---|
| 73 | |
|---|
| 74 | sub try_push { |
|---|
| 75 | my ($self, %args) = @_; |
|---|
| 76 | |
|---|
| 77 | my $key = $args{key}; |
|---|
| 78 | |
|---|
| 79 | my $mutex = $self->mutex; |
|---|
| 80 | |
|---|
| 81 | my $rv; |
|---|
| 82 | my $done = 0; |
|---|
| 83 | while ( ! $done) { |
|---|
| 84 | my $lock = $mutex ? $mutex->lock($key, 1) : 1; |
|---|
| 85 | next unless $lock; |
|---|
| 86 | |
|---|
| 87 | $done = 1; |
|---|
| 88 | my $bucket_source = $self->memcached->get($key); |
|---|
| 89 | my $bucket; |
|---|
| 90 | if ($bucket_source) { |
|---|
| 91 | $bucket = Data::Valve::Bucket->deserialize($bucket_source, $self->interval, $self->max_items); |
|---|
| 92 | } else { |
|---|
| 93 | $bucket = Data::Valve::Bucket->new( |
|---|
| 94 | interval => $self->interval, |
|---|
| 95 | max_items => $self->max_items, |
|---|
| 96 | ); |
|---|
| 97 | } |
|---|
| 98 | $rv = $bucket->try_push(); |
|---|
| 99 | |
|---|
| 100 | # we only need to set if the value has changed, i.e., the throttle |
|---|
| 101 | # was successful |
|---|
| 102 | if ($rv) { |
|---|
| 103 | $self->memcached->set($key, $bucket->serialize); |
|---|
| 104 | } |
|---|
| 105 | } |
|---|
| 106 | |
|---|
| 107 | return $rv; |
|---|
| 108 | } |
|---|
| 109 | |
|---|
| 110 | 1; |
|---|
| 111 | |
|---|
| 112 | __END__ |
|---|
| 113 | |
|---|
| 114 | =head1 NAME |
|---|
| 115 | |
|---|
| 116 | Data::Valve::BucketStore::Memcached - Memcached Backend |
|---|
| 117 | |
|---|
| 118 | =head1 DESCRIPTION |
|---|
| 119 | |
|---|
| 120 | Data::Valve::BucketStore::Memcached uses Memcached as its storage backend, |
|---|
| 121 | and allows multiple processes to work together. |
|---|
| 122 | |
|---|
| 123 | This module also provides locking mechanism by means of KeyedMutex. |
|---|
| 124 | You should specify one at construction time: |
|---|
| 125 | |
|---|
| 126 | Data::Valve->new( |
|---|
| 127 | bucket_store => { |
|---|
| 128 | module => "Memcached", |
|---|
| 129 | args => { |
|---|
| 130 | mutex => { |
|---|
| 131 | args => { |
|---|
| 132 | sock => "host:port" # <-- here |
|---|
| 133 | } |
|---|
| 134 | } |
|---|
| 135 | } |
|---|
| 136 | } |
|---|
| 137 | ); |
|---|
| 138 | |
|---|
| 139 | This allows all coordinating processes to share the same mutex, and you will |
|---|
| 140 | get "correct" throttling information |
|---|
| 141 | |
|---|
| 142 | =head1 METHODS |
|---|
| 143 | |
|---|
| 144 | =head2 try_push |
|---|
| 145 | |
|---|
| 146 | =cut |
|---|