Changeset 18621 for lang/perl/TheSchwartz-Simple
- Timestamp:
- 09/02/08 11:30:50 (5 years ago)
- Location:
- lang/perl/TheSchwartz-Simple/trunk
- Files:
-
- 1 added
- 1 modified
-
lib/TheSchwartz/Simple.pm (modified) (4 diffs)
-
t/03_list_jobs.t (added)
Legend:
- Unmodified
- Added
- Removed
-
lang/perl/TheSchwartz-Simple/trunk/lib/TheSchwartz/Simple.pm
r18470 r18621 12 12 sub new { 13 13 my $class = shift; 14 my ($dbhs) = @_;15 $dbhs = [ $dbhs] unless ref $dbhs eq 'ARRAYREF';14 my ($dbhs) = @_; 15 $dbhs = [$dbhs] unless ref $dbhs eq 'ARRAYREF'; 16 16 bless { 17 17 databases => $dbhs, … … 24 24 25 25 my $job; 26 if ( ref $_[0] eq 'TheSchwartz::Simple::Job') {26 if ( ref $_[0] eq 'TheSchwartz::Simple::Job' ) { 27 27 $job = $_[0]; 28 } else { 28 } 29 else { 29 30 $job = TheSchwartz::Simple::Job->new_from_array(@_); 30 31 } 31 $job->arg( Storable::nfreeze( $job->arg) ) if ref $job->arg;32 33 for my $dbh ( @{$self->{databases}}) {32 $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg; 33 34 for my $dbh ( @{ $self->{databases} } ) { 34 35 my $jobid; 35 36 eval { 36 $job->funcid( $self->funcname_to_id( $dbh, $job->funcname) );37 $job->insert_time( time);37 $job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) ); 38 $job->insert_time(time); 38 39 39 40 my $row = $job->as_hashref; … … 41 42 42 43 my $sql = sprintf 'INSERT INTO job (%s) VALUES (%s)', 43 join( ", ", @col), join(", ", ("?") x @col);44 join( ", ", @col ), join( ", ", ("?") x @col ); 44 45 45 46 my $sth = $dbh->prepare_cached($sql); 46 47 $sth->execute( @$row{@col} ); 47 48 48 $jobid = _insert_id( $dbh, $sth, "job", "jobid");49 $jobid = _insert_id( $dbh, $sth, "job", "jobid" ); 49 50 }; 50 51 … … 56 57 57 58 sub funcname_to_id { 58 my ($self, $dbh, $funcname) = @_;59 my ( $self, $dbh, $funcname ) = @_; 59 60 60 61 my $dbid = refaddr $dbh; 61 unless (exists $self->{_funcmap}{$dbid}) { 62 my $sth = $dbh->prepare_cached('SELECT funcid, funcname FROM funcmap'); 62 unless ( exists $self->{_funcmap}{$dbid} ) { 63 my $sth 64 = $dbh->prepare_cached('SELECT funcid, funcname FROM funcmap'); 63 65 $sth->execute; 64 while ( my $row = $sth->fetchrow_arrayref) {65 $self->{_funcmap}{$dbid}{ $row->[1]} = $row->[0];66 while ( my $row = $sth->fetchrow_arrayref ) { 67 $self->{_funcmap}{$dbid}{ $row->[1] } = $row->[0]; 66 68 } 67 69 $sth->finish; 68 70 } 69 71 70 unless ( exists $self->{_funcmap}{$dbid}{$funcname}) {72 unless ( exists $self->{_funcmap}{$dbid}{$funcname} ) { 71 73 ## This might fail in a race condition since funcname is UNIQUE 72 my $sth = $dbh->prepare_cached('INSERT INTO funcmap (funcname) VALUES (?)'); 74 my $sth = $dbh->prepare_cached( 75 'INSERT INTO funcmap (funcname) VALUES (?)'); 73 76 eval { $sth->execute($funcname) }; 74 77 75 my $id = _insert_id( $dbh, $sth, "funcmap", "funcid");78 my $id = _insert_id( $dbh, $sth, "funcmap", "funcid" ); 76 79 77 80 ## If we got an exception, try to load the record again 78 81 if ($@) { 79 my $sth = $dbh->prepare_cached('SELECT funcid FROM funcmap WHERE funcname = ?'); 82 my $sth = $dbh->prepare_cached( 83 'SELECT funcid FROM funcmap WHERE funcname = ?'); 80 84 $sth->execute($funcname); 81 85 $id = $sth->fetchrow_arrayref->[0] 82 86 or croak "Can't find or create funcname $funcname: $@"; 87 } 88 89 $self->{_funcmap}{$dbid}{$funcname} = $id; 90 } 91 92 $self->{_funcmap}{$dbid}{$funcname}; 93 } 94 95 sub _insert_id { 96 my ( $dbh, $sth, $table, $col ) = @_; 97 98 my $driver = $dbh->{Driver}{Name}; 99 if ( $driver eq 'mysql' ) { 100 return $dbh->{mysql_insertid}; 101 } 102 elsif ( $driver eq 'Pg' ) { 103 return $dbh->last_insert_id( undef, undef, undef, undef, 104 { sequence => join( "_", $table, $col, 'seq' ) } ); 105 } 106 elsif ( $driver eq 'SQLite' ) { 107 return $dbh->func('last_insert_rowid'); 108 } 109 else { 110 croak "Don't know how to get last insert id for $driver"; 111 } 112 } 113 114 sub list_jobs { 115 my ( $self, $arg ) = @_; 116 117 die "No funcname" unless exists $arg->{funcname}; 118 119 my @options; 120 push @options, 121 { 122 key => 'run_after', 123 op => '<=', 124 value => $arg->{run_after} 125 } 126 if exists $arg->{run_after}; 127 push @options, 128 { 129 key => 'grabbed_until', 130 op => '<=', 131 value => $arg->{grabbed_until} 132 } 133 if exists $arg->{grabbed_until}; 134 135 if ( $arg->{coalesce} ) { 136 $arg->{coalesce_op} ||= '='; 137 push @options, 138 { 139 key => 'coalesce', 140 op => $arg->{coalesce_op}, 141 value => $arg->{coalesce} 142 }; 143 } 144 145 my @jobs; 146 for my $dbh ( @{ $self->{databases} } ) { 147 eval { 148 my $funcid = $self->funcname_to_id( $dbh, $arg->{funcname} ); 149 150 my $sql = 'SELECT * FROM job WHERE funcid = ?'; 151 my @value = ($funcid); 152 for (@options) { 153 $sql .= " AND $_->{key} $_->{op} ?"; 154 push @value, $_->{value}; 155 } 156 157 my $sth = $dbh->prepare_cached($sql); 158 $sth->execute(@value); 159 while ( my $ref = $sth->fetchrow_hashref ) { 160 push @jobs, TheSchwartz::Simple::Job->new($ref); 161 } 83 162 }; 84 85 $self->{_funcmap}{$dbid}{$funcname} = $id; 86 } 87 88 $self->{_funcmap}{$dbid}{$funcname}; 89 } 90 91 sub _insert_id { 92 my($dbh, $sth, $table, $col) = @_; 93 94 my $driver = $dbh->{Driver}{Name}; 95 if ($driver eq 'mysql') { 96 return $dbh->{mysql_insertid}; 97 } elsif ($driver eq 'Pg') { 98 return $dbh->last_insert_id(undef, undef, undef, undef, 99 { sequence => join("_", $table, $col, 'seq') }); 100 } elsif ($driver eq 'SQLite') { 101 return $dbh->func('last_insert_rowid'); 102 } else { 103 croak "Don't know how to get last insert id for $driver"; 104 } 163 } 164 165 return @jobs; 105 166 } 106 167
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)