Show
Ignore:
Timestamp:
09/02/08 11:30:50 (5 years ago)
Author:
hidek
Message:

add list_jobs w/o want_handle option

Location:
lang/perl/TheSchwartz-Simple/trunk
Files:
1 added
1 modified

Legend:

Unmodified
Added
Removed
  • lang/perl/TheSchwartz-Simple/trunk/lib/TheSchwartz/Simple.pm

    r18470 r18621  
    1212sub new { 
    1313    my $class = shift; 
    14     my($dbhs) = @_; 
    15     $dbhs = [ $dbhs ] unless ref $dbhs eq 'ARRAYREF'; 
     14    my ($dbhs) = @_; 
     15    $dbhs = [$dbhs] unless ref $dbhs eq 'ARRAYREF'; 
    1616    bless { 
    1717        databases => $dbhs, 
     
    2424 
    2525    my $job; 
    26     if (ref $_[0] eq 'TheSchwartz::Simple::Job') { 
     26    if ( ref $_[0] eq 'TheSchwartz::Simple::Job' ) { 
    2727        $job = $_[0]; 
    28     } else { 
     28    } 
     29    else { 
    2930        $job = TheSchwartz::Simple::Job->new_from_array(@_); 
    3031    } 
    31     $job->arg( Storable::nfreeze($job->arg) ) if ref $job->arg; 
    32  
    33     for my $dbh (@{$self->{databases}}) { 
     32    $job->arg( Storable::nfreeze( $job->arg ) ) if ref $job->arg; 
     33 
     34    for my $dbh ( @{ $self->{databases} } ) { 
    3435        my $jobid; 
    3536        eval { 
    36             $job->funcid( $self->funcname_to_id($dbh, $job->funcname) ); 
    37             $job->insert_time( time ); 
     37            $job->funcid( $self->funcname_to_id( $dbh, $job->funcname ) ); 
     38            $job->insert_time(time); 
    3839 
    3940            my $row = $job->as_hashref; 
     
    4142 
    4243            my $sql = sprintf 'INSERT INTO job (%s) VALUES (%s)', 
    43                 join(", ", @col), join(", ", ("?") x @col); 
     44                join( ", ", @col ), join( ", ", ("?") x @col ); 
    4445 
    4546            my $sth = $dbh->prepare_cached($sql); 
    4647            $sth->execute( @$row{@col} ); 
    4748 
    48             $jobid = _insert_id($dbh, $sth, "job", "jobid"); 
     49            $jobid = _insert_id( $dbh, $sth, "job", "jobid" ); 
    4950        }; 
    5051 
     
    5657 
    5758sub funcname_to_id { 
    58     my($self, $dbh, $funcname) = @_; 
     59    my ( $self, $dbh, $funcname ) = @_; 
    5960 
    6061    my $dbid = refaddr $dbh; 
    61     unless (exists $self->{_funcmap}{$dbid}) { 
    62         my $sth = $dbh->prepare_cached('SELECT funcid, funcname FROM funcmap'); 
     62    unless ( exists $self->{_funcmap}{$dbid} ) { 
     63        my $sth 
     64            = $dbh->prepare_cached('SELECT funcid, funcname FROM funcmap'); 
    6365        $sth->execute; 
    64         while (my $row = $sth->fetchrow_arrayref) { 
    65             $self->{_funcmap}{$dbid}{$row->[1]} = $row->[0]; 
     66        while ( my $row = $sth->fetchrow_arrayref ) { 
     67            $self->{_funcmap}{$dbid}{ $row->[1] } = $row->[0]; 
    6668        } 
    6769        $sth->finish; 
    6870    } 
    6971 
    70     unless (exists $self->{_funcmap}{$dbid}{$funcname}) { 
     72    unless ( exists $self->{_funcmap}{$dbid}{$funcname} ) { 
    7173        ## This might fail in a race condition since funcname is UNIQUE 
    72         my $sth = $dbh->prepare_cached('INSERT INTO funcmap (funcname) VALUES (?)'); 
     74        my $sth = $dbh->prepare_cached( 
     75            'INSERT INTO funcmap (funcname) VALUES (?)'); 
    7376        eval { $sth->execute($funcname) }; 
    7477 
    75         my $id = _insert_id($dbh, $sth, "funcmap", "funcid"); 
     78        my $id = _insert_id( $dbh, $sth, "funcmap", "funcid" ); 
    7679 
    7780        ## If we got an exception, try to load the record again 
    7881        if ($@) { 
    79             my $sth = $dbh->prepare_cached('SELECT funcid FROM funcmap WHERE funcname = ?'); 
     82            my $sth = $dbh->prepare_cached( 
     83                'SELECT funcid FROM funcmap WHERE funcname = ?'); 
    8084            $sth->execute($funcname); 
    8185            $id = $sth->fetchrow_arrayref->[0] 
    8286                or croak "Can't find or create funcname $funcname: $@"; 
     87        } 
     88 
     89        $self->{_funcmap}{$dbid}{$funcname} = $id; 
     90    } 
     91 
     92    $self->{_funcmap}{$dbid}{$funcname}; 
     93} 
     94 
     95sub _insert_id { 
     96    my ( $dbh, $sth, $table, $col ) = @_; 
     97 
     98    my $driver = $dbh->{Driver}{Name}; 
     99    if ( $driver eq 'mysql' ) { 
     100        return $dbh->{mysql_insertid}; 
     101    } 
     102    elsif ( $driver eq 'Pg' ) { 
     103        return $dbh->last_insert_id( undef, undef, undef, undef, 
     104            { sequence => join( "_", $table, $col, 'seq' ) } ); 
     105    } 
     106    elsif ( $driver eq 'SQLite' ) { 
     107        return $dbh->func('last_insert_rowid'); 
     108    } 
     109    else { 
     110        croak "Don't know how to get last insert id for $driver"; 
     111    } 
     112} 
     113 
     114sub list_jobs { 
     115    my ( $self, $arg ) = @_; 
     116 
     117    die "No funcname" unless exists $arg->{funcname}; 
     118 
     119    my @options; 
     120    push @options, 
     121        { 
     122        key   => 'run_after', 
     123        op    => '<=', 
     124        value => $arg->{run_after} 
     125        } 
     126        if exists $arg->{run_after}; 
     127    push @options, 
     128        { 
     129        key   => 'grabbed_until', 
     130        op    => '<=', 
     131        value => $arg->{grabbed_until} 
     132        } 
     133        if exists $arg->{grabbed_until}; 
     134 
     135    if ( $arg->{coalesce} ) { 
     136        $arg->{coalesce_op} ||= '='; 
     137        push @options, 
     138            { 
     139            key   => 'coalesce', 
     140            op    => $arg->{coalesce_op}, 
     141            value => $arg->{coalesce} 
     142            }; 
     143    } 
     144 
     145    my @jobs; 
     146    for my $dbh ( @{ $self->{databases} } ) { 
     147        eval { 
     148            my $funcid = $self->funcname_to_id( $dbh, $arg->{funcname} ); 
     149 
     150            my $sql   = 'SELECT * FROM job WHERE funcid = ?'; 
     151            my @value = ($funcid); 
     152            for (@options) { 
     153                $sql .= " AND $_->{key} $_->{op} ?"; 
     154                push @value, $_->{value}; 
     155            } 
     156 
     157            my $sth = $dbh->prepare_cached($sql); 
     158            $sth->execute(@value); 
     159            while ( my $ref = $sth->fetchrow_hashref ) { 
     160                push @jobs, TheSchwartz::Simple::Job->new($ref); 
     161            } 
    83162        }; 
    84  
    85         $self->{_funcmap}{$dbid}{$funcname} = $id; 
    86     } 
    87  
    88     $self->{_funcmap}{$dbid}{$funcname}; 
    89 } 
    90  
    91 sub _insert_id { 
    92     my($dbh, $sth, $table, $col) = @_; 
    93  
    94     my $driver = $dbh->{Driver}{Name}; 
    95     if ($driver eq 'mysql') { 
    96         return $dbh->{mysql_insertid}; 
    97     } elsif ($driver eq 'Pg') { 
    98         return $dbh->last_insert_id(undef, undef, undef, undef, 
    99                                     { sequence => join("_", $table, $col, 'seq') }); 
    100     } elsif ($driver eq 'SQLite') { 
    101         return $dbh->func('last_insert_rowid'); 
    102     } else { 
    103         croak "Don't know how to get last insert id for $driver"; 
    104     } 
     163    } 
     164 
     165    return @jobs; 
    105166} 
    106167