root/lang/perl/Data-Model/trunk/lib/Data/Model/Driver/Queue/Q4M.pm @ 32452

Revision 32452, 4.4 kB (checked in by yappo, 4 years ago)

useing mixin manager for q4m support

Line 
1package Data::Model::Driver::Queue::Q4M;
2use strict;
3use warnings;
4use base 'Data::Model::Driver::DBI';
5
6use Carp ();
7
8sub timeout { $_[0]->{timeout} }
9
10sub _create_arguments {
11    my $arg_length = scalar(@_);
12    my $timeout;
13    my %callbacks;
14    my @queue_tables;
15    for (my $i = 0; $i < $arg_length; $i++) {
16        my($table, $value) = ($_[$i], $_[$i + 1]);
17        if (ref($value) eq 'CODE') {
18            # register callback
19            push @queue_tables, $table;
20            $callbacks{$table} = $value;
21
22        } elsif ($table eq 'timeout' && $value =~ /\A[0-9]+\z/) {
23            # timeout
24            $timeout = $value;
25        }
26        $i++;
27    }
28    (\@queue_tables, \%callbacks, $timeout);
29}
30
31sub queue_wait {
32    my($self, $timeout, @tables) = @_;
33
34    my $dbh = $self->r_handle;
35    my $sql = sprintf 'SELECT queue_wait(%s)', join(', ', (('?') x (scalar(@tables) + 1)));
36    my $sth = $dbh->prepare_cached($sql);
37
38    # bind params
39    my $i = 1;
40    for my $table (@tables) {
41        $sth->bind_param($i++, $table, undef);
42    }
43    $sth->bind_param($i, $timeout, undef);
44
45    $sth->execute;
46    $sth->bind_columns(undef, \my $retcode);
47
48    my $rv = $sth->fetch;
49    $sth->finish;
50    undef $sth;
51    return 0 unless $rv && $retcode;
52    return $retcode;
53}
54
55sub queue_abort {
56    my $self = shift;
57
58    my $dbh = $self->r_handle;
59    my $sql = 'SELECT queue_abort()';
60    my $sth = $dbh->prepare($sql);
61    $sth->execute;
62}
63
64sub queue_end {
65    my $self = shift;
66
67    my $dbh = $self->r_handle;
68    my $sql = 'SELECT queue_end()';
69    my $sth = $dbh->prepare($sql);
70    $sth->execute;
71}
72
73sub queue_running {
74    my($self, $c) = (shift, shift);
75    my $arg_length = scalar(@_);
76    Carp::croak 'illegal parameter' if $arg_length % 2;
77
78    # create table attributes
79    my($queue_tables, $callbacks, $timeout) = _create_arguments(@_);
80    Carp::croak 'required is callback handler' unless @{ $queue_tables };
81
82    my %schema  = map { $_ => 1 } $c->schema_names;
83    for my $table (@{ $queue_tables }) {
84        my($name) = split /:/, $table;
85        Carp::croak "'$name' is missing model name" unless $schema{$name};
86    }
87
88    $timeout ||= $self->timeout || 60;
89
90    # queue_wait
91    my $table_id = $self->queue_wait($timeout, @{ $queue_tables });
92    return unless $table_id;
93
94    # get record
95    my $running_table = $queue_tables->[$table_id - 1];
96    my($real_table) = split /:/, $running_table;
97    my($row) = $c->get( $real_table );
98    unless ($row) {
99        $self->queue_abort;
100        return;
101    }
102
103    # running callback
104    eval {
105        $callbacks->{$running_table}->($row);
106    };
107    if ($@) {
108        $self->queue_abort;
109        die $@; # throwing exception
110    }
111
112    $self->queue_end;
113    return $real_table;
114}
115
116# for schema
117sub _as_sql_hook {
118    my $self = shift;
119
120    if ($_[1] eq 'get_table_attributes') {
121        my $ret = $self->dbd->_as_sql_hook(@_);
122        unless ($ret =~ s/(\A|\W)\s*ENGINE\s*=\s*\w+\s*(\z|\W)/${1}TYPE=QUEUE${2}/) {
123            $ret ||= 'ENGINE=QUEUE';
124        }
125        return $ret;
126    } else {
127        return $self->dbd->_as_sql_hook(@_);
128    }
129}
130
1311;
132
133
134__END__
135
136
137=head1 NAME
138
139Data::Model::Driver::Queue::Q4M - Q4M manager for Data::Model
140
141=head1 SYNOPSIS
142
143  use Data::Model::Driver::Queue::Q4M;
144  my $driver = Data::Model::Driver::Queue::Q4M->new(
145      dsn      => 'dbi:mysql:database=test',
146      username => '',
147      password => '',
148      timeout  => 60, # queue_wait timeout
149  );
150
151  {
152    package MyQueue;
153    use base 'Data::Model';
154    use Data::Model::Mixin modules => ['Queue::Q4M'];
155    use Data::Model::Schema;
156
157    base_driver $driver;
158    install_model smtp => schema {
159        column id
160            => char => {};
161        column data
162            => int => {};
163    };
164
165    install_model pop => schema {
166        column id
167            => char => {};
168        column data
169            => int => {};
170    };
171  }
172
173  my $model = MyQueue->new;
174
175  # add queue
176  $model->set(
177      smtp => {
178          id   => 'foo',
179          data => 1,
180      }
181  );
182
183  # same queue_wait('smtp', 'pop', 10);
184  my $retval = $model->queue_running(
185      smtp => sub {
186          my $row = shift;
187          is($row->id, 'foo');
188          is($row->data, 1);
189      },
190      pop => sub {
191          my $row = shift;
192      },
193      timeout => 10, # optional
194  );
195
196  # same queue_wait('smtp:data>10');
197  my $retval = $model->queue_running(
198      'smtp:data>10' => sub {
199          my $row = shift;
200          is($row->id, 'foo');
201          is($row->data, 1);
202      },
203  );
204
205=cut
Note: See TracBrowser for help on using the browser.