| 1 | # ----------------------------------------------------------------------------- |
|---|
| 2 | # $Id$ |
|---|
| 3 | # ----------------------------------------------------------------------------- |
|---|
| 4 | # Socket Connector |
|---|
| 5 | # ----------------------------------------------------------------------------- |
|---|
| 6 | # copyright (C) 2004 Topia <topia@clovery.jp>. all rights reserved. |
|---|
| 7 | package Tiarra::Socket::Connect; |
|---|
| 8 | use strict; |
|---|
| 9 | use warnings; |
|---|
| 10 | use Carp; |
|---|
| 11 | use Tiarra::Socket; |
|---|
| 12 | use base qw(Tiarra::Socket); |
|---|
| 13 | use Timer; |
|---|
| 14 | use Tiarra::OptionalModules; |
|---|
| 15 | use Tiarra::Utils; |
|---|
| 16 | utils->define_attr_accessor(0, qw(domain host addr port callback), |
|---|
| 17 | qw(bind_addr prefer timeout), |
|---|
| 18 | qw(retry_int retry_count try_count)); |
|---|
| 19 | utils->define_attr_enum_accessor('domain', 'eq', |
|---|
| 20 | qw(tcp unix)); |
|---|
| 21 | |
|---|
| 22 | # now supported tcp, unix |
|---|
| 23 | |
|---|
| 24 | # tcp: |
|---|
| 25 | # my $connector = connect->new( |
|---|
| 26 | # host => [hostname], |
|---|
| 27 | # port => [port], |
|---|
| 28 | # callback => sub { |
|---|
| 29 | # my ($genre, $connector, $msg_or_sock, $errno) = @_; |
|---|
| 30 | # if ($genre eq 'warn') { |
|---|
| 31 | # # $msg_or_sock: msg |
|---|
| 32 | # # maybe don't have $errno |
|---|
| 33 | # warn $msg_or_sock; |
|---|
| 34 | # } elsif ($genre eq 'error') { |
|---|
| 35 | # # $msg_or_sock: msg |
|---|
| 36 | # # maybe has $errno |
|---|
| 37 | # die $msg_or_sock; |
|---|
| 38 | # } elsif ($genre eq 'sock') { |
|---|
| 39 | # # $msg_or_sock: sock |
|---|
| 40 | # # maybe don't have $errno |
|---|
| 41 | # attach($connector->current_addr, $connector->current_port, |
|---|
| 42 | # $msg_or_sock); |
|---|
| 43 | # # optional genre |
|---|
| 44 | # } elsif ($genre eq 'interrupt') { |
|---|
| 45 | # # $msg_or_sock: undef |
|---|
| 46 | # # maybe don't have $errno |
|---|
| 47 | # die 'interrupted'; |
|---|
| 48 | # } elsif ($genre eq 'timeout') { |
|---|
| 49 | # # $msg_or_sock: undef |
|---|
| 50 | # # maybe don't have $errno |
|---|
| 51 | # die 'timeout'; |
|---|
| 52 | # } |
|---|
| 53 | # }, |
|---|
| 54 | # # optional params |
|---|
| 55 | # addr => [already resolved addr], |
|---|
| 56 | # bind_addr => [bind_addr (cannot specify host)], |
|---|
| 57 | # timeout => [timeout], # didn't test enough, please send report when bugs. |
|---|
| 58 | # retry_int => [retry interval], |
|---|
| 59 | # retry_count => [retry count], |
|---|
| 60 | # prefer => [prefer socket type(and order) (ipv4, ipv6) as string's |
|---|
| 61 | # array ref, default ipv6, ipv4], |
|---|
| 62 | # domain => 'tcp', # default |
|---|
| 63 | # ); |
|---|
| 64 | # $connector->interrupt; |
|---|
| 65 | # unix: |
|---|
| 66 | # my $connector = connect->new( |
|---|
| 67 | # addr => [path], |
|---|
| 68 | # callback => sub { |
|---|
| 69 | # # ... same as tcp domain's |
|---|
| 70 | # }, |
|---|
| 71 | # # optional params |
|---|
| 72 | # domain => 'unix', # please define this |
|---|
| 73 | # ); |
|---|
| 74 | |
|---|
| 75 | sub new { |
|---|
| 76 | my ($class, %opts) = @_; |
|---|
| 77 | |
|---|
| 78 | $class->_increment_caller('socket-connector', \%opts); |
|---|
| 79 | my $this = $class->SUPER::new(%opts); |
|---|
| 80 | map { |
|---|
| 81 | $this->$_($opts{$_}); |
|---|
| 82 | } qw(host addr port callback bind_addr timeout retry_int retry_count); |
|---|
| 83 | |
|---|
| 84 | if (!defined $this->callback) { |
|---|
| 85 | croak 'callback closure required'; |
|---|
| 86 | } |
|---|
| 87 | |
|---|
| 88 | $this->domain(utils->get_first_defined($opts{domain}, 'tcp')); |
|---|
| 89 | $this->prefer($opts{prefer}); |
|---|
| 90 | |
|---|
| 91 | if (!defined $this->prefer) { |
|---|
| 92 | if ($this->domain_tcp) { |
|---|
| 93 | my @prefer; |
|---|
| 94 | @prefer = ('ipv4'); |
|---|
| 95 | if (Tiarra::OptionalModules->ipv6) { |
|---|
| 96 | unshift(@prefer, 'ipv6') |
|---|
| 97 | } |
|---|
| 98 | $this->prefer(\@prefer); |
|---|
| 99 | } elsif ($this->domain_unix) { |
|---|
| 100 | $this->prefer(['unix']); |
|---|
| 101 | } else { |
|---|
| 102 | croak 'Unsupported domain: '. $this->domain; |
|---|
| 103 | } |
|---|
| 104 | } |
|---|
| 105 | |
|---|
| 106 | $this->{queue} = []; |
|---|
| 107 | $this->connect; |
|---|
| 108 | } |
|---|
| 109 | |
|---|
| 110 | sub connect { |
|---|
| 111 | my $this = shift; |
|---|
| 112 | |
|---|
| 113 | if (defined $this->timeout) { |
|---|
| 114 | $this->{timer} = Timer->new( |
|---|
| 115 | After => $this->timeout, |
|---|
| 116 | Code => sub { |
|---|
| 117 | $this->interrupt('timeout'); |
|---|
| 118 | }); |
|---|
| 119 | } |
|---|
| 120 | |
|---|
| 121 | if (defined $this->addr || $this->domain_unix) { |
|---|
| 122 | my $entry = Tiarra::Resolver::QueueData->new; |
|---|
| 123 | $entry->answer_status($entry->ANSWER_OK); |
|---|
| 124 | $entry->answer_data([$this->addr]); |
|---|
| 125 | $this->_connect_stage($entry); |
|---|
| 126 | } else { |
|---|
| 127 | Tiarra::Resolver->resolve( |
|---|
| 128 | 'addr', $this->host, sub { |
|---|
| 129 | eval { |
|---|
| 130 | $this->_connect_stage(@_); |
|---|
| 131 | }; if ($@) { |
|---|
| 132 | $this->_connect_error("internal error: $@"); |
|---|
| 133 | } |
|---|
| 134 | }); |
|---|
| 135 | } |
|---|
| 136 | $this; |
|---|
| 137 | } |
|---|
| 138 | |
|---|
| 139 | sub _connect_stage { |
|---|
| 140 | my ($this, $entry) = @_; |
|---|
| 141 | |
|---|
| 142 | my %addrs_by_types; |
|---|
| 143 | |
|---|
| 144 | if ($entry->answer_status ne $entry->ANSWER_OK) { |
|---|
| 145 | $this->_connect_error("Couldn't resolve hostname"); |
|---|
| 146 | return undef; # end |
|---|
| 147 | } |
|---|
| 148 | |
|---|
| 149 | foreach my $addr (@{$entry->answer_data}) { |
|---|
| 150 | push (@{$addrs_by_types{$this->probe_type_by_addr($addr)}}, |
|---|
| 151 | $addr); |
|---|
| 152 | } |
|---|
| 153 | |
|---|
| 154 | foreach my $sock_type (@{$this->prefer}) { |
|---|
| 155 | my $struct; |
|---|
| 156 | push (@{$this->{queue}}, |
|---|
| 157 | map { |
|---|
| 158 | $struct = { |
|---|
| 159 | type => $sock_type, |
|---|
| 160 | addr => $_, |
|---|
| 161 | port => $this->port, |
|---|
| 162 | }; |
|---|
| 163 | } @{$addrs_by_types{$sock_type}}); |
|---|
| 164 | } |
|---|
| 165 | $this->_connect_try_next; |
|---|
| 166 | } |
|---|
| 167 | |
|---|
| 168 | sub _connect_try_next { |
|---|
| 169 | my $this = shift; |
|---|
| 170 | |
|---|
| 171 | $this->{connecting} = shift @{$this->{queue}}; |
|---|
| 172 | if (defined $this->{connecting}) { |
|---|
| 173 | my $methodname = '_try_connect_' . $this->{connecting}->{type}; |
|---|
| 174 | $this->$methodname; |
|---|
| 175 | } else { |
|---|
| 176 | if ($this->retry_int && (++$this->try_count <= $this->retry_count)) { |
|---|
| 177 | $this->{timer} = Timer->new( |
|---|
| 178 | After => $this->retry_int, |
|---|
| 179 | Code => sub { |
|---|
| 180 | $this->cleanup; |
|---|
| 181 | $this->connect; |
|---|
| 182 | }); |
|---|
| 183 | $this->_connect_warn( |
|---|
| 184 | 'all dead, ' . |
|---|
| 185 | utils->to_ordinal_number($this->try_count) . ' retry'); |
|---|
| 186 | } else { |
|---|
| 187 | $this->_connect_error('all dead'); |
|---|
| 188 | } |
|---|
| 189 | } |
|---|
| 190 | } |
|---|
| 191 | |
|---|
| 192 | sub _try_connect_ipv4 { |
|---|
| 193 | my $this = shift; |
|---|
| 194 | |
|---|
| 195 | $this->_try_connect_tcp('IO::Socket::INET'); |
|---|
| 196 | } |
|---|
| 197 | |
|---|
| 198 | sub _try_connect_ipv6 { |
|---|
| 199 | my $this = shift; |
|---|
| 200 | |
|---|
| 201 | if (!Tiarra::OptionalModules->ipv6) { |
|---|
| 202 | $this->_error( |
|---|
| 203 | qq{Host $this->{host} seems to be an IPv6 address, }. |
|---|
| 204 | qq{but IPv6 support is not enabled. }. |
|---|
| 205 | qq{Use IPv4 or install Socket6 or IO::Socket::INET6 if possible.\n}); |
|---|
| 206 | } |
|---|
| 207 | |
|---|
| 208 | $this->_try_connect_tcp('IO::Socket::INET6'); |
|---|
| 209 | } |
|---|
| 210 | |
|---|
| 211 | sub _try_connect_tcp { |
|---|
| 212 | my ($this, $package, $addr, %additional) = @_; |
|---|
| 213 | |
|---|
| 214 | if (!eval("require $package")) { |
|---|
| 215 | $this->_connect_error("Couldn\'t require socket package: $package"); |
|---|
| 216 | return; |
|---|
| 217 | } |
|---|
| 218 | my $sock = $package->new( |
|---|
| 219 | %additional, |
|---|
| 220 | (defined $this->{bind_addr} ? |
|---|
| 221 | (LocalAddr => $this->{bind_addr}) : ()), |
|---|
| 222 | Timeout => undef, |
|---|
| 223 | Proto => 'tcp'); |
|---|
| 224 | if (!defined $sock) { |
|---|
| 225 | $this->_connect_error( |
|---|
| 226 | $this->sock_errno_to_msg($!, 'Couldn\'t prepare socket'), |
|---|
| 227 | $!); |
|---|
| 228 | return; |
|---|
| 229 | } |
|---|
| 230 | if (!defined $sock->blocking(0)) { |
|---|
| 231 | # effect only on connecting; comment out |
|---|
| 232 | #$this->_warn('cannot non-blocking') if ::debug_mode(); |
|---|
| 233 | |
|---|
| 234 | if ($this->_is_winsock) { |
|---|
| 235 | # winsock FIONBIO |
|---|
| 236 | my $FIONBIO = 0x8004667e; # from Winsock2.h |
|---|
| 237 | my $temp = chr(1); |
|---|
| 238 | my $retval = $sock->ioctl($FIONBIO, $temp); |
|---|
| 239 | if (!$retval) { |
|---|
| 240 | $this->_warn($this->sock_errno_to_msg( |
|---|
| 241 | $!, 'Couldn\'t set non-blocking mode (winsock2)'), $!); |
|---|
| 242 | } |
|---|
| 243 | } else { |
|---|
| 244 | $this->_warn($this->sock_errno_to_msg( |
|---|
| 245 | $!, 'Couldn\'t set non-blocking mode (general)'), $!); |
|---|
| 246 | } |
|---|
| 247 | } |
|---|
| 248 | my $saddr = Tiarra::Resolver->resolve( |
|---|
| 249 | 'saddr', [$this->current_addr, $this->current_port], |
|---|
| 250 | sub {}, 0); |
|---|
| 251 | $this->{connecting}->{saddr} = $saddr->answer_data; |
|---|
| 252 | if ($sock->connect($this->{connecting}->{saddr}) || |
|---|
| 253 | $!{EINPROGRESS} || $!{EWOULDBLOCK}) { |
|---|
| 254 | my $error = $!; |
|---|
| 255 | $this->attach($sock); |
|---|
| 256 | $! = $error; |
|---|
| 257 | if ($!{EINPROGRESS} || $!{EWOULDBLOCK}) { |
|---|
| 258 | $this->install; |
|---|
| 259 | } else { |
|---|
| 260 | $this->_call; |
|---|
| 261 | } |
|---|
| 262 | } else { |
|---|
| 263 | $this->_connect_warn_try_next($!, 'connect error'); |
|---|
| 264 | } |
|---|
| 265 | } |
|---|
| 266 | |
|---|
| 267 | sub _try_connect_unix { |
|---|
| 268 | my $this = shift; |
|---|
| 269 | |
|---|
| 270 | if (!Tiarra::OptionalModules->unix_dom) { |
|---|
| 271 | $this->_error( |
|---|
| 272 | qq{Address $this->{addr} seems to be an Unix Domain Socket address, }. |
|---|
| 273 | qq{but Unix Domain Socket support is not enabled. }. |
|---|
| 274 | qq{Use other protocol if possible.\n}); |
|---|
| 275 | } |
|---|
| 276 | |
|---|
| 277 | require IO::Socket::UNIX; |
|---|
| 278 | my $sock = IO::Socket::UNIX->new(Peer => $this->{connecting}->{addr}); |
|---|
| 279 | if (defined $sock) { |
|---|
| 280 | $this->attach($sock); |
|---|
| 281 | $this->_call; |
|---|
| 282 | } else { |
|---|
| 283 | $this->_connect_warn_try_next($!, 'Couldn\'t connect'); |
|---|
| 284 | } |
|---|
| 285 | } |
|---|
| 286 | |
|---|
| 287 | sub _connect_warn_try_next { |
|---|
| 288 | my ($this, $errno, $msg) = @_; |
|---|
| 289 | |
|---|
| 290 | $this->_connect_warn($this->sock_errno_to_msg($errno, $msg), $errno); |
|---|
| 291 | $this->_connect_try_next; |
|---|
| 292 | } |
|---|
| 293 | |
|---|
| 294 | sub _connect_error { shift->_connect_warn_or_error('error', @_); } |
|---|
| 295 | sub _connect_warn { shift->_connect_warn_or_error('warn', @_); } |
|---|
| 296 | |
|---|
| 297 | sub _connect_warn_or_error { |
|---|
| 298 | my $this = shift; |
|---|
| 299 | my $method = '_'.shift; |
|---|
| 300 | my $str = shift; |
|---|
| 301 | my $errno = shift; # but optional |
|---|
| 302 | if (defined $str) { |
|---|
| 303 | $str = ': ' . $str; |
|---|
| 304 | } else { |
|---|
| 305 | $str = ''; |
|---|
| 306 | } |
|---|
| 307 | |
|---|
| 308 | $this->$method("Couldn't connect to ".$this->destination.$str, $errno, @_); |
|---|
| 309 | } |
|---|
| 310 | |
|---|
| 311 | sub destination { |
|---|
| 312 | my $this = shift; |
|---|
| 313 | |
|---|
| 314 | $this->repr_destination( |
|---|
| 315 | host => $this->host, |
|---|
| 316 | addr => $this->current_addr, |
|---|
| 317 | port => $this->current_port, |
|---|
| 318 | type => $this->current_type); |
|---|
| 319 | } |
|---|
| 320 | |
|---|
| 321 | sub current_addr { |
|---|
| 322 | my $this = shift; |
|---|
| 323 | |
|---|
| 324 | utils->get_first_defined( |
|---|
| 325 | $this->{connecting}->{addr}, |
|---|
| 326 | $this->addr); |
|---|
| 327 | } |
|---|
| 328 | |
|---|
| 329 | sub current_port { |
|---|
| 330 | my $this = shift; |
|---|
| 331 | |
|---|
| 332 | utils->get_first_defined( |
|---|
| 333 | $this->{connecting}->{port}, |
|---|
| 334 | $this->port); |
|---|
| 335 | } |
|---|
| 336 | |
|---|
| 337 | sub current_type { |
|---|
| 338 | my $this = shift; |
|---|
| 339 | |
|---|
| 340 | $this->{connecting}->{type}; |
|---|
| 341 | } |
|---|
| 342 | |
|---|
| 343 | sub _error { |
|---|
| 344 | # connection error; and finish ->connect chain |
|---|
| 345 | my ($this, $msg, $errno) = @_; |
|---|
| 346 | |
|---|
| 347 | $this->callback->('error', $this, $msg, $errno); |
|---|
| 348 | } |
|---|
| 349 | |
|---|
| 350 | sub _warn { |
|---|
| 351 | # connection warning; but continue trying |
|---|
| 352 | my ($this, $msg, $errno) = @_; |
|---|
| 353 | |
|---|
| 354 | $this->callback->('warn', $this, $msg, $errno); |
|---|
| 355 | } |
|---|
| 356 | |
|---|
| 357 | sub _call { |
|---|
| 358 | # connection successful |
|---|
| 359 | my $this = shift; |
|---|
| 360 | |
|---|
| 361 | $this->callback->('sock', $this, $this->sock); |
|---|
| 362 | } |
|---|
| 363 | |
|---|
| 364 | sub cleanup { |
|---|
| 365 | my $this = shift; |
|---|
| 366 | |
|---|
| 367 | if ($this->installed) { |
|---|
| 368 | $this->uninstall; |
|---|
| 369 | } |
|---|
| 370 | if (defined $this->{timer}) { |
|---|
| 371 | $this->{timer}->uninstall; |
|---|
| 372 | $this->{timer} = undef; |
|---|
| 373 | } |
|---|
| 374 | } |
|---|
| 375 | |
|---|
| 376 | sub interrupt { |
|---|
| 377 | my ($this, $genre) = @_; |
|---|
| 378 | |
|---|
| 379 | $this->cleanup; |
|---|
| 380 | if (defined $this->sock) { |
|---|
| 381 | $this->close; |
|---|
| 382 | } |
|---|
| 383 | $genre = 'interrupt' unless defined $genre; |
|---|
| 384 | $this->callback->($genre, $this); |
|---|
| 385 | } |
|---|
| 386 | |
|---|
| 387 | sub want_to_write { |
|---|
| 388 | 1; |
|---|
| 389 | } |
|---|
| 390 | |
|---|
| 391 | sub write { shift->proc_sock('write') } |
|---|
| 392 | sub read { shift->proc_sock('read') } |
|---|
| 393 | sub exception { shift->_handle_sock_error } |
|---|
| 394 | |
|---|
| 395 | sub proc_sock { |
|---|
| 396 | my $this = shift; |
|---|
| 397 | my $state = shift; |
|---|
| 398 | |
|---|
| 399 | if ($state eq 'write') { |
|---|
| 400 | my $error = $this->errno; |
|---|
| 401 | $this->cleanup; |
|---|
| 402 | if ($error) { |
|---|
| 403 | $this->close; |
|---|
| 404 | $this->_connect_warn_try_next($error); |
|---|
| 405 | } else { |
|---|
| 406 | $this->_call; |
|---|
| 407 | } |
|---|
| 408 | } elsif (!$this->sock->connect($this->{connecting}->{saddr})) { |
|---|
| 409 | if ($!{EISCONN} || |
|---|
| 410 | ($this->_is_winsock && (($! == 10022) || $!{EWOULDBLOCK} || |
|---|
| 411 | $!{EALREADY}))) { |
|---|
| 412 | $this->cleanup; |
|---|
| 413 | $this->_call; |
|---|
| 414 | } else { |
|---|
| 415 | $this->_warn( |
|---|
| 416 | $this->sock_errno_to_msg($!, 'connection try error'), $!); |
|---|
| 417 | $this->_handle_sock_error; |
|---|
| 418 | } |
|---|
| 419 | } elsif (!IO::Select->new($this->sock)->can_write(0)) { |
|---|
| 420 | $this->_warn('cannot write socket error'); |
|---|
| 421 | my $error = $this->errno; |
|---|
| 422 | $this->cleanup; |
|---|
| 423 | $this->close; |
|---|
| 424 | $this->_connect_warn_try_next($error, "cant write on $state"); |
|---|
| 425 | } else { |
|---|
| 426 | # ignore first ready-to-read |
|---|
| 427 | if ($state ne 'read' || $this->{unexpected_want_to_read_count}++) { |
|---|
| 428 | $this->_warn("connect successful, why called this on $state?"); |
|---|
| 429 | } |
|---|
| 430 | } |
|---|
| 431 | } |
|---|
| 432 | |
|---|
| 433 | sub _handle_sock_error { |
|---|
| 434 | my $this = shift; |
|---|
| 435 | |
|---|
| 436 | my $error = $this->errno; |
|---|
| 437 | $this->cleanup; |
|---|
| 438 | $this->close; |
|---|
| 439 | $this->_connect_warn_try_next($error); |
|---|
| 440 | } |
|---|
| 441 | |
|---|
| 442 | 1; |
|---|