| 1 | package Data::Model::Driver::Queue::Q4M; |
|---|
| 2 | use strict; |
|---|
| 3 | use warnings; |
|---|
| 4 | use base 'Data::Model::Driver::DBI'; |
|---|
| 5 | |
|---|
| 6 | use Carp (); |
|---|
| 7 | |
|---|
| 8 | sub timeout { $_[0]->{timeout} } |
|---|
| 9 | |
|---|
| 10 | sub _create_arguments { |
|---|
| 11 | my $arg_length = scalar(@_); |
|---|
| 12 | my $timeout; |
|---|
| 13 | my %callbacks; |
|---|
| 14 | my @queue_tables; |
|---|
| 15 | for (my $i = 0; $i < $arg_length; $i++) { |
|---|
| 16 | my($table, $value) = ($_[$i], $_[$i + 1]); |
|---|
| 17 | if (ref($value) eq 'CODE') { |
|---|
| 18 | # register callback |
|---|
| 19 | push @queue_tables, $table; |
|---|
| 20 | $callbacks{$table} = $value; |
|---|
| 21 | |
|---|
| 22 | } elsif ($table eq 'timeout' && $value =~ /\A[0-9]+\z/) { |
|---|
| 23 | # timeout |
|---|
| 24 | $timeout = $value; |
|---|
| 25 | } |
|---|
| 26 | $i++; |
|---|
| 27 | } |
|---|
| 28 | (\@queue_tables, \%callbacks, $timeout); |
|---|
| 29 | } |
|---|
| 30 | |
|---|
| 31 | sub queue_wait { |
|---|
| 32 | my($self, $timeout, @tables) = @_; |
|---|
| 33 | |
|---|
| 34 | my $dbh = $self->r_handle; |
|---|
| 35 | my $sql = sprintf 'SELECT queue_wait(%s)', join(', ', (('?') x (scalar(@tables) + 1))); |
|---|
| 36 | my $sth = $dbh->prepare_cached($sql); |
|---|
| 37 | |
|---|
| 38 | # bind params |
|---|
| 39 | my $i = 1; |
|---|
| 40 | for my $table (@tables) { |
|---|
| 41 | $sth->bind_param($i++, $table, undef); |
|---|
| 42 | } |
|---|
| 43 | $sth->bind_param($i, $timeout, undef); |
|---|
| 44 | |
|---|
| 45 | $sth->execute; |
|---|
| 46 | $sth->bind_columns(undef, \my $retcode); |
|---|
| 47 | |
|---|
| 48 | my $rv = $sth->fetch; |
|---|
| 49 | $sth->finish; |
|---|
| 50 | undef $sth; |
|---|
| 51 | return 0 unless $rv && $retcode; |
|---|
| 52 | return $retcode; |
|---|
| 53 | } |
|---|
| 54 | |
|---|
| 55 | sub queue_abort { |
|---|
| 56 | my $self = shift; |
|---|
| 57 | |
|---|
| 58 | my $dbh = $self->r_handle; |
|---|
| 59 | my $sql = 'SELECT queue_abort()'; |
|---|
| 60 | my $sth = $dbh->prepare($sql); |
|---|
| 61 | $sth->execute; |
|---|
| 62 | } |
|---|
| 63 | |
|---|
| 64 | sub queue_end { |
|---|
| 65 | my $self = shift; |
|---|
| 66 | |
|---|
| 67 | my $dbh = $self->r_handle; |
|---|
| 68 | my $sql = 'SELECT queue_end()'; |
|---|
| 69 | my $sth = $dbh->prepare($sql); |
|---|
| 70 | $sth->execute; |
|---|
| 71 | } |
|---|
| 72 | |
|---|
| 73 | sub queue_running { |
|---|
| 74 | my($self, $c) = (shift, shift); |
|---|
| 75 | my $arg_length = scalar(@_); |
|---|
| 76 | Carp::croak 'illegal parameter' if $arg_length % 2; |
|---|
| 77 | |
|---|
| 78 | # create table attributes |
|---|
| 79 | my($queue_tables, $callbacks, $timeout) = _create_arguments(@_); |
|---|
| 80 | Carp::croak 'required is callback handler' unless @{ $queue_tables }; |
|---|
| 81 | |
|---|
| 82 | my %schema = map { $_ => 1 } $c->schema_names; |
|---|
| 83 | for my $table (@{ $queue_tables }) { |
|---|
| 84 | my($name) = split /:/, $table; |
|---|
| 85 | Carp::croak "'$name' is missing model name" unless $schema{$name}; |
|---|
| 86 | } |
|---|
| 87 | |
|---|
| 88 | $timeout ||= $self->timeout || 60; |
|---|
| 89 | |
|---|
| 90 | # queue_wait |
|---|
| 91 | my $table_id = $self->queue_wait($timeout, @{ $queue_tables }); |
|---|
| 92 | return unless $table_id; |
|---|
| 93 | |
|---|
| 94 | # get record |
|---|
| 95 | my $running_table = $queue_tables->[$table_id - 1]; |
|---|
| 96 | my($real_table) = split /:/, $running_table; |
|---|
| 97 | my($row) = $c->get( $real_table ); |
|---|
| 98 | unless ($row) { |
|---|
| 99 | $self->queue_abort; |
|---|
| 100 | return; |
|---|
| 101 | } |
|---|
| 102 | |
|---|
| 103 | # running callback |
|---|
| 104 | eval { |
|---|
| 105 | $callbacks->{$running_table}->($row); |
|---|
| 106 | }; |
|---|
| 107 | if ($@) { |
|---|
| 108 | $self->queue_abort; |
|---|
| 109 | die $@; # throwing exception |
|---|
| 110 | } |
|---|
| 111 | |
|---|
| 112 | $self->queue_end; |
|---|
| 113 | return $real_table; |
|---|
| 114 | } |
|---|
| 115 | |
|---|
| 116 | # for schema |
|---|
| 117 | sub _as_sql_hook { |
|---|
| 118 | my $self = shift; |
|---|
| 119 | |
|---|
| 120 | if ($_[1] eq 'get_table_attributes') { |
|---|
| 121 | my $ret = $self->dbd->_as_sql_hook(@_); |
|---|
| 122 | unless ($ret =~ s/(\A|\W)\s*ENGINE\s*=\s*\w+\s*(\z|\W)/${1}TYPE=QUEUE${2}/) { |
|---|
| 123 | $ret ||= 'ENGINE=QUEUE'; |
|---|
| 124 | } |
|---|
| 125 | return $ret; |
|---|
| 126 | } else { |
|---|
| 127 | return $self->dbd->_as_sql_hook(@_); |
|---|
| 128 | } |
|---|
| 129 | } |
|---|
| 130 | |
|---|
| 131 | 1; |
|---|
| 132 | |
|---|
| 133 | |
|---|
| 134 | __END__ |
|---|
| 135 | |
|---|
| 136 | |
|---|
| 137 | =head1 NAME |
|---|
| 138 | |
|---|
| 139 | Data::Model::Driver::Queue::Q4M - Q4M manager for Data::Model |
|---|
| 140 | |
|---|
| 141 | =head1 SYNOPSIS |
|---|
| 142 | |
|---|
| 143 | use Data::Model::Driver::Queue::Q4M; |
|---|
| 144 | my $driver = Data::Model::Driver::Queue::Q4M->new( |
|---|
| 145 | dsn => 'dbi:mysql:database=test', |
|---|
| 146 | username => '', |
|---|
| 147 | password => '', |
|---|
| 148 | timeout => 60, # queue_wait timeout |
|---|
| 149 | ); |
|---|
| 150 | |
|---|
| 151 | { |
|---|
| 152 | package MyQueue; |
|---|
| 153 | use base 'Data::Model'; |
|---|
| 154 | use Data::Model::Mixin modules => ['Queue::Q4M']; |
|---|
| 155 | use Data::Model::Schema; |
|---|
| 156 | |
|---|
| 157 | base_driver $driver; |
|---|
| 158 | install_model smtp => schema { |
|---|
| 159 | column id |
|---|
| 160 | => char => {}; |
|---|
| 161 | column data |
|---|
| 162 | => int => {}; |
|---|
| 163 | }; |
|---|
| 164 | |
|---|
| 165 | install_model pop => schema { |
|---|
| 166 | column id |
|---|
| 167 | => char => {}; |
|---|
| 168 | column data |
|---|
| 169 | => int => {}; |
|---|
| 170 | }; |
|---|
| 171 | } |
|---|
| 172 | |
|---|
| 173 | my $model = MyQueue->new; |
|---|
| 174 | |
|---|
| 175 | # add queue |
|---|
| 176 | $model->set( |
|---|
| 177 | smtp => { |
|---|
| 178 | id => 'foo', |
|---|
| 179 | data => 1, |
|---|
| 180 | } |
|---|
| 181 | ); |
|---|
| 182 | |
|---|
| 183 | # same queue_wait('smtp', 'pop', 10); |
|---|
| 184 | my $retval = $model->queue_running( |
|---|
| 185 | smtp => sub { |
|---|
| 186 | my $row = shift; |
|---|
| 187 | is($row->id, 'foo'); |
|---|
| 188 | is($row->data, 1); |
|---|
| 189 | }, |
|---|
| 190 | pop => sub { |
|---|
| 191 | my $row = shift; |
|---|
| 192 | }, |
|---|
| 193 | timeout => 10, # optional |
|---|
| 194 | ); |
|---|
| 195 | |
|---|
| 196 | # same queue_wait('smtp:data>10'); |
|---|
| 197 | my $retval = $model->queue_running( |
|---|
| 198 | 'smtp:data>10' => sub { |
|---|
| 199 | my $row = shift; |
|---|
| 200 | is($row->id, 'foo'); |
|---|
| 201 | is($row->data, 1); |
|---|
| 202 | }, |
|---|
| 203 | ); |
|---|
| 204 | |
|---|
| 205 | =cut |
|---|