Changeset 21309 for lang/perl/Net-Amazon-HadoopEC2
- Timestamp:
- 10/14/08 14:28:18 (5 years ago)
- Location:
- lang/perl/Net-Amazon-HadoopEC2/trunk
- Files:
-
- 7 added
- 1 removed
- 3 modified
-
Changes (added)
-
MANIFEST (modified) (2 diffs)
-
README (added)
-
lib/Net/Amazon/HadoopEC2.pm (modified) (3 diffs)
-
lib/Net/Amazon/HadoopEC2/Cluster.pm (modified) (7 diffs)
-
t/01_group.t (added)
-
t/01_module.t (deleted)
-
t/02_launch_cluster.t (added)
-
t/03_find_cluster.t (added)
-
t/04_launch_slave.t (added)
-
t/05_terminate_cluster.t (added)
Legend:
- Unmodified
- Added
- Removed
-
lang/perl/Net-Amazon-HadoopEC2/trunk/MANIFEST
r20994 r21309 1 Changes 1 2 inc/Module/Install.pm 2 3 inc/Module/Install/Base.pm … … 8 9 inc/Module/Install/WriteAll.pm 9 10 lib/Net/Amazon/HadoopEC2.pm 11 lib/Net/Amazon/HadoopEC2/Cluster.pm 10 12 Makefile.PL 11 13 MANIFEST This list of files 12 14 META.yml 15 README 13 16 t/00_compile.t 14 t/01_module.t 17 t/01_group.t 18 t/02_launch_cluster.t 19 t/03_find_cluster.t 20 t/04_launch_slave.t 21 t/05_terminate_cluster.t -
lang/perl/Net-Amazon-HadoopEC2/trunk/lib/Net/Amazon/HadoopEC2.pm
r21205 r21309 26 26 sub launch_cluster { 27 27 my ($self, $args) = @_; 28 $self->_assert_group({ group=> $args->{name}}) or return;28 $self->_assert_group({name => $args->{name}}) or return; 29 29 my $cluster = Net::Amazon::HadoopEC2::Cluster->new( 30 30 { 31 31 ec2 => $self->ec2, 32 32 name => $args->{name}, 33 image_id => $args->{image_id},34 slaves => $args->{slaves} || 2,35 key_name => $args->{key_name} || 'gsg-keypair',36 33 key_file => $args->{key_file}, 37 34 } 38 35 ); 39 $cluster->launch or return; 36 $cluster->launch_cluster( 37 { 38 slaves => $args->{slaves} || 2 , 39 image_id => $args->{image_id}, 40 key_name => $args->{key_name} || 'gsg-keypair', 41 } 42 ) or return; 40 43 return $cluster; 41 44 } 42 45 46 sub find_cluster { 47 my ($self, $args) = @_; 48 $self->_find_group({name => $args->{name}}) or return; 49 my $cluster = Net::Amazon::HadoopEC2::Cluster->new( 50 { 51 ec2 => $self->ec2, 52 name => $args->{name}, 53 } 54 ); 55 $cluster->find_cluster or return; 56 return $cluster; 57 } 58 43 59 sub _assert_group { 44 60 my ($self, $args) = @_; 45 my @groups = ($args->{group}, "$args->{group}-master"); 61 unless ($self->_find_group($args)) { 62 return $self->_create_group($args); 63 } 64 return 1; 65 } 66 67 sub _find_group { 68 my ($self, $args) = @_; 69 my @groups = ($args->{name}, "$args->{name}-master"); 46 70 my $g = $self->ec2->describe_security_groups( 47 71 GroupName => [ @groups ], 48 72 ); 49 73 if (ref $g eq 'Net::Amazon::EC2::Errors') { 50 $g->errors->[0]->code eq 'InvalidGroup.NotFound' or return; 74 $g->errors->[0]->code eq 'InvalidGroup.NotFound' or die $g->errors->[0]->message; 75 return; 51 76 } else { 52 return scalar @{$g} == 2 ? 1 : 0;53 }54 return $self->_create_group($args);77 scalar @{$g} == 2 or die "$args->{name} doesn't seem to be Hadoop cluster."; 78 return 1; 79 } 55 80 } 56 81 57 82 sub _create_group { 58 83 my ($self, $args) = @_; 59 my @groups = ($args->{ group}, "$args->{group}-master");84 my @groups = ($args->{name}, "$args->{name}-master"); 60 85 for my $target ( @groups ) { 61 86 my $desc = 'Group for Hadoop ' . ($target =~ m{master} ? 'Master' : 'Slaves') . '.'; … … 114 139 sub _remove_group { 115 140 my ($self, $args) = @_; 116 my @groups = ($args->{ group}, "$args->{group}-master");141 my @groups = ($args->{name}, "$args->{name}-master"); 117 142 my $success = 0; 118 143 my $result; … … 180 205 =item * launch_cluster($hashref) 181 206 207 =item * find_cluster($hashref) 208 182 209 =back 183 210 -
lang/perl/Net-Amazon-HadoopEC2/trunk/lib/Net/Amazon/HadoopEC2/Cluster.pm
r21205 r21309 1 1 package Net::Amazon::HadoopEC2::Cluster; 2 2 use Moose; 3 use Moose::Util::TypeConstraints;4 3 use Net::SSH::Perl; 5 4 use File::Spec; 6 5 use File::Basename; 7 6 use MIME::Base64; 7 use Carp; 8 8 9 9 has name => ( is => 'ro', isa => 'Str', required => 1 ); 10 10 has ec2 => ( is => 'ro', isa => 'Net::Amazon::EC2', required => 1 ); 11 has image_id => ( is => 'ro', isa => 'Str', required => 1 ); 12 has slaves => ( is => 'ro', isa => 'Str', required => 1 ); 13 has key_name => ( is => 'ro', isa => 'Str', required => 1, 14 default => 'gsg-keypair' ); 15 has key_file => ( is => 'ro', isa => 'Str', required => 1, 11 has key_file => ( is => 'rw', isa => 'Str', required => 1, lazy => 1, 16 12 default => "$ENV{HOME}/.ssh/id_rsa-gsg-keypair" ); 17 has master_instance => ( is => 'rw', isa => 'Net::Amazon::EC2::RunningInstances' ); 18 has slave_instances => ( is => 'rw', isa => 'ArrayRef[Net::Amazon::EC2::RunningInstances]' ); 19 has retry => ( is => 'ro', isa => 'Int', required => 1, default => 1 ); 20 has retry_interval => ( is => 'ro', isa => 'Int', default => 10 ); 21 has map_tasks => (is => 'rw', isa => 'Int', default => 2 ); 22 has reduce_tasks => (is => 'rw', isa => 'Int', default => 2 ); 23 has compress => (is => 'rw', isa => 'Str', default => 'false' ); 13 has master_instance => ( is => 'rw', isa => 'Maybe[Net::Amazon::EC2::RunningInstances]'); 14 has slave_instances => ( is => 'rw', isa => 'ArrayRef[Net::Amazon::EC2::RunningInstances]', default => sub { return [] } ); 15 16 has retry => ( is => 'rw', isa => 'Int', required => 1, default => 1 ); 17 has retry_interval => ( is => 'rw', isa => 'Int', required => 1, default => 10 ); 18 has map_tasks => (is => 'rw', isa => 'Int', required => 1, default => 2 ); 19 has reduce_tasks => (is => 'rw', isa => 'Int', required => 1, default => 2 ); 20 has compress => (is => 'rw', isa => 'Str', required => 1, default => 'false' ); 24 21 25 22 has ssh => ( … … 50 47 51 48 no Moose; 52 no Moose::Util::TypeConstraints; 53 54 sub launch { 55 my ($self, $args) = @_; 56 $self->launch_master($args) or return; 49 50 sub find_cluster { 51 my ($self, $args) = @_; 52 my $master_group = sprintf("%s-master", $self->name); 53 my @res = @{$self->ec2->describe_instances}; 54 my @master = $self->_wait_for_instances({name => $master_group}) or return; 55 scalar @master == 1 or return; 56 $self->master_instance($master[0]); 57 $self->_find_slaves; 58 return $self; 59 } 60 61 sub _find_slaves { 62 my ($self) = @_; 63 my @slaves = $self->_wait_for_instances({name => $self->name}); 64 $self->slave_instances([ @slaves ]); 65 } 66 67 sub launch_cluster { 68 my ($self, $args) = @_; 69 $self->_launch_master($args) or return; 57 70 $self->launch_slave($args) or return; 58 return 1;59 } 60 61 sub launch_master {71 return $self; 72 } 73 74 sub _launch_master { 62 75 my ($self, $args) = @_; 63 76 my $master_group = sprintf("%s-master", $self->name); … … 72 85 my $result; 73 86 $result = $self->ec2->run_instances( 74 ImageId => $ self->image_id,87 ImageId => $args->{image_id}, 75 88 MinCount => 1, 76 89 MaxCount => 1, 77 KeyName => $ self->key_name,90 KeyName => $args->{key_name}, 78 91 SecurityGroup => $master_group, 79 92 UserData => encode_base64($user_data_str), … … 81 94 ref $result eq 'Net::Amazon::EC2::ReservationInfo' or return; 82 95 my $master_id = $result->instances_set->[0]->instance_id; 83 while (1) { 84 $result = $self->ec2->describe_instances( 85 InstanceId => $master_id, 86 ); 87 ref $result eq 'Net::Amazon::EC2::Errors' and return; 88 ref $result->[0] eq 'Net::Amazon::EC2::ReservationInfo' or return; 89 my $state = $result->[0]->instances_set->[0]; 90 if ($state->instance_state->code == 16) { 91 $self->master_instance($state); 92 last; 93 } 94 $self->retry or last; 95 sleep $self->retry_interval; 96 } 97 $self->master_instance or return; 96 my ($master) = $self->_wait_for_instances({name => $master_group, instance_id => $master_id}) or return; 97 $self->master_instance($master); 98 98 $self->push_files( 99 99 { … … 103 103 ) or return; 104 104 $self->execute( { command => 'chmod 600 /root/.ssh/id_rsa' } )->{code} and return; 105 return 1;105 return $self; 106 106 } 107 107 108 108 sub launch_slave { 109 109 my ($self, $args) = @_; 110 $self->master_instance or return; 110 111 my $user_data = { 111 MASTER_HOST => $self->master_instance-> dns_name,112 MASTER_HOST => $self->master_instance->private_dns_name, 112 113 MAX_MAP_TASKS => $self->map_tasks, 113 114 MAX_REDUCE_TASKS => $self->reduce_tasks, … … 117 118 118 119 my $result = $self->ec2->run_instances( 119 ImageId => $self-> image_id,120 ImageId => $self->master_instance->image_id, 120 121 MinCount => 1, 121 MaxCount => $ self->slaves,122 KeyName => $self-> key_name,122 MaxCount => $args->{slaves}, 123 KeyName => $self->master_instance->key_name, 123 124 SecurityGroup => $self->name, 124 125 UserData => encode_base64($user_data_str), 125 126 ); 126 127 ref $result eq 'Net::Amazon::EC2::ReservationInfo' or return; 127 $self->slave_instances([ $result->instances_set ]); 128 return 1; 129 } 130 131 sub terminate { 128 my @instances = map { $_->instance_id} @{$result->instances_set}; 129 $self->_wait_for_instances({name => $self->name, instance_id => [ @instances ]}); 130 $self->_find_slaves; 131 return $self; 132 } 133 134 sub _wait_for_instances { 135 my ($self, $args) = @_; 136 my $name = $args->{name} or croak "name not specified"; 137 my $instances = $args->{instance_id} || []; 138 $instances = [ $instances ] unless ref $instances; 139 while (1) { 140 my $result = $self->ec2->describe_instances( 141 InstanceId => $instances, 142 ); 143 ref $result eq 'Net::Amazon::EC2::Errors' and return; 144 ref $result->[0] eq 'Net::Amazon::EC2::ReservationInfo' or return; 145 my @found = map {@{$_->instances_set}} grep { grep {$_->group_id eq $name} @{$_->group_set}} @{$result}; 146 if (grep {$_->instance_state->code == 0} @found) { 147 $self->retry or last; 148 sleep $self->retry_interval; 149 } 150 @found = grep {$_->instance_state->code == 16} @found; 151 if (my $count_expect = scalar @{$instances}) { 152 scalar @found == $count_expect or next; 153 } 154 return @found; 155 } 156 } 157 158 sub terminate_cluster { 132 159 my ($self, $args) = @_; 133 160 my @instances = map { $_->instance_id } ($self->master_instance, @{$self->slave_instances}); 134 $self->ec2->terminate_instances(161 my $res = $self->ec2->terminate_instances( 135 162 InstanceId => [ @instances ], 136 163 ); 164 $self->master_instance(undef); 165 $self->slave_instances( [] ); 166 return $res; 167 } 168 169 sub terminate_slaves { 170 my ($self, $args) = @_; 171 my $existing = scalar @{$self->slave_instances}; 172 my $count = $args->{slaves} || $existing; 173 $count = $existing if $count > $existing; 174 my @instances = map { $_->instance_id } @{$self->slave_instances}[ 0 .. $count - 1 ]; 175 my $res = $self->ec2->terminate_instances( 176 InstanceId => [ @instances ], 177 ); 178 $self->_find_slaves; 179 return $res; 137 180 } 138 181 … … 209 252 =over 4 210 253 211 =item * launch 212 213 =item * launch_master254 =item * launch_cluster 255 256 =item * find_cluster 214 257 215 258 =item * launch_slave 216 259 217 =item * terminate 260 =item * terminate_cluster 261 262 =item * terminate_slaves 218 263 219 264 =item * execute
![(please configure the [header_logo] section in trac.ini)](/share/chrome/site/your_project_logo.png)