Show
Ignore:
Timestamp:
10/14/08 14:28:18 (5 years ago)
Author:
lopnor
Message:

lang/perl/Net-Amazon-HadoopEC2: implemented launch_*, find_*, terminate_*

Location:
lang/perl/Net-Amazon-HadoopEC2/trunk
Files:
7 added
1 removed
3 modified

Legend:

Unmodified
Added
Removed
  • lang/perl/Net-Amazon-HadoopEC2/trunk/MANIFEST

    r20994 r21309  
     1Changes 
    12inc/Module/Install.pm 
    23inc/Module/Install/Base.pm 
     
    89inc/Module/Install/WriteAll.pm 
    910lib/Net/Amazon/HadoopEC2.pm 
     11lib/Net/Amazon/HadoopEC2/Cluster.pm 
    1012Makefile.PL 
    1113MANIFEST                        This list of files 
    1214META.yml 
     15README 
    1316t/00_compile.t 
    14 t/01_module.t 
     17t/01_group.t 
     18t/02_launch_cluster.t 
     19t/03_find_cluster.t 
     20t/04_launch_slave.t 
     21t/05_terminate_cluster.t 
  • lang/perl/Net-Amazon-HadoopEC2/trunk/lib/Net/Amazon/HadoopEC2.pm

    r21205 r21309  
    2626sub launch_cluster { 
    2727    my ($self, $args) = @_; 
    28     $self->_assert_group({group => $args->{name}}) or return; 
     28    $self->_assert_group({name => $args->{name}}) or return; 
    2929    my $cluster = Net::Amazon::HadoopEC2::Cluster->new( 
    3030        { 
    3131            ec2      => $self->ec2, 
    3232            name     => $args->{name}, 
    33             image_id => $args->{image_id}, 
    34             slaves   => $args->{slaves} || 2, 
    35             key_name => $args->{key_name} || 'gsg-keypair', 
    3633            key_file => $args->{key_file}, 
    3734        } 
    3835    ); 
    39     $cluster->launch or return; 
     36    $cluster->launch_cluster(  
     37        {  
     38            slaves => $args->{slaves} || 2 , 
     39            image_id => $args->{image_id}, 
     40            key_name => $args->{key_name} || 'gsg-keypair', 
     41        }  
     42    ) or return; 
    4043    return $cluster; 
    4144} 
    4245 
     46sub find_cluster { 
     47    my ($self, $args) = @_; 
     48    $self->_find_group({name => $args->{name}}) or return; 
     49    my $cluster = Net::Amazon::HadoopEC2::Cluster->new( 
     50        { 
     51            ec2 => $self->ec2, 
     52            name => $args->{name}, 
     53        } 
     54    ); 
     55    $cluster->find_cluster or return; 
     56    return $cluster; 
     57} 
     58 
    4359sub _assert_group { 
    4460    my ($self, $args) = @_; 
    45     my @groups = ($args->{group}, "$args->{group}-master"); 
     61    unless ($self->_find_group($args)) { 
     62        return $self->_create_group($args); 
     63    } 
     64    return 1; 
     65} 
     66 
     67sub _find_group { 
     68    my ($self, $args) = @_; 
     69    my @groups = ($args->{name}, "$args->{name}-master"); 
    4670    my $g = $self->ec2->describe_security_groups( 
    4771        GroupName => [ @groups ], 
    4872    ); 
    4973    if (ref $g eq 'Net::Amazon::EC2::Errors') { 
    50         $g->errors->[0]->code eq 'InvalidGroup.NotFound' or return; 
     74        $g->errors->[0]->code eq 'InvalidGroup.NotFound' or die $g->errors->[0]->message; 
     75        return; 
    5176    } else { 
    52         return scalar @{$g} == 2 ? 1 : 0; 
    53     } 
    54     return $self->_create_group($args); 
     77        scalar @{$g} == 2 or die "$args->{name} doesn't seem to be Hadoop cluster."; 
     78        return 1; 
     79    } 
    5580} 
    5681 
    5782sub _create_group { 
    5883    my ($self, $args) = @_; 
    59     my @groups = ($args->{group}, "$args->{group}-master"); 
     84    my @groups = ($args->{name}, "$args->{name}-master"); 
    6085    for my $target ( @groups ) { 
    6186        my $desc = 'Group for Hadoop ' . ($target =~ m{master} ? 'Master' : 'Slaves') . '.'; 
     
    114139sub _remove_group { 
    115140    my ($self, $args) = @_; 
    116     my @groups = ($args->{group}, "$args->{group}-master"); 
     141    my @groups = ($args->{name}, "$args->{name}-master"); 
    117142    my $success = 0; 
    118143    my $result; 
     
    180205=item * launch_cluster($hashref) 
    181206 
     207=item * find_cluster($hashref) 
     208 
    182209=back 
    183210 
  • lang/perl/Net-Amazon-HadoopEC2/trunk/lib/Net/Amazon/HadoopEC2/Cluster.pm

    r21205 r21309  
    11package Net::Amazon::HadoopEC2::Cluster; 
    22use Moose; 
    3 use Moose::Util::TypeConstraints; 
    43use Net::SSH::Perl; 
    54use File::Spec; 
    65use File::Basename; 
    76use MIME::Base64; 
     7use Carp; 
    88 
    99has name     => ( is => 'ro', isa => 'Str', required => 1 ); 
    1010has ec2      => ( is => 'ro', isa => 'Net::Amazon::EC2', required => 1 ); 
    11 has image_id => ( is => 'ro', isa => 'Str', required => 1 ); 
    12 has slaves   => ( is => 'ro', isa => 'Str', required => 1 ); 
    13 has key_name => ( is => 'ro', isa => 'Str', required => 1, 
    14     default => 'gsg-keypair' ); 
    15 has key_file => ( is => 'ro', isa => 'Str', required => 1, 
     11has key_file => ( is => 'rw', isa => 'Str', required => 1, lazy => 1, 
    1612    default => "$ENV{HOME}/.ssh/id_rsa-gsg-keypair" ); 
    17 has master_instance => ( is => 'rw', isa => 'Net::Amazon::EC2::RunningInstances' ); 
    18 has slave_instances => ( is => 'rw', isa => 'ArrayRef[Net::Amazon::EC2::RunningInstances]' ); 
    19 has retry => ( is => 'ro', isa => 'Int', required => 1, default => 1 ); 
    20 has retry_interval => ( is => 'ro', isa => 'Int', default => 10 ); 
    21 has map_tasks => (is => 'rw', isa => 'Int', default => 2 ); 
    22 has reduce_tasks => (is => 'rw', isa => 'Int', default => 2 ); 
    23 has compress => (is => 'rw', isa => 'Str', default => 'false' ); 
     13has master_instance => ( is => 'rw', isa => 'Maybe[Net::Amazon::EC2::RunningInstances]'); 
     14has slave_instances => ( is => 'rw', isa => 'ArrayRef[Net::Amazon::EC2::RunningInstances]', default => sub { return [] } ); 
     15 
     16has retry => ( is => 'rw', isa => 'Int', required => 1, default => 1 ); 
     17has retry_interval => ( is => 'rw', isa => 'Int', required => 1, default => 10 ); 
     18has map_tasks => (is => 'rw', isa => 'Int', required => 1, default => 2 ); 
     19has reduce_tasks => (is => 'rw', isa => 'Int', required => 1, default => 2 ); 
     20has compress => (is => 'rw', isa => 'Str', required => 1, default => 'false' ); 
    2421 
    2522has ssh => ( 
     
    5047 
    5148no Moose; 
    52 no Moose::Util::TypeConstraints; 
    53  
    54 sub launch { 
    55     my ($self, $args) = @_; 
    56     $self->launch_master($args) or return; 
     49 
     50sub find_cluster { 
     51    my ($self, $args) = @_; 
     52    my $master_group = sprintf("%s-master", $self->name); 
     53    my @res = @{$self->ec2->describe_instances}; 
     54    my @master = $self->_wait_for_instances({name => $master_group}) or return; 
     55    scalar @master == 1 or return; 
     56    $self->master_instance($master[0]); 
     57    $self->_find_slaves; 
     58    return $self; 
     59} 
     60 
     61sub _find_slaves { 
     62    my ($self) = @_; 
     63    my @slaves = $self->_wait_for_instances({name => $self->name}); 
     64    $self->slave_instances([ @slaves ]); 
     65} 
     66 
     67sub launch_cluster { 
     68    my ($self, $args) = @_; 
     69    $self->_launch_master($args) or return; 
    5770    $self->launch_slave($args) or return; 
    58     return 1; 
    59 } 
    60  
    61 sub launch_master { 
     71    return $self; 
     72} 
     73 
     74sub _launch_master { 
    6275    my ($self, $args) = @_; 
    6376    my $master_group = sprintf("%s-master", $self->name); 
     
    7285    my $result; 
    7386    $result = $self->ec2->run_instances( 
    74         ImageId => $self->image_id, 
     87        ImageId => $args->{image_id}, 
    7588        MinCount => 1, 
    7689        MaxCount => 1, 
    77         KeyName => $self->key_name, 
     90        KeyName => $args->{key_name}, 
    7891        SecurityGroup => $master_group, 
    7992        UserData => encode_base64($user_data_str), 
     
    8194    ref $result eq 'Net::Amazon::EC2::ReservationInfo' or return; 
    8295    my $master_id = $result->instances_set->[0]->instance_id; 
    83     while (1) { 
    84         $result = $self->ec2->describe_instances( 
    85             InstanceId => $master_id, 
    86         ); 
    87         ref $result eq 'Net::Amazon::EC2::Errors' and return; 
    88         ref $result->[0] eq 'Net::Amazon::EC2::ReservationInfo' or return; 
    89         my $state = $result->[0]->instances_set->[0]; 
    90         if ($state->instance_state->code == 16) { 
    91             $self->master_instance($state); 
    92             last; 
    93         } 
    94         $self->retry or last; 
    95         sleep $self->retry_interval; 
    96     } 
    97     $self->master_instance or return; 
     96    my ($master) = $self->_wait_for_instances({name => $master_group, instance_id => $master_id}) or return; 
     97    $self->master_instance($master); 
    9898    $self->push_files( 
    9999        { 
     
    103103    ) or return; 
    104104    $self->execute( { command => 'chmod 600 /root/.ssh/id_rsa' } )->{code} and return; 
    105     return 1; 
     105    return $self; 
    106106} 
    107107 
    108108sub launch_slave { 
    109109    my ($self, $args) = @_; 
     110    $self->master_instance or return; 
    110111    my $user_data = { 
    111         MASTER_HOST => $self->master_instance->dns_name, 
     112        MASTER_HOST => $self->master_instance->private_dns_name, 
    112113        MAX_MAP_TASKS => $self->map_tasks, 
    113114        MAX_REDUCE_TASKS => $self->reduce_tasks, 
     
    117118 
    118119    my $result = $self->ec2->run_instances( 
    119         ImageId => $self->image_id, 
     120        ImageId => $self->master_instance->image_id, 
    120121        MinCount => 1, 
    121         MaxCount => $self->slaves, 
    122         KeyName => $self->key_name, 
     122        MaxCount => $args->{slaves}, 
     123        KeyName => $self->master_instance->key_name, 
    123124        SecurityGroup => $self->name, 
    124125        UserData => encode_base64($user_data_str), 
    125126    ); 
    126127    ref $result eq 'Net::Amazon::EC2::ReservationInfo' or return; 
    127     $self->slave_instances([ $result->instances_set ]); 
    128     return 1; 
    129 } 
    130  
    131 sub terminate { 
     128    my @instances = map { $_->instance_id} @{$result->instances_set}; 
     129    $self->_wait_for_instances({name => $self->name, instance_id => [ @instances ]}); 
     130    $self->_find_slaves; 
     131    return $self; 
     132} 
     133 
     134sub _wait_for_instances { 
     135    my ($self, $args) = @_; 
     136    my $name = $args->{name} or croak "name not specified"; 
     137    my $instances = $args->{instance_id} || []; 
     138    $instances = [ $instances ] unless ref $instances; 
     139    while (1) { 
     140        my $result = $self->ec2->describe_instances( 
     141            InstanceId => $instances, 
     142        ); 
     143        ref $result eq 'Net::Amazon::EC2::Errors' and return; 
     144        ref $result->[0] eq 'Net::Amazon::EC2::ReservationInfo' or return; 
     145        my @found = map {@{$_->instances_set}} grep { grep {$_->group_id eq $name} @{$_->group_set}} @{$result}; 
     146        if (grep {$_->instance_state->code == 0} @found) { 
     147            $self->retry or last; 
     148            sleep $self->retry_interval; 
     149        } 
     150        @found = grep {$_->instance_state->code == 16} @found;  
     151        if (my $count_expect = scalar @{$instances}) { 
     152            scalar @found == $count_expect or next; 
     153        } 
     154        return @found; 
     155    } 
     156} 
     157 
     158sub terminate_cluster { 
    132159    my ($self, $args) = @_; 
    133160    my @instances = map { $_->instance_id } ($self->master_instance, @{$self->slave_instances}); 
    134     $self->ec2->terminate_instances( 
     161    my $res = $self->ec2->terminate_instances( 
    135162        InstanceId => [ @instances ], 
    136163    ); 
     164    $self->master_instance(undef); 
     165    $self->slave_instances( [] ); 
     166    return $res; 
     167} 
     168 
     169sub terminate_slaves { 
     170    my ($self, $args) = @_; 
     171    my $existing = scalar @{$self->slave_instances}; 
     172    my $count = $args->{slaves} || $existing; 
     173    $count = $existing if $count > $existing; 
     174    my @instances = map { $_->instance_id } @{$self->slave_instances}[ 0 .. $count - 1 ]; 
     175    my $res = $self->ec2->terminate_instances( 
     176        InstanceId => [ @instances ], 
     177    ); 
     178    $self->_find_slaves; 
     179    return $res; 
    137180} 
    138181 
     
    209252=over 4 
    210253 
    211 =item * launch 
    212  
    213 =item * launch_master 
     254=item * launch_cluster 
     255 
     256=item * find_cluster 
    214257 
    215258=item * launch_slave 
    216259 
    217 =item * terminate 
     260=item * terminate_cluster 
     261 
     262=item * terminate_slaves 
    218263 
    219264=item * execute