| 1 | #! /usr/bin/perl |
|---|
| 2 | |
|---|
| 3 | use strict; |
|---|
| 4 | use warnings; |
|---|
| 5 | |
|---|
| 6 | use DBI; |
|---|
| 7 | use Getopt::Long; |
|---|
| 8 | use File::Temp qw(tempfile); |
|---|
| 9 | use IO::File::AtomicChange; |
|---|
| 10 | use JSON; |
|---|
| 11 | use Storable qw(dclone); |
|---|
| 12 | |
|---|
| 13 | our $VERSION = 0.01; |
|---|
| 14 | |
|---|
| 15 | my $READ_PRIVS = "SELECT"; |
|---|
| 16 | my $WRITE_PRIVS = "INSERT,UPDATE,DELETE,CREATE TEMPORARY TABLES,EXECUTE,LOCK TABLES"; |
|---|
| 17 | |
|---|
| 18 | my %DEFAULT_PORT = ( |
|---|
| 19 | mysql => 3306, |
|---|
| 20 | ); |
|---|
| 21 | |
|---|
| 22 | my ( |
|---|
| 23 | $opt_rdbms, $opt_database, $opt_user, $opt_password, $opt_shard_def, |
|---|
| 24 | $opt_new_host, $opt_new_port, $opt_from_id, $opt_incline_source, |
|---|
| 25 | $opt_drop_old_user_after, $opt_help, $opt_version, |
|---|
| 26 | ); |
|---|
| 27 | |
|---|
| 28 | sub usage { |
|---|
| 29 | print <<"EOT"; |
|---|
| 30 | Usage: $0 [options] |
|---|
| 31 | |
|---|
| 32 | Options: |
|---|
| 33 | --rdbms=driver driver (default: mysql) |
|---|
| 34 | --database=db_name database name (mandatory) |
|---|
| 35 | --user=user administrator user of the rdb nodes (default: root) |
|---|
| 36 | --password=pass (default: empty) |
|---|
| 37 | --shard-def=file shard definition file (mandatory) |
|---|
| 38 | --new-host=host hostname of the new primary node (mandatory) |
|---|
| 39 | --new-port=port (optional) |
|---|
| 40 | --from=lowest_id lowest id of the data to be moved to the new node |
|---|
| 41 | --incline-source=file |
|---|
| 42 | if set, re-installs incline triggers (default: empty) |
|---|
| 43 | --drop-old-user-after=seconds |
|---|
| 44 | drops read privileges granted to old user after given |
|---|
| 45 | seconds (default: 60) |
|---|
| 46 | --help print this help |
|---|
| 47 | --version print version |
|---|
| 48 | EOT |
|---|
| 49 | exit 0; |
|---|
| 50 | } |
|---|
| 51 | |
|---|
| 52 | sub db_connect { |
|---|
| 53 | my ($host, $port) = @_; |
|---|
| 54 | $port ||= $DEFAULT_PORT{$opt_rdbms}; |
|---|
| 55 | my $dbh = DBI->connect( |
|---|
| 56 | "DBI:$opt_rdbms:dbname=$opt_database;host=$host;port=$port", |
|---|
| 57 | $opt_user, |
|---|
| 58 | $opt_password, |
|---|
| 59 | { RaiseError => 1 }, |
|---|
| 60 | ) or die $DBI::errstr; |
|---|
| 61 | $dbh; |
|---|
| 62 | } |
|---|
| 63 | |
|---|
| 64 | sub get_master_status { |
|---|
| 65 | my $dbh = shift; |
|---|
| 66 | $dbh->selectall_arrayref('SHOW MASTER STATUS', { Slice => {} })->[0]; |
|---|
| 67 | } |
|---|
| 68 | |
|---|
| 69 | sub get_slave_status { |
|---|
| 70 | my $dbh = shift; |
|---|
| 71 | $dbh->selectall_arrayref('SHOW SLAVE STATUS', { Slice => {} })->[0]; |
|---|
| 72 | } |
|---|
| 73 | |
|---|
| 74 | sub master_is { |
|---|
| 75 | my ($dbh, $host, $port) = @_; |
|---|
| 76 | $port ||= $DEFAULT_PORT{$opt_rdbms}; |
|---|
| 77 | my $status = get_slave_status($dbh); |
|---|
| 78 | $status->{Master_Host} eq $host && $status->{Master_Port} == $port; |
|---|
| 79 | } |
|---|
| 80 | |
|---|
| 81 | sub build_grant_username { |
|---|
| 82 | my $u = shift; |
|---|
| 83 | "'$u'\@'localhost','$u'\@'\%'"; |
|---|
| 84 | } |
|---|
| 85 | |
|---|
| 86 | GetOptions( |
|---|
| 87 | 'rdbms=s' => \$opt_rdbms, |
|---|
| 88 | 'database=s' => \$opt_database, |
|---|
| 89 | 'user=s' => \$opt_user, |
|---|
| 90 | 'password=s' => \$opt_password, |
|---|
| 91 | 'shard-def=s' => \$opt_shard_def, |
|---|
| 92 | 'new-host=s' => \$opt_new_host, |
|---|
| 93 | 'new-port=i' => \$opt_new_port, |
|---|
| 94 | 'from-id=i' => \$opt_from_id, |
|---|
| 95 | 'incline-source=s' => \$opt_incline_source, |
|---|
| 96 | 'drop-old-user-after=i' => \$opt_drop_old_user_after, |
|---|
| 97 | help => \$opt_help, |
|---|
| 98 | version => \$opt_version, |
|---|
| 99 | ) or exit 1; |
|---|
| 100 | usage() |
|---|
| 101 | if $opt_help; |
|---|
| 102 | if ($opt_version) { |
|---|
| 103 | print "$VERSION\n"; |
|---|
| 104 | exit 0; |
|---|
| 105 | } |
|---|
| 106 | |
|---|
| 107 | # validate options |
|---|
| 108 | $opt_rdbms ||= 'mysql'; |
|---|
| 109 | die "--rdbms=driver set to unknown rdbms:$opt_rdbms\n" |
|---|
| 110 | unless $DEFAULT_PORT{$opt_rdbms || ''}; |
|---|
| 111 | die "--database=db_name not specified\n" |
|---|
| 112 | unless $opt_database; |
|---|
| 113 | $opt_user ||= 'root'; |
|---|
| 114 | $opt_password ||= ''; |
|---|
| 115 | die "--shard-def=file not specified\n" |
|---|
| 116 | unless $opt_shard_def; |
|---|
| 117 | die "--new-host=host not specified\n" |
|---|
| 118 | unless $opt_new_host; |
|---|
| 119 | die "--from-id=lowest id not specified\n" |
|---|
| 120 | unless defined $opt_from_id; |
|---|
| 121 | $opt_drop_old_user_after ||= 60; |
|---|
| 122 | |
|---|
| 123 | # load definition file |
|---|
| 124 | my $shard_def = from_json( |
|---|
| 125 | do { |
|---|
| 126 | open my $fh, '<', $opt_shard_def |
|---|
| 127 | or die "failed to open file:$opt_shard_def:$!"; |
|---|
| 128 | my $lines = join '', <$fh>; |
|---|
| 129 | close $fh; |
|---|
| 130 | $lines; |
|---|
| 131 | }, |
|---|
| 132 | { utf8 => 1 }, |
|---|
| 133 | ); |
|---|
| 134 | die "failed to parse file:$opt_shard_def:root object is not a hash\n" |
|---|
| 135 | unless ref $shard_def eq 'HASH'; |
|---|
| 136 | die "algorithm is not \"range-int\" in file:$opt_shard_def\n" |
|---|
| 137 | unless $shard_def->{algorithm} eq 'range-int'; |
|---|
| 138 | die "cannot parse \"map\" attribute in file:$opt_shard_def\n" |
|---|
| 139 | unless ref $shard_def->{map} eq 'HASH'; |
|---|
| 140 | # get information of the original node to be divided |
|---|
| 141 | my ($orig_from_id, $orig_node) = do { |
|---|
| 142 | my $cand_lb; |
|---|
| 143 | for my $lb (sort { $a <=> $b } keys %{$shard_def->{map}}) { |
|---|
| 144 | if ($lb <= $opt_from_id) { |
|---|
| 145 | $cand_lb = $lb; |
|---|
| 146 | } |
|---|
| 147 | } |
|---|
| 148 | die "--from-id=$opt_from_id is out of range\n" |
|---|
| 149 | unless defined $cand_lb; |
|---|
| 150 | my $orig_node = $shard_def->{map}->{$cand_lb}; |
|---|
| 151 | $orig_node = [ $orig_node ] |
|---|
| 152 | unless ref $orig_node eq 'ARRAY'; |
|---|
| 153 | die "new node should be different from the original\n" |
|---|
| 154 | if $opt_new_host eq $orig_node->[0]->{host} |
|---|
| 155 | && ($opt_new_port || $DEFAULT_PORT{$opt_rdbms}) |
|---|
| 156 | == ($orig_node->[0]->{port} || $DEFAULT_PORT{$opt_rdbms}); |
|---|
| 157 | ($cand_lb, $orig_node); |
|---|
| 158 | }; |
|---|
| 159 | print "INFO: dividing node: $orig_node->[0]->{host}:$orig_node->[0]->{port}\n"; |
|---|
| 160 | die "no username defined for original host\n" |
|---|
| 161 | unless $orig_node->[0]->{username}; |
|---|
| 162 | my $orig_username = $orig_node->[0]->{username}; |
|---|
| 163 | |
|---|
| 164 | # connect to orig and new node |
|---|
| 165 | print "INFO: connecting to database nodes\n"; |
|---|
| 166 | my $orig_dbh = db_connect($orig_node->[0]->{host}, $orig_node->[0]->{port}); |
|---|
| 167 | my $new_dbh = db_connect($opt_new_host, $opt_new_port); |
|---|
| 168 | |
|---|
| 169 | print "INFO: checking replication status\n"; |
|---|
| 170 | die "new node is not currently slave of the original node\n" |
|---|
| 171 | unless master_is( |
|---|
| 172 | $new_dbh, $orig_node->[0]->{host}, $orig_node->[0]->{port}, |
|---|
| 173 | ); |
|---|
| 174 | die "new node is not receiving events from master\n" |
|---|
| 175 | unless get_slave_status($new_dbh)->{Slave_IO_State} |
|---|
| 176 | eq 'Waiting for master to send event'; |
|---|
| 177 | |
|---|
| 178 | # drop write privileges |
|---|
| 179 | print "INFO: dropping write privileges to original node\n"; |
|---|
| 180 | $orig_dbh->do( |
|---|
| 181 | "REVOKE $WRITE_PRIVS ON $opt_database.* FROM " |
|---|
| 182 | . build_grant_username($orig_username), |
|---|
| 183 | ); |
|---|
| 184 | print "INFO: dropped write privileges from original node, to recover:\n"; |
|---|
| 185 | print "INFO: GRANT $WRITE_PRIVS ON $opt_database.* TO ", build_grant_username($orig_username), ";\n"; |
|---|
| 186 | print "INFO:\n"; |
|---|
| 187 | |
|---|
| 188 | # TODO wait until all write queries on the original node stops running |
|---|
| 189 | |
|---|
| 190 | print "INFO: waiting for new node to catch up\n"; |
|---|
| 191 | # wait for the nodes to sync |
|---|
| 192 | while (1) { |
|---|
| 193 | my $m = get_master_status($orig_dbh); |
|---|
| 194 | my $s = get_slave_status($new_dbh); |
|---|
| 195 | print "INFO: $m->{Position} vs. $s->{Read_Master_Log_Pos}\n"; |
|---|
| 196 | last |
|---|
| 197 | if $m->{File} eq $s->{Master_Log_File} |
|---|
| 198 | && $m->{Position} eq $s->{Read_Master_Log_Pos}; |
|---|
| 199 | sleep 1; |
|---|
| 200 | } |
|---|
| 201 | |
|---|
| 202 | # upgrade new node to master |
|---|
| 203 | print "INFO: nodes are in sync, stopping replication\n"; |
|---|
| 204 | $new_dbh->do( |
|---|
| 205 | 'STOP SLAVE', |
|---|
| 206 | ); |
|---|
| 207 | |
|---|
| 208 | # drop old user from new node |
|---|
| 209 | print "INFO: dropping old user '$orig_username' from new node\n"; |
|---|
| 210 | $new_dbh->do( |
|---|
| 211 | 'DROP USER ' . build_grant_username($orig_username), |
|---|
| 212 | ); |
|---|
| 213 | |
|---|
| 214 | my $new_username = "pac" . time; |
|---|
| 215 | |
|---|
| 216 | sub updated_shard_def { |
|---|
| 217 | my $r = dclone($shard_def); |
|---|
| 218 | $orig_node->[0]->{username} = $new_username; |
|---|
| 219 | $r->{map}->{$opt_from_id} = [ |
|---|
| 220 | { |
|---|
| 221 | %{$orig_node->[0]}, |
|---|
| 222 | host => $opt_new_host, |
|---|
| 223 | ($opt_new_port ? (port => $opt_new_port) : ()), |
|---|
| 224 | }, |
|---|
| 225 | ]; |
|---|
| 226 | return $r; |
|---|
| 227 | } |
|---|
| 228 | |
|---|
| 229 | # reinstall incline triggers |
|---|
| 230 | if ($opt_incline_source) { |
|---|
| 231 | print "INFO: reinstalling incline triggers\n"; |
|---|
| 232 | my ($tmp_fh, $tmp_def) = tempfile(); |
|---|
| 233 | print $tmp_fh to_json(updated_shard_def(), { utf8 => 1, pretty => 1 }); |
|---|
| 234 | $tmp_fh->flush; |
|---|
| 235 | my @incline_cmd = ( |
|---|
| 236 | q(incline), |
|---|
| 237 | '--mode=shard', |
|---|
| 238 | "--source=$opt_incline_source", |
|---|
| 239 | "--shard-source=$tmp_def", |
|---|
| 240 | "--rdbms=$opt_rdbms", |
|---|
| 241 | "--database=$opt_database", |
|---|
| 242 | ($opt_user ? "--user=$opt_user" : ()), |
|---|
| 243 | ($opt_password ? "--password=$opt_password" : ()), |
|---|
| 244 | ); |
|---|
| 245 | if ($orig_from_id != $opt_from_id) { |
|---|
| 246 | my @opts = ( |
|---|
| 247 | "--host=$orig_node->[0]->{host}", |
|---|
| 248 | "--port=" . ($orig_node->[0]->{port} || $DEFAULT_PORT{$opt_rdbms}), |
|---|
| 249 | ); |
|---|
| 250 | print "INFO: ", join(' ', @incline_cmd, @opts), " drop-trigger\n"; |
|---|
| 251 | system(@incline_cmd, @opts, 'drop-trigger') == 0 |
|---|
| 252 | or die "failed to drop incline triggers from the original node:$?"; |
|---|
| 253 | print "INFO: ", join(' ', @incline_cmd, @opts), " create-trigger\n"; |
|---|
| 254 | system(@incline_cmd, @opts, 'create-trigger') == 0 |
|---|
| 255 | or die "failed to install incline triggers to the original node:$?"; |
|---|
| 256 | } |
|---|
| 257 | my @opts = ( |
|---|
| 258 | "--host=$opt_new_host", |
|---|
| 259 | "--port=" . ($opt_new_port || $DEFAULT_PORT{$opt_rdbms}), |
|---|
| 260 | ); |
|---|
| 261 | print "INFO: ", join(' ', @incline_cmd, @opts), " drop-trigger\n"; |
|---|
| 262 | system(@incline_cmd, @opts, 'drop-trigger') == 0 |
|---|
| 263 | or die "failed to drop incline triggers from the new node:$?"; |
|---|
| 264 | print "INFO: ", join(' ', @incline_cmd, @opts), " create-trigger\n"; |
|---|
| 265 | system(@incline_cmd, @opts, 'create-trigger') == 0 |
|---|
| 266 | or die "failed to install incline triggers to the new node:$?"; |
|---|
| 267 | } |
|---|
| 268 | |
|---|
| 269 | # create new user on both nodes |
|---|
| 270 | print "INFO: creating user '$new_username' on the original node\n"; |
|---|
| 271 | $orig_dbh->do( |
|---|
| 272 | "GRANT $READ_PRIVS,$WRITE_PRIVS ON $opt_database.* TO " |
|---|
| 273 | . build_grant_username($new_username), |
|---|
| 274 | ); |
|---|
| 275 | print "INFO: creating user '$new_username' on the new node\n"; |
|---|
| 276 | $new_dbh->do( |
|---|
| 277 | "GRANT $READ_PRIVS,$WRITE_PRIVS ON $opt_database.* TO " |
|---|
| 278 | . build_grant_username($new_username), |
|---|
| 279 | ); |
|---|
| 280 | |
|---|
| 281 | # update shard definition |
|---|
| 282 | $shard_def = updated_shard_def(); |
|---|
| 283 | |
|---|
| 284 | { # link current shard def as a backup |
|---|
| 285 | my $i = 1; |
|---|
| 286 | while (! link($opt_shard_def, "$opt_shard_def.$i")) { |
|---|
| 287 | $i++; |
|---|
| 288 | } |
|---|
| 289 | print "INFO: link(2)ed original shard definition file to:", |
|---|
| 290 | "$opt_shard_def.$i\n"; |
|---|
| 291 | } |
|---|
| 292 | |
|---|
| 293 | { # update shard def |
|---|
| 294 | my $fh = IO::File::AtomicChange->new($opt_shard_def, 'w') |
|---|
| 295 | or die "failed to open file:$opt_shard_def:$!"; |
|---|
| 296 | $fh->print(to_json($shard_def, { utf8 => 1, pretty => 1 })); |
|---|
| 297 | $fh->close; |
|---|
| 298 | } |
|---|
| 299 | print <<"EOT"; |
|---|
| 300 | INFO: updated the shard definition file |
|---|
| 301 | INFO: |
|---|
| 302 | INFO: CONGRATULATIONS!!! The node has been successfully divided! |
|---|
| 303 | INFO: Now you have to copy the shard definition file to your servers. |
|---|
| 304 | INFO: |
|---|
| 305 | INFO: After $opt_drop_old_user_after seconds, read privileges to old user: $orig_username |
|---|
| 306 | INFO: on the original node will be revoked. Or you can type Ctrl-C and |
|---|
| 307 | INFO manually drop the privileges, by: |
|---|
| 308 | INFO: DROP USER '$orig_username'\@'localhost','$orig_username'\@'\%' |
|---|
| 309 | INFO: |
|---|
| 310 | EOT |
|---|
| 311 | |
|---|
| 312 | sleep $opt_drop_old_user_after; |
|---|
| 313 | |
|---|
| 314 | $orig_dbh->do( |
|---|
| 315 | 'DROP USER ' . build_grant_username($orig_username), |
|---|
| 316 | ); |
|---|
| 317 | print "INFO: dropped old user from original node\n"; |
|---|