root/lang/perl/TheSchwartz-Simple/trunk/lib/TheSchwartz/Simple.pm @ 29760

Revision 29760, 6.3 kB (checked in by miyagawa, 5 years ago)

Checking in changes prior to tagging of version 0.05. Changelog diff is:

Index: Changes
===================================================================
--- Changes (revision 29758)
+++ Changes (working copy)
@@ -1,5 +1,8 @@

Revision history for Perl extension TheSchwartz::Simple


+0.05 Sun Feb 8 23:48:18 PST 2009
+ - Support prefix (Thanks to Mark Fowler)
+

0.04 Sat Sep 6 10:25:09 JST 2008

  • Fixed a bug when new() is given an array ref of DBHs


Line 
1package TheSchwartz::Simple;
2
3use strict;
4use 5.8.1;
5our $VERSION = '0.05';
6
7use Carp;
8use Scalar::Util qw( refaddr );
9use Storable;
10use TheSchwartz::Simple::Job;
11
12sub new {
13    my $class = shift;
14    my ($dbhs) = @_;
15    $dbhs = [$dbhs] unless ref $dbhs eq 'ARRAY';
16    bless {
17        databases => $dbhs,
18        _funcmap  => {},
19        prefix    => "",
20    }, $class;
21}
22
23sub insert {
24    my $self = shift;
25
26    my $job;
27    if ( ref $_[0] eq 'TheSchwartz::Simple::Job' ) {
28        $job = $_[0];
29    }
30    else {
31        $job = TheSchwartz::Simple::Job->new_from_array(@_);
32    }
33    $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg;
34
35    for my $dbh ( @{ $self->{databases} } ) {
36        my $jobid;
37        eval {
38            $job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) );
39            $job->insert_time(time);
40
41            my $row = $job->as_hashref;
42            my @col = keys %$row;
43
44            my $sql = sprintf 'INSERT INTO %sjob (%s) VALUES (%s)',
45                $self->{prefix},
46                join( ", ", @col ), join( ", ", ("?") x @col );
47
48            my $sth = $dbh->prepare_cached($sql);
49            my $i = 1;
50            for my $col (@col) {
51                $sth->bind_param(
52                    $i++,
53                    $row->{$col},
54                    _bind_param_attr( $dbh, $col ),
55                );
56            }
57            $sth->execute();
58
59            $jobid = _insert_id( $dbh, $sth, "job", "jobid" );
60        };
61
62        return $jobid if defined $jobid;
63    }
64
65    return;
66}
67
68sub funcname_to_id {
69    my ( $self, $dbh, $funcname ) = @_;
70
71    my $dbid = refaddr $dbh;
72    unless ( exists $self->{_funcmap}{$dbid} ) {
73        my $sth
74            = $dbh->prepare_cached("SELECT funcid, funcname FROM $self->{prefix}funcmap");
75        $sth->execute;
76        while ( my $row = $sth->fetchrow_arrayref ) {
77            $self->{_funcmap}{$dbid}{ $row->[1] } = $row->[0];
78        }
79        $sth->finish;
80    }
81
82    unless ( exists $self->{_funcmap}{$dbid}{$funcname} ) {
83        ## This might fail in a race condition since funcname is UNIQUE
84        my $sth = $dbh->prepare_cached(
85            "INSERT INTO $self->{prefix}funcmap (funcname) VALUES (?)");
86        eval { $sth->execute($funcname) };
87
88        my $id = _insert_id( $dbh, $sth, "funcmap", "funcid" );
89
90        ## If we got an exception, try to load the record again
91        if ($@) {
92            my $sth = $dbh->prepare_cached(
93                "SELECT funcid FROM $self->{prefix}funcmap WHERE funcname = ?");
94            $sth->execute($funcname);
95            $id = $sth->fetchrow_arrayref->[0]
96                or croak "Can't find or create funcname $funcname: $@";
97        }
98
99        $self->{_funcmap}{$dbid}{$funcname} = $id;
100    }
101
102    $self->{_funcmap}{$dbid}{$funcname};
103}
104
105sub _insert_id {
106    my ( $dbh, $sth, $table, $col ) = @_;
107
108    my $driver = $dbh->{Driver}{Name};
109    if ( $driver eq 'mysql' ) {
110        return $dbh->{mysql_insertid};
111    }
112    elsif ( $driver eq 'Pg' ) {
113        return $dbh->last_insert_id( undef, undef, undef, undef,
114            { sequence => join( "_", $table, $col, 'seq' ) } );
115    }
116    elsif ( $driver eq 'SQLite' ) {
117        return $dbh->func('last_insert_rowid');
118    }
119    else {
120        croak "Don't know how to get last insert id for $driver";
121    }
122}
123
124sub list_jobs {
125    my ( $self, $arg ) = @_;
126
127    die "No funcname" unless exists $arg->{funcname};
128
129    my @options;
130    push @options, {
131        key   => 'run_after',
132        op    => '<=',
133        value => $arg->{run_after}
134    } if exists $arg->{run_after};
135    push @options, {
136        key   => 'grabbed_until',
137        op    => '<=',
138        value => $arg->{grabbed_until}
139    } if exists $arg->{grabbed_until};
140
141    if ( $arg->{coalesce} ) {
142        $arg->{coalesce_op} ||= '=';
143        push @options, {
144            key   => 'coalesce',
145            op    => $arg->{coalesce_op},
146            value => $arg->{coalesce}
147        };
148    }
149
150    my @jobs;
151    for my $dbh ( @{ $self->{databases} } ) {
152        eval {
153            my $funcid = $self->funcname_to_id( $dbh, $arg->{funcname} );
154
155            my $sql   = "SELECT * FROM $self->{prefix}job WHERE funcid = ?";
156            my @value = ($funcid);
157            for (@options) {
158                $sql .= " AND $_->{key} $_->{op} ?";
159                push @value, $_->{value};
160            }
161
162            my $sth = $dbh->prepare_cached($sql);
163            $sth->execute(@value);
164            while ( my $ref = $sth->fetchrow_hashref ) {
165                push @jobs, TheSchwartz::Simple::Job->new($ref);
166            }
167        };
168    }
169
170    return @jobs;
171}
172
173sub _bind_param_attr {
174    my ( $dbh, $col ) = @_;
175
176    return if $col ne 'arg';
177
178    my $driver = $dbh->{Driver}{Name};
179    if ( $driver eq 'Pg' ) {
180        return { pg_type => DBD::Pg::PG_BYTEA() };
181    }
182    elsif ( $driver eq 'SQLite' ) {
183        return DBI::SQL_BLOB();
184    }
185    return;
186}
187
188sub prefix {
189    my $self = shift;
190
191    $self->{prefix} = shift if @_;
192    return $self->{prefix};
193}
194
195
1961;
197__END__
198
199=encoding utf-8
200
201=for stopwords TheSchwartz DBI schwartz
202
203=head1 NAME
204
205TheSchwartz::Simple - Lightweight TheSchwartz job dispatcher using plain DBI
206
207=head1 SYNOPSIS
208
209  use DBI;
210  use TheSchwartz::Simple;
211
212  my $dbh = DBI->connect(...);
213  my $client = TheSchwartz::Simple->new([ $dbh ]);
214  $client->prefix("theschwartz_"); # optional
215  my $job_id = $client->insert('funcname', $arg);
216
217  my $job = TheSchwartz::Simple::Job->new;
218  $job->funcname("WorkerName");
219  $job->arg({ foo => "bar" });
220  $job->uniqkey("uniqkey");
221  $job->run_after( time + 60 );
222  $client->insert($job);
223
224  my @jobs = $client->list_jobs({ funcname => 'funcname' });
225  for my $job (@jobs) {
226      print $job->jobid;
227  }
228
229=head1 DESCRIPTION
230
231TheSchwartz::Simple is yet another interface to insert a new job into
232TheSchwartz database using plain DBI interface.
233
234This module is solely created for the purpose of injecting a new job
235from web servers without loading additional TheSchwartz and
236Data::ObjectDriver modules onto your system. Your schwartz job worker
237processes will still need to be implemented using the full featured
238TheSchwartz::Worker module,
239
240=head1 AUTHOR
241
242Tatsuhiko Miyagawa E<lt>miyagawa@cpan.orgE<gt>
243
244=head1 COPYRIGHT
245
246Six Apart, Ltd. 2008-
247
248=head1 LICENSE
249
250This library is free software; you can redistribute it and/or modify
251it under the same terms as Perl itself.
252
253=head1 SEE ALSO
254
255L<TheSchwartz>
256
257=cut
Note: See TracBrowser for help on using the browser.